7.3 Hash Based Shuffle

本节讲解Hash Based Shuffle,包括Hash Based Shuffle概述、Hash Based Shuffle内核、Hash Based Shuffle的数据读写的源码解析等内容。

7.3.1 概述

在Spark 1.1之前,Spark中只实现了一种Shuffle方式,即基于Hash的Shuffle。在Spark 1.1版本中引入了基于Sort的Shuffle实现方式,并且在Spark 1.2版本之后,默认的实现方式从基于Hash的Shuffle,修改为基于Sort的Shuffle实现方式,即使用的ShuffleManager从默认的hash修改为sort。说明在Spark 2.0版本中,Hash的Shuffle方式已经不再使用。

Spark之所以一开始就提供基于Hash的Shuffle实现机制,其主要目的之一就是为了避免不需要的排序(这也是Hadoop Map Reduce被人诟病的地方,将Sort作为固定步骤,导致许多不必要的开销)。但基于Hash的Shuffle实现机制在处理超大规模数据集的时候,由于过程中会产生大量的文件,导致过度的磁盘I/O开销和内存开销,会极大地影响性能。

但在一些特定的应用场景下,采用基于Hash的实现Shuffle机制的性能会超过基于Sort的Shuffle实现机制。关于基于Hash与基于Sort的Shuffle实现机制的性能测试方面,可以参考Spark创始人之一的ReynoldXin给的测试:“sort-basedshuffle has lower memory usage and seems to outperformhash-based in almost allof our testing”。

相关数据可以参考https://issues.apache.org/jira/browse/SPARK-3280。

因此,在Spark 1.2版本中修改为默认基于Sort的Shuffle实现机制时,同时也给出了特定应用场景下回退的机制。

7.3.2 Hash Based Shuffle内核

1.基于Hash的Shuffle实现机制的内核框架

基于Hash的Shuffle实现,ShuffleManager的具体实现子类为HashShuffleManager,对应的具体实现机制如图7-3所示。

图7-3 基于哈希算法的Shuffle实现机制的内核框架

其中,HashShuffleManager是ShuffleManager的基于哈希算法实现方式的具体实现子类。数据块的读写分别由BlockStoreShuffleReader与HashShuffleWriter实现;数据块的文件解析器则由具体子类FileShuffleBlockResolver实现;BaseShuffleHandle是ShuffleHandle接口的基本实现,保存Shuffle注册的信息。

HashShuffleManager继承自ShuffleManager,对应实现了各个抽象接口。基于Hash的Shuffle,内部使用的各组件的具体子类如下所示。

(1)BaseShuffleHandle:携带了Shuffle最基本的元数据信息,包括shuffleId、numMaps和dependency。

(2)BlockStoreShuffleReader:负责写入的Shuffle数据块的读操作。

(3)FileShuffleBlockResolver:负责管理,为Shuffle任务分配基于磁盘的块数据的Writer。每个ShuffleShuffle任务为每个Reduce分配一个文件。

(4)HashShuffleWriter:负责Shuffle数据块的写操作。

在此与解析整个Shuffle过程一样,以HashShuffleManager类作为入口进行解析。

首先看一下HashShuffleManager具体子类的注释,如下所示。

Spark 1.6.0版本的HashShuffleManager.scala的源码(Spark 2.2版本已无HashShuffleManager方式)如下。

1.  /**
      *使用Hash的ShuffleManager具体实现子类,针对每个Mapper都会为各个Reduce分
      *区构建一个输出文件(也可能是多个任务复用文件)
2.    */
3.  private[spark]      class    HashShuffleManager(conf:         SparkConf)   extends
    ShuffleManager with Logging {
4.  ......
2.基于Hash的Shuffle实现方式一

为了避免Hadoop中基于Sort方式的Shuffle所带来的不必要的排序开销,Spark在开始时采用了基于Hash的Shuffle方式。但这种方式存在不少缺陷,这些缺陷大部分是由于在基于Hash的Shuffle实现过程中创建了太多的文件所造成的。在这种方式下,每个Mapper端的Task运行时都会为每个Reduce端的Task生成一个文件,具体如图7-4所示。

图7-4 基于Hash的Shuffle实现方式——文件的输出细节图

Executor-Mapper表示执行Mapper端的Tasks的工作点,可以分布到集群中的多台机器节点上,并且可以以不同的形式出现,如以Spark Standalone部署模式中的Executor出现,也可以以Spark On Yarn部署模式中的容器形式出现,关键是它代表了实际执行Mapper端的Tasks的工作点的抽象概念。其中,M表示Mapper端的Task的个数,R表示Reduce端的Task的个数。

对应在右侧的本地文件系统是在该工作点上所生成的文件,其中R表示Reduce端的分区个数。生成的文件名格式为:shuffle_shuffleId_mapId_reduceId,其中的shuffle_shuffleId_1_1表示mapId为1,同时reduceId也为1。

在Mapper端,每个分区对应启动一个Task,而每个Task会为每个Reducer端的Task生成一个文件,因此最终生成的文件个数为M×R。

由于这种实现方式下,对应生成文件个数仅与Mapper端和Reducer端各自的分区数有关,因此图中将Mapper端的全部M个Task抽象到一个Executor-Mapper中,实际场景中通常是分布到集群中的各个工作点中。

生成的各个文件位于本地文件系统的指定目录中,该目录地址由配置属性spark.local.dir设置。说明:分区数与Task数,一个是静态的数据分块个数,一个是数据分块对应执行的动态任务个数,因此,在特定的、描述个数的场景下,两者是一样的。

3.基于Hash的Shuffle实现方式二

为了减少Hash所生成的文件个数,对基于Hash的Shuffle实现方式进行了优化,引入文件合并的机制,该机制设置的开关为配置属性spark.shuffle.consolidateFiles。在引入文件合并的机制后,当设置配置属性为true,即启动文件合并时,在Mapper端的输出文件会进行合并,在一定程度上可以大量减少文件的生成,降低不必要的开销。文件合并的实现方式可以参考图7-5。

图7-5 基于Hash的Shuffle的合并文件机制的输出细节图

Executor-Mapper表示集群中分配的某个工作点,其中,C表示在该工作点上所分配到的内核(Core)个数,T表示在该工作点上为每个Task分配的内核个数。C/T表示在该工作点上调度时最大的Task并行个数。

右侧的本地文件系统是在该工作点上所生成的文件,其中R表示Reduce端的分区个数。生成的文件名格式为:merged_shuffle_shuffleId_bucketId_fileId,其中的merged_shuffle_ shuffleId_1_1表示bucketId为1,同时fileId也为1。

在Mapper端,Task会复用文件组,由于最大并行个数为C/T,因此文件组最多分配C/T个,当某个Task运行结束后,会释放该文件组,之后调度的Task则复用前一个Task所释放的文件组,因此会复用同一个文件。最终在该工作点上生成的文件总数为C/T*R,如果设工作点个数为E,则总的文件数为E*C/T*R。

4.基于Hash的Shuffle机制的优缺点

1)优点

 可以省略不必要的排序开销。

 避免了排序所需的内存开销。

2)缺点

 生成的文件过多,会对文件系统造成压力。

 大量小文件的随机读写会带来一定的磁盘开销。

 数据块写入时所需的缓存空间也会随之增加,会对内存造成压力。

7.3.3 Hash Based Shuffle数据读写的源码解析

1.基于Hash的Shuffle实现方式一的源码解析

下面针对Spark 1.6版本中的基于Hash的Shuffle实现在数据写方面进行源码解析(Spark2.0版本中已无Hash的Shuffle实现方式)。在基于Hash的Shuffle实现机制中,采用HashShuffleWriter作为数据写入器。在HashShuffleWriter中控制Shuffle写数据的关键代码如下所示。

Spark 1.6.0版本的HashShuffleWriter.scala的源码(Spark 2.2版本已无HashShuffle-Manager方式)如下。

1.  private[spark] class HashShuffleWriter[K, V](
2.      shuffleBlockResolver: FileShuffleBlockResolver,
3.      handle: BaseShuffleHandle[K, V, _],
4.  mapId: Int,
5.      context: TaskContext)
6.    extends ShuffleWriter[K, V] with Logging {
7.
8.    //控制每个Writer输出时的切片个数,对应分区个数
9.    private val dep = handle.dependency
10.   private val numOutputSplits = dep.partitioner.numPartitions
11.
12.   ......
13.   //获取数据读写的块管理器
14.   private val blockManager = SparkEnv.get.blockManager
15.   private val ser = Serializer.getSerializer(dep.serializer.getOrElse(null))
16.
17. //从  FileShuffleBlockResolver      的 forMapTask方法中获取指定的    shuffleId  对应
    //的mapId
18. //对应分区个数构建的数据块写的ShuffleWriterGroup实例
19.   private val shuffle = shuffleBlockResolver.forMapTask(dep.shuffleId,
      mapId, numOutputSplits, ser, writeMetrics)
20.
21.
22.   /** Task输出时一组记录的写入 */
23.
24.   override def write(records: Iterator[Product2[K, V]]): Unit = {
25. //判断在写时是否需要先聚合,即定义了Map端Combine时,先对数据进行聚合再写入,否则
    //直接返回需要写入的一批记录
26.
27.   val iter = if (dep.aggregator.isDefined) {
28.       if (dep.mapSideCombine) {
29.         dep.aggregator.get.combineValuesByKey(records, context)
30.       } else {
31.         records
32.       }
33.     } else {
34.       require(!dep.mapSideCombine, "Map-side combine without Aggregator
          specified!")
35.       records
36.     }
37.
38.     //根据分区器,获取每条记录对应的bucketId(即所在Reduce序号),根据bucketId
        //从FileShuffleBlockResolver构建的ShuffleWriterGroup中,获取DiskBlock-
        //ObjectWriter实例,对应磁盘数据块的数据写入器
39.     for (elem<- iter) {
40.       val bucketId = dep.partitioner.getPartition(elem._1)
41.       shuffle.writers(bucketId).write(elem._1, elem._2)
42.     }
43.   }
44.      ......
45. }

当需要在Map端进行聚合时,使用的是聚合器(Aggregator)的combineValuesByKey方法,在该方法中使用ExternalAppendOnlyMap类对记录集进行处理,处理时如果内存不足,会引发Spill操作。早期的实现会直接缓存到内存,在数据量比较大时容易引发内存泄漏。

在HashShuffleManager中,ShuffleBlockResolver特质使用的具体子类为FileShuffleBlock-Resolver,即指定了具体如何从一个逻辑Shuffle块标识信息来获取一个块数据,对应为下面第7行调用的forMapTask方法,具体代码如下所示:

Spark 1.6.0版本的FileShuffleBlockResolver.scala的源码(Spark 2.2版本已无HashShuffleManager方式)如下。

1.  /**
2.    *针对给定的 Map Task,指定一个ShuffleWriterGroup实例,在数据块写入器成功
3.    *关闭时,会注册为完成状态
4.    */
5.
6.
7.   def forMapTask(shuffleId: Int, mapId: Int, numReduces: Int, serializer:
     Serializer,
8.  writeMetrics: ShuffleWriteMetrics): ShuffleWriterGroup = {
9.      new ShuffleWriterGroup {
10.       //在FileShuffleBlockResolver中维护着当前Map Task对应shuffleId标识的
          //Shuffle中,指定numReduces个数的Reduce的各个状态
11.
12. shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numReduces))
13.       private val shuffleState = shuffleStates(shuffleId)
14.
15. ......
16.           //根据Reduce端的任务个数,构建元素类型为DiskBlockObjectWriter的数组,
              //DiskBlockObjectWriter负责具体数据的磁盘写入
17.    //原则上,Shuffle的输出可以存放在各种提供存储机制的系统上,但为了容错性等方面的
       //考虑,目前的Shuffle实行机制都会写入到磁盘中
18.
19.  val writers: Array[DiskBlockObjectWriter] = {
20.         //这里的逻辑Bucket的Id值即对应的Reduce的任务序号,或者说分区ID
21.         Array.tabulate[DiskBlockObjectWriter](numReduces) { bucketId =>
22.         //针对每个Map端分区的Id与Bucket的Id构建数据块的逻辑标识
23.           val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
24.           val blockFile = blockManager.diskBlockManager.getFile(blockId)
25.           val tmp = Utils.tempFileWith(blockFile)
26.blockManager.getDiskWriter(blockId, tmp, serializerInstance, bufferSize,
    writeMetrics)
27.         }
28.       }
29. ......
30.       //任务完成时回调的释放写入器方法
31.       override def releaseWriters(success: Boolean) {
32. shuffleState.completedMapTasks.add(mapId)
33.       }
34.     }
35.   }

其中,ShuffleBlockId实例构建的源码如下所示。

1.  case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int)
    extends BlockId {
2.    override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_"
      + reduceId
3.  }

从name方法的重载上可以看出,后续构建的文件与代码中的mapId、reduceId的关系。当然,所有同一个Shuffle的输出数据块,都会带上shuffleId这个唯一标识的,因此全局角度上,逻辑数据块name不会重复(针对一些推测机制或失败重试机制之类的场景而已,逻辑name没有带上时间信息,因此缺少多次执行的输出区别,但在管理这些信息时会维护一个时间作为有效性判断)。

2.基于Hash的Shuffle实现方式二的源码解析

下面通过详细解析FileShuffleBlockResolver源码来加深对文件合并机制的理解。

由于在Spark 1.6中,文件合并机制已经删除,因此下面基于Spark 1.5版本的代码对文件合并机制的具体实现细节进行解析。以下代码位于FileShuffleBlockResolver类中。

合并机制的关键控制代码如下所示。

Spark 1.5.0版本的FileShuffleBlockResolver.scala的源码(Spark 2.2版本已无HashShuffleManager方式)如下。

1.  /**
2.    *获取一个针对特定Map Task的ShuffleWriterGroup
3.    */
4.
5.
6.    def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer:
      Serializer,
7.  writeMetrics: ShuffleWriteMetrics): ShuffleWriterGroup = {
8.      new ShuffleWriterGroup {
9.  ......
10.       val writers: Array[DiskBlockObjectWriter] = if
         (consolidateShuffleFiles) {
11. //获取未使用的文件组
12. fileGroup = getUnusedFileGroup()
13.         Array.tabulate[DiskBlockObjectWriter](numBuckets) { bucketId =>
14. val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
15. //注意获取磁盘写入器时,传入的第二个参数与未使用文件合并机制时的差异
16. //fileGroup(bucketId):构造器方式调用,对应apply的方法调用
17.blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializerInstance,
    bufferSize,
18. writeMetrics)
19.         }
20.       } else {
21.         Array.tabulate[DiskBlockObjectWriter](numBuckets) { bucketId =>
22. val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
23. //根据ShuffleBlockId信息获取文件名
24.           val blockFile = blockManager.diskBlockManager.getFile(blockId)
25.           val tmp = Utils.tempFileWith(blockFile)
26.blockManager.getDiskWriter(blockId, tmp, serializerInstance, bufferSize,
    writeMetrics)
27.         }
28.       }
29. ......
30. writeMetrics.incShuffleWriteTime(System.nanoTime - openStartTime)
31.       override def releaseWriters(success: Boolean) {
32. //带文件合并机制时,写入器在释放后的处理
33. //3个关键信息mapId、offsets、lengths
34.         if (consolidateShuffleFiles) {
35.           if (success) {
36.             val offsets = writers.map(_.fileSegment().offset)
37.             val lengths = writers.map(_.fileSegment().length)
38. fileGroup.recordMapOutput(mapId, offsets, lengths)
39.           }
40. //回收文件组,便于后续复用
41. recycleFileGroup(fileGroup)
42.         } else {
43. shuffleState.completedMapTasks.add(mapId)
44.         }
45.       }

其中,第10行中的consolidateShuffleFiles变量,是判断是否设置了文件合并机制,当设置consolidateShuffleFiles为true后,会继续调用getUnusedFileGroup方法,在该方法中会获取未使用的文件组,即重新分配或已经释放可以复用的文件组。

获取未使用的文件组(ShuffleFileGroup)的相关代码getUnusedFileGroup如下所示。

Spark 1.5.0版本的FileShuffleBlockResolver.scala的源码(Spark 2.2版本已无HashShuffleManager方式)如下。

1.  private def getUnusedFileGroup(): ShuffleFileGroup = {
2.  //获取已经构建但未使用的文件组,如果获取失败,则重新构建一个文件组
3.          val fileGroup = shuffleState.unusedFileGroups.poll()
4.          if (fileGroup != null) fileGroup else newFileGroup()
5.        }
6.  //重新构建一个文件组的源码
7.        private def newFileGroup(): ShuffleFileGroup = {
8.  //构建后会对文件编号进行递增,该文件编号最终用在生成的文件名中
9.          val fileId = shuffleState.nextFileId.getAndIncrement()
10.         val files = Array.tabulate[File](numBuckets) { bucketId =>
11. //最终的文件名,可以通过文件名的组成及取值细节,加深对实现细节在文件个数上的差异的理解
12.
13.           val filename = physicalFileName(shuffleId, bucketId, fileId)
14. blockManager.diskBlockManager.getFile(filename)
15.         }
16. //构建并添加到shuffleState中,便于后续复用
17.         val fileGroup = new ShuffleFileGroup(shuffleId, fileId, files)
18. shuffleState.allFileGroups.add(fileGroup)
19. fileGroup
20.       }

其中,第13行代码对应生成的文件名,即物理文件名,相关代码如下所示。

Spark 1.5.0版本的FileShuffleBlockResolver.scala的源码(Spark 2.2版本已无HashShuffleManager方式)如下。

1.  private def physicalFileName(shuffleId: Int, bucketId: Int, fileId: Int) = {
2.      "merged_shuffle_%d_%d_%d".format(shuffleId, bucketId, fileId)
3.    }

可以看到,与未使用文件合并时的基于Hash的Shuffle实现方式不同的是,在生成的文件名中没有对应的mapId,取而代之的是与文件组相关的fileId,而fileId则是多个Mapper端的Task所共用的,在此仅从生成的物理文件名中也可以看出文件合并的某些实现细节。

另外,对应生成的文件组既然是复用的,当一个Mapper端的Task执行结束后,便会释放该文件组(ShuffleFileGroup),之后继续调度时便会复用该文件组。对应地,调度到某个Executor工作点上同时运行的Task最大个数,就对应了最多分配的文件组个数。

而在TaskSchedulerImpl调度Task时,各个Executor工作点上Task调度控制的源码说明了在各个Executor工作点上调度并行的Task数,具体代码如下所示。

1.  private def resourceOfferSingleTaskSet(
2.  taskSet: TaskSetManager,
3.  maxLocality: TaskLocality,
4.  shuffledOffers: Seq[WorkerOffer],
5.  availableCpus: Array[Int],
6.      tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
7.  var launchedTask = false
8.    for (i <- 0 until shuffledOffers.size) {
9.  val execId = shuffledOffers(i).executorId
10. val host = shuffledOffers(i).host
11. //判断当前Executor工作点上可用的内核个数是否满足Task所需的内核个数
12. //CPUS_PER_TASK:表示设置的每个Task所需的内核个数
13. if (availableCpus(i) >= CPUS_PER_TASK) {
14. try {
15. for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
16. ......
17. launchedTask = true
18. }
19.      } catch {
20. ......
21.       }
22.     }
23.   }
24. return launchedTask
25. }

其中,设置每个Task所需的内核个数的配置属性如下所示:

1.  //每个任务请求的CPU个数
2.  val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)

对于这些会影响Executor中并行执行的任务数的配置信息,设置时需要多方面考虑,包括内核个数与任务个数的合适比例,在内存模型中,为任务分配内存的具体策略等。任务分配内存的具体策略可以参考Spark官方给出的具体设计文档,以及文档中各种设计方式的权衡等内容。