9.1 Spark中Cache原理和源码详解

本节对Spark中Cache原理及Spark中Cache源码进行详解。

9.1.1 Spark中Cache原理详解

Spark中Cache机制原理:首先,RDD是通过iterator进行计算的。

(1)CacheManager会通过BlockManager从Local或者Remote获取数据直接通过RDD的compute进行计算,有可能需要考虑checkpoint。

(2)通过BlockManager首先从本地获取数据,如果得不到数据,就会从远程获取数据。

(3)首先查看当前的RDD是否进行了checkpoint,如果进行了的话,就直接读取checkpoint的数据,否则必须进行计算;因为此时RDD需要缓存,所以计算如果需要,则通过BlockManager再次进行持久化。

(4)如果持久化的时候只是缓存到磁盘中,就直接使用BlockManager的doPut方法写入磁盘(需要考虑Replication)。

(5)如果指定内存作缓存,优先保存到内存中,此时会使用MemoryStore.unrollSafely方法来尝试安全地将数据保存在内存中,如果内存不够,会使用一个方法来整理一部分内存空间,然后基于整理出来的内存空间放入我们想缓存的最新数据。

(6)直接通过RDD的compute进行计算,有可能需要考虑checkpoint。

Spark中,Cache原理示意图如图9-1所示。

9.1.2 Spark中Cache源码详解

CacheManager管理是缓存,而缓存可以是基于内存的缓存,也可以是基于磁盘的缓存。CacheManager需要通过BlockManager来操作数据。

Task发生计算时要调用RDD的compute进行计算。下面看一下MapPartitionsRDD的 compute方法。

图9-1 Cache原理示意图

MapPartitionsRDD的源码如下。

1.   private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
2.      var prev: RDD[T],
3.      f: (TaskContext, Int, Iterator[T]) => Iterator[U],  //(TaskContext,
        partition index, iterator)
4.      preservesPartitioning: Boolean = false)
5.    extends RDD[U](prev) {
6.
7.    override val partitioner = if (preservesPartitioning) firstParent[T].
      partitioner else None
8.
9.    override def getPartitions: Array[Partition] = firstParent[T].
      partitions
10.
11.   override def compute(split: Partition, context: TaskContext):
      Iterator[U] =
12.     f(context, split.index, firstParent[T].iterator(split, context))
13.
14.   override def clearDependencies() {
15.     super.clearDependencies()
16.     prev = null
17.   }
18. }

compute真正计算的时候通过iterator计算,MapPartitionsRDD的iterator依赖父RDD计算。iterator是RDD内部的方法,如有缓存,将从缓存中读取数据,否则进行计算。这不是被用户直接调用,但可用于实现自定义子RDD。

RDD.scala的iterator方法如下。

1.  final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
2.    if (storageLevel != StorageLevel.NONE) {
3.      getOrCompute(split, context)
4.    } else {
5.      computeOrReadCheckpoint(split, context)
6.    }
7.  }

RDD.scala的iterator方法中判断storageLevel != StorageLevel.NONE,说明数据可能存放在内存、磁盘中,调用getOrCompute(split, context)方法。如果之前计算过一次,再次计算可以找CacheManager要数据。

RDD.scala的getOrCompute的源码如下。

1.   private[spark] def getOrCompute(partition: Partition, context:
     TaskContext): Iterator[T] = {
2.      val blockId = RDDBlockId(id, partition.index)
3.      var readCachedBlock = true
4.      //这种方法被Executors调用,所以我们需要调用SparkEnv.get代替sc.env
5.      SparkEnv.get.blockManager.getOrElseUpdate(blockId,               storageLevel,
        elementClassTag, () => {
6.        readCachedBlock = false
7.        computeOrReadCheckpoint(partition, context)
8.      }) match {
9.        case Left(blockResult) =>
10.         if (readCachedBlock) {
11.           val existingMetrics = context.taskMetrics().inputMetrics
12.           existingMetrics.incBytesRead(blockResult.bytes)
13.           new InterruptibleIterator[T](context, blockResult.data.
              asInstanceOf[Iterator[T]]) {
14.             override def next(): T = {
15.               existingMetrics.incRecordsRead(1)
16.               delegate.next()
17.             }
18.           }
19.         } else {
20.           new InterruptibleIterator(context, blockResult.data.asInstanceOf
              [Iterator[T]])
21.         }
22.       case Right(iter) =>
23.         new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])
24.     }
25.   }

在有缓存的情况下,缓存可能基于内存,也可能基于磁盘,getOrCompute获取缓存;如没有缓存,则需重新计算RDD。为何需要重新计算?如果数据放在内存中,假设缓存了100万个数据分片,下一个步骤计算的时候需要内存,因为需要进行计算的内存空间占用比之前缓存的数据占用内存空间重要,假设须腾出10000个数据分片所在的空间,因此从BlockManager中将内存中的缓存数据drop到磁盘上,如果不是内存和磁盘的存储级别,那10000个数据分片的缓存数据就可能丢失,99万个数据分片可以复用,而这10000个数据分片须重新进行计算。

Cache在工作的时候会最大化地保留数据,但是数据不一定绝对完整,因为当前的计算如果需要内存空间,那么Cache在内存中的数据必须让出空间,此时如何在RDD持久化的时候同时指定可以把数据放在Disk上,那么部分Cache的数据就可以从内存转入磁盘,否则数据就会丢失。

getOrCompute方法返回的是Iterator。进行Cache以后,BlockManager对其进行管理,通过blockId可以获得曾经缓存的数据。具体CacheManager在获得缓存数据的时候会通过BlockManager来抓到数据。

getOrElseUpdate方法中,如果block存在,检索给定的块block;如果不存在,则调用提供makeIterator方法计算块block,对块block进行持久化,并返回block的值。

BlockManager.scala的getOrElseUpdate的源码如下。

1.     def getOrElseUpdate[T](
2.       blockId: BlockId,
3.       level: StorageLevel,
4.       classTag: ClassTag[T],
5.       makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
6.     //尝试从本地或远程存储读取块。如果它存在,那么我们就不需要通过本地get或put路
       //径获取
7.     get[T](blockId)(classTag) match {
8.       case Some(block) =>
9.         return Left(block)
10.      case _ =>
11.        //需要计算块
12.    }
13.    //需要计算blockInitially,在块上我们没有锁
14.    doPutIterator(blockId, makeIterator, level, classTag, keepReadLock =
       true) match {
15.      case None =>
16.        //doput()方法没有返回,所以块已存在或者已成功存储。因此,我们现在在块上持有
           //读取锁
17.        val blockResult = getLocalValues(blockId).getOrElse {
18.     //在doPut()和get()方法调用的时候,我们持有读取锁,块不应被驱逐,这样,get()
        //方法没返回块,表示发生一些内部错误
19.          releaseLock(blockId)
20.          throw new SparkException(s"get() failed for block $blockId even
             though we held a lock")
21.        }
22.   //我们已经持有调用doPut()方法在块上的读取锁,getLocalValues()再一次获取锁,
      //所以我们需要调用releaseLock(),这样获取锁的数量是1(因为调用者只release()一次)
23.        releaseLock(blockId)
24.        Left(blockResult)
25.      case Some(iter) =>
26.   //输入失败,可能是因为数据太大而不能存储在内存中,不能溢出到磁盘上。因此,我们需
      //要将输入迭代器传递给调用者,他们可以决定如何处理这些值(例如,不缓存它们)
27.       Right(iter)
28.    }
29.  }

BlockManager.scala的getOrElseUpdate中根据blockId调用了get[T](blockId)方法,get方法从block块管理器(本地或远程)获取一个块block。如果块在本地存储且没获取锁,则先获取块block的读取锁。如果该块是从远程块管理器获取的,当data迭代器被完全消费以后,那么读取锁将自动释放。get的时候,如果本地有数据,从本地获取数据返回;如果没有数据,则从远程节点获取数据。

BlockManager.scala的get方法的源码如下:

1.  def get[T: ClassTag](blockId: BlockId): Option[BlockResult] = {
2.    val local = getLocalValues(blockId)
3.    if (local.isDefined) {
4.      logInfo(s"Found block $blockId locally")
5.      return local
6.     }
7.     val remote = getRemoteValues[T](blockId)
8.     if (remote.isDefined) {
9.       logInfo(s"Found block $blockId remotely")
10.      return remote
11.    }
12.    None
13.  }

BlockManager的get方法从Local的角度讲,如果数据在本地,get方法调用getLocalValues获取数据。如果数据在内存中(level.useMemory且memoryStore包含了blockId),则从memoryStore中获取数据;如果数据在磁盘中(level.useDisk且diskStore包含了blockId),则从diskStore中获取数据。这说明数据在本地缓存,可以在内存中,也可以在磁盘上。

BlockManager的get方法从remote的角度讲,get方法中将调用getRemoteValues方法。

BlockManager.Scala的getRemoteValues的源码如下。

1.    private def getRemoteValues[T: ClassTag](blockId: BlockId): Option
      [BlockResult] = {
2.    val ct = implicitly[ClassTag[T]]
3.    getRemoteBytes(blockId).map { data =>
4.      val values =
5.        serializerManager.dataDeserializeStream(blockId, data.toInputStream
          (dispose = true))(ct)
6.      new BlockResult(values, DataReadMethod.Network, data.size)
7.    }
8.  }

getRemoteValues方法中调用getRemoteBytes方法,通过blockTransferService.fetchBlockSync从远程节点获取数据。

BlockManager.Scala的getRemoteBytes的源码如下。

1.   def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
2.     logDebug(s"Getting remote block $blockId")
3.     require(blockId != null, "BlockId is null")
4.     var runningFailureCount = 0
5.     var totalFailureCount = 0
6.     val locations = getLocations(blockId)
7.     val maxFetchFailures = locations.size
8.     var locationIterator = locations.iterator
9.     while (locationIterator.hasNext) {
10.      val loc = locationIterator.next()
11.      logDebug(s"Getting remote block $blockId from $loc")
12.      val data = try {
13.        blockTransferService.fetchBlockSync(
14.          loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()
15.      } catch {
16.        case NonFatal(e) =>
17.          runningFailureCount += 1
18.          totalFailureCount += 1
19.
20.          if (totalFailureCount >= maxFetchFailures) {
21.            //放弃尝试的位置。要么我们已经尝试了所有的原始位置,或者我们已经从master
               //节点刷新了位置列表,并且仍然在刷新列表中尝试位置后命中失败logWarning
               //(s"Failed to fetch block after $totalFailureCount fetch failures."+
               //s"Most recent failure cause:", e)
22.
23.            return None
24.          }
25.
26.          logWarning(s"Failed to fetch remote block $blockId " +
27.            s"from $loc (failed attempt $runningFailureCount)", e)
28.
29.    //如果有大量的Executors,那么位置列表可以包含一个旧的条目造成大量重试,可能花
       //费大量的时间。在一定数量的获取失败之后,为去掉这些旧的条目,我们刷新块位置
30.          if (runningFailureCount >= maxFailuresBeforeLocationRefresh) {
31.            locationIterator = getLocations(blockId).iterator
32.            logDebug(s"Refreshed locations from the driver " +
33.              s"after ${runningFailureCount} fetch failures.")
34.            runningFailureCount = 0
35.          }
36.
37.          //此位置失败,所以我们尝试从不同的位置获取,这里返回一个null
38.
39.      }
40.
41.      if (data != null) {
42.        return Some(new ChunkedByteBuffer(data))
43.      }
44.      logDebug(s"The value of block $blockId is null")
45.    }
46.    logDebug(s"Block $blockId not found")
47.    None
48.  }

BlockManager的get方法,如果本地有数据,则从本地获取数据返回;如果远程有数据,则从远程获取数据返回;如果都没有数据,就返回None。get方法的返回类型是Option[BlockResult],Option的结果分为两种情况:①如果有内容,则返回Some[BlockResult;②如果没有内容,则返回None。这是Option的基础语法。

Option.scala的源码如下。

1.    sealed abstract class Option[+A] extends Product with Serializable {
2.    self =>
3.  .....
4.  final case class Some[+A](x: A) extends Option[A] {
5.    def isEmpty = false
6.    def get = x
7.  }
8.
9.  .......
10. case object None extends Option[Nothing] {
11.   def isEmpty = true
12.   def get = throw new NoSuchElementException("None.get")
13. }

回到BlockManager的getOrElseUpdate方法,从get方法返回的结果进行模式匹配,如果有数据,则对Some(block)返回Left(block),这是获取到block的情况;如果没数据,则是None,须计算block。

回到RDD.scala的getOrCompute方法,在getOrCompute方法中调用SparkEnv.get. blockManager.getOrElseUpdate方法时,传入blockId、storageLevel、elementClassTag,其中第四个参数是一个匿名函数,在匿名函数中调用了computeOrReadCheckpoint(partition, context)。然后在getOrElseUpdate方法中,根据blockId获取数据,如果获取到缓存数据,就返回;如果没有数据,就调用doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true)进行计算,doPutIterator其中第二个参数makeIterator就是getOrElseUpdate方法中传入的匿名函数,在匿名函数中获取到Iterator数据。

RDD.getOrCompute中computeOrReadCheckpoint方法,如果RDD进行了checkpoint,则从父RDD的iterator中直接获取数据;或者没有Checkpoint物化,则重新计算RDD的数据。

RDD.scala的computeOrReadCheckpoint的源码如下。

1.   private[spark] def computeOrReadCheckpoint(split: Partition, context:
     TaskContext): Iterator[T] =
2.  {
3.    if (isCheckpointedAndMaterialized) {
4.      firstParent[T].iterator(split, context)
5.    } else {
6.      compute(split, context)
7.    }
8.  }

BlockManager.scala的getOrElseUpdate方法中如果根据blockID没有获取到本地数据,则调用doPutIterator将通过BlockManager再次进行持久化。

BlockManager.scala的getOrElseUpdate方法的源码如下。

1.   def getOrElseUpdate[T](
2.        blockId: BlockId,
3.        level: StorageLevel,
4.        classTag: ClassTag[T],
5.        makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
6.      //尝试从本地或远程存储读取块。如果它存在,那么我们就不需要通过本地GET或PUT路
        //径获取
7.      get[T](blockId)(classTag) match {
8.        case Some(block) =>
9.          return Left(block)
10.       case _ =>
11.         //Need to compute the block.
12.     }
13.     //起初我们不锁这个块
14.     doPutIterator(blockId, makeIterator, level, classTag, keepReadLock =
        true) match {
15. .......

BlockManager.scala的getOrElseUpdate方法中调用了doPutIterator。doPutIterator将makeIterator从父RDD的checkpoint读取的数据或者重新计算的数据存放到内存中,如果内存不够,就溢出到磁盘中持久化。

Spark 2.1.1版本的BlockManager.scala的doPutIterator方法的源码如下。

1.  private def doPutIterator[T](
2.    blockId: BlockId,
3.    iterator: () => Iterator[T],
4.    level: StorageLevel,
5.    classTag: ClassTag[T],
6.    tellMaster: Boolean = true,
7.    keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator[T]]={
8.  doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock
    = keepReadLock) { info =>
9.   val startTimeMs = System.currentTimeMillis
10.  var iteratorFromFailedMemoryStorePut: Option[PartiallyUnrolledIterator
     [T]] = None
11.  //块的大小为字节
12.  var size = 0L
13.  if (level.useMemory) {
14.    //首先把它放在内存中,即使useDisk设置为true;如果内存存储不能保存,我们
       //稍后会把它放在磁盘上
15.    if (level.deserialized) {
16.      memoryStore.putIteratorAsValues(blockId, iterator(), classTag)
         match {
17.        case Right(s) =>
18.          size = s
19.        case Left(iter) =>
20.          //没有足够的空间来展开块;如果适用,可以溢出到磁盘
21.          if (level.useDisk) {
22.            logWarning(s"Persisting block $blockId to disk instead.")
23.            diskStore.put(blockId) { fileOutputStream =>
24.              serializerManager.dataSerializeStream(blockId,
                 fileOutputStream, iter)(classTag)
25.            }
26.            size = diskStore.getSize(blockId)
27.          } else {
28.            iteratorFromFailedMemoryStorePut = Some(iter)
29.          }
30.      }
31.    } else { //!level.deserialized
32.      memoryStore.putIteratorAsBytes(blockId, iterator(), classTag,
         level.memoryMode) match {
33.        case Right(s) =>
34.          size = s
35.        case Left(partiallySerializedValues) =>
36.          //没有足够的空间来展开块;如果适用,可以溢出到磁盘
37.          if (level.useDisk) {
38.            logWarning(s"Persisting block $blockId to disk instead.")
39.            diskStore.put(blockId) { fileOutputStream =>
40.              partiallySerializedValues.finishWritingToStream
                 (fileOutputStream)
41.            }
42.            size = diskStore.getSize(blockId)
43.          } else {
44.            iteratorFromFailedMemoryStorePut = Some
               (partiallySerializedValues. valuesIterator)
45.          }
46.      }
47.    }
48.
49.  } else if (level.useDisk) {
50.    diskStore.put(blockId) { fileOutputStream =>
51.      serializerManager.dataSerializeStream(blockId,          fileOutputStream,
         iterator())(classTag)
52.    }
53.    size = diskStore.getSize(blockId)
54.  }
55.
56.  val putBlockStatus = getCurrentBlockStatus(blockId, info)
57.  val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
58.  if (blockWasSuccessfullyStored) {
59.    //现在块位于内存或磁盘存储中,通知master
60.        info.size = size
61.        if (tellMaster && info.tellMaster) {
62.          reportBlockStatus(blockId, putBlockStatus)
63.        }
64.        addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
65.        logDebug("Put block %s locally took %s".format(blockId, Utils.
           getUsedTimeMs(startTimeMs)))
66.        if (level.replication > 1) {
67.          val remoteStartTime = System.currentTimeMillis
68.          val bytesToReplicate = doGetLocalBytes(blockId, info)
69.          //[SPARK-16550] 使用默认的序列化时擦除      classTag  类型,当反序列化类时
             //NettyBlockRpcServer崩溃。待办事项(EKL)删除远程节点类装载器的问题
             //已经修复val remoteClassTag = if (!serializerManager.canUseKryo
             //(classTag)) {
70.            scala.reflect.classTag[Any]
71.          } else {
72.            classTag
73.          }
74.          try {
75.            replicate(blockId, bytesToReplicate, level, remoteClassTag)
76.          } finally {
77.            bytesToReplicate.unmap()
78.          }
79.          logDebug("Put block %s remotely took %s"
80.            .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
81.        }
82.      }
83.     assert(blockWasSuccessfullyStored == iteratorFromFailedMemoryStorePut.
        isEmpty)
84.      iteratorFromFailedMemoryStorePut
85.    }
86.  }

Spark 2.2.0版本的BlockManager.scala的doPutIterator方法的源码与Spark 2.1.1版本相比具有如下特点。

 上段代码中第23、39、50行fileOutputStream的名称更新为channel。

 上段代码中第23、39、50行之后新增加一行代码val out = Channels.newOutputStream (channel)。

 上段代码中第24、51行serializerManager.dataSerializeStream的第2个参数调整为out。

 上段代码中第40行fileOutputStream参数调整为out。

 上段代码中第77行bytesToReplicate.unmap()方法调整为bytesToReplicate.dispose()。

1.   ......
2.     diskStore.put(blockId) { channel =>
3.                    val out = Channels.newOutputStream(channel)
4.                   serializerManager.dataSerializeStream(blockId, out, iter)
                     (classTag)
5.                  }
6.  .......
7.  diskStore.put(blockId) { channel =>
8.                    val out = Channels.newOutputStream(channel)
9.                    partiallySerializedValues.finishWritingToStream(out)
10. .......
11.         diskStore.put(blockId) { channel =>
12.           val out = Channels.newOutputStream(channel)
13.          serializerManager.dataSerializeStream(blockId, out, iterator())
             (classTag)
14.         }
15. ......
16.             bytesToReplicate.dispose()
17. ........