7.5 Tungsten Sorted Based Shuffle

本节讲解Tungsten Sorted Based Shuffle,包括Tungsten Sorted Based Shuffle概述、Tungsten Sorted Based内核、Tungsten Sorted Based数据读写的源码解析等内容。

7.5.1 概述

基于Tungsten Sort的Shuffle实现机制主要是借助Tungsten项目所做的优化来高效处理Shuffle。

Spark提供了配置属性,用于选择具体的Shuffle实现机制,但需要说明的是,虽然默认情况下Spark默认开启的是基于Sort的Shuffle实现机制(对应spark.shuffle.manager的默认值),但实际上,参考Shuffle的框架内核部分可知基于Sort的Shuffle实现机制与基于Tungsten Sort的Shuffle实现机制都是使用SortShuffleManager,而内部使用的具体的实现机制,是通过提供的两个方法进行判断的。对应非基于Tungsten Sort时,通过SortShuffleWriter. shouldBypassMergeSort方法判断是否需要回退到Hash风格的Shuffle实现机制,当该方法返回的条件不满足时,则通过SortShuffleManager.canUseSerializedShuffle方法判断是否需要采用基于Tungsten Sort的Shuffle实现机制,而当这两个方法返回都为false,即都不满足对应的条件时,会自动采用常规意义上的基于Sort的Shuffle实现机制。

因此,当设置了spark.shuffle.manager=tungsten-sort时,也不能保证就一定采用基于Tungsten Sort的Shuffle实现机制。有兴趣的读者可以参考Spark 1.5及之前的注册方法的实现,该实现中SortShuffleManager的注册方法仅构建了BaseShuffleHandle实例,同时对应的getWriter中也只对应构建了BaseShuffleHandle实例。

7.5.2 Tungsten Sorted Based Shuffle内核

基于Tungsten Sort的Shuffle实现机制的入口点仍然是SortShuffleManager类,与同样在SortShuffleManager类控制下的其他两种实现机制不同的是,基于Tungsten Sort的Shuffle实现机制使用的ShuffleHandle与ShuffleWriter分别为SerializedShuffleHandle与UnsafeShuffleWriter。因此,对应的具体实现机制可以用图7-12来表示,对应如下。

在Sorted Based Shuffle中,SortShuffleManager根据内部采用的不同实现细节,分别给出两种排序模式,而基于TungstenSort的Shuffle实现机制对应的就是序列化排序模式。

从图7-12中可以看到基于Sort的Shuffle实现机制,具体的写入器的选择与注册得到的ShuffleHandle类型有关,参考SortShuffleManager类的registerShuffle方法。

registerShuffle方法中会判断是否满足序列化模式的条件,如果满足,则使用基于TungstenSort的Shuffle实现机制,对应在代码中,表现为使用类型为SerializedShuffleHandle的ShuffleHandle。上述代码进一步说明了在spark.shuffle.manager设置为sort时,内部会自动选择具体的实现机制。对应代码的先后顺序,就是选择的先后顺序。

对应的序列化排序(Serialized sorting)模式需要满足的条件如下所示。

图7-12 基于TungstenSort的Shuffle实现机制的框架类图

(1)Shuffle依赖中不带聚合操作或没有对输出进行排序的要求。

(2)Shuffle的序列化器支持序列化值的重定位(当前仅支持KryoSerializer以及Spark SQL子框架自定义的序列化器)。

(3)Shuffle过程中的输出分区个数少于16 777 216个。

实际上,使用过程中还有其他一些限制,如引入那个Page形式的内存管理模型后,内部单条记录的长度不能超过128 MB(具体内存模型可以参考PackedRecordPointer类)。另外,分区个数的限制也是该内存模型导致的(同样参考PackedRecordPointer类)。

所以,目前使用基于TungstenSort的Shuffle实现机制条件还是比较苛刻的。

7.5.3 Tungsten Sorted Based Shuffle数据读写的源码解析

对应这种SerializedShuffleHandle及其相关的Shuffle数据写入器类型的相关代码,可以参考SortShuffleManager类的getWriter方法,关键代码如下所示。

SortShuffleManager.scala的源码如下。

1.        /** 为指定的分区提供一个数据写入器。该方法在Map端的Tasks中调用*/
2.  override def getWriter[K, V](
3.        handle: ShuffleHandle,
4.        mapId: Int,
5.        context: TaskContext): ShuffleWriter[K, V] = {
6.      numMapsForShuffle.putIfAbsent(
7.        handle.shuffleId,        handle.asInstanceOf[BaseShuffleHandle[_,  _,
          _]].numMaps)
8.      val env = SparkEnv.get
9.      handle match {
10.   //SerializedShuffleHandle对应的写入器为UnsafeShuffleWriter
      //使用的数据块逻辑与物理映射关系仍然为IndexShuffleBlockResolver,对应
      //SortShuffleManager中的变量,因此相同
11.      case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V
         @unchecked] =>
12.        new UnsafeShuffleWriter(
13.          env.blockManager,
14.          shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
15.          context.taskMemoryManager(),
16.          unsafeShuffleHandle,
17.          mapId,
18.          context,
19.          env.conf)
20.      case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K
@         unchecked, V @unchecked] =>
21.        new BypassMergeSortShuffleWriter(
22.          env.blockManager,
23.          shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
24.          bypassMergeSortHandle,
25.          mapId,
26.          context,
27.          env.conf)
28.      case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
29.        new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
30
31.    }
32.  }

数据写入器类UnsafeShuffleWriter中使用SortShuffleManager实例中的变量shuffleBlockResolver来对逻辑数据块与物理数据块的映射进行解析,而该变量使用的是与基于Hash的Shuffle实现机制不同的解析类,即当前使用的IndexShuffleBlockResolver。

UnsafeShuffleWriter构建时传入了一个与其他两种基于Sorted的Shuffle实现机制不同的参数:context.taskMemoryManager(),在此构建了一个TaskMemoryManager实例并传入UnsafeShuffleWriter。TaskMemoryManager与Task是一对一的关系,负责管理分配给Task的内存。

下面开始解析写数据块的UnsafeShuffleWriter类的源码实现。首先来看其write的方法。

UnsafeShuffleWriter.scala的源码如下。

1.      public void write(scala.collection.Iterator<Product2<K, V>> records)
        throws IOException {
2.   ......
3.     boolean success = false;
4.     try {
5.     //对输入的记录集 records,循环将每条记录插入到外部排序器
6.       while (records.hasNext()) {
7.         insertRecordIntoSorter(records.next());
8.       }
9.       closeAndWriteOutput();
10.      //生成最终的两个结果文件,和Sorted Based Shuffle的实现机制一样,每个Map
         //端的任务对应生成一个数据(Data)文件和对应的索引(Index)文件
11.
12.
13.      success = true;
14.    } finally {
15.      if (sorter != null) {
16.     try {
17.       sorter.cleanupResources();
18.     } catch (Exception e) {
19.  .......

写过程的关键步骤有三步。

(1)通过insertRecordIntoSorter(records.next())方法将每条记录插入外部排序器。

(2)closeAndWriteOutput方法写数据文件与索引文件,在写的过程中,会先合并外部排序器在插入过程中生成的Spill中间文件。

(3)sorter.cleanupResources()最后释放外部排序器的资源。

首先查看将每条记录插入外部排序器(ShuffleExternalSorter)时所使用的insertRecordIntoSorter方法,其关键代码如下所示。

UnsafeShuffleWriter.scala的源码如下。

1.        void insertRecordIntoSorter(Product2<K, V> record) throws IOException {
2.      assert(sorter != null);
3.    //对于多次访问的Key值,使用局部变量,可以避免多次函数调用
4.      final K key = record._1();
5.      final int partitionId = partitioner.getPartition(key);
6.   //先复位存放每条记录的缓冲区,内部使用ByteArrayOutputStream存放每条记录,容量
     //为1MB
7.
8.
9.      serBuffer.reset();
10.  //进一步使用序列化器从serBuffer缓冲区构建序列化输出流,将记录写入到缓冲区
11.     serOutputStream.writeKey(key, OBJECT_CLASS_TAG);
12.     serOutputStream.writeValue(record._2(), OBJECT_CLASS_TAG);
13.     serOutputStream.flush();
14.
15.     final int serializedRecordSize = serBuffer.size();
16.     assert (serializedRecordSize > 0);
17.    //将记录插入到外部排序器中,serBuffer是一个字节数组,内部数据存放的偏移量为
       //Platform.BYTE_ARRAY_OFFSET
18.
19.
20.     sorter.insertRecord(
21.       serBuffer.getBuf(), Platform.BYTE_ARRAY_OFFSET, serializedRecordSize,
          partitionId);
22.   }

下面继续查看第二步写数据文件与索引文件的closeAndWriteOutput方法,其关键代码如下所示。

closeAndWriteOutput的源码如下。

1.   void closeAndWriteOutput() throws IOException {
2.    assert(sorter != null);
3.    updatePeakMemoryUsed();
4.    //设为null,用于GC垃圾回收
5.    serBuffer = null;
6.    serOutputStream = null;
7.    //关闭外部排序器,并获取全部Spill信息
8.    final SpillInfo[] spills = sorter.closeAndGetSpills();
9.    sorter = null;
10.   final long[] partitionLengths;
11.     //通过块解析器获取输出文件名
12.     final File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
13.     //在后续合并Spill文件时先使用临时文件名,最终再重命名为真正的输出文件名,
        //即在writeIndexFileAndCommit方法中会重复通过块解析器获取输出文件名
14.     final File tmp = Utils.tempFileWith(output);
15.     try {
16.       try {
17.         partitionLengths = mergeSpills(spills, tmp);
18.       } finally {
19.         for (SpillInfo spill : spills) {
20.           if (spill.file.exists() && ! spill.file.delete()) {
21.             logger.error("Error while deleting spill file {}",
                spill.file.getPath());
22.           }
23.         }
24.       }
25.
26.  //将合并Spill后获取的分区及其数据量信息写入索引文件,并将临时数据文件重命名为
     //真正的数据文件名
27.
28.       shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId,
          partitionLengths, tmp);
29.     } finally {
30.       if (tmp.exists() && !tmp.delete()) {
31.         logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
32.       }
33.     }
34.    mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(),
       partitionLengths);
35.   }

closeAndWriteOutput方法主要有以下三步。

(1)触发外部排序器,获取Spill信息。

(2)合并中间的Spill文件,生成数据文件,并返回各个分区对应的数据量信息。

(3)根据各个分区的数据量信息生成数据文件对应的索引文件。

writeIndexFileAndCommit方法和Sorted Based Shuffle机制的实现一样,在此仅分析过程中不同的Spill文件合并步骤,即mergeSpills方法的具体实现。

UnsafeShuffleWriter.scala的mergeSpills方法的源码如下。

1.   /**
       * 合并 0 个或多个Spill的中间文件,基于Spills的个数以及I/O压缩码选择最
       * 快速的合并策略。返回包含合并文件中各个分区的数据长度的数组。
2.     * /
3.
4.
5.  private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws
    IOException {
6.
7.
8.  //获取Shuffle的压缩配置信息
9.      final    boolean   compressionEnabled       =  sparkConf.getBoolean("spark.
        shuffle.compress", true);
10.     final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.
        createCodec(sparkConf);
11.   //获取是否启动unsafe的快速合并
12.    final boolean fastMergeEnabled =
13.      sparkConf.getBoolean("spark.shuffle.unsafe.fastMergeEnabled", true);
14.
15.  //没有压缩或者当压缩码支持序列化流合并时,支持快速合并
16.    final boolean fastMergeIsSupported = !compressionEnabled ||
17.      CompressionCodec$.MODULE$.supportsConcatenationOfSerializedStreams
         (compressionCodec);
18.    final boolean encryptionEnabled = blockManager.serializerManager().
       encryptionEnabled();
19.    try {
20.
21.        //没有中间的Spills文件时,创建一个空文件,并返回包含分区数据长度的
           //空数组。后续读取时会过滤掉空文件
22.
23.
24.      if (spills.length == 0) {
25.        new FileOutputStream(outputFile).close(); //Create an empty file
26.        return new long[partitioner.numPartitions()];
27.      } else if (spills.length == 1) {
28.
29.        //最后一个Spills文件已经更新 metrics 信息,因此不需要重复更新,直接
           //重命名Spills的中间临时文件为目标输出的数据文件,同时将该Spills中间
           //文件的各分区数据长度的数组返回即可
30.        Files.move(spills[0].file, outputFile);
31.        return spills[0].partitionLengths;
32.      } else {
33.        final long[] partitionLengths;
34.       //当存在多个Spill 中间文件时,根据不同的条件,采用不同的文件合并策略
35.
36.        if (fastMergeEnabled && fastMergeIsSupported) {
37.         //由spark.file.transferTo配置属性控制,默认为true
38.          if (transferToEnabled && !encryptionEnabled) {
39.            logger.debug("Using transferTo-based fast merge");
40.
41.            //通过 NIO 的方式合并各个Spills的分区字节数据
42.            //仅在 I/O 压缩码和序列化器支持序列化流的合并时安全
43.
44.            partitionLengths = mergeSpillsWithTransferTo(spills, outputFile);
45.          } else {
46.            logger.debug("Using fileStream-based fast merge");
47.            //使用 Java FileStreams文件流的方式进行合并
48.            partitionLengths = mergeSpillsWithFileStream(spills, outputFile,
               null);
49.          }
50.        } else {
51.          logger.debug("Using slow merge");
52.         partitionLengths = mergeSpillsWithFileStream(spills, outputFile,
            compressionCodec);
53.        }
54.        //更新Shuffle写数据的度量信息
55.        writeMetrics.decBytesWritten(spills[spills.length - 1].file.
           length());
56.        writeMetrics.incBytesWritten(outputFile.length());
57.        return partitionLengths;
58.      }
59.    } catch (IOException e) {
60.      if (outputFile.exists() && !outputFile.delete()) {
61.
62.        logger.error("Unable to delete output file {}", outputFile.
           getPath());
63.      }
64.      throw e;
65.    }
66.  }

各种合并策略在性能上具有一定差异,会根据具体的条件采用,主要有基于Java NIO(New I/O)和基于普通文件流合并文件的方式。下面简单描述一下基于文件合并流的处理过程,代码如下所示:

UnsafeShuffleWriter.scala的mergeSpillsWithFileStream方法的源码如下。

1.    /** 使用 Java FileStreams文件流的方式合并*/
2.  private long[] mergeSpillsWithFileStream(
3.        SpillInfo[] spills,
4.        File outputFile,
5.        @Nullable CompressionCodec compressionCodec) throws IOException {
6.      assert (spills.length >= 2);
7.      final int numPartitions = partitioner.numPartitions();
8.      final long[] partitionLengths = new long[numPartitions];
9.
10. //对应打开的输入流的个数为Spills的临时文件个数
11.     final InputStream[] spillInputStreams = new FileInputStream
        [spills.length];
12.
13.    //使用计数输出流避免关闭基础文件并询问文件系统在每个分区写入文件大小
14.
15.     final CountingOutputStream mergedFileOutputStream = new
        CountingOutputStream(
16.       new FileOutputStream(outputFile));
17.
18.     boolean threwException = true;
19.     try {
20.       //为每个Spills中间文件打开文件输入流
21.       for (int i = 0; i < spills.length; i++) {
22.         spillInputStreams[i] = new FileInputStream(spills[i].file);
23.       }
24.       //遍历分区
25.       for (int partition = 0; partition < numPartitions; partition++) {
26.         final long initialFileLength = mergedFileOutputStream.getByteCount();
27.         //屏蔽底层输出流的close()调用,以便能够关闭高层流,以确保所有数据都真正刷
            //新并清除内部状态
28.
29.         OutputStream partitionOutput = new CloseShieldOutputStream(
30.           new TimeTrackingOutputStream(writeMetrics, mergedFileOutputStream));
31.         partitionOutput = blockManager.serializerManager().wrapForEncryption
            (partitionOutput);
32.         if (compressionCodec != null) {
33.           partitionOutput = compressionCodec.compressedOutputStream
              (partitionOutput);
34.         }
35.
36.         //依次从各个 Spills 输入流中读取当前分区的数据长度指定个数的字节,到各个分
            //区对应的输出文件流中
37.         for (int i = 0; i < spills.length; i++) {
38.           final long partitionLengthInSpill = spills[i].partitionLengths
              [partition];
39.           if (partitionLengthInSpill > 0) {
40.             InputStream partitionInputStream = new LimitedInputStream
                (spillInputStreams[i],
41.               partitionLengthInSpill, false);
42.             try {
43.               partitionInputStream = blockManager.serializerManager()
                  .wrapForEncryption(
44.                 partitionInputStream);
45.               if (compressionCodec != null) {
46.                 partitionInputStream = compressionCodec.compressedInputStream
                    (partitionInputStream);
47.               }
48.               ByteStreams.copy(partitionInputStream, partitionOutput);
49.             } finally {
50.               partitionInputStream.close();
51.             }
52.           }
53.         }
54.         partitionOutput.flush();
55.         partitionOutput.close();
56.  //将当前写入的数据长度存入返回的数组中
57.         partitionLengths[partition] = (mergedFileOutputStream
            .getByteCount() - initialFileLength);
58.       }
59.       threwException = false;
60.     } finally {
61.       //为了避免屏蔽异常以后导致过早进入finally块的异常处理,只能在清理过程中抛出
          //异常
62.
63.       for (InputStream stream : spillInputStreams) {
64.         Closeables.close(stream, threwException);
65.       }
66.       Closeables.close(mergedFileOutputStream, threwException);
67.     }
68.     return partitionLengths;
69.   }

基于NIO的文件合并流程基本类似,只是底层采用NIO的技术实现。