7.4 Sorted Based Shuffle

在历史的发展中,为什么Spark最终还是放弃了HashShuffle,使用了Sorted-Based Shuffle,而且作为后起之秀的Tungsten-based Shuffle到底是在什么样的背景下产生的。Tungsten-Sort Shuffle已经并入了Sorted-Based Shuffle,Spark的引擎会自动识别程序需要的是Sorted-Based Shuffle,还是Tungsten-Sort Shuffle,Spark会检查相对的应用程序有没有Aggregrate的操作。Sorted-Based Shuffle也有缺点,其缺点反而是它排序的特性,它强制要求数据在Mapper端必须先进行排序(注意,这里没有说对计算结果进行排序),所以导致它排序的速度有点慢。而Tungsten-Sort Shuffle对它的排序算法进行了改进,优化了排序的速度。

Spark会根据宽依赖把它一系列的算子划分成不同的Stage,Stage的内部会进行Pipeline、Stage与Stage之间进行Shuffle。Shuffle的过程包含三部分,如图7-6所示。

图7-6 Shuffle的过程示意图

第一部分是Shuffle的Writer;第二部分是网络传输;第三部分是Shuffle的Read,这三大部分设置了内存操作、磁盘I/O、网络I/O以及JVM的管理。而这些东西是影响了Spark应用程序95%以上效率的唯一原因。假设程序代码本身非常好,性能的95%都消耗在Shuffle阶段的本地写磁盘文件、网络传输数据以及抓取数据这样的生命周期中,如图7-7所示。

图7-7 Shuffle示意图

在Shuffle写数据的时候,内存中有一个缓存区叫Buffer,可以将其想像成一个Map,同时在本地磁盘有对应的本地文件。如果本地磁盘有文件,在内存中肯定也需要有对应的管理句柄。也就是说,单从ShuffleWriter内存占用的角度讲,已经有一部分内存空间用在存储Buffer数据,另一部分内存空间是用来管理文件句柄的,回顾HashShuffle所产生小文件的个数是Mapper分片数量×Reducer分片数量(M×R)。例如,Mapper端有1000个数据分片,Reducer端也有1000个数据分片,在HashShuffle的机制下,它在本地内存空间中会产生1000 ×1000=1000000个小文件,结果可想而知,这么多的I/O,这么多的内存消耗、这么容易产生OOM,以及这么沉重的CG负担。再说,如果Reducer端去读取Mapper端的数据时,Mapper端有这么多的小文件,要打开很多网络通道去读数据,打开1000000端口不是一件很轻松的事。这会导致一个非常经典的错误:Reducer端下一个Stage通过Driver去抓取上一个Stage属于它自己的数据的时候,说文件找不到。其实,这个时候不是真的在磁盘上找不到文件,而是程序不响应,因为它在进行垃圾回收(GC)操作。

Spark最根本要优化和迫切要解决的问题是:减少Mapper端ShuffleWriter产生的文件数量,这样便可以让Spark从几百台集群的规模瞬间变成可以支持几千台,甚至几万台集群的规模(一个Task背后可能是一个Core去运行,也可能是多个Core去运行,但默认情况下是用一个Core去运行一个Task)。

减少Mapper端的小文件带来的好处是:

(1)Mapper端的内存占用变少了。

(2)Spark不仅仅可以处理小规模的数据,即使处理大规模的数据,也不会很容易达到性能瓶颈。

(3)Reducer端抓取数据的次数变少了。

(4)网络通道的句柄变少了。

(5)不仅仅减少了数据级别内存的消耗,更极大减少了Spark框架运行时必须消耗Reducer的内容。

7.4.1 概述

Sorted-Based Shuffle的出现,最显著的优势是把Spark从只能处理中小规模数据的平台,变成可以处理无限大规模数据的平台。集群规模意味着Spark处理数据的规模,也意味着Spark的运算能力。

Sorted-Based Shuffle不会为每个Reducer中的Task生产一个单独的文件,相反,Sorted-Based Shuffle会把Mapper中每个ShuffleMapTask所有的输出数据Data只写到一个文件中,因为每个ShuffleMapTask中的数据会被分类,所以Sort-based Shuffle使用了index文件,存储具体ShuffleMapTask输出数据在同一个Data文件中是如何分类的信息。基于Sort-based Shuffle会在Mapper中的每个ShuffleMapTask中产生两个文件(并发度的个数×2),如图7-8所示。

图7-8 Sorted-Based Shuffle示意图

图7-8会产生一个Data文件和一个Index文件。其中,Data文件是存储当前Task的Shuffle输出的,而Index文件则存储了Data文件中的数据通过Partitioner的分类信息,此时下一个阶段的Stage中的Task就是根据这个Index文件获取自己所需要抓取的上一个Stage中ShuffleMapTask所产生的数据。

假设现在Mapper端有1000个数据分片,Reducer端也有1000个数据分片,它的并发度是100,使用Sorted-Based Shuffle会产生多少个Mapper端的小文件,答案是100×2 = 200个。它的MapTask会独自运行,每个MapTask在运行时写两个文件,运行成功后就不需要这个MapTask的文件句柄,无论是文件本身的句柄,还是索引的句柄,都不需要,所以如果它的并发度是100个Core,每次运行100个任务,它最终只会占用200个文件句柄,这与HashShuffle的机制不一样,HashShuffle最差的情况是Hashed句柄存储在内存中。

图7-9中,Sorted-Based Shuffle主要在Mapper阶段,这个跟Reducer端没有任何关系,在Mapper阶段,Sorted-Based Shuffle要进行排序,可以认为是二次排序,它的原理是有两个Key进行排序,第一个是PartitionId进行排序,第二个是本身数据的Key进行排序。它会把PartitionId分成3个,索引分别为0、1、2,这个在Mapper端进行排序的过程其实是让Reducer去抓取数据的时候变得更高效。例如,第一个Reducer,它会到Mapper端的索引为0的数据分片中抓取数据。具体而言,Reducer首先找Driver去获取父Stage中每个ShuffleMapTask输出的位置信息,根据位置信息获取Index文件,解析Index文件,从解析的Index文件中获取Data文件中属于自己的那部分内容。

图7-9 Sorted-Based Shuffle流程图

一个Mapper任务除了有一个数据文件外,它也会有一个索引文件,Map Task把数据写到文件磁盘的顺序是根据自身的Key写进去的,同时也是按照Partition写进去的,因为它是顺序写数据,记录每个Partition的大小。

Sort-Based Shuffle的弱点如下。

(1)如果Mapper中Task的数量过大,依旧会产生很多小文件,此时在Shuffle传数据的过程中到Reducer端,Reducer会需要同时大量地记录进行反序列化,导致大量内存消耗和GC负担巨大,造成系统缓慢,甚至崩溃!

(2)强制了在Mapper端必须要排序,这里的前提是数据本身不需要排序。

(3)如果在分片内也需要进行排序,此时需要进行Mapper端和Reducer端的两次排序。

(4)它要基于记录本身进行排序,这就是Sort-Based Shuffle最致命的性能消耗。

7.4.2 Sorted Based Shuffle内核

Sorted-Based Shuffle的核心是借助于ExternalSorter把每个ShuffleMapTask的输出排序到一个文件中(FileSegmentGroup),为了区分下一个阶段Reducer Task不同的内容,它还需要有一个索引文件(Index)来告诉下游Stage的并行任务,那一部分是属于下游Stage的,如图7-10所示。

图7-10 Sorted-Based Shuffle的核心示意图

图7-10中,在Reducer端有4个Reducer Task,它会产生一组File Group和一个索引文件,File Group里的FileSegement会进行排序,下游的Task很容易根据索引(index)定位到这个File中的那一部分。FileSegement是属于下游的,相当于一个指针,下游的Task要向Driver去确定文件在哪里,然后到这个File文件所在的地方,实际上会与BlockManager进行沟通,BlockManager首先会读一个Index文件,根据它的命名规则进行解析。例如,下一个阶段的第一个Task,一般就是抓取第一个Segment,这是一个指针定位的过程。

Sort-Based Shuffle最大的意义是减少临时文件的输出数量,且只会产生两个文件:一个是包含不同内容,划分成不同FileSegment构成的单一文件File;另外一个是索引文件Index。图7-10中,Sort-Based Shuffle展示了一个Sort and Spill的过程(它是Spill到磁盘的时候再进行排序的)。

7.4.3 Sorted Based Shuffle数据读写的源码解析

Sorted Based Shuffle,即基于Sorted的Shuffle实现机制,在该Shuffle过程中,Sorted体现在输出的数据会根据目标的分区Id(即带Shuffle过程的目标RDD中各个分区的Id值)进行排序,然后写入一个单独的Map端输出文件中。相应地,各个分区内部的数据并不会再根据Key值进行排序,除非调用带排序目的的方法,在方法中指定Key值的Ordering实例,才会在分区内部根据该Ordering实例对数据进行排序。当Map端的输出数据超过内存容纳大小时,会将各个排序结果Spill到磁盘上,最终再将这些Spill的文件合并到一个最终的文件中。在Spark的各种计算算子中到处体现了一种惰性的理念,在此也类似,在需要提升性能时,引入根据分区Id排序的设计,同时仅在指定分区内部排序的情况下,才会全局去排序。而Hadoop的MapReduce相比之下带有一定的学术气息,中规中矩,严格设计Shuffle阶段中的各个步骤。

基于Hash的Shuffle实现,ShuffleManager的具体实现子类为HashShuffleManager,对应的具体实现机制如7-11所示。

图7-11 基于Sorted的Shuffle实现机制的框架类图

在图7-11中,各个不同的ShuffleHandle与不同的具体Shuffle写入器实现子类是一一对应的,可以认为是通过注册时生成的不同ShuffleHandle设置不同的Shuffle写入器实现子类。

从ShuffleManager注册的配置属性与具体实现子类的映射关系,即前面提及的在SparkEnv中实例化的代码,可以看出sort与tungsten-sort对应的具体实现子类都是org.apache.spark.shuffle.sort.SortShuffleManager。也就是当前基于Sort的Shuffle实现机制与使用Tungsten项目的Shuffle实现机制都是通过SortShuffleManager类来提供接口,两种实现机制的区别在于,该类中使用了不同的Shuffle数据写入器。

SortShuffleManager根据内部采用的不同实现细节,对应有两种不同的构建Map端文件输出的写方式,分别为序列化排序模式与反序列化排序模式。

(1)序列化排序(Serialized sorting)模式:这种方式对应了新引入的基于Tungsten项目的方式。

(2)反序列化排序(Deserialized sorting)模式:这种方式对应除了前面这种方式之外的其他方式。

基于Sort的Shuffle实现机制采用的是反序列化排序模式。下面分析该实现机制下的数据写入器的实现细节。

基于Sort的Shuffle实现机制,具体的写入器的选择与注册得到的ShuffleHandle类型有关,参考SortShuffleManager类的registerShuffle方法,相关代码如下所示。

Spark 2.1.1版本的SortShuffleManager.scala的源码如下。

1.  override def registerShuffle[K, V, C](
2.       shuffleId: Int,
3.       numMaps: Int,
4.        dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
5.     //通过shouldBypassMergeSort方法判断是否满足回退到Hash风格的Shuffle条件
6.      if (SortShuffleWriter.shouldBypassMergeSort(SparkEnv.get.conf,
        dependency)) {
7.    //如果当前的分区个数小于设置的配置属性:
      //spark.shuffle.sort.bypassMergeThreshold,同时不需要在Map对数据进行聚合,
      //此时可以直接写文件,并在最后将文件合并
8.
9.   new BypassMergeSortShuffleHandle[K, V](
10.         shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K,
            V, V]])
11.     } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
12.       //否则,试图Map输出缓冲区的序列化形式,因为这杨更高效
13.       new SerializedShuffleHandle[K, V](
14.         shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K,
            V, V]])
15.     } else {
16.       //否则,缓冲在反序列化形式Map输出
17.       new BaseShuffleHandle(shuffleId, numMaps, dependency)
18.     }
19.   }

Spark 2.2.0版本的SortShuffleManager.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第6行shouldBypassMergeSort方法的第一个传入参数SparkEnv.get.conf微调为conf。

1.    ......
2.      if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
3.  ......

Sorted Based Shuffle写数据的源码解析如下。

基于Sort的Shuffle实现机制中相关的ShuffleHandle包含BypassMergeSortShuffleHandle与BaseShuffleHandle。对应这两种ShuffleHandle及其相关的Shuffle数据写入器类型的相关代码可以参考SortShuffleManager类的getWriter方法,关键代码如下所示。

SortShuffleManager的getWriter的源码如下。

1.       override def getWriter[K, V](
2.        handle: ShuffleHandle,
3.        mapId: Int,
4.        context: TaskContext): ShuffleWriter[K, V] = {
5.      numMapsForShuffle.putIfAbsent(
6.        handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _,
          _]].numMaps)
7.      val env = SparkEnv.get
8.   //通过对ShuffleHandle类型的模式匹配,构建具体的数据写入器
9.      handle match {
10.       case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V
          @unchecked] =>
11.         new UnsafeShuffleWriter(
12.           env.blockManager,
13.           shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
14.           context.taskMemoryManager(),
15.           unsafeShuffleHandle,
16.           mapId,
17.           context,
18.           env.conf)
19.       case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K
         @unchecked, V @unchecked] =>
20.        new BypassMergeSortShuffleWriter(
21.          env.blockManager,
22.          shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
23.          bypassMergeSortHandle,
24.          mapId,
25.          context,
26.          env.conf)
27.      case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
28.        new SortShuffleWriter(shuffleBlockResolver, other, mapId,
           context)
29.    }
30.  }

在对应构建的两种数据写入器类BypassMergeSortShuffleWriter与SortShuffleWriter中,都是通过变量shuffleBlockResolver对逻辑数据块与物理数据块的映射进行解析,而该变量使用的是与基于Hash的Shuffle实现机制不同的解析类,即当前使用的IndexShuffleBlockResolver。

下面开始解析这两种写数据块方式的源码实现。

1.BypassMergeSortShuffleWriter写数据的源码解析

该类实现了带Hash风格的基于Sort的Shuffle机制,为每个Reduce端的任务构建一个输出文件,将输入的每条记录分别写入各自对应的文件中,并在最后将这些基于各个分区的文件合并成一个输出文件。

在Reducer端任务数比较少的情况下,基于Hash的Shuffle实现机制明显比基于Sort的Shuffle实现机制要快,因此基于Sort的Shuffle实现机制提供了一个fallback方案,对于Reducer端任务数少于配置属性spark.shuffle.sort.bypassMergeThreshold设置的个数时,使用带Hash风格的fallback计划,由BypassMergeSortShuffleWriter具体实现。

使用该写入器的条件如下所示:

(1)不能指定Ordering,从前面数据读取器的解析可以知道,当指定Ordering时,会对分区内部的数据进行排序。因此,对应的BypassMergeSortShuffleWriter写入器避免了排序开销。

(2)不能指定Aggregator。

(3)分区个数小于spark.shuffle.sort.bypassMergeThreshold配置属性指定的个数。

和其他ShuffleWriter的具体子类一样,BypassMergeSortShuffleWriter写数据的具体实现位于实现的write方法中,关键代码如下所示。

BypassMergeSortShuffleWriter.scala的write的源码如下。

1.     public void write(Iterator<Product2<K, V>> records) throws IOException {
2.   //为每个Reduce端的分区打开的DiskBlockObjectWriter存放于partitionWriters,
     //需要根据具体Reduce端的分区个数进行构建
3.
4.
5.      assert (partitionWriters == null);
6.      if (!records.hasNext()) {
7.        partitionLengths = new long[numPartitions];
8.    //初始化索引文件的内容,此时对应各个分区的数据量或偏移量需要在后续获取分区的真实
      //数据量时重写
9.
10.       shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId,
          partitionLengths, null);
11.      //下面代码的调用形式是对应在Java类中调用Scala提供的object中的apply方法
         //的形式,是由编译器编译Scala中的object得到的结果来决定的
12
13.
14.       mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(),
          partitionLengths);
15.       return;
16.     }
17.     final SerializerInstance serInstance = serializer.newInstance();
18.     final long openStartTime = System.nanoTime();
19.   //对应每个分区各配置一个磁盘写入器DiskBlockObjectWriter
20.     partitionWriters = new DiskBlockObjectWriter[numPartitions];
21.     partitionWriterSegments = new FileSegment[numPartitions];
22.     //注意,在该写入方式下,会同时打开numPartitions个DiskBlockObjectWriter,
        //因此对应的分区数不应设置过大,避免带来过大的内存开销目前对应      DiskBlock-
        //ObjectWriter的缓存大小默认配置为32KB,比早先的100KB降低了很多,但也说明
        //不适合同时打开太多的DiskBlockObjectWriter实例
23.     for (int i = 0; i < numPartitions; i++) {
24.       final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
25.         blockManager.diskBlockManager().createTempShuffleBlock();
26.       final File file = tempShuffleBlockIdPlusFile._2();
27.       final BlockId blockId = tempShuffleBlockIdPlusFile._1();
28.       partitionWriters[i] =
29.         blockManager.getDiskWriter(blockId, file, serInstance,
            fileBufferSize, writeMetrics);
30.     }
31.
32.     //创建文件写入和创建磁盘写入器都涉及与磁盘的交互,当打开许多文件时,磁盘写会花费
        //很长时间,所以磁盘写入时间应包含在Shuffle写入时间内
33.
34.     writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
35.     //读取每条记录,并根据分区器将该记录交由分区对应的DiskBlockObjectWriter,
        //写入各自对应的临时文件中
36.
37.     while (records.hasNext()) {
38.       final Product2<K, V> record = records.next();
39.       final K key = record._1();
40.       partitionWriters[partitioner.getPartition(key)].write(key,
          record._2());
41.     }
42.
43.     for (int i = 0; i < numPartitions; i++) {
44.       final DiskBlockObjectWriter writer = partitionWriters[i];
45.       partitionWriterSegments[i] = writer.commitAndGet();
46.       writer.close();
47.     }
48.      //获取最终合并后的文件名,对应格式为:"shuffle_" + shuffleId + "_" + mapId
        // + "_" + reduceId + ".index", 并且其中的       reduceId   为0,对应的含义就是
        //该文件包含所有为Reduce端输出的数据
49.     File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
50.     File tmp = Utils.tempFileWith(output);
51.     try {
52.     //在此合并前面生成的各个中间临时文件,并获取各个分区对应的数据量,由数据量可以得
        //到对应的偏移量
53.
54.       partitionLengths = writePartitionedFile(tmp);
55.  //主要是根据前面获取的数据量,重写Index文件中的偏移量信息
56.       shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId,
          partitionLengths, tmp);
57.     } finally {
58.       if (tmp.exists() && !tmp.delete()) {
59.         logger.error("Error while deleting temp file {}",
            tmp.getAbsolutePath());
60.       }
61.     }
62.  //封装并返回任务结果
63.    mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(),
       partitionLengths);
64.   }

其中调用的createTempShuffleBlock方法描述了各个分区生成的中间临时文件的格式与对应的BlockId,具体代码如下所示。

DiskBlockManager的createTempShuffleBlock的源码如下。

1.  /**中间临时文件名的格式由前缀temp_shuffle_与randomUUID组成,可以唯一标识
    BlockId*/
2.  def createTempShuffleBlock(): (TempShuffleBlockId, File) = {
3.     var blockId = new TempShuffleBlockId(UUID.randomUUID())
4.     while (getFile(blockId).exists()) {
5.       blockId = new TempShuffleBlockId(UUID.randomUUID())
6.     }
7.     (blockId, getFile(blockId))
8.   }

从上面的分析中可以知道,每个Map端的任务最终会生成两个文件,即数据(Data)文件和索引(Index)文件。

另外,使用DiskBlockObjectWriter写记录时,是以32条记录批次写入的,不会占用太大的内存。但由于对应不能指定聚合器(Aggregator),写数据时也是直接写入记录,因此对应后续的网络I/O的开销也会很大。

2.SortShuffleWriter写数据的源码解析

前面BypassMergeSortShuffleWriter的写数据是在Reducer端的分区个数较少的情况下提供的一种优化方式,但当数据集规模非常大时,使用该写数据方式不合适时,就需要使用SortShuffleWriter来写数据块。

和其他ShuffleWriter的具体子类一样,SortShuffleWriter写数据的具体实现位于实现的write方法中,关键代码如下所示。

SortShuffleWriter的write的源码如下。

1.      override def write(records: Iterator[Product2[K, V]]): Unit = {
2.  //当需要在Map端进行聚合操作时,此时将会指定聚合器(Aggregator)
3.  //将Key值的Ordering传入到外部排序器ExternalSorter中
4.      sorter = if (dep.mapSideCombine) {
5.        require(dep.aggregator.isDefined, "Map-side combine without
          Aggregator specified!")
6.        new ExternalSorter[K, V, C](
7.          context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering,
            dep.serializer)
8.      } else {
9.     //没有指定Map端使用聚合时,传入ExternalSorter的聚合器(Aggregator)
       //与Key值的Ordering都设为None,即不需要传入,对应在Reduce端读取数据
       //时才根据聚合器分区数据进行聚合,并根据是否设置Ordering而选择是否对分区
       //数据进行排序
10.       new ExternalSorter[K, V, V](
11.         context, aggregator = None, Some(dep.partitioner), ordering = None,
            dep.serializer)
12.     }
13.      //将写入的记录集全部放入外部排序器
14.     sorter.insertAll(records)
15.
16.     //不要费心在Shuffle写时间中,包括打开合并输出文件的时间,因为它只打开一个文件,
        //所以通常太快,无法精确测量(见Spark-3570)
17.     //和BypassMergeSortShuffleWriter一样,获取输出文件名和BlockId
18.     val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
19.     val tmp = Utils.tempFileWith(output)
20.     try {
21.       val blockId = ShuffleBlockId(dep.shuffleId, mapId,
          IndexShuffleBlockResolver.NOOP_REDUCE_ID)
22.  //将分区数据写入文件,返回各个分区对应的数据量
23.       val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
24.    //和BypassMergeSortShuffleWriter一样,更新索引文件的偏移量信息
25.      shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId,
         partitionLengths, tmp)
26.       mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
27.     } finally {
28.       if (tmp.exists() && !tmp.delete()) {
29.         logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
30.       }
31.     }
32.   }

在这种基于Sort的Shuffle实现机制中引入了外部排序器(ExternalSorter)。ExternalSorter继承了Spillable,因此内存使用达到一定阈值时,会Spill到磁盘,可以减少内存带来的开销。

外部排序器的insertAll方法内部在处理完(包含聚合和非聚合两种方式)每条记录时,都会检查是否需要Spill。内部各种细节比较多,这里以Spill条件判断为主线,简单描述一下条件相关的代码。具体判断是否需要Spill的相关代码可以参考Spillable类中的maybeSpill方法(该方法的简单调用流程为:ExternalSorter #insterAll–>ExternalSorter #maybeSpillCollection ->Spillable#maybeSpill),关键代码如下所示。

Spillable的maybeSpill的源码如下。

1.     protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
2.   //判断是否需要Spill
3.     var shouldSpill = false
4.   //1. 检查当前记录数是否是32的倍数——即对小批量的记录集进行Spill
5.   //2. 同时,当前需要的内存大小是否达到或超过了当前分配的内存阈值
6.     if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
7.       //从Shuffle内存池中获取当前内存的两倍
8.       val amountToRequest = 2 * currentMemory - myMemoryThreshold
9.   //实际上会先申请内存,然后再次判断,最后决定是否Spill
10.      val granted = acquireMemory(amountToRequest)
11.      myMemoryThreshold += granted
12.
13.      //内存很少时,如果准许内存进一步增长(tryToAcquire返回0,或者比
         //myMemoryThreshold更多的内存),当前的collection将会溢出
14.      shouldSpill = currentMemory >= myMemoryThreshold
15.    }
16.    //当满足下列条件之一时,需要Spill,条件如下所示:
17.    //1. 当前判断结果为true
18.    //2. 从上次Spill之后所读取的记录数超过配置的阈值时
19.    //配置属性为:spark.shuffle.spill.numElementsForceSpillThreshold
20.    shouldSpill = shouldSpill || _elementsRead >
       numElementsForceSpillThreshold
21.    //Actually spill
22.    if (shouldSpill) {
23.      _spillCount += 1
24.      logSpillage(currentMemory)
25.      spill(collection)
26.      _elementsRead = 0
27.      _memoryBytesSpilled += currentMemory
28.      releaseMemory()
29.    }
30.    shouldSpill
31.  }

对于外部排序器,除了insertAll方法外,它的writePartitionedFile方法也非常重要。

ExternalSorter.scala的writePartitionedFile的源码如下。

1.  def writePartitionedFile(
2.  blockId: BlockId,
3.  outputFile: File): Array[Long] = {

其中,BlockId是数据块的逻辑位置,File参数是对应逻辑位置的物理存储位置。这两个参数值的获取方法和使用BypassMergeSortShuffleHandle及其对应的ShuffleWriter是一样的。

在该方法中,有一个容易混淆的地方,与Shuffle的度量(Metric)信息有关,对应代码如下所示。

1.  context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)
2.  context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled)

其中,第1行对应修改了Spilled的数据在内存中的字节大小,第2行则对应修改了Spilled的数据在磁盘中的字节大小。在内存中时,数据是以反序列化形式存放的,而存储到磁盘(默认会序列化)时,会对数据进行序列化。反序列化后的数据会远远大于序列化后的数据(也可以通过UI界面查看这两个度量信息的大小差异来确认,具体差异的大小和数据以及选择的序列化器有关,有兴趣的读者可以参考各序列器间的性能等比较文档)。

从这一点也可以看出,如果在内存中使用反序列化的数据,会大大增加内存的开销(也意味着增加GC负载),并且反序列化也会增加CPU的开销,因此引入了利用Tungsten项目的基于Tungsten Sort的Shuffle实现机制。Tungsten项目的优化主要有三个方面,这里从避免反序列化的数据量会极大消耗内存这方面考虑,主要是借助Tungsten项目的内存管理模型,可以直接处理序列化的数据;同时,CPU开销方面,直接处理序列化数据,可以避免数据反序列化的这部分处理开销。