7.6 Shuffle与Storage模块间的交互

在Spark中,存储模块被抽象成Storage。顾名思义,Storage是存储的意思,代表着Spark中的数据存储系统,负责管理和实现数据块(Block)的存放。其中存取数据的最小单元是Block,数据由不同的Block组成,所有操作都是以Block为单位进行的。从本质上讲,RDD中的Partition和Storage中的Block是等价的,只是所处的模块不同,看待的角度不一样而已。

Storage抽象模块的实现分为两个层次,如图7-13所示。

(1)通信层:通信层是典型的Master-Slave结构,Master和Slave之间传输控制和状态信息。通信层主要由BlockManager、BlockManagerMaster、BlockManagerMasterEndpoint、BlockManagerSlaveEndpoint等类实现。

(2)存储层:负责把数据存储到内存、磁盘或者堆外内存中,有时还需要为数据在远程节点上生成副本,这些都由存储层提供的接口实现。Spark 2.2.0具体的存储层的实现类有DiskStore和MemoryStore。

图7-13 Storage存储模块

Shuffle模块若要和Storage模块进行交互,需要通过调用统一的操作类BlockManager来完成。如果把整个存储模块看成一个黑盒,BlockManager就是黑盒上留出的一个供外部调用的接口。

7.6.1 Shuffle注册的交互

Spark中BlockManager在Driver端的创建,在SparkContext创建的时候会根据具体的配置创建SparkEnv对象,源码如下所示。

SparkContext.scala的源码如下。

1.   _env = createSparkEnv(_conf, isLocal, listenerBus)
2.      SparkEnv.set(_env)
3.  .......
4.  private[spark] def createSparkEnv(
5.        conf: SparkConf,
6.        isLocal: Boolean,
7.        listenerBus: LiveListenerBus): SparkEnv = {
8.  //创建Driver端的运行环境
9.      SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext
        .numDriverCores(master))
10.   }

createSparkEnv方法中,传入SparkConf配置对象、isLocal标志,以及LiveListenerBus,方法中使用SparkEnv对象的createDriverEnv方法创建SparkEnv并返回。在SparkEnv的createDriverEvn方法中,将会创建BlockManager、BlockManagerMaster等对象,完成Storage在Driver端的部署。

SparkEnv中创建BlockManager、BlockManagerMaster关键源码如下所示。

SparkEnv.scala的源码如下。

1.     val blockTransferService =
2.        new NettyBlockTransferService(conf, securityManager, bindAddress,
          advertiseAddress, blockManagerPort, numUsableCores)
3.
4.  //创建BlockManagerMasterEndpoint
5.      val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
6.        BlockManagerMaster.DRIVER_ENDPOINT_NAME,
7.    //创建BlockManagerMasterEndpoint
8.        new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
9.        conf, isDriver)
10.  //创建BlockManager
11.     //注:blockManager无效,直到initialize()被调用
12.     val blockManager = new BlockManager(executorId, rpcEnv,
        blockManagerMaster,
13.       serializerManager, conf, memoryManager, mapOutputTracker,
          shuffleManager,
14.       blockTransferService, securityManager, numUsableCores)

使用new关键字实例化出BlockManagerMaster,传入BlockManager的构造函数,实例化出BlockManager对象。这里的BlockManagerMaster和BlockManager属于聚合关系。BlockManager主要对外提供统一的访问接口,BlockManagerMaster主要对内提供各节点之间的指令通信服务。

构建BlockManager时,传入shuffleManager参数,shuffleManager是在SparkEnv中创建的,将shuffleManager传入到BlockManager中,BlockManager就拥有shuffleManager的成员变量,从而可以调用shuffleManager的相关方法。

BlockManagerMaster在Driver端和Executors中的创建稍有差别。首先来看在Driver端创建的情形。创建BlockManagerMaster传入的isDriver参数,isDriver为true,表示在Driver端创建,否则视为在Slave节点上创建。

当SparkContext中执行_env.blockManager.initialize(_applicationId)代码时,会调用Driver端BlockManager的initialize方法。Initialize方法的源码如下所示。

SparkContext.scala的源码如下。

1.  _env.blockManager.initialize(_applicationId)

Spark 2.1.1版本的BlockManager.scala的源码如下。

1.   def initialize(appId: String): Unit = {
2.   //调用blockTransferService的init方法,blockTransferService用于在不同节点
     //fetch数据、传送数据
3.     blockTransferService.init(this)
4.   //shuffleClient用于读取其他Executor上的shuffle files
5.      shuffleClient.init(appId)
6.
7.      blockReplicationPolicy = {
8.        val priorityClass = conf.get(
9.          "spark.storage.replication.policy", classOf
            [RandomBlockReplicationPolicy].getName)
10.       val clazz = Utils.classForName(priorityClass)
11.       val ret = clazz.newInstance.asInstanceOf[BlockReplicationPolicy]
12.       logInfo(s"Using $priorityClass for block replication policy")
13.       ret
14.     }
15.
16.     val id =
17.       BlockManagerId(executorId, blockTransferService.hostName,
          blockTransferService.port, None)
18.
19.   //向blockManagerMaster注册BlockManager。在registerBlockManager方法中传
      //入了  slaveEndpoint,slaveEndpoint      为  BlockManager   中的 RPC  对象,用于和
      //blockManagerMaster通信
20.     val idFromMaster = master.registerBlockManager(
21.       id,
22.       maxMemory,
23.       slaveEndpoint)
24.   //得到blockManagerId
25.     blockManagerId = if (idFromMaster != null) idFromMaster else id
26.
27. //得到shuffleServerId
28.     shuffleServerId = if (externalShuffleServiceEnabled) {
29.       logInfo(s"external shuffle service port = $externalShuffleServicePort")
30.       BlockManagerId(executorId, blockTransferService.hostName,
          externalShuffleServicePort)
31.     } else {
32.       blockManagerId
33.     }
34.  //注册shuffleServer
35.     //如果存在,将注册Executors配置与本地shuffle服务
36.     if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
37.       registerWithExternalShuffleServer()
38.     }
39.
40.     logInfo(s"Initialized BlockManager: $blockManagerId")
41.   }

Spark 2.2.0版本的BlockManager.scala的源码与Spark 2.1.1版本相比具有如下特点。

 上段代码中第22行删除maxMemory。

 上段代码中第22行之后新增参数maxOnHeapMemory:最大的堆内存大小。

 上段代码中第22行之后新增参数maxOffHeapMemory:最大的堆外存大小。

1.  .....
2.        maxOnHeapMemory,
3.        maxOffHeapMemory,
4.  ...

如上面的源码所示,initialize方法使用appId初始化BlockManager,主要完成以下工作。

(1)初始化BlockTransferService。

(2)初始化ShuffleClient。

(3)创建BlockManagerId。

(4)将BlockManager注册到BlockManagerMaster上。

(5)若ShuffleService可用,则注册ShuffleService。

在BlockManager的initialize方法上右击Find Usages,可以看到initialize方法在两个地方得到调用:一个是SparkContext;另一个是Executor。启动Executor时,会调用BlockManager的initialize方法。Executor中调用initialize方法的源码如下所示。

Executor.scala的源码如下。

1.  //CoarseGrainedExecutorBackend中实例化Executor,isLocal设置成false,即
     //Executor中isLocal始终为false
2.
3.   if (!isLocal) {
4.  //向度量系统注册
5.    //env.metricsSystem.registerSource(executorSource)
6.   //调用BlockManager的initialize方法,initialize方法将向BlockManagerMaster
     //注册,完成Executor中的BlockManager向Driver中的BlockManager注册
7.      env.blockManager.initialize(conf.getAppId)
8.    }

上面代码中调用了env.blockManager.initialize方法。在initialize方法中,完成BlockManger向Master端BlockManagerMaster的注册。使用方法master.registerBlockManager (id,maxMemory,slaveEndpoint)完成注册,registerBlockManager方法中传入Id、maxMemory、salveEndPoint引用,分别表示Executor中的BlockManager、最大内存、BlockManager中的BlockMangarSlaveEndpoint。BlockManagerSlaveEndpoint是一个RPC端点,使用它完成同BlockManagerMaster的通信。BlockManager收到注册请求后将Executor中注册的BlockManagerInfo存入哈希表中,以便通过BlockManagerSlaveEndpoint向Executor发送控制命令。

ShuffleManager是一个用于shuffle系统的可插拔接口。在Driver端SparkEnv中创建ShuffleManager,在每个Executor上也会创建。基于spark.shuffle.manager进行设置。Driver使用ShuffleManager注册到shuffles系统,Executors(或Driver在本地运行的任务)可以请求读取和写入数据。这将被SparkEnv的SparkConf和isDriver布尔值作为参数。

ShuffleManager.scala的源码如下。

1.   private[spark] trait ShuffleManager {
2.
3.    /**
4.      *注册一个shuffle管理器,获取一个句柄传递给任务
5.      */
6.    def registerShuffle[K, V, C](
7.        shuffleId: Int,
8.        numMaps: Int,
9.        dependency: ShuffleDependency[K, V, C]): ShuffleHandle
10.
11.   /**为给定分区获取一个写入器。Executors节点通过Map任务调用*/
12.   def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context:
      TaskContext): ShuffleWriter[K, V]
13.
14.   /**
        *获取读取器汇聚一定范围的分区(从   startPartition     到 endPartition-1)。在
        *Executors节点,通过reduce 任务调用
15.     */
16.
17.   def getReader[K, C](
18.       handle: ShuffleHandle,
19.       startPartition: Int,
20.       endPartition: Int,
21.       context: TaskContext): ShuffleReader[K, C]
22.
23.   /**
24.     *从ShuffleManager移除一个shuffle的元数据
25.     * @return如果元数据成功删除,则返回true,否则返回false
26.     */
27.   def unregisterShuffle(shuffleId: Int): Boolean
28.
29.   /**
        * 返回一个能够根据块坐标来检索shuffle 块数据的解析器
30.     */
31.   def shuffleBlockResolver: ShuffleBlockResolver
32.
33.   /** 关闭ShuffleManager */
34.   def stop(): Unit
35. }

Spark Shuffle Pluggable框架ShuffleBlockManager在Spark 1.6.0之后改成了ShuffleBlockResolver。ShuffleBlockResolver具体读取shuffle数据,是一个trait。在ShuffleBlockResolver中已无getBytes方法。getBlockData(blockId: ShuffleBlockId)方法返回的是ManagedBuffer,这是核心。

ShuffleBlockResolver的源码如下。

1.     trait ShuffleBlockResolver {
2.    type ShuffleId = Int
3.
4.    /**
        *为指定的块检索数据。如果块数据不可用,则抛出一个未指明的异常
5.      */
6.    def getBlockData(blockId: ShuffleBlockId): ManagedBuffer
7.
8.    def stop(): Unit
9.  }

Spark 2.0版本中通过IndexShuffleBlockResolver来具体实现ShuffleBlockResolver (SortBasedShuffle方式),已无FileShuffleBlockManager(Hashshuffle方式)。IndexShuffle-BlockResolver创建和维护逻辑块和物理文件位置之间的shuffle blocks映射关系。来自于相同map task任务的shuffle blocks数据存储在单个合并数据文件中;数据文件中的数据块的偏移量存储在单独的索引文件中。将shuffleBlockId + reduce ID set to 0 + ".后缀"作为数据shuffle data的shuffleBlockId名字。其中,文件名后缀为".data"的是数据文件;文件名后缀为".index"的是索引文件。

7.6.2 Shuffle写数据的交互

基于Sort的Shuffle实现的ShuffleHandle包含BypassMergeSortShuffleHandle与BaseShuffleHandle。两种ShuffleHandle写数据的方法可以参考SortShuffleManager类的getWriter方法,关键代码如下所示。

SortShuffleManager的getWriter的源码如下。

1.     override def getWriter[K, V](
2.  .......
3.      case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K
        @unchecked, V @unchecked] =>
4.        new BypassMergeSortShuffleWriter(
5.           env.blockManager,
6.           shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
7.         ........
8.       case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
9.         new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
10.    }
11.  }

在对应构建的两种数据写入器类BypassMergeSortShuffleWriter与SortShuffleWriter中,都是通过变量shuffleBlockResolver对逻辑数据块与物理数据块的映射进行解析。BypassMergeSortShuffleWriter写数据的具体实现位于实现的write方法中,其中调用的createTempShuffleBlock方法描述了各个分区所生成的中间临时文件的格式与对应的BlockId。SortShuffleWriter写数据的具体实现位于实现的write方法中。

7.6.3 Shuffle读数据的交互

SparkEnv.get.shuffleManager.getReader是SortShuffleManager的getReader,是获取数据的阅读器,getReader方法中创建了一个BlockStoreShuffleReader实例。SortShuffleManager. scala的read()方法的源码如下。

1.  override def getReader[K, C](
2.      handle: ShuffleHandle,
3.      startPartition: Int,
4.      endPartition: Int,
5.      context: TaskContext): ShuffleReader[K, C] = {
6.    new BlockStoreShuffleReader(
7.      handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition,
        endPartition, context)
8.  }

BlockStoreShuffleReader实例的read方法,首先实例化new ShuffleBlockFetcherIterator。ShuffleBlockFetcherIterator是一个阅读器,里面有一个成员blockManager。blockManager是内存和磁盘上数据读写的统一管理器;ShuffleBlockFetcherIterator.scala的initialize方法中splitLocalRemoteBlocks()划分本地和远程的blocks,Utils.randomize(remoteRequests)把远程请求通过随机的方式添加到队列中,fetchUpToMaxBytes()发送远程请求获取我们的blocks,fetchLocalBlocks()获取本地的blocks。

7.6.4 BlockManager架构原理、运行流程图和源码解密

BlockManager是管理整个Spark运行时数据的读写,包含数据存储本身,在数据存储的基础上进行数据读写。由于Spark是分布式的,所以BlockManager也是分布式的,BlockManager本身相对而言是一个比较大的模块,Spark中有非常多的模块:调度模块、资源管理模块等。BlockManager是另外一个非常重要的模块。BlockManager本身的源码量非常大。本节从BlockManager原理流程对BlockManager做深刻地讲解。在Shuffle读写数据的时候,我们需要读写BlockManager。因此,BlockManager是至关重要的内容。

编写一个业务代码WordCount.scala,通过观察WordCount运行时BlockManager的日志来理解BlockManager的运行。

WordCount.scala的代码如下。

1.  package com.dt.spark.sparksql
2.
3.  import org.apache.log4j.{Level, Logger}
4.  import org.apache.spark.SparkConf
5.  import org.apache.spark.SparkContext
6.  import org.apache.spark.internal.config
7.  import org.apache.spark.rdd.RDD
8.
9.  /**
10.   * 使用Scala开发本地测试的Spark WordCount程序
11.   *
12.   * @author DT大数据梦工厂
13.   *         新浪微博:http://weibo.com/ilovepains/
14.   */
15. object WordCount {
16.   def main(args: Array[String]) {
17.     Logger.getLogger("org").setLevel(Level.ALL)
18.
19.     /**
20.       *第1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,
21.       *例如,通过setMaster设置程序要链接的Spark集群的Master的URL,如果设置
22.       *为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(如只有
23.       *1GB的内存)的初学者       *
24.       */
25.     val conf = new SparkConf() //创建SparkConf对象
26.     conf.setAppName("Wow,My First Spark App!") //设置应用程序的名称,在程序
                                                            //运行的监控界面可以看到名称
27.     conf.setMaster("local") //此时,程序在本地运行,不需要安装Spark集群
28.     /**
29.       * 第2步:创建SparkContext对象
30.       * SparkContext是Spark程序所有功能的唯一入口,采用Scala、Java、Python、
          * R等都必须有一个SparkContext
31.       * SparkContext    核心作用:初始化  Spark  应用程序运行所需要的核心组件,包括
          * DAGScheduler、TaskScheduler、SchedulerBackend
32.       * 同时还会负责Spark程序往Master注册程序等
33.       * SparkContext是整个Spark应用程序中最重要的一个对象
34.       */
35.     val sc = new SparkContext(conf)
                                 //创建SparkContext对象,通过传入SparkConf
                                 //实例来定制Spark运行的具体参数和配置信息
36.     /**
37.       * 第 3  步:根据具体的数据来源(如  HDFS、HBase、Local FS、DB、S3      等)通过
          * SparkContext创建RDD
38.       * RDD的创建基本有三种方式:根据外部的数据来源(如HDFS)、根据Scala集合、由
          * 其他的RDD操作
39.       * 数据会被RDD划分为一系列的Partitions,分配到每个Partition的数据属于一
          * 个Task的处理范畴
40.       */
41.     //val lines: RDD[String] = sc.textFile("D://Big_Data_Software spark-
        1.6.0-bin-hadoop2.6README.md", 1) //读取本地文件并设置为一个Partition
42.     // val lines = sc.textFile("D://Big_Data_Software spark-1.6.0-bin-
        hadoop2.6//README.md", 1) //读取本地文件并设置为一个Partition
43.
44.     val lines = sc.textFile("data/wordcount/helloSpark.txt", 1)
                                          //读取本地文件并设置为一个Partition
45.     /**
46.       * 第4步:对初始的RDD进行Transformation级别的处理,如通过map、filter等
          * 高阶函数等的编程,进行具体的数据计算
47.       * 第4.1步:将每一行的字符串拆分成单个单词
48.       */
49.
50.     val words = lines.flatMap { line => line.split(" ") }
    //对每一行的字符串进行单词拆分并把所有行的拆分结果通过flat合并成为一个大的单词集合
51.
52.     /**
53.       * 第4步:对初始的RDD进行Transformation级别的处理,如通过map、filter等
          * 高阶函数等的编程,进行具体的数据计算
54.       * 第4.2步:在单词拆分的基础上,对每个单词实例计数为1,也就是word => (word, 1)
55.       */
56.     val pairs = words.map { word => (word, 1) }
57.
58.     /**
59.       * 第4步:对初始的RDD进行Transformation级别的处理,如通过map、filter等
          * 高阶函数等的编程,进行具体的数据计算
60.       * 第4.3步:在每个单词实例计数为1基础上,统计每个单词在文件中出现的总次数
61.       */
62.     val wordCountsOdered = pairs.reduceByKey(_ + _).map(pair => (pair._2,
        pair._1)).sortByKey(false).map(pair => (pair._2, pair._1))
         //对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce)
63.     wordCountsOdered.collect.foreach(wordNumberPair => println
        (wordNumberPair._1 + " : " + wordNumberPair._2))
64.     while (true) {
65.
66.     }
67.     sc.stop()
68.
69.   }
70. }

在IDEA中运行一个业务程序WordCount.scala,日志中显示:

 SparkEnv:Registering MapOutputTracker,其中MapOutputTracker中数据的读写都和BlockManager关联。

 SparkEnv:Registering BlockManagerMaste,其中Registering BlockManagerMaster由BlockManagerMaster进行注册。

 DiskBlockManager:Created local directory C:\Users\dell\AppData\Local\Temp\blockmgr-...其中DiskBlockManager是管理磁盘存储的,里面有我们的数据。可以访问Temp目录下以blockmgr-开头的文件的内容。

WordCount运行结果如下。

1.   Using Spark's default log4j profile: org/apache/spark/log4j-defaults.
     properties
2.  17/06/06 05:37:57 INFO SparkContext: Running Spark version 2.1.0
3.  ......
4.  17/06/06 05:38:01 INFO SparkEnv: Registering MapOutputTracker
5.  17/06/06 05:38:01 DEBUG MapOutputTrackerMasterEndpoint: init
6.  17/06/06 05:38:01 INFO SparkEnv: Registering BlockManagerMaster
7.  17/06/06 05:38:01 INFO BlockManagerMasterEndpoint: Using org.apache
    .spark.storage.DefaultTopologyMapper for getting topology information
8.  17/06/06 05:38:01 INFO BlockManagerMasterEndpoint:
    BlockManagerMasterEndpoint up
9.  17/06/06 05:38:01 INFO DiskBlockManager: Created local directory at
    C:\Users\dell\AppData\Local\Temp\blockmgr-a58a44dd-484b-4871-a92a-828
    872c98804
10. 17/06/06 05:38:01 DEBUG DiskBlockManager: Adding shutdown hook
11. 17/06/06 05:38:01 DEBUG ShutdownHookManager: Adding shutdown hook
12. 17/06/06 05:38:01 INFO MemoryStore: MemoryStore started with capacity
    637.2 MB
13. 17/06/06 05:38:02 INFO SparkEnv: Registering OutputCommitCoordinator
14. 17/06/06 05:38:02 DEBUG OutputCommitCoordinator$OutputCommitCoordinator-
    Endpoint: init
15. ........

从Application启动的角度观察BlockManager:

(1)Application启动时会在SparkEnv中注册BlockManagerMaster以及MapOutputTracker,其中,

a)BlockManagerMaster:对整个集群的Block数据进行管理。

b)MapOutputTrackerMaster:跟踪所有的Mapper的输出。

BlockManagerMaster中有一个引用driverEndpoint,isDriver判断是否运行在Driver上。

BlockManagerMaster的源码如下。

1.   private[spark]
2.  class BlockManagerMaster(
3.      var driverEndpoint: RpcEndpointRef,
4.      conf: SparkConf,
5.      isDriver: Boolean)
6.    extends Logging {

BlockManagerMaster注册给SparkEnv,SparkEnv在SparkContext中。

SparkContext.scala的源码如下。

1.     ......
2.    private var _env: SparkEnv = _
3.  ......
4.    _env = createSparkEnv(_conf, isLocal, listenerBus)
5.      SparkEnv.set(_env)

进入createSparkEnv方法:

1.  private[spark] def createSparkEnv(
2.      conf: SparkConf,
3.      isLocal: Boolean,
4.      listenerBus: LiveListenerBus): SparkEnv = {
5.    SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.
      numDriverCores(master))
6.  }

进入SparkEnv.scala的createDriverEnv方法:

1.   private[spark] def createDriverEnv(
2.  ......
3.      create(
4.        conf,
5.        SparkContext.DRIVER_IDENTIFIER,
6.        bindAddress,
7.        advertiseAddress,
8.        port,
9.        isLocal,
10.       numCores,
11.       ioEncryptionKey,
12.       listenerBus = listenerBus,
13.       mockOutputCommitCoordinator = mockOutputCommitCoordinator
14.     )
15.   }
16. ......

SparkEnv.scala的createDriverEnv中调用了create方法,判断是否是Driver。create方法的源码如下。

1.    private def create(
2.        conf: SparkConf,
3.        executorId: String,
4.        bindAddress: String,
5.        advertiseAddress: String,
6.        port: Int,
7.        isLocal: Boolean,
8.        numUsableCores: Int,
9.        ioEncryptionKey: Option[Array[Byte]],
10.       listenerBus: LiveListenerBus = null,
11.       mockOutputCommitCoordinator:         Option[OutputCommitCoordinator]  =
          None): SparkEnv = {
12.
13.     val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER
14.      ......
15.     if (isDriver) {
16.       conf.set("spark.driver.port", rpcEnv.address.port.toString)
17.     } else if (rpcEnv.address != null) {
18.       conf.set("spark.executor.port", rpcEnv.address.port.toString)
19.       logInfo(s"Setting spark.executor.port to: ${rpcEnv.address.port
          .toString}")
20.     }
21.  ......
22.    val mapOutputTracker = if (isDriver) {
23.       new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
24.     } else {
25.       new MapOutputTrackerWorker(conf)
26.     }
27. ......
28. SparkContext.scala
29. private[spark] val DRIVER_IDENTIFIER = "driver"
30. ......

在SparkEnv.scala的createDriverEnv中调用new()函数创建一个MapOutputTrackerMaster。MapOutputTrackerMaster的源码如下。

1.   private[spark] class MapOutputTrackerMaster(conf: SparkConf,
2.      broadcastManager: BroadcastManager, isLocal: Boolean)
3.    extends MapOutputTracker(conf) {
4.  ......

然后看一下blockManagerMaster。在SparkEnv.scala中调用new()函数创建一个blockManagerMaster。

1.  val blockManagerMaster = new BlockManagerMaster
    (registerOrLookupEndpoint(
2.   BlockManagerMaster.DRIVER_ENDPOINT_NAME,
3.   new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
4.   conf, isDriver)

BlockManagerMaster对整个集群的Block数据进行管理,Block是Spark数据管理的单位,与数据存储没有关系,数据可能存在磁盘上,也可能存储在内存中,还可能存储在offline,如Alluxio上。源码如下。

1.  private[spark]
2.  class BlockManagerMaster(
3.      var driverEndpoint: RpcEndpointRef,
4.      conf: SparkConf,
5.      isDriver: Boolean)
6.    extends Logging {
7.  .....

构建BlockManagerMaster的时候调用new()函数创建一个BlockManagerMasterEndpoint,这是循环消息体。

1.      private[spark]
2.  class BlockManagerMasterEndpoint(
3.      override val rpcEnv: RpcEnv,
4.      val isLocal: Boolean,
5.      conf: SparkConf,
6.      listenerBus: LiveListenerBus)
7.    extends ThreadSafeRpcEndpoint with Logging {

(2)BlockManagerMasterEndpoint本身是一个消息体,会负责通过远程消息通信的方式去管理所有节点的BlockManager。

查看WordCount在IDEA中的运行日志,日志中显示BlockManagerMasterEndpoint: Registering block manager,向block manager进行注册。

1.  ......
2.   17/06/06 05:38:02 INFO BlockManager: Using org.apache.spark.storage.
     RandomBlockReplicationPolicy for block replication policy
3.  17/06/06 05:38:02 INFO BlockManagerMaster: Registering BlockManager
    BlockManagerId(driver, 192.168.93.1, 63572, None)
4.  17/06/06 05:38:02 DEBUG DefaultTopologyMapper: Got a request for 192.168.
    93.1
5.  17/06/06 05:38:02 INFO BlockManagerMasterEndpoint: Registering block
    manager 192.168.93.1:63572 with 637.2 MB RAM, BlockManagerId(driver,
    192.168.93.1, 63572, None)
6.  17/06/06 05:38:02 INFO BlockManagerMaster: Registered BlockManager
    BlockManagerId(driver, 192.168.93.1, 63572, None)
7.  17/06/06     05:38:02     INFO    BlockManager:      Initialized     BlockManager:
    BlockManagerId(driver, 192.168.93.1, 63572, None)
8.  .......

(3)每启动一个ExecutorBackend,都会实例化BlockManager,并通过远程通信的方式注册给BlockManagerMaster;实质上是Executor中的BlockManager在启动的时候注册给了Driver上的BlockManagerMasterEndpoint。

(4)MemoryStore是BlockManager中专门负责内存数据存储和读写的类。

查看WordCount在IDEA中的运行日志,日志中显示MemoryStore: Block broadcast_0 stored as values in memory,数据存储在内存中。

1.   .......
2.  17/06/06 05:38:04 INFO MemoryStore: Block broadcast_0 stored as values
    in memory (estimated size 208.5 KB, free 637.0 MB)
3.  17/06/06 05:38:04 INFO MemoryStore: Block broadcast_0_piece0 stored as
    bytes in memory (estimated size 20.0 KB, free 637.0 MB)
4.  17/06/06 05:38:04 INFO BlockManagerInfo: Added broadcast_0_piece0 in
    memory on 192.168.93.1:63572 (size: 20.0 KB, free: 637.2 MB)
5.  .......

Spark读写数据是以block为单位的,MemoryStore将block数据存储在内存中。MemoryStore.scala的源码如下。

1.   private[spark] class MemoryStore(
2.      conf: SparkConf,
3.      blockInfoManager: BlockInfoManager,
4.      serializerManager: SerializerManager,
5.      memoryManager: MemoryManager,
6.      blockEvictionHandler: BlockEvictionHandler)
7.    extends Logging {
8.  ......

(5)DiskStore是BlockManager中专门负责基于磁盘的数据存储和读写的类。

Spark 2.1.1版本的DiskStore.scala的源码如下。

1.   private[spark] class DiskStore(conf: SparkConf, diskManager:
     DiskBlockManager) extends Logging {
2.  .......

Spark 2.2.0版本的DiskStore.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第1行新增加了securityManager安全管理的成员变量。

1.  .......
2.     securityManager: SecurityManager) extends Logging {

(6)DiskBlockManager:管理Logical Block与Disk上的Physical Block之间的映射关系并负责磁盘文件的创建、读写等。

查看WordCount在IDEA中的运行日志,日志中显示INFO DiskBlockManager: Created local directory。DiskBlockManager负责磁盘文件的管理。

1.   .....
2.  17/06/06 05:38:01 INFO BlockManagerMasterEndpoint: Using org.apache.
    spark.storage.DefaultTopologyMapper for getting topology information
3.  17/06/06 05:38:01 INFO BlockManagerMasterEndpoint:
    BlockManagerMasterEndpoint up
4.  17/06/06 05:38:01 INFO DiskBlockManager: Created local directory at
    C:\Users\dell\AppData\Local\Temp\blockmgr-a58a44dd-484b-4871-a92a-828
    872c98804
5.  17/06/06 05:38:01 DEBUG DiskBlockManager: Adding shutdown hook
6.  .......

DiskBlockManager负责管理逻辑级别和物理级别的映射关系,根据BlockID映射一个文件。在目录spark.local.dir或者SPARK_LOCAL_DIRS中,Block文件进行hash生成。通过createLocalDirs生成本地目录。DiskBlockManager的源码如下。

1.   private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop:
     Boolean) extends Logging {
2.  ......
3.  private def createLocalDirs(conf: SparkConf): Array[File] = {
4.      Utils.getConfiguredLocalDirs(conf).flatMap { rootDir =>
5.        try {
6.          val localDir = Utils.createDirectory(rootDir, "blockmgr")
7.          logInfo(s"Created local directory at $localDir")
8.          Some(localDir)
9.        } catch {
10.         case e: IOException =>
11.           logError(s"Failed to create local dir in $rootDir. Ignoring this
              directory.", e)
12.           None
13.       }
14.     }
15.   }

从Job运行的角度来观察BlockManager:

查看WordCount.scala的运行日志:日志中显示INFO BlockManagerInfo: Added broadcast_0_piece0 in memory,将BlockManagerInfo的广播变量加入到内存中。

1.  ......
2.  17/06/06 05:38:04 INFO MemoryStore: Block broadcast_0_piece0 stored as
    bytes in memory (estimated size 20.0 KB, free 637.0 MB)
3.  17/06/06 05:38:04 INFO BlockManagerInfo: Added broadcast_0_piece0 in
    memory on 192.168.93.1:63572 (size: 20.0 KB, free: 637.2 MB)
4.  ......

Driver使用BlockManagerInfo管理ExecutorBackend中BlockManager的元数据,BlockManagerInfo的成员变量包括blockManagerId、系统当前时间timeMs、最大堆内内存maxOnHeapMem、最大堆外内存maxOffHeapMem、slaveEndpoint。

Spark 2.1.1版本的BlockManagerMasterEndpoint.scala的源码如下。

1.  private[spark] class BlockManagerInfo(
2.     val blockManagerId: BlockManagerId,
3.     timeMs: Long,
4.     val maxMem: Long,
5.     val slaveEndpoint: RpcEndpointRef)
6.   extends Logging {

Spark 2.2.0版本的BlockManagerMasterEndpoint.scala的源码与Spark 2.1.1版本相比具有如下特点。

 上段代码中第4行删除maxMem。

 上段代码中第4行之后新增maxOnHeapMem成员变量:最大的堆内内存大小。

 上段代码中第4行之后新增maxOffHeapMem成员变量:最大的堆外内存大小。

1.  ......
2.      val maxOnHeapMem: Long,
3.      val maxOffHeapMem: Long,
4.   ......
5.  extends Logging {

集群中每启动一个节点,就创建一个BlockManager,BlockManager是在每个节点(Driver及Executors)上运行的管理器,用于存放和检索本地和远程不同的存储块(内存、磁盘和堆外内存)。BlockManagerInfo中的BlockManagerId标明是哪个BlockManager,slaveEndpoint是消息循环体,用于消息通信。

(1)首先通过MemoryStore存储广播变量。

(2)在Driver中是通过BlockManagerInfo来管理集群中每个ExecutorBackend中的BlockManager中的元数据信息的。

(3)当改变了具体的ExecutorBackend上的Block信息后,就必须发消息给Driver中的BlockManagerMaster来更新相应的BlockManagerInfo。

(4)当执行第二个Stage的时候,第二个Stage会向Driver中的MapOutputTracker-MasterEndpoint发消息请求上一个Stage中相应的输出,此时MapOutputTrackerMaster会把上一个Stage的输出数据的元数据信息发送给当前请求的Stage。图7-14是BlockManager工作原理和运行机制简图:

图7-14 BlockManager工作原理和运行机制简图

BlockManagerMasterEndpoint.scala中BlockManagerInfo的getStatus方法如下。

1.  def getStatus(blockId: BlockId): Option[BlockStatus] = Option(_blocks.
    get(blockId))

其中的BlockStatus是一个case class。

1.     case class BlockStatus(storageLevel: StorageLevel, memSize: Long,
       diskSize: Long) {
2.    def isCached: Boolean = memSize + diskSize > 0
3.  }

BlockTransferService.scala进行网络连接操作,获取远程数据。

1.   private[spark]
2.  abstract class BlockTransferService extends ShuffleClient with Closeable
    with Logging {

7.6.5 BlockManager解密进阶:BlockManager初始化和注册解密、BlockManagerMaster工作解密、BlockTransferService解密、本地数据读写解密、远程数据读写解密

BlockManager既可以运行在Driver上,也可以运行在Executor上。在Driver上的BlockManager管理集群中Executor的所有的BlockManager,BlockManager分成Master、Slave结构,一切的调度、一切的工作由Master触发,Executor在启动的时候一定会启动BlockManager。BlockManager主要提供了读和写数据的接口,可以从本地读写数据,也可以从远程读写数据。读写数据可以基于磁盘,也可以基于内存以及OffHeap。OffHeap就是堆外空间(如Alluxion是分布式内存管理系统,与基于内存计算的Spark系统形成天衣无缝的组合,在大数据领域中,Spark+Alluxion+Kafka是非常有用的组合)。

从整个程序运行的角度看,Driver也是Executor的一种,BlockManager可以运行在Driver上,也可以运行在Executor上。BlockManager.scala的源码如下。

1.   private[spark] class BlockManager(
2.      executorId: String,
3.      rpcEnv: RpcEnv,
4.      val master: BlockManagerMaster,
5.      val serializerManager: SerializerManager,
6.      val conf: SparkConf,
7.      memoryManager: MemoryManager,
8.      mapOutputTracker: MapOutputTracker,
9.      shuffleManager: ShuffleManager,
10.     val blockTransferService: BlockTransferService,
11.     securityManager: SecurityManager,
12.     numUsableCores: Int)
13.   extends BlockDataManager with BlockEvictionHandler with Logging {
14. ......
15. val diskBlockManager = {
16.     //如果外部服务不为shuffle 文件提供服务执行清理文件
17.     val deleteFilesOnStop =
18.       !externalShuffleServiceEnabled || executorId == SparkContext.
          DRIVER_IDENTIFIER
19.     new DiskBlockManager(conf, deleteFilesOnStop)
20.   }
21. ......
22. private val futureExecutionContext = ExecutionContext.fromExecutorService(
23.     ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128))
24. ......
25.   private[spark] val memoryStore =
26.    new MemoryStore(conf, blockInfoManager, serializerManager, memoryManager,
       this)
27.   private[spark] val diskStore = new DiskStore(conf, diskBlockManager)
28.   memoryManager.setMemoryStore(memoryStore)
29. ......
30.   def initialize(appId: String): Unit = {
31. .......

BlockManager中的成员变量中:BlockManagerMaster对整个集群的BlockManagerMaster进行管理;serializerManager是默认的序列化器;MemoryManager是内存管理;MapOutputTracker是Shuffle输出的时候,要记录ShuffleMapTask输出的位置,以供下一个Stage使用,因此需要进行记录。BlockTransferService是进行网络操作的,如果要连同另外一个BlockManager进行数据读写操作,就需要BlockTransferService。Block是Spark运行时数据的最小抽象单位,可能放入内存中,也可能放入磁盘中,还可能放在Alluxio上。

SecurityManager是安全管理;numUsableCores是可用的Cores。

BlockManager中DiskBlockManager管理磁盘的读写,创建并维护磁盘上逻辑块和物理块之间的逻辑映射位置。一个block被映射到根据BlockId生成的一个文件,块文件哈希列在目录spark.local.dir中(如果设置了SPARK LOCAL DIRS),或在目录(SPARK LOCAL DIRS)中。

然后在BlockManager中创建一个缓存池:block-manager-future以及memoryStore 、diskStore。

Shuffle读写数据的时候是通过BlockManager进行管理的。

Spark 2.1.1版本的BlockManager.scala的源码如下。

1.    var blockManagerId: BlockManagerId = _
2.
3.   //服务此Executor的shuffle文件的服务器的地址,这或者是外部的服务,或者只是我们
     //自己的Executor的BlockManager
4.   private[spark] var shuffleServerId: BlockManagerId = _
5.
6.   //客户端读取其他Executors的shuffle文件。这或者是一个外部服务,或者只是
     //标准BlockTransferService 直接连接到其他Executors
7.
8.   private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
9.     val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle",
       numUsableCores)
10.    new ExternalShuffleClient(transConf, securityManager,
       securityManager.isAuthenticationEnabled(),
11.      securityManager.isSaslEncryptionEnabled())
12.  } else {
13.    blockTransferService
14.  }

Spark 2.2.0版本的BlockManager.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第11行ExternalShuffleClient类中去掉securityManager.isSaslEncryptionEnabled()成员变量。

1.   ......
2.      new ExternalShuffleClient(transConf, securityManager,
        securityManager.isAuthenticationEnabled())
3.  ......

BlockManager.scala中,BlockManager实例对象通过调用initialize方法才能正式工作,传入参数是appId,基于应用程序的ID初始化BlockManager。initialize不是在构造器的时候被使用,因为BlockManager实例化的时候还不知道应用程序的ID,应用程序ID是应用程序启动时,ExecutorBackend向Master注册时候获得的。

BlockManager.scala的initialize方法中的BlockTransferService进行网络通信。ShuffleClient是BlockManagerWorker每次启动时向BlockManagerMaster注册。BlockManager.scala的initialize方法中调用了registerBlockManager,向Master进行注册,告诉BlockManagerMaster把自己注册进去。

Spark 2.1.1版本BlockManagerMaster.scala的registerBlockManager的源码如下。

1.   def registerBlockManager(
2.       blockManagerId: BlockManagerId,
3.       maxMemSize: Long,
4.       slaveEndpoint: RpcEndpointRef): BlockManagerId = {
5.     logInfo(s"Registering BlockManager $blockManagerId")
6.     val updatedId = driverEndpoint.askWithRetry[BlockManagerId](
7.       RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint))
8.     logInfo(s"Registered BlockManager $updatedId")
9.     updatedId
10.  }

Spark 2.2.0版本的BlockManagerMaster.scala的registerBlockManager的源码与Spark 2.1.1版本相比具有如下特点。

 上段代码中第3行maxMemSize删除。

 上段代码中第3行之后新增参数maxOnHeapMemSize:最大的堆内内存大小。

 上段代码中第3行之后新增参数maxOffHeapMemSize:最大的堆外内存大小。

 上段代码中第6行driverEndpoint.askWithRetry方法调整为driverEndpoint.askSync方法。

 上段代码中第7行RegisterBlockManager新增maxOnHeapMemSize、maxOffHeapMemSize两个参数。

1.   ......
2.     maxOnHeapMemSize: Long,
3.     maxOffHeapMemSize: Long,
4.     ......
5.   val updatedId = driverEndpoint.askSync[BlockManagerId](
6.     RegisterBlockManager(blockManagerId, maxOnHeapMemSize,
       maxOffHeapMemSize, slaveEndpoint))
7.  ......

registerBlockManager方法的RegisterBlockManager是一个case class。

Spark 2.1.1版本的BlockManagerMessages.scala的源码如下。

1.   case class RegisterBlockManager(
2.    blockManagerId: BlockManagerId,
3.    maxMemSize: Long,
4.    sender: RpcEndpointRef)
5.  extends ToBlockManagerMaster

Spark 2.2.0版本的BlockManagerMessages.scala的源码与Spark 2.1.1版本相比具有如下特点。

 上段代码中第3行maxMemSize删除。

 上段代码中第3行之后新增成员变量maxOnHeapMemSize:最大堆内内存大小。

 上段代码中第3行之后新增成员变量maxOffHeapMemSize:最大堆外内存大小。

1.    ......
2.      maxOnHeapMemSize: Long,
3.      maxOffHeapMemSize: Long,
4.  ......

在Executor实例化的时候,要初始化blockManager。blockManager在initialize中将应用程序ID传进去。

Executor.scala的源码如下。

1.    if (!isLocal) {
2.    env.metricsSystem.registerSource(executorSource)
3.    env.blockManager.initialize(conf.getAppId)
4.  }

Executor.scala中,Executor每隔10s向Master发送心跳消息,如收不到心跳消息,blockManager须重新注册。

Spark 2.1.1版本的Executor.scala的源码如下。

1.  .......
2.  val message = Heartbeat(executorId, accumUpdates.toArray,
    env.blockManager.blockManagerId)
3.      try {
4.        val response = heartbeatReceiverRef.askWithRetry
          [HeartbeatResponse](
5.            message, RpcTimeout(conf, "spark.executor.heartbeatInterval",
              "10s"))
6.        if (response.reregisterBlockManager) {
7.          logInfo("Told to re-register on heartbeat")
8.          env.blockManager.reregister()
9.        }
10.       heartbeatFailures = 0
11.     } catch {
12.       case NonFatal(e) =>
13.         logWarning("Issue communicating with driver in heartbeater", e)
14.         heartbeatFailures += 1
15.         if (heartbeatFailures >= HEARTBEAT_MAX_FAILURES) {
16.           logError(s"Exit as unable to send heartbeats to driver " +
17.             s"more than $HEARTBEAT_MAX_FAILURES times")
18.           System.exit(ExecutorExitCode.HEARTBEAT_FAILURE)
19.         }
20.     }
21.   }
22. .......

Spark 2.2.0版本的Executor.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第4行heartbeatReceiverRef.askWithRetry方法调整为heartbeatReceiverRef.askSync方法。

1.  .......
2.      val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
3.            message, RpcTimeout(conf, "spark.executor.heartbeatInterval",
              "10s"))

回到BlockManagerMaster.scala的registerBlockManager:

registerBlockManager中RegisterBlockManager传入的slaveEndpoint是:具体的Executor启动时会启动一个BlockManagerSlaveEndpoint,会接收BlockManagerMaster发过来的指令。在initialize方法中通过master.registerBlockManager传入slaveEndpoint,而slaveEndpoint是在rpcEnv.setupEndpoint方法中调用new()函数创建的BlockManagerSlaveEndpoint。

总结一下:

(1)当Executor实例化的时候,会通过BlockManager.initialize来实例化Executor上的BlockManager,并且创建BlockManagerSlaveEndpoint这个消息循环体来接受Driver中BlockManagerMaster发过来的指令,如删除Block等。

1.  env.blockManager.initialize(conf.getAppId)

BlockManagerSlaveEndpoint.scala的源码如下。

1.  class BlockManagerSlaveEndpoint(
2.    override val rpcEnv: RpcEnv,
3.    blockManager: BlockManager,
4.    mapOutputTracker: MapOutputTracker)
5.  extends ThreadSafeRpcEndpoint with Logging {

(2)当BlockManagerSlaveEndpoint实例化后,Executor上的BlockManager需要向Driver上的BlockManagerMasterEndpoint注册。

BlockManagerMaster的registerBlockManager方法,其中的driverEndpoint是构建BlockManagerMaster时传进去的。

(3)BlockManagerMasterEndpoint接收到Executor上的注册信息并进行处理。

Spark 2.1.1版本的BlockManagerMasterEndpoint.scala的源码如下。

1.  class BlockManagerMasterEndpoint(
2.      override val rpcEnv: RpcEnv,
3.  ......
4.   override def receiveAndReply(context: RpcCallContext): PartialFunction
     [Any, Unit] = {
5.      case RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint) =>
6.        context.reply(register(blockManagerId, maxMemSize, slaveEndpoint))
7.  ......

Spark 2.2.0版本的BlockManagerMasterEndpoint.scala的源码与Spark 2.1.1版本相比具有如下特点。

 上段代码中第5行RegisterBlockManager新增成员变量:maxOnHeapMemSize、maxOffHeapMemSize。

 上段代码中第6行register新增成员变量:maxOnHeapMemSize、maxOffHeapMemSize。

1.  ......
2.  override def receiveAndReply(context: RpcCallContext): PartialFunction
    [Any, Unit] = {
3.      case RegisterBlockManager(blockManagerId, maxOnHeapMemSize,
        maxOffHeapMemSize, slaveEndpoint) =>
4.        context.reply(register(blockManagerId, maxOnHeapMemSize,
          maxOffHeapMemSize, slaveEndpoint))
5.  ......

BlockManagerMasterEndpoint的register注册方法,为每个Executor的BlockManager生成对应的BlockManagerInfo。BlockManagerInfo是一个HashMap[BlockManagerId, BlockManagerInfo]。

register注册方法源码如下。

Spark 2.1.1版本的BlockManagerMasterEndpoint.scala的源码如下。

1.  private val blockManagerInfo = new mutable.HashMap[BlockManagerId,
    BlockManagerInfo]
2.  ......
3.   private def register(
4.        idWithoutTopologyInfo: BlockManagerId,
5.        maxMemSize: Long,
6.        slaveEndpoint: RpcEndpointRef): BlockManagerId = {
7.      //dummy id不应包含拓扑信息
8.      //我们在这里得到信息和回应一个块标识符
9.      val id = BlockManagerId(
10.       idWithoutTopologyInfo.executorId,
11.       idWithoutTopologyInfo.host,
12.       idWithoutTopologyInfo.port,
13.       topologyMapper.getTopologyForHost(idWithoutTopologyInfo.host))
14.
15.     val time = System.currentTimeMillis()
16.     if (!blockManagerInfo.contains(id)) {
17.       blockManagerIdByExecutor.get(id.executorId) match {
18.         case Some(oldId) =>
19.           //同一个Executor 的块管理器已经存在,所以删除它(假定已挂掉)
20.           logError("Got two different block manager registrations on same
              executor - "
21.               + s" will replace old one $oldId with new one $id")
22.           removeExecutor(id.executorId)
23.         case None =>
24.       }
25.       logInfo("Registering block manager %s with %s RAM, %s".format(
26.         id.hostPort, Utils.bytesToString(maxMemSize), id))
27.
28.       blockManagerIdByExecutor(id.executorId) = id
29.
30.       blockManagerInfo(id) = new BlockManagerInfo(
31.         id, System.currentTimeMillis(), maxMemSize, slaveEndpoint)
32.     }
33.     listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize))
34.     id
35.   }......

Spark 2.2.0版本的BlockManagerMasterEndpoint.scala的源码与Spark 2.1.1版本相比具有如下特点。

 上段代码中第5行删掉maxMemSize。

 上段代码中第5行之后Register新增参数maxOnHeapMemSize:最大堆内内存大小;maxOffHeapMemSize:最大堆外内存大小。

 上段代码中第26行日志打印时新增最大堆内内存大小、最大堆外内存大小的信息。

 上段代码中第31行构建BlockManagerInfo实例时传入maxOnHeapMemSize、maxOffHeapMemSize。

 上段代码中第33行listenerBus监控系统增加对最大堆内内存大小、最大堆外内存大小信息的监控。

1.  .......
2.       maxOnHeapMemSize: Long,
3.        maxOffHeapMemSize: Long,
4.  ......
5.          id.hostPort, Utils.bytesToString(maxOnHeapMemSize +
            maxOffHeapMemSize), id))
6.  .......
7.
8.        blockManagerInfo(id) = new BlockManagerInfo(
9.          id, System.currentTimeMillis(), maxOnHeapMemSize,
            maxOffHeapMemSize, slaveEndpoint)
10. .......
11. listenerBus.post(SparkListenerBlockManagerAdded(time, id,
    maxOnHeapMemSize + maxOffHeapMemSize,
12.         Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
13. .......

BlockManagerMasterEndpoint中,BlockManagerId是一个class,标明了BlockManager在哪个Executor中,以及host主机名、port端口等信息。

BlockManagerId.scala的源码如下。

1.  class BlockManagerId private (
2.     private var executorId_ : String,
3.     private var host_ : String,
4.     private var port_ : Int,
5.     private var topologyInfo_ : Option[String])
6.   extends Externalizable {

BlockManagerMasterEndpoint中,BlockManagerInfo包含内存、slaveEndpoint等信息。

回到BlockManagerMasterEndpoint的register注册方法:如果blockManagerInfo没有包含BlockManagerId,根据BlockManagerId.executorId查询BlockManagerId,如果匹配到旧的BlockManagerId,就进行清理。

BlockManagerMasterEndpoint的removeExecutor方法如下。

1.    private def removeExecutor(execId: String) {
2.    logInfo("Trying to remove executor " + execId + " from
      BlockManagerMaster.")
3.    blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
4.  }

进入removeBlockManager方法,从blockManagerIdByExecutor数据结构中清理掉block manager信息,从blockManagerInfo数据结构中清理掉所有的blocks信息。removeBlockManager源码如下。

Spark 2.1.1版本的BlockManagerMasterEndpoint.scala的removeBlockManager的源码如下。

1.   private def removeBlockManager(blockManagerId: BlockManagerId) {
2.      val info = blockManagerInfo(blockManagerId)
3.
4.      //从blockManagerIdByExecutor删除块管理
5.      blockManagerIdByExecutor -= blockManagerId.executorId
6.
7.      //将它从blockManagerInfo 删除所有的块
8.      blockManagerInfo.remove(blockManagerId)
9.      val iterator = info.blocks.keySet.iterator
10.     while (iterator.hasNext) {
11.       val blockId = iterator.next
12.       val locations = blockLocations.get(blockId)
13.      locations -= blockManagerId
14.      if (locations.size == 0) {
15.        blockLocations.remove(blockId)
16.      }
17.    }
18.    listenerBus.post(SparkListenerBlockManagerRemoved(System.
       currentTimeMillis(), blockManagerId))
19.    logInfo(s"Removing block manager $blockManagerId")
20.  }

Spark 2.2.0版本的BlockManagerMasterEndpoint.scala的removeBlockManager的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第16~20行整体替换为以下代码:新增数据复制处理。

1.   .......
2.         //如果没有块管理器,就注销这个块。否则,如果主动复制启用,块block是一个RDD
           //或测试块 block(后者用于单元测试),我们发送一条消息随机选择Executor的位
           //置来复制给定块block。注意,我们忽略了其他块block类型(如广播broadcast/
           //shuffle blocks),因为复制在这种情况下没有多大意义
3.     ......
4.          logWarning(s"No more replicas available for $blockId !")
5.        } else if (proactivelyReplicate && (blockId.isRDD || blockId.
          isInstanceOf[TestBlockId])) {
6.
7.          //假设Executor未能找出故障前存在的副本数量
8.          val maxReplicas = locations.size + 1
9.          val i = (new Random(blockId.hashCode)).nextInt(locations.size)
10.         val blockLocations = locations.toSeq
11.         val candidateBMId = blockLocations(i)
12.         blockManagerInfo.get(candidateBMId).foreach { bm =>
13.           val remainingLocations = locations.toSeq.filter(bm => bm !=
              candidateBMId)
14.           val replicateMsg = ReplicateBlock(blockId, remainingLocations,
              maxReplicas)
15.           bm.slaveEndpoint.ask[Boolean](replicateMsg)
16.         }
17.       }
18.     }
19. .........

removeBlockManager中的一行代码blockLocations.remove的remove方法如下。

HashMap.java的源码如下。

1.  public V remove(Object key) {
2.      Node<K,V> e;
3.      return (e = removeNode(hash(key), key, null, false, true)) == null ?
4.          null : e.value;
5.  }

回到BlockManagerMasterEndpoint的register注册方法:然后在blockManagerIdByExecutor中加入BlockManagerId,将BlockManagerId加入BlockManagerInfo信息,在listenerBus中进行监听,函数返回BlockManagerId,完成注册。

回到BlockManager.scala,在initialize方法通过master.registerBlockManager注册成功以后,将返回值赋值给idFromMaster。Initialize初始化之后,看一下BlockManager.scala中其他的方法。

reportAllBlocks方法:具体的Executor须向Driver不断地汇报自己的状态。

BlockManager.scala的reportAllBlocks方法的源码如下。

1.       private def reportAllBlocks(): Unit = {
2.     logInfo(s"Reporting ${blockInfoManager.size} blocks to the master.")
3.     for ((blockId, info) <- blockInfoManager.entries) {
4.       val status = getCurrentBlockStatus(blockId, info)
5.       if (info.tellMaster && !tryToReportBlockStatus(blockId, status)) {
6.         logError(s"Failed to report $blockId to master; giving up.")
7.         return
8.       }
9.     }
10.  }

reportAllBlocks方法中调用了getCurrentBlockStatus,包括内存、磁盘等信息。

getCurrentBlockStatus的源码如下。

1.   private def getCurrentBlockStatus(blockId: BlockId, info: BlockInfo):
     BlockStatus = {
2.      info.synchronized {
3.        info.level match {
4.          case null =>
5.            BlockStatus.empty
6.          case level =>
7.            val inMem = level.useMemory && memoryStore.contains(blockId)
8.            val onDisk = level.useDisk && diskStore.contains(blockId)
9.            val deserialized = if (inMem) level.deserialized else false
10.           val replication = if (inMem || onDisk) level.replication else 1
11.           val storageLevel = StorageLevel(
12.             useDisk = onDisk,
13.             useMemory = inMem,
14.             useOffHeap = level.useOffHeap,
15.             deserialized = deserialized,
16.             replication = replication)
17.           val memSize = if (inMem) memoryStore.getSize(blockId) else 0L
18.           val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
19.           BlockStatus(storageLevel, memSize, diskSize)
20.       }
21.     }
22.   }

getCurrentBlockStatus方法中的BlockStatus,包含存储级别StorageLevel、内存大小、磁盘大小等信息。

BlockManagerMasterEndpoint.scala的BlockStatus的源码如下。

1.  case   class    BlockStatus(storageLevel:        StorageLevel,     memSize:  Long,
    diskSize: Long) {
2.    def isCached: Boolean = memSize + diskSize > 0
3.  }
4.  ......
5.    object BlockStatus {
6.    def empty: BlockStatus = BlockStatus(StorageLevel.NONE, memSize = 0L,
      diskSize = 0L)
7.  }

回到BlockManager.scala,其中的getLocationBlockIds方法比较重要,根据BlockId获取这个BlockId所在的BlockManager。

BlockManager.scala的getLocationBlockIds的源码如下。

1.   private def getLocationBlockIds(blockIds: Array[BlockId]): Array[Seq
     [BlockManagerId]] = {
2.    val startTimeMs = System.currentTimeMillis
3.    val locations = master.getLocations(blockIds).toArray
4.    logDebug("Got multiple block location in %s".format
      (Utils.getUsedTimeMs(startTimeMs)))
5.    locations
6.  }

getLocationBlockIds方法中根据BlockId通过master.getLocations向Master获取位置信息,因为master管理所有的位置信息。getLocations方法里的driverEndpoint是BlockManagerMasterEndpoint,Executor向BlockManagerMasterEndpoint发送GetLocationsMultipleBlockIds消息。

Spark 2.1.1版本的BlockManagerMaster.scala的getLocations方法的源码如下。

1.  def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq
    [BlockManagerId]] = {
2.    driverEndpoint.askWithRetry[IndexedSeq[Seq[BlockManagerId]]](
3.      GetLocationsMultipleBlockIds(blockIds))
4.  }

Spark 2.2.0版本的BlockManagerMaster.scala的getLocations方法的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第2行driverEndpoint.askWithRetry方法调整为driverEndpoint. askSync方法。

1.  ......
2.      driverEndpoint.askSync[IndexedSeq[Seq[BlockManagerId]]](
3.  .......

getLocations中的GetLocationsMultipleBlockIds是一个case class。

1.  case class GetLocationsMultipleBlockIds(blockIds: Array[BlockId])
   extends ToBlockManagerMaster

在BlockManagerMasterEndpoint侧接收GetLocationsMultipleBlockIds消息。

BlockManagerMasterEndpoint.scala的receiveAndReply方法如下。

1.   override def receiveAndReply(context: RpcCallContext): PartialFunction
     [Any, Unit] = {
2.  ......
3.   case GetLocationsMultipleBlockIds(blockIds) =>
4.        context.reply(getLocationsMultipleBlockIds(blockIds))

进入getLocationsMultipleBlockIds方法,进行map操作,开始查询位置信息。

1.   private def getLocationsMultipleBlockIds(
2.      blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = {
3.    blockIds.map(blockId => getLocations(blockId))
4.  }

进入getLocations方法,首先判断内存缓存结构blockLocations中是否包含blockId,如果已包含,就获取位置信息,否则返回空的信息。

1.  private def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
2.    if (blockLocations.containsKey(blockId)) blockLocations.get
      (blockId).toSeq else Seq.empty
3.  }

其中,blockLocations是一个重要的数据结构,是一个JHashMap。Key是BlockId。Value是一个HashSet[BlockManagerId],使用HashSet。因为每个BlockId在磁盘上有副本,不同机器的位置不一样,而且不同副本对应的BlockManagerId不一样,位于不同的机器上,所以使用HashSet数据结构。

BlockManagerMasterEndpoint.scala的blockLocations的源码如下。

1.  private val blockLocations = new JHashMap[BlockId, mutable.HashSet
    [BlockManagerId]]

回到BlockManager.scala,getLocalValues是一个重要的方法,从blockInfoManager中获取本地数据。

 首先根据blockId从blockInfoManager中获取BlockInfo信息。

 从BlockInfo信息获取level级别,根据level.useMemory && memoryStore.contains (blockId)判断是否在内存中,如果在内存中,就从memoryStore中获取数据。

 根据level.useDisk && diskStore.contains(blockId)判断是否在磁盘中,如果在磁盘中,就从diskStore中获取数据。

Spark 2.1.1版本的BlockManager.scala的getLocalValues方法的源码如下。

1.   def getLocalValues(blockId: BlockId): Option[BlockResult] = {
2.      logDebug(s"Getting local block $blockId")
3.      blockInfoManager.lockForReading(blockId) match {
4.        case None =>
5.          logDebug(s"Block $blockId was not found")
6.          None
7.        case Some(info) =>
8.          val level = info.level
9.          logDebug(s"Level for block $blockId is $level")
10.         if (level.useMemory && memoryStore.contains(blockId)) {
11.           val iter: Iterator[Any] = if (level.deserialized) {
12.             memoryStore.getValues(blockId).get
13.           } else {
14.             serializerManager.dataDeserializeStream(
15.              blockId, memoryStore.getBytes(blockId).get.toInputStream())
                 (info.classTag)
16.           }
17.           val ci = CompletionIterator[Any, Iterator[Any]](iter,
              releaseLock(blockId))
18.           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
19.         } else if (level.useDisk && diskStore.contains(blockId)) {
20.           val iterToReturn: Iterator[Any] = {
21.             val diskBytes = diskStore.getBytes(blockId)
22.             if (level.deserialized) {
23.               val diskValues = serializerManager.dataDeserializeStream(
24.                 blockId,
25.                 diskBytes.toInputStream(dispose = true))(info.classTag)
26.               maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
27.             } else {
28.              val stream = maybeCacheDiskBytesInMemory(info, blockId, level,
                 diskBytes)
29.                 .map {_.toInputStream(dispose = false)}
30.                 .getOrElse { diskBytes.toInputStream(dispose = true) }
31.               serializerManager.dataDeserializeStream(blockId, stream)
                (info.classTag)
32.           }
33.         }
34.         val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn,
            releaseLock(blockId))
35.         Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
36.       } else {
37.         handleLocalReadFailure(blockId)
38.       }
39.    }
40.  }

Spark 2.2.0版本的BlockManager.scala的getLocalValues方法的源码与Spark 2.1.1版本相比具有如下特点。

 上段代码中第9行之后新增taskAttemptId的创建。

 上段代码中第17行releaseLock新增一个参数taskAttemptId。

 上段代码中第21、25、28、30行diskBytes更新为diskData。

 上段代码中第21行之后新增val iterToReturn: Iterator[Any]。

 上段代码中第25行diskData.toInputStream方法删掉dispose = true参数。

 上段代码中第34行CompletionIterator的第二个参数调整为releaseLockAndDispose (blockId, diskData, taskAttemptId)。

1.   .......
2.         val taskAttemptId = Option(TaskContext.get()).map(_.taskAttemptId())
3.       ......
4.           //在迭代器iterator完成触发时,我们需要从一个没有TaskContext上下文的
             //线程捕获taskId,参阅spark-18406讨论
5.           val ci = CompletionIterator[Any, Iterator[Any]](iter, {
6.             releaseLock(blockId, taskAttemptId)
7.        ........
8.           val diskData = diskStore.getBytes(blockId)
9.           val iterToReturn: Iterator[Any] = {
10.          ........
11.            diskData.toInputStream())(info.classTag)
12.         ........
13.             val stream = maybeCacheDiskBytesInMemory(info, blockId, level,
                diskData)
14.           ........
15.              .getOrElse { diskData.toInputStream() }
16.          ........
17.            releaseLockAndDispose(blockId, diskData, taskAttemptId)
18.    ........

回到BlockManager.scala,getRemoteValues方法从远程的BlockManager中获取block数据,在JVM中不需要去获取锁。

BlockManager.scala的getRemoteValues方法的源码如下。

1.  private def getRemoteValues[T: ClassTag](blockId: BlockId): Option
    [BlockResult] = {
2.    val ct = implicitly[ClassTag[T]]
3.    getRemoteBytes(blockId).map { data =>
4.      val values =
5.        serializerManager.dataDeserializeStream(blockId, data.toInputStream
          (dispose = true))(ct)
6.      new BlockResult(values, DataReadMethod.Network, data.size)
7.    }
8.  }

getRemoteValues方法中调用getRemoteBytes,获取远程的数据,如果获取的失败次数超过最大的获取次数(locations.size),就提示失败,返回空值;如果获取到远程数据,就返回。

getRemoteBytes方法调用blockTransferService.fetchBlockSync方法实现远程获取数据。

BlockTransferService.scala的fetchBlockSync方法的源码如下。

Spark 2.1.1版本的BlockTransferService.scala的fetchBlockSync方法的源码如下。

1.   def fetchBlockSync(host: String, port: Int, execId: String, blockId:
     String): ManagedBuffer = {
2.      //线程等待的监视器
3.      val result = Promise[ManagedBuffer]()
4.      fetchBlocks(host, port, execId, Array(blockId),
5.        new BlockFetchingListener {
6.          override def onBlockFetchFailure(blockId:             String,  exception:
            Throwable): Unit = {
7.            result.failure(exception)
8.          }
9.          override def onBlockFetchSuccess(blockId: String, data:
            ManagedBuffer): Unit = {
10.           val ret = ByteBuffer.allocate(data.size.toInt)
11.           ret.put(data.nioByteBuffer())
12.           ret.flip()
13.           result.success(new NioManagedBuffer(ret))
14.         }
15.       })
16.     ThreadUtils.awaitResult(result.future, Duration.Inf)
17.   }

Spark 2.2.0版本的BlockTransferService.scala的fetchBlockSync方法的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第15行fetchBlocks方法新增了shuffleFiles = null参数。fetchBlocks方法用于异步从远程节点获取序列块,仅在调用[init]之后可用。注意,这个API需要一个序列,可以实现批处理请求,而不是返回一个future,底层实现可以调用onBlockFetchSuccess尽快获取块的数据,而不是等待所有块被取出来。

1.    ......
2.        }, shuffleFiles = null)
3.  .......

fetchBlockSync中调用fetchBlocks方法,NettyBlockTransferService继承自BlockTransferService,是BlockTransferService实现子类。

Spark 2.1.1版本的NettyBlockTransferService的fetchBlocks的源码如下。

1.   override def fetchBlocks(
2.        host: String,
3.        port: Int,
4.        execId: String,
5.        blockIds: Array[String],
6.        listener: BlockFetchingListener): Unit = {
7.      logTrace(s"Fetch blocks from $host:$port (executor id $execId)")
8.      try {
9.        val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter {
10.         override def createAndStart(blockIds: Array[String], listener:
            BlockFetchingListener) {
11.          val client = clientFactory.createClient(host, port)
12.          new OneForOneBlockFetcher(client, appId, execId, blockIds.toArray,
             listener).start()
13.        }
14.      }
15.
16.      val maxRetries = transportConf.maxIORetries()
17.      if (maxRetries > 0) {
18.        //注意,Fetcher将正确处理maxRetries等于0的情况;避免它在代码中产生Bug,
           //一旦确定了稳定性,就应该删除if语句
19.        new RetryingBlockFetcher(transportConf, blockFetchStarter,
           blockIds, listener).start()
20.      } else {
21.        blockFetchStarter.createAndStart(blockIds, listener)
22.      }
23.    } catch {
24.      case e: Exception =>
25.        logError("Exception while beginning fetchBlocks", e)
26.        blockIds.foreach(listener.onBlockFetchFailure(_, e))
27.    }
28.  }

Spark 2.2.0版本的NettyBlockTransferService的fetchBlocks的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第6行fetchBlocks方法新增了shuffleFiles参数。

1.   .......
2.        shuffleFiles: Array[File]): Unit = {
3.  .......

回到BlockManager.scala,无论是doPutBytes(),还是doPutIterator()方法中,都会使用doPut方法。

BlockManager.scala的doPut方法的源码如下。

1.   private def doPut[T](
2.       blockId: BlockId,
3.       level: StorageLevel,
4.       classTag: ClassTag[_],
5.       tellMaster: Boolean,
6.       keepReadLock: Boolean)(putBody: BlockInfo => Option[T]): Option[T]
         = {
7.     require(blockId != null, "BlockId is null")
8.     require(level != null && level.isValid, "StorageLevel is null or
       invalid")
9.
10.    val putBlockInfo = {
11.      val newInfo = new BlockInfo(level, classTag, tellMaster)
12.      if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) {
13.        newInfo
14.      } else {
15.        logWarning(s"Block $blockId already exists on this machine; not
           re-adding it")
16.        if (!keepReadLock) {
17.    //在现有的块上lockNewBlockForWriting 返回一个读锁,所以我们必须释放它
18.          releaseLock(blockId)
19.        }
20.        return None
21.      }
22.    }
23.
24.    val startTimeMs = System.currentTimeMillis
25.    var exceptionWasThrown: Boolean = true
26.    val result: Option[T] = try {
27.      val res = putBody(putBlockInfo)
28.      exceptionWasThrown = false
29.     ......
30.     result
31.  }

doPut方法中,lockNewBlockForWriting写入一个新的块前先尝试获得适当的锁,如果我们是第一个写块,获得写入锁后继续后续操作。否则,如果另一个线程已经写入块,须等待写入完成,才能获取读取锁,调用new()函数创建一个BlockInfo赋值给putBlockInfo,然后通过putBody(putBlockInfo)将数据存入。putBody是一个匿名函数,输入BlockInfo,输出的是一个泛型Option[T]。putBody函数体内容是doPutIterator方法(doPutBytes方法也类似调用doPut)调用doPut时传入的。

BlockManager.scala的doPutIterator调用doput方法,在其putBody匿名函数体中进行判断:

如果是level.useMemory,则在memoryStore中放入数据。

如果是level.useDisk,则在diskStore中放入数据。

如果level.replication大于1,则在其他节点中存入副本数据。

其中,BlockManager.scala的replicate方法的副本复制源码如下。

Spark 2.1.1版本的BlockManager.scala的replicate方法的源码如下。

1.   private def replicate(
2.        blockId: BlockId,
3.        data: ChunkedByteBuffer,
4.        level: StorageLevel,
5.        classTag: ClassTag[_]): Unit = {
6.  ......
7.  while(numFailures <= maxReplicationFailures &&
8.          !peersForReplication.isEmpty &&
9.          peersReplicatedTo.size != numPeersToReplicateTo) {
10.       val peer = peersForReplication.head
11.       try {
12.         val onePeerStartTime = System.nanoTime
13.         logTrace(s"Trying to replicate $blockId of ${data.size} bytes to
            $peer")
14.         blockTransferService.uploadBlockSync(
15.           peer.host,
16.           peer.port,
17.           peer.executorId,
18.           blockId,
19.           new NettyManagedBuffer(data.toNetty),
20.           tLevel,
21.           classTag)
22. ......

Spark 2.2.0版本的BlockManager.scala的replicate方法的源码与Spark 2.1.1版本相比具有如下特点。

 上段代码中第5行replicate方法中新增了existingReplicas参数。

 上段代码中第19行uploadBlockSync方法的第5个参数由NettyManagedBuffer实例调整为BlockManagerManagedBuffer实例。

1.      .......
2.        existingReplicas: Set[BlockManagerId] = Set.empty): Unit = {
3.  ......
4.       new    BlockManagerManagedBuffer(blockInfoManager,            blockId,  data,
         false),
5.   ......

replicate方法中调用了blockTransferService.uploadBlockSync方法。

BlockTransferService.scala的uploadBlockSync的源码如下。

1.   def uploadBlockSync(
2.        hostname: String,
3.        port: Int,
4.        execId: String,
5.        blockId: BlockId,
6.        blockData: ManagedBuffer,
7.        level: StorageLevel,
8.        classTag: ClassTag[_]): Unit = {
9.      val future = uploadBlock(hostname, port, execId, blockId, blockData,
        level, classTag)
10.     ThreadUtils.awaitResult(future, Duration.Inf)
11.   }
12. }

uploadBlockSync中又调用uploadBlock方法,BlockTransferService.scala的uploadBlock方法无具体实现,NettyBlockTransferService是BlockTransferService的子类,具体实现uploadBlock方法。

NettyBlockTransferService的uploadBlock的源码如下。

1.   override def uploadBlock(
2.        hostname: String,
3.        port: Int,
4.        execId: String,
5.        blockId: BlockId,
6.        blockData: ManagedBuffer,
7.        level: StorageLevel,
8.        classTag: ClassTag[_]): Future[Unit] = {
9.      val result = Promise[Unit]()
10.     val client = clientFactory.createClient(hostname, port)
11.
12.     //使用JavaSerializer序列号器将StorageLevel和ClassTag序列化。其他一切都
        //用我们的二进制协议编码
13.     val metadata = JavaUtils.bufferToArray(serializer.newInstance().
        serialize((level, classTag)))
14.
15.     //为了序列化,转换或复制NIO缓冲到数组
16.     val array = JavaUtils.bufferToArray(blockData.nioByteBuffer())
17.
18.     client.sendRpc(new       UploadBlock(appId,      execId,     blockId.toString,
        metadata, array).toByteBuffer,
19.       new RpcResponseCallback {
20.         override def onSuccess(response: ByteBuffer): Unit = {
21.           logTrace(s"Successfully uploaded block $blockId")
22.           result.success((): Unit)
23.         }
24.        override def onFailure(e: Throwable): Unit = {
25.          logError(s"Error while uploading block $blockId", e)
26.          result.failure(e)
27.        }
28.      })
29.
30.    result.future
31.  }

回到BlockManager.scala,看一下dropFromMemory方法。如果存储级别定位为MEMORY_AND_DISK,那么数据可能放在内存和磁盘中,内存够的情况下不会放到磁盘上;如果内存不够,就放到磁盘上,这时就会调用dropFromMemory。如果存储级别不是定义为MEMORY_AND_DISK,而只是存储在内存中,内存不够时,缓存的数据此时就会丢弃。如果仍需要数据,那就要重新计算。

Spark 2.1.1版本的BlockManager.scala的dropFromMemory的源码如下。

1.   private[storage] override def dropFromMemory[T: ClassTag](
2.        blockId: BlockId,
3.        data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel = {
4.      logInfo(s"Dropping block $blockId from memory")
5.      val info = blockInfoManager.assertBlockIsLockedForWriting(blockId)
6.      var blockIsUpdated = false
7.      val level = info.level
8.
9.      //如果存储级别要求,则保存到磁盘
10.     if (level.useDisk && !diskStore.contains(blockId)) {
11.       logInfo(s"Writing block $blockId to disk")
12.       data() match {
13.         case Left(elements) =>
14.           diskStore.put(blockId) { fileOutputStream =>
15.             serializerManager.dataSerializeStream(
16.               blockId,
17.               fileOutputStream,
18.               elements.toIterator)(info.classTag.asInstanceOf[ClassTag[T]])
19.           }
20.         case Right(bytes) =>
21.           diskStore.putBytes(blockId, bytes)
22.       }
23.       blockIsUpdated = true
24.     }
25.
26.     //实际由内存存储
27.     val droppedMemorySize =
28.       if   (memoryStore.contains(blockId))         memoryStore.getSize(blockId)
          else 0L
29.     val blockIsRemoved = memoryStore.remove(blockId)
30.     if (blockIsRemoved) {
31.       blockIsUpdated = true
32.     } else {
33.       logWarning(s"Block $blockId could not be dropped from memory as it
          does not exist")
34.     }
35.
36.     val status = getCurrentBlockStatus(blockId, info)
37.     if (info.tellMaster) {
38.       reportBlockStatus(blockId, status, droppedMemorySize)
39.     }
40.    if (blockIsUpdated) {
41.      addUpdatedBlockStatusToTaskMetrics(blockId, status)
42.    }
43.    status.storageLevel
44.  }

Spark 2.2.0版本的BlockManager.scala的dropFromMemory的源码与Spark 2.1.1版本相比具有如下特点。

 上段代码中第14行fileOutputStream名称调整为channel。

 上段代码中第14行之后新增代码:val out = Channels.newOutputStream(channel)。

 上段代码中第17行fileOutputStream调整为out。

1.    ........
2.   diskStore.put(blockId) { channel =>
3.             val out = Channels.newOutputStream(channel)
4.      ........
5.               out,
6.  ........

总结:dropFromMemory是指在内存不够的时候,尝试释放一部分内存给要使用内存的应用,释放的这部分内存数据需考虑是丢弃,还是放到磁盘上。如果丢弃,如5000个步骤作为一个Stage,前面4000个步骤进行了Cache,Cache时可能有100万个partition分区单位,其中丢弃了100个,丢弃的100个数据就要重新计算;但是,如果设置了同时放到内存和磁盘,此时会放入磁盘中,下次如果需要,就可以从磁盘中读取数据,而不是重新计算。