4.2 DAGScheduler解析

DAGScheduler是面向Stage的高层调度器。本节讲解DAG的定义、DAG的实例化、DAGScheduler划分Stage的原理、DAGScheduler划分Stage的具体算法、Stage内部Task获取最佳位置的算法等内容。

4.2.1 DAG的定义

DAGScheduler是面向Stage的高层级的调度器,DAGScheduler把DAG拆分成很多的Tasks,每组的Tasks都是一个Stage,解析时是以Shuffle为边界反向解析构建Stage,每当遇到Shuffle,就会产生新的Stage,然后以一个个TaskSet(每个Stage封装一个TaskSet)的形式提交给底层调度器TaskScheduler。DAGScheduler需要记录哪些RDD被存入磁盘等物化动作,同时要寻求Task的最优化调度,如在Stage内部数据的本地性等。DAGScheduler还需要监视因为Shuffle跨节点输出可能导致的失败,如果发现这个Stage失败,可能就要重新提交该Stage。

为了更好地理解Spark高层调度器DAGScheduler,须综合理解RDD、Application、Driver Program、Job内容,还需要了解以下概念。

(1)Stage:一个Job需要拆分成多组任务来完成,每组任务由Stage封装。与一个Job所有涉及的PartitionRDD类似,Stage之间也有依赖关系。

(2)TaskSet:一组任务就是一个TaskSet,对应一个Stage。其中,一个TaskSet的所有Task之间没有Shuffle依赖,因此互相之间可以并行运行。

(3)Task:一个独立的工作单元,由Driver Program发送到Executor上去执行。通常情况下,一个Task处理RDD的一个Partition的数据。根据Task返回类型的不同,Task又分为ShuffleMapTask和ResultTask。

4.2.2 DAG的实例化

在Spark源码中,DAGScheduler是整个Spark Application的入口,即在SparkContext中声明并实例化。在实例化DAGScheduler之前,已经实例化了SchedulerBackend和底层调度器TaskScheduler,而SchedulerBackend和TaskScheduler是通过SparkContext的方法createTaskScheduler实例化的。DAGScheduler在提交TaskSet给底层调度器的时候是面向TaskScheduler接口的,这符合面向对象中依赖抽象,而不依赖具体实现的原则,带来底层资源调度器的可插拔性,以至于Spark可以运行在众多的部署模式上,如Standalone、Yarn、Mesos、Local及其他自定义的部署模式。

SparkContext.scala的源码中相关的代码如下。

1.   class SparkContext(config: SparkConf) extends Logging {
2.  .......
3.  @volatile private var _dagScheduler: DAGScheduler = _
4.  ......
5.    private[spark] def dagScheduler: DAGScheduler = _dagScheduler
6.    private[spark] def dagScheduler_=(ds: DAGScheduler): Unit = {
7.      _dagScheduler = ds
8.    }
9.  ......
10.    val (sched, ts) = SparkContext.createTaskScheduler(this, master,
       deployMode)
11.     _schedulerBackend = sched
12.     _taskScheduler = ts
13.      //实例化DAGScheduler时传入当前的SparkContext实例化对象
14.     _dagScheduler = new DAGScheduler(this)
15.     _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
16. ......
17.  _taskScheduler.start()

DAGScheduler.scala的源码中相关的代码如下。

1.    private[spark]
2.  class DAGScheduler(
3.      private[scheduler] val sc: SparkContext,
4.      private[scheduler] val taskScheduler: TaskScheduler,
5.      listenerBus: LiveListenerBus,
6.      mapOutputTracker: MapOutputTrackerMaster,
7.      blockManagerMaster: BlockManagerMaster,
8.      env: SparkEnv,
9.      clock: Clock = new SystemClock())
10.   extends Logging {
11. ......
12.   def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
13.     this(
14.       sc,
15.      taskScheduler,
16.      sc.listenerBus,
17.      sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
18.      sc.env.blockManager.master,
19.      sc.env)
20.  }
21.
22.  def this(sc: SparkContext) = this(sc, sc.taskScheduler)

4.2.3 DAGScheduler划分Stage的原理

Spark将数据在分布式环境下分区,然后将作业转化为DAG,并分阶段进行DAG的调度和任务的分布式并行处理。DAG将调度提交给DAGScheduler,DAGScheduler调度时会根据是否需要经过Shuffle过程将Job划分为多个Stage。

DAG划分Stage及Stage并行计算示意图如图4-3所示。

图4-3 DAG划分Stage及Stage并行计算示意图

其中,实线圆角方框标识的是RDD,方框中的矩形块为RDD的分区。

在图4-3中,RDD A到RDD B之间,以及RDD F到RDD G之间的数据需要经过Shuffle过程,因此RDD A和RDD F分别是Stage 1跟Stage 3和Stage 2跟Stage 3的划分点。而RDD B到RDD G之间,以及RDD C到RDD D到RDD F和RDD E到RDD F之间的数据不需要经过Shuffle过程,因此,RDD G和RDD B的依赖是窄依赖,RDD B和RDD G划分到同一个Stage 3,RDD F和RDD D和RDD E的依赖以及RDD D和RDD C的依赖是窄依赖,RDD C、RDD D、RDD E和RDD F划分到同一个Stage 2。Stage 1和Stage 2是相互独立的,可以并发执行。而由于Stage 3依赖Stage 1和Stage 2的计算结果,所以Stage 3最后执行计算。

根据以上RDD依赖关系的描述,图4-3中的操作算子中,map和union是窄依赖的操作,因为子RDD(如D)的分区只依赖父RDD(如C)的一个分区,其他常见的窄依赖的操作如filter、flatMap和join(每个分区和已知的分区join)等。groupByKey和join是宽依赖的操作,其他常见的宽依赖的操作如reduceByKey等。

由此可见,在DAGScheduler的调度过程中,Stage阶段的划分是根据是否有Shuffle过程,也就是当存在ShuffleDependency的宽依赖时,需要进行Shuffle,这时才会将作业(Job)划分成多个Stage。

4.2.4 DAGScheduler划分Stage的具体算法

Spark作业调度的时候,在Job提交过程中进行Stage划分以及确定Task的最佳位置。Stage的划分是DAGScheduler工作的核心,涉及作业在集群中怎么运行,Task最佳位置数据本地性的内容。Spark算子的构建是链式的,涉及怎么进行计算,首先是划分Stage,Stage划分以后才是计算的本身;分布式大数据系统追求最大化的数据本地性。数据本地性是指数据进行计算的时候,数据就在内存中,甚至不用计算就直接获得结果。

Spark Application中可以因为不同的Action触发众多的Job。也就是说,一个Application中可以有很多的Job,每个Job是由一个或者多个Stage构成的,后面的Stage依赖于前面的Stage。也就是说,只有前面依赖的Stage计算完毕后,后面的Stage才会运行。

Stage划分的根据是宽依赖。什么时候产生宽依赖呢?例如,reducByKey、groupByKey等。

我们从RDD的collect()方法开始,collect算子是一个Action,会触发Job的运行。

RDD.scala的collect方法的源码调用了runJob方法。

1.  def collect(): Array[T] = withScope {
2.    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
3.    Array.concat(results: _*)
4.  }

进入SparkContext.scala的runJob方法如下。

1.  def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U]
    = {
2.    runJob(rdd, func, 0 until rdd.partitions.length)
3.  }

继续重载runJob方法:

1.  def runJob[T, U: ClassTag](
2.      rdd: RDD[T],
3.      func: Iterator[T] => U,
4.      partitions: Seq[Int]): Array[U] = {
5.    val cleanedFunc = clean(func)
6.    runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it),
      partitions)
7.  }

继续重载runJob方法:

SparkContext.scala的源码如下。

1.     def runJob[T, U: ClassTag](
2.      rdd: RDD[T],
3.      processPartition: Iterator[T] => U,
4.      resultHandler: (Int, U) => Unit)
5.  {
6.    val processFunc = (context: TaskContext, iter: Iterator[T]) =>
      processPartition(iter)
7.    runJob[T, U](rdd, processFunc, 0 until rdd.partitions.length,
      resultHandler)
8.  }

继续重载runJob方法:

1.       def runJob[T, U: ClassTag](
2.       rdd: RDD[T],
3.       func: (TaskContext, Iterator[T]) => U,
4.       partitions: Seq[Int],
5.       resultHandler: (Int, U) => Unit): Unit = {
6.     if (stopped.get()) {
7.       throw new IllegalStateException("SparkContext has been shutdown")
8.     }
9.     val callSite = getCallSite
10.    val cleanedFunc = clean(func)
11.    logInfo("Starting job: " + callSite.shortForm)
12.    if (conf.getBoolean("spark.logLineage", false)) {
13.      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
14.    }
15.    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite,
       resultHandler, localProperties.get)
16.    progressBar.foreach(_.finishAll())
17.    rdd.doCheckpoint()
18.  }

进入DAGScheduler.scala的runJob方法:

Spark 2.1.1版本的DAGScheduler.scala的源码如下。

1.   def runJob[T, U](
2.        rdd: RDD[T],
3.        func: (TaskContext, Iterator[T]) => U,
4.        partitions: Seq[Int],
5.        callSite: CallSite,
6.        resultHandler: (Int, U) => Unit,
7.        properties: Properties): Unit = {
8.      val start = System.nanoTime
9.      val waiter = submitJob(rdd, func, partitions, callSite, resultHandler,
        properties)
10.     //注意:不要调用 Await.ready(future),因为它调用scala.concurrent.blocking,
        //如果使用fork-join pool连接池,则并发SQL执行失败。注意,由于Scala的特质,
        //awaitPermission实际上没有在任何地方使用,所以这里安全地传递null。更多的细
        //节,可参阅SPARK-13747
11.     val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
12.     waiter.completionFuture.ready(Duration.Inf)(awaitPermission)
13.     waiter.completionFuture.value.get match {
14.       case scala.util.Success(_) =>
15.         logInfo("Job %d finished: %s, took %f s".format
16.         (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
17.       case scala.util.Failure(exception) =>
18.         logInfo("Job %d failed: %s, took %f s".format
19.         (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
20.         //SPARK-8644:包括来自用户DAGScheduler异常堆栈跟踪
21.         val callerStackTrace = Thread.currentThread().getStackTrace.tail
22.         exception.setStackTrace(exception.getStackTrace ++
            callerStackTrace)
23.         throw exception
24.     }
25.   }

Spark 2.2.0版本DAGScheduler.scala的源码与Spark 2.1.1版本相比具有如下特点。

 上段代码中第10~12行代码删除。

 上段代码中第13行代码之前新增一行代码:

ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)

Spark 2.2.0版本删掉了null.asInstanceOf[scala.concurrent.CanAwait]的使用,调整为使用ThreadUtils.awaitReady (waiter.completionFuture, Duration.Inf)方法。

1.   ......
2.    ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
3.  ......

DAGScheduler runJob的时候就交给了submitJob,waiter等待作业调度的结果,作业成功或者失败,打印相关的日志信息。进入DAGScheduler的submitJob方法如下。

1.     def submitJob[T, U](
2.       rdd: RDD[T],
3.       func: (TaskContext, Iterator[T]) => U,
4.       partitions: Seq[Int],
5.       callSite: CallSite,
6.       resultHandler: (Int, U) => Unit,
7.       properties: Properties): JobWaiter[U] = {
8.     //检查,以确保我们不在不存在的分区上启动任务
9.     val maxPartitions = rdd.partitions.length
10.    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
11.      throw new IllegalArgumentException(
12.        "Attempting to access a non-existent partition: " + p + ". " +
13.          "Total number of partitions: " + maxPartitions)
14.    }
15.
16.    val jobId = nextJobId.getAndIncrement()
17.    if (partitions.size == 0) {
18.      //如果作业正在运行0个任务,则立即返回
19.      return new JobWaiter[U](this, jobId, 0, resultHandler)
20.    }
21.
22.    assert(partitions.size > 0)
23.    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
24.    val waiter = new JobWaiter(this, jobId, partitions.size,
       resultHandler)
25.    eventProcessLoop.post(JobSubmitted(
26.      jobId, rdd, func2, partitions.toArray, callSite, waiter,
27.      SerializationUtils.clone(properties)))
28.    waiter
29.  }

submitJob方法中,submitJob首先获取rdd.partitions.length,校验运行的时候partitions是否存在。submitJob方法关键的代码是eventProcessLoop.post(JobSubmitted的JobSubmitted,JobSubmitted是一个case class,而不是一个case object,因为application中有很多的Job,不同的Job的JobSubmitted实例不一样,如果使用case object,case object展示的内容是一样的,就像全局唯一变量,而现在我们需要不同的实例,因此使用case class。JobSubmitted的成员finalRDD是最后一个RDD。

由Action(如collect)导致SparkContext.runJob的执行,最终导致DAGScheduler中的submitJob的执行,其核心是通过发送一个case class JobSubmitted对象给eventProcessLoop。其中,JobSubmitted的源码如下。

1.   private[scheduler] case class JobSubmitted(
2.    jobId: Int,
3.    finalRDD: RDD[_],
4.    func: (TaskContext, Iterator[_]) => _,
5.    partitions: Array[Int],
6.    callSite: CallSite,
7.    listener: JobListener,
8.    properties: Properties = null)
9.  extends DAGSchedulerEvent

JobSubmitted是private[scheduler]级别的,用户不可直接调用它。JobSubmitted封装了jobId,封装了最后一个finalRDD,封装了具体对RDD操作的函数func,封装了有哪些partitions要进行计算,也封装了作业监听器listener、状态等内容。

DAGScheduler的submitJob方法关键代码eventProcessLoop.post(JobSubmitted中,将JobSubmitted放入到eventProcessLoop。post就是Java中的post,往一个线程中发一个消息。eventProcessLoop的源码如下。

1.  private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop
    (this)

DAGSchedulerEventProcessLoop继承自EventLoop。

1.   private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler:
     DAGScheduler)
2.  extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with
    Logging {

EventLoop中开启了一个线程eventThread,线程设置成Daemon后台运行的方式;run方法里面调用了onReceive(event)方法。post方法就是往eventQueue.put事件队列中放入一个元素。EventLoop的源码如下。

1.   private[spark] abstract class EventLoop[E](name: String) extends Logging {
2.
3.    private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()
4.
5.    private val stopped = new AtomicBoolean(false)
6.
7.    private val eventThread = new Thread(name) {
8.      setDaemon(true)
9.
10.     override def run(): Unit = {
11.       try {
12.         while (!stopped.get) {
13.           val event = eventQueue.take()
14.           try {
15.             onReceive(event)
16.           } catch {
17.             case NonFatal(e) =>
18.               try {
19.                 onError(e)
20.               } catch {
21.                 case NonFatal(e) => logError("Unexpected error in " + name, e)
22.               }
23.           }
24.         }
25.       } catch {
26.         case ie: InterruptedException => //即使eventQueue不为空,退出
27.         case NonFatal(e) => logError("Unexpected error in " + name, e)
28.       }
29.     }
30.
31.   }
32.
33.   def start(): Unit = {
34.     if (stopped.get) {
35.       throw new IllegalStateException(name + " has already been stopped")
36.     }
37.     //调用OnStart启动事件线程,确保其发生在onReceive方法前
38.     onStart()
39.     eventThread.start()
40.   }
41. ......
42. def post(event: E): Unit = {
43.     eventQueue.put(event)
44.   }

eventProcessLoop是DAGSchedulerEventProcessLoo实例,DAGSchedulerEventProcessLoop继承自EventLoop,具体实现onReceive方法,onReceive方法又调用doOnReceive方法。

doOnReceive收到消息后开始处理。

Spark 2.1.1版本的DAGScheduler.scala的源码如下。

1.   private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
2.     case JobSubmitted(jobId, rdd, func, partitions, callSite, listener,
       properties) =>
3.       dagScheduler.handleJobSubmitted(jobId,           rdd,    func,    partitions,
         callSite, listener, properties)
4.
5.     case MapStageSubmitted(jobId, dependency, callSite, listener,
       properties) =>
6.       dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite,
         listener, properties)
7.
8.     case StageCancelled(stageId) =>
9.       dagScheduler.handleStageCancellation(stageId)
10.
11.    case JobCancelled(jobId) =>
12.      dagScheduler.handleJobCancellation(jobId)
13.
14.    case JobGroupCancelled(groupId) =>
15.      dagScheduler.handleJobGroupCancelled(groupId)
16.
17.    case AllJobsCancelled =>
18.      dagScheduler.doCancelAllJobs()
19.
20.    case ExecutorAdded(execId, host) =>
21.      dagScheduler.handleExecutorAdded(execId, host)
22.
23.    case ExecutorLost(execId, reason) =>
24.      val filesLost = reason match {
25.        case SlaveLost(_, true) => true
26.        case _ => false
27.      }
28.      dagScheduler.handleExecutorLost(execId, filesLost)
29.
30.    case BeginEvent(task, taskInfo) =>
31.      dagScheduler.handleBeginEvent(task, taskInfo)
32.
33.    case GettingResultEvent(taskInfo) =>
34.      dagScheduler.handleGetTaskResult(taskInfo)
35.
36.    case completion: CompletionEvent =>
37.      dagScheduler.handleTaskCompletion(completion)
38.
39.    case TaskSetFailed(taskSet, reason, exception) =>
40.      dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
41.
42.    case ResubmitFailedStages =>
43.      dagScheduler.resubmitFailedStages()
44.  }

Spark 2.2.0版本的DAGScheduler.scala的源码与Spark 2.1.1版本相比具有如下特点。

 上段代码中第8~9行StageCancelled增加一个成员变量reason,说明Stage取消的原因。

 上段代码中第11~12行JobCancelled增加一个成员变量reason,说明Job取消的原因。

1.   ....
2.      case StageCancelled(stageId, reason) =>
3.        dagScheduler.handleStageCancellation(stageId, reason)
4.
5.      case JobCancelled(jobId, reason) =>
6.        dagScheduler.handleJobCancellation(jobId, reason)
7.  .......

总结:EventLoop里面开启一个线程,线程里面不断循环一个队列,post的时候就是将消息放到队列中,由于消息放到队列中,在不断循环,所以可以拿到这个消息,转过来回调方法onReceive(event),在onReceive处理的时候就调用了doOnReceive方法。

关于线程的异步通信:为什么要新开辟一条线程?例如,在DAGScheduler发送消息为何不直接调用doOnReceive,而需要一个消息循环器。DAGScheduler这里自己给自己发消息,不管是自己发消息,还是别人发消息,都采用一条线程去处理,两者处理的逻辑是一致的,扩展性就非常好。使用消息循环器,就能统一处理所有的消息,保证处理的业务逻辑都是一致的。

eventProcessLoop是DAGSchedulerEventProcessLoop的具体实例,而DAGScheduler-EventProcessLoop是EventLoop的子类,具体实现EventLoop的onReceive方法,onReceive方法转过来回调doOnReceive。

在doOnReceive中通过模式匹配的方式把执行路由到case JobSubmitted,调用dagScheduler.handleJobSubmitted方法。

1.  private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
2.     case JobSubmitted(jobId, rdd, func, partitions, callSite, listener,
       properties) =>
3.       dagScheduler.handleJobSubmitted(jobId,           rdd,    func,    partitions,
         callSite, listener, properties)

DAGScheduler的handleJobSubmitted的源码如下。

1.    private[scheduler] def handleJobSubmitted(jobId: Int,
2.       finalRDD: RDD[_],
3.       func: (TaskContext, Iterator[_]) => _,
4.       partitions: Array[Int],
5.       callSite: CallSite,
6.       listener: JobListener,
7.       properties: Properties) {
8.     var finalStage: ResultStage = null
9.     try {
10.      //如果作业运行在HadoopRDD上,而底层HDFS的文件已被删除,那么在创建新的Stage
         //时将会跑出一个异常
11.      finalStage = createResultStage(finalRDD, func, partitions, jobId,
         callSite)
12.    } catch {
13.      case e: Exception =>
14.        logWarning("Creating new stage failed due to exception - job: " +
           jobId, e)
15.        listener.jobFailed(e)
16.        return
17.    }
18.
19.    val job = new ActiveJob(jobId, finalStage, callSite, listener,
       properties)
20.    clearCacheLocs()
21.    logInfo("Got job %s (%s) with %d output partitions".format(
22.      job.jobId, callSite.shortForm, partitions.length))
23.    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
24.    logInfo("Parents of final stage: " + finalStage.parents)
25.    logInfo("Missing parents: " + getMissingParentStages(finalStage))
26.
27.    val jobSubmissionTime = clock.getTimeMillis()
28.    jobIdToActiveJob(jobId) = job
29.    activeJobs += job
30.    finalStage.setActiveJob(job)
31.    val stageIds = jobIdToStageIds(jobId).toArray
32.    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).
       map(_.latestInfo))
33.    listenerBus.post(
34.      SparkListenerJobStart(job.jobId,          jobSubmissionTime,      stageInfos,
         properties))
35.    submitStage(finalStage)
36.  }

Stage开始:每次调用一个runJob就产生一个Job;finalStage是一个ResultStage,最后一个Stage是ResultStage,前面的Stage是ShuffleMapStage。

在handleJobSubmitted中首先创建finalStage,创建finalStage时会建立父Stage的依赖链条。

通过createResultStage创建finalStage,传入的参数包括最后一个finalRDD,操作的函数func,分区partitions、jobId、callSite等内容。创建过程中可能捕获异常。例如,在Hadoop上,底层的hdfs文件被删除了或者被修改了,就出现异常。

createResultStage的源码如下。

1.   private def createResultStage(
2.        rdd: RDD[_],
3.        func: (TaskContext, Iterator[_]) => _,
4.        partitions: Array[Int],
5.        jobId: Int,
6.        callSite: CallSite): ResultStage = {
7.      val parents = getOrCreateParentStages(rdd, jobId)
8.      val id = nextStageId.getAndIncrement()
9.      val stage = new ResultStage(id, rdd, func, partitions, parents, jobId,
        callSite)
10.     stageIdToStage(id) = stage
11.     updateJobIdStageIdMaps(jobId, stage)
12.     stage
13.   }

createResultStage中,基于作业ID,作业ID(jobId)是作为第三个参数传进来的,创建了ResultStage。

createResultStage的getOrCreateParentStages获取或创建一个给定RDD的父Stages列表,新的Stages将提供firstJobId创建。

getOrCreateParentStages的源码如下。

1.     private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int):
       List[Stage] = {
2.    getShuffleDependencies(rdd).map { shuffleDep =>
3.      getOrCreateShuffleMapStage(shuffleDep, firstJobId)
4.    }.toList
5.  }

getOrCreateParentStages调用了getShuffleDependencies(rdd),getShuffleDependencies返回给定RDD的父节点中直接的shuffle依赖。这个函数不会返回更远祖先节点的依赖。例如,如果C shuffle依赖于B,B shuffle依赖于A:A <-- B <-- C。在RDD C中调用getShuffleDependencies函数,将只返回B <-- C的依赖。此功能可用作单元测试。

下面根据DAG划分Stage示意图,如图4-4所示。

图4-4 DAG划分Stage示意图

RDD G在getOrCreateParentStages的getShuffleDependencies的时候同时依赖于RDD B,RDD F;看依赖关系,RDD G和RDD B在同一个Stage里,RDD G和RDD F不在同一个Stage里,根据Shuffle依赖产生了一个新的Stage。如果不是Shuffle级别的依赖,就将其加入waitingForVisit.push(dependency.rdd),waitingForVisit是一个栈Stack,把当前依赖的RDD push进去。然后进行while循环,当waitingForVisit不是空的情况下,将waitingForVisit.pop()的内容弹出来放入到toVisit,如果已经访问过的数据结构visited中没有访问记录,那么toVisit.dependencies再次循环遍历:如果是Shuffle依赖,就加入到parents数据结构;如果是窄依赖,就加入到waitingForVisit。

例如,首先将RDD G放入到waitingForVisit,然后看RDD G的依赖关系,依赖RDD B、RDD F;RDD G和RDD F构成的是宽依赖,所以就加入父Stage里,是一个新的Stage。但如果是窄依赖,就把RDD B放入到栈waitingForVisit中,RDD G和RDD B在同一个Stage中。栈waitingForVisit现在又有新的元素RDD B,然后再次进行循环,获取到宽依赖RDD A,将构成一个新的Stage。RDD G的getShuffleDependencies最终返回HashSet (ShuffleDependency(RDD F),ShuffleDependency(RDD A))。然后getShuffleDependencies(rdd). map遍历调用getOrCreateShuffleMapStage直接创建父Stage。

getShuffleDependencies的源码如下。

1.       private[scheduler] def getShuffleDependencies(
2.       rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
3.     val parents = new HashSet[ShuffleDependency[_, _, _]]
4.     val visited = new HashSet[RDD[_]]
5.     val waitingForVisit = new Stack[RDD[_]]
6.     waitingForVisit.push(rdd)
7.     while (waitingForVisit.nonEmpty) {
8.       val toVisit = waitingForVisit.pop()
9.       if (!visited(toVisit)) {
10.        visited += toVisit
11.        toVisit.dependencies.foreach {
12.          case shuffleDep: ShuffleDependency[_, _, _] =>
13.            parents += shuffleDep
14.          case dependency =>
15.            waitingForVisit.push(dependency.rdd)
16.        }
17.      }
18.    }
19.    parents
20.  }

getOrCreateParentStages方法中通过getShuffleDependencies(rdd).map进行map转换时用了getOrCreateShuffleMapStage方法。如果在shuffleIdToMapStage数据结构中shuffleId已经存在,那就获取一个shuffle map stage,否则,如果shuffle map stage不存在,除了即将进行计算的更远祖先节点的shuffle map stage,还将创建一个自己的shuffle map stage。

getOrCreateShuffleMapStage的源码如下。

1.   private def getOrCreateShuffleMapStage(
2.       shuffleDep: ShuffleDependency[_, _, _],
3.       firstJobId: Int): ShuffleMapStage = {
4.     shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
5.       case Some(stage) =>
6.         stage
7.
8.       case None =>
9.         //创建所有即将计算的祖先shuffle依赖的阶段
10.        getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach
           { dep =>
11.    //尽管getMissingAncestorShuffleDependencies 只返回shuffle 的依赖,其已
       //不在shuffleIdToMapStage中。我们在foreach循环中得到一个特定的依赖是可能
       //的,将被增加到shuffleIdToMapStage依赖中,其是通过早期的依赖关系创建的阶段,
       //参考SPARK-13902
12.          if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
13.            createShuffleMapStage(dep, firstJobId)
14.          }
15.        }
16.        //最后,为给定的shuffle 依赖创建一个阶段
17.        createShuffleMapStage(shuffleDep, firstJobId)
18.    }
19.  }

getOrCreateShuffleMapStage方法中:

 如果根据shuffleId模式匹配获取到Stage,就返回Stage。首先从shuffleIdToMapStage中根据shuffleId获取Stage。shuffleIdToMapStage是一个HashMap数据结构,将Shuffle dependency ID对应到ShuffleMapStage的映射关系,shuffleIdToMapStage只包含当前运行作业的映射数据,当Shuffle Stage作业完成时,Shuffle映射数据将被删除,Shuffle的数据将记录在MapOutputTracker中。

 如果根据shuffleId模式匹配没有获取到Stage,调用getMissingAncestorShuffle-Dependencies方法,createShuffleMapStage创建所有即将进行计算的祖先shuffle依赖的Stages。

getMissingAncestorShuffleDependencies查找shuffle依赖中还没有进行shuffleToMapStage注册的祖先节点。

getMissingAncestorShuffleDependencies的源码如下。

1.      private def getMissingAncestorShuffleDependencies(
2.       rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = {
3.     val ancestors = new Stack[ShuffleDependency[_, _, _]]
4.     val visited = new HashSet[RDD[_]]
5.     //手动维护堆栈来防止通过递归访问造成的堆栈溢出异常
6.     val waitingForVisit = new Stack[RDD[_]]
7.     waitingForVisit.push(rdd)
8.     while (waitingForVisit.nonEmpty) {
9.       val toVisit = waitingForVisit.pop()
10.      if (!visited(toVisit)) {
11.        visited += toVisit
12.        getShuffleDependencies(toVisit).foreach { shuffleDep =>
13.          if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) {
14.            ancestors.push(shuffleDep)
15.            waitingForVisit.push(shuffleDep.rdd)
16.          } //依赖关系及其已经注册的祖先
17.        }
18.      }
19.    }
20.    ancestors
21.  }

createShuffleMapStage根据Shuffle依赖的分区创建一个ShuffleMapStage,如果前一个Stage已生成相同的Shuffle数据,那Shuffle数据仍是可用的,createShuffleMapStage方法将复制Shuffle数据的位置信息去获取数据,无须再重新生成一次数据。createShuffleMapStage的源码如下。

1.   def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId:
     Int): ShuffleMapStage = {
2.      val rdd = shuffleDep.rdd
3.      val numTasks = rdd.partitions.length
4.      val parents = getOrCreateParentStages(rdd, jobId)
5.      val id = nextStageId.getAndIncrement()
6.      val stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId,
        rdd.creationSite, shuffleDep)
7.      stageIdToStage(id) = stage
8.      shuffleIdToMapStage(shuffleDep.shuffleId) = stage
9.      updateJobIdStageIdMaps(jobId, stage)
10.     if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
11.       //以前运行的阶段为这个shuffle生成的分区,对于每个输出仍然可用,将输出位置的
          //信息复制到新阶段(所以没必要重新计算数据)
12.       val    serLocs    =   mapOutputTracker.getSerializedMapOutputStatuses
          (shuffleDep.shuffleId)
13.       val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
14.       (0 until locs.length).foreach { i =>
15.         if (locs(i) ne null) {
16.           //locs(i) will be null if missing
17.           stage.addOutputLoc(i, locs(i))
18.         }
19.       }
20.     } else {
21.       //这里需要注册RDDS与缓存和map输出跟踪器,不能在RDD构造函数实现,因为分区
          //是未知的
22.       logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
23.       mapOutputTracker.registerShuffle(shuffleDep.shuffleId,
          rdd.partitions.length)
24.     }
25.     stage
26.   }

回到handleJobSubmitted,创建finalStage以后将提交finalStage。

1.     private[scheduler] def handleJobSubmitted(jobId: Int,
2.  ......
3.    finalStage = createResultStage(finalRDD, func, partitions, jobId,
      callSite)
4.  ......
5.    submitStage(finalStage)
6.    }

submitStage提交Stage,首先递归提交即将计算的父Stage。

submitStage的源码如下。

1.  private def submitStage(stage: Stage) {
2.    val jobId = activeJobForStage(stage)
3.    if (jobId.isDefined) {
4.      logDebug("submitStage(" + stage + ")")
5.      if (!waitingStages(stage) && !runningStages(stage) && !failedStages
        (stage)) {
6.        val missing = getMissingParentStages(stage).sortBy(_.id)
7.        logDebug("missing: " + missing)
8.        if (missing.isEmpty) {
9.          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has
            no missing parents")
10.          submitMissingTasks(stage, jobId.get)
11.        } else {
12.          for (parent <- missing) {
13.            submitStage(parent)
14.          }
15.          waitingStages += stage
16.        }
17.      }
18.    } else {
19.      abortStage(stage, "No active job for stage " + stage.id, None)
20.    }
21.  }

其中调用了getMissingParentStages,源码如下。

1.     private def getMissingParentStages(stage: Stage): List[Stage] = {
2.     val missing = new HashSet[Stage]
3.     val visited = new HashSet[RDD[_]]
4.     //人工维护堆栈来防止通过递归访问造成的堆栈溢出异常
5.     val waitingForVisit = new Stack[RDD[_]]
6.     def visit(rdd: RDD[_]) {
7.       if (!visited(rdd)) {
8.         visited += rdd
9.         val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
10.        if (rddHasUncachedPartitions) {
11.          for (dep <- rdd.dependencies) {
12.            dep match {
13.              case shufDep: ShuffleDependency[_, _, _] =>
14.                val mapStage = getOrCreateShuffleMapStage(shufDep, stage.
                   firstJobId)
15.                if (!mapStage.isAvailable) {
16.                  missing += mapStage
17.                }
18.              case narrowDep: NarrowDependency[_] =>
19.                waitingForVisit.push(narrowDep.rdd)
20.            }
21.          }
22.        }
23.      }
24.    }
25.    waitingForVisit.push(stage.rdd)
26.    while (waitingForVisit.nonEmpty) {
27.      visit(waitingForVisit.pop())
28.    }
29.    missing.toList
30.  }

接下来,我们结合Spark DAG划分Stage示意(图4-5)进行详细阐述。

RDD A到RDD B,以及RDD F到RDD G之间的数据需要经过Shuffle过程,因此,RDD A和RDD F分别是Stage 1跟Stage 3、Stage 2跟Stage 3的划分点。而RDD B到RDD G没有Shuffle,因此,RDD G和RDD B的依赖是窄依赖,RDD B和RDD G划分到同一个Stage 3;RDD C到RDD D、RDD F,RDD E到RDD F之间的数据不需要经过Shuffle,RDD F和RDD D加RDD E的依赖、RDD D和RDD C的依赖是窄依赖,因此,RDD C、RDD D、RDD E和RDD F划分到同一个Stage 2。Stage 1和Stage 2是相互独立的,可以并发执行。而由于Stage 3依赖Stage 1和Stage 2的计算结果,所以Stage 3最后执行计算。

 createResultStage:基于作业ID(jobId)创建ResultStage。调用getOrCreateParentStages创建所有父Stage,返回parents: List[Stage]作为父Stage,将parents传入ResultStage,实例化生成ResultStage。

在DAG划分Stage示意图中,对RDD G调用createResultStage,通过getOrCreate-ParentStages获取所有父List[Stage]:Stage 1、Stage 2,然后创建自己的Stage 3。

 getOrCreateParentStages:获取或创建给定RDD的父Stage列表。将根据提供的firstJobId创建新的Stages。

图4-5 DAG划分Stage示意图

在DAG划分Stage示意图中,RDD G的getOrCreateParentStages会调用getShuffleDependencies获得RDD G所有直接宽依赖集合HashSet(ShuffleDependency(RDD F), ShuffleDependency(RDD A)),这里是RDD F和RDD A的宽依赖集合,然后遍历集合,对(ShuffleDependency(RDD F), ShuffleDependency(RDD A))分别调用getOrCreateShuffleMapStage。

 对ShuffleDependency(RDD A)调用getOrCreateShuffleMapStage,getOrCreateShuffle-MapStage中根据shuffleDep.shuffleId模式匹配调用getMissingAncestorShuffle-Dependencies,返回为空;对ShuffleDependency(RDD A)调用createShuffleMapStage,RDD A已无父Stage,因此创建Stage 1。

 对ShuffleDependency(RDD F)调用getOrCreateShuffleMapStage,getOrCreateShuffle-MapStage中根据shuffleDep.shuffleId模式匹配调用getMissingAncestorShuffle-Dependencies,返回为空;对ShuffleDependency(RDD F)调用createShuffleMapStage,RDD F之前的RDD C到RDD D、RDD F;RDD E到RDD F之间都没有Shuffle,没有宽依赖就不会产生Stage。因此,RDD F已无父Stage,创建Stage 2。

 最后,把List(Stage 1,Stage 2)作为Stage 3的父Stage,创建Stage 3。Stage 3是ResultStage。

回到DAGScheduler.scala的handleJobSubmitted方法,首先通过createResultStage构建finalStage。

handleJobSubmitted的源码如下。

1.  private[scheduler] def handleJobSubmitted(jobId: Int,
2.    .......
3.      finalStage = createResultStage(finalRDD, func, partitions, jobId,
         callSite)
4.    .......
5.     val job = new ActiveJob(jobId, finalStage, callSite, listener,
       properties)
6.    .......
7.     logInfo("Missing parents: " + getMissingParentStages(finalStage))
8.     ......
9.     submitStage(finalStage)
10.  }

handleJobSubmitted方法中的ActiveJob是一个普通的数据结构,保存了当前Job的一些信息:

1.  private[spark] class ActiveJob(
2.     val jobId: Int,
3.     val finalStage: Stage,
4.     val callSite: CallSite,
5.     val listener: JobListener,
6.     val properties: Properties) {

handleJobSubmitted方法日志打印信息:getMissingParentStages(finalStage)),getMissing-ParentStages根据finalStage找父Stage,如果有父Stage,就直接返回;如果没有父Stage,就进行创建。

handleJobSubmitted方法中的submitStage比较重要。submitStage的源码如下。

1.   private def submitStage(stage: Stage) {
2.      val jobId = activeJobForStage(stage)
3.      if (jobId.isDefined) {
4.        logDebug("submitStage(" + stage + ")")
5.        if (!waitingStages(stage) && !runningStages(stage) && !failedStages
          (stage)) {
6.          val missing = getMissingParentStages(stage).sortBy(_.id)
7.          logDebug("missing: " + missing)
8.          if (missing.isEmpty) {
9.            logInfo("Submitting " + stage + " (" + stage.rdd + "), which has
              no missing parents")
10.           submitMissingTasks(stage, jobId.get)
11.         } else {
12.           for (parent <- missing) {
13.             submitStage(parent)
14.           }
15.           waitingStages += stage
16.         }
17.       }
18.     } else {
19.       abortStage(stage, "No active job for stage " + stage.id, None)
20.     }
21.   }

submitStage首先从activeJobForStage中获得JobID;如果JobID已经定义isDefined,那就获得即将计算的Stage(getMissingParentStages),然后进行升序排列。如果父Stage为空,那么提交submitMissingTasks,DAGScheduler把处理的过程交给具体的TaskScheduler去处理。如果父Stage不为空,将循环递归调用submitStage(parent),从后往前回溯。后面的Stage依赖于前面的Stage。也就是说,只有前面依赖的Stage计算完毕后,后面的Stage才会运行。submitStage一直循环调用,导致的结果是父Stage的父Stage……一直回溯到最左侧的父Stage开始计算。

4.2.5 Stage内部Task获取最佳位置的算法

Task任务本地性算法实现:

DAGScheudler的submitMissingTasks方法中体现了如何利用RDD的本地性得到Task的本地性,从而获取Stage内部Task的最佳位置。接下来看一下submitMissingTasks的源码,关注Stage本身的算法以及任务本地性。runningStages是一个HashSet[Stage]数据结构,表示正在运行的Stages,将当前运行的Stage增加到runningStages中,根据Stage进行判断,如果是ShuffleMapStage,则从getPreferredLocs(stage.rdd, id)获取任务本地性信息;如果是ResultStage,则从getPreferredLocs(stage.rdd, p)获取任务本地性信息。

DAGScheduler.scala的源码如下。

1.   private def submitMissingTasks(stage: Stage, jobId: Int) {
2.    ......
3.     runningStages += stage
4.    ......
5.     stage match {
6.       case s: ShuffleMapStage =>
7.         outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId =
           s.numPartitions - 1)
8.       case s: ResultStage =>
9.         outputCommitCoordinator.stageStart(
10.          stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
11.    }
12.

在submitMissingTasks中会通过调用以下代码来获得任务的本地性。

1.   val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
2.     stage match {
3.       case s: ShuffleMapStage =>
4.         partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd,
           id))}.toMap
5.       case s: ResultStage =>
6.         partitionsToCompute.map { id =>
7.           val p = s.partitions(id)
8.           (id, getPreferredLocs(stage.rdd, p))
9.         }.toMap
10.    }

partitionsToCompute获得要计算的Partitions的id。

1.  val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

如果stage是ShuffleMapStage,在代码partitionsToCompute.map { id => (id, getPreferredLocs (stage.rdd, id))}.toMap中,id是partitions的id,使用匿名函数生成一个Tuple,第一个元素值是数据分片的id,第二个元素是把rdd和id传进去,获取位置getPreferredLocs。然后通过toMap转换,返回Map[Int, Seq[TaskLocation]]。第一个值是partitions的id,第二个值是TaskLocation。

具体一个Partition中的数据本地性的算法实现在下述getPreferredLocs代码中。

1.  private[spark]
2.  def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = {
3.    getPreferredLocsInternal(rdd, partition, new HashSet)
4.  }

getPreferredLocsInternal是getPreferredLocs的递归实现:这个方法是线程安全的,它只能被DAGScheduler通过线程安全方法getCacheLocs()使用。

getPreferredLocsInternal的源码如下。

1.   private def getPreferredLocsInternal(
2.       rdd: RDD[_],
3.       partition: Int,
4.       visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
5.     //如果分区已被访问,则无须重新访问。这避免了路径探索。SPARK-695
6.     if (!visited.add((rdd, partition))) {
7.       //已访问的分区返回零
8.       return Nil
9.     }
10.    //如果分区已经缓存,返回缓存的位置
11.    val cached = getCacheLocs(rdd)(partition)
12.    if (cached.nonEmpty) {
13.      return cached
14.    }
15.    //如果RDD位置优先(输入RDDs的情况),就获取它
16.    val rddPrefs = rdd.preferredLocations(rdd.partitions(partition))
       .toList
17.    if (rddPrefs.nonEmpty) {
18.      return rddPrefs.map(TaskLocation(_))
19.    }
20.
21.    //如果RDD是窄依赖,将选择第一个窄依赖的第一分区作为位置首选项。理想情况下,我们
       //将基于传输大小选择
22.    rdd.dependencies.foreach {
23.      case n: NarrowDependency[_] =>
24.        for (inPart <- n.getParents(partition)) {
25.          val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
26.          if (locs != Nil) {
27.            return locs
28.          }
29.        }
30.
31.      case _ =>
32.    }
33.
34.    Nil
35.  }

getPreferredLocsInternal代码中:

在visited中把当前的RDD和partition加进去是否能成功,visited是一个HashSet,如果已经有就出错。

如果partition被缓存(partition被缓存是指数据已经在DAGScheduler中),则在getCacheLocs(rdd)(partition)传入rdd和partition,获取缓存的位置信息。如果获取到缓存位置信息,就返回。

getCacheLocs的源码如下。

1.  private[scheduler]
2.   def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] =
     cacheLocs.synchronized {
3.     //注意:这个不用getOrElse(),因为方法被调用0(任务数)次
4.     if (!cacheLocs.contains(rdd.id)) {
5.       //注:如果存储级别为NONE,我们不需要从块管理器获取位置信息
6.       val locs: IndexedSeq[Seq[TaskLocation]] = if (rdd.getStorageLevel
         == StorageLevel.NONE) {
7.         IndexedSeq.fill(rdd.partitions.length)(Nil)
8.       } else {
9.         val blockIds =
10.          rdd.partitions.indices.map(index => RDDBlockId(rdd.id,
             index)).toArray[BlockId]
11.        blockManagerMaster.getLocations(blockIds).map { bms =>
12.          bms.map(bm => TaskLocation(bm.host, bm.executorId))
13.        }
14.      }
15.      cacheLocs(rdd.id) = locs
16.    }
17.    cacheLocs(rdd.id)
18.  }

getCacheLocs中的cacheLocs是一个HashMap,包含每个RDD的分区上的缓存位置信息。map的key值是RDD的ID,Value是由分区编号索引的数组。每个数组值是RDD分区缓存位置的集合。

1.  private val cacheLocs = new HashMap [Int, IndexedSeq[Seq[TaskLocation]]]

getPreferredLocsInternal方法在具体算法实现的时候首先查询DAGScheduler的内存数据结构中是否存在当前Partition的数据本地性的信息,如果有,则直接返回;如果没有,首先会调用rdd.getPreferedLocations。

如果自定义RDD,那一定要写getPreferedLocations,这是RDD的五大特征之一。例如,想让Spark运行在HBase上或者运行在一种现在还没有直接支持的数据库上面,此时开发者需要自定义RDD。为了保证Task计算的数据本地性,最关键的方式是必须实现RDD的getPreferedLocations。数据不动代码动,以HBase为例,Spark要操作HBase的数据,要求Spark运行在HBase所在的集群中,HBase是高速数据检索的引擎,数据在哪里,Spark也需要运行在哪里。Spark能支持各种来源的数据,核心就在于getPreferedLocations。如果不实现getPreferedLocations,就要从数据库或HBase中将数据抓过来,速度会很慢。

RDD.scala的getPreferedLocations的源码如下。

1.   final def preferredLocations(split: Partition): Seq[String] = {
2.    checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {
3.      getPreferredLocations(split)
4.    }
5.  }

这是RDD的getPreferredLocations。

1.  protected def getPreferredLocations(split: Partition): Seq[String] = Nil

这样,数据本地性在运行前就已经完成,因为RDD构建的时候已经有元数据的信息。说明:本节代码基于Spark 2.2的源码版本。

DAGScheduler计算数据本地性的时候巧妙地借助了RDD自身的getPreferedLocations中的数据,最大化地优化效率,因为getPreferedLocations中表明了每个Partition的数据本地性,虽然当前Partition可能被persist或者checkpoint,但是persist或者checkpoint默认情况下肯定和getPreferedLocations中的Partition的数据本地性是一致的,所以这就极大地简化了Task数据本地性算法的实现和效率的优化。