3.4 解析Spark中的DAG逻辑视图

本节讲解DAG生成的机制,通过DAG,Spark可以对计算的流程进行优化;通过WordCounts的示例对DAG逻辑视图进行解析。

3.4.1 DAG生成的机制

在图论中,如果一个有向图无法从任意顶点出发经过若干条边回到该点,则这个图是一个有向无环图(DAG图)。而在Spark中,由于计算过程很多时候会有先后顺序,受制于某些任务必须比另一些任务较早执行的限制,我们必须对任务进行排队,形成一个队列的任务集合,这个队列的任务集合就是DAG图,每一个定点就是一个任务,每一条边代表一种限制约束(Spark中的依赖关系)。

通过DAG,Spark可以对计算的流程进行优化,对于数据处理,可以将在单一节点上进行的计算操作进行合并,并且计算中间数据通过内存进行高效读写,对于数据处理,需要涉及Shuffle操作的步骤划分Stage,从而使计算资源的利用更加高效和合理,减少计算资源的等待过程,减少计算中间数据读写产生的时间浪费(基于内存的高效读写)。

Spark中DAG生成过程的重点是对Stage的划分,其划分的依据是RDD的依赖关系,对于不同的依赖关系,高层调度器会进行不同的处理。对于窄依赖,RDD之间的数据不需要进行Shuffle,多个数据处理可以在同一台机器的内存中完成,所以窄依赖在Spark中被划分为同一个Stage;对于宽依赖,由于Shuffle的存在,必须等到父RDD的Shuffle处理完成后,才能开始接下来的计算,所以会在此处进行Stage的切分。

在Spark中,DAG生成的流程关键在于回溯,在程序提交后,高层调度器将所有的RDD看成是一个Stage,然后对此Stage进行从后往前的回溯,遇到Shuffle就断开,遇到窄依赖,则归并到同一个Stage。等到所有的步骤回溯完成,便生成一个DAG图。

DAG生成的相关源码位于Spark的DAGScheduler.scala。getOrCreateParentStages获取或创建一个给定RDD的父Stages列表,getOrCreateParentStages调用了getShuffleDependencies (rdd),getShuffleDependencies返回给定RDD的父节点中直接的Shuffle依赖。

DAGScheduler.scala的getOrCreateParentStages的源码如下。

1.   private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int):
     List[Stage] = {
2.      getShuffleDependencies(rdd).map { shuffleDep =>
3.        getOrCreateShuffleMapStage(shuffleDep, firstJobId)
4.      }.toList
5.    }
6.
7.  ......
8.  private[scheduler] def getShuffleDependencies(
9.        rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
10.     val parents = new HashSet[ShuffleDependency[_, _, _]]
11.     val visited = new HashSet[RDD[_]]
12.     val waitingForVisit = new Stack[RDD[_]]
13.     waitingForVisit.push(rdd)
14.     while (waitingForVisit.nonEmpty) {
15.       val toVisit  = waitingForVisit.pop()
16.       if (!visited(toVisit)) {
17.         visited += toVisit
18.         toVisit.dependencies.foreach {
19.           case shuffleDep: ShuffleDependency[_, _, _] =>
20.             parents += shuffleDep
21.           case dependency =>
22.             waitingForVisit.push(dependency.rdd)
23.         }
24.       }
25.     }
26.     parents
27.   }

3.4.2 DAG逻辑视图解析

本节通过一个简单计数案例讲解DAG具体的生成流程和关系。示例代码如下。

1.  val conf = new SparkConf()//创建SparkConf
2.  conf.setAppName("Wow,My First Spark App")//设置应用名称
3.  conf.setMaster("local")//在本地运行
4.  val sc =new SparkContext(conf)
5.  val lines = sc.textFile ("C://Users//feng//IdeaProjects//WordCount//src
    //SparkText.txt",1)
6.  //操作1,flatMap由lines通过flatMap操作形成新的MapPartitionRDD
7.  val words = lines.flatMap{ lines => lines.split(" ") }
8.  //操作2,map 由word通过Map操作形成新的MapPartitionRDD
9.  val pairs =words.map { word => (word,1) }
10. //操作3,reduceByKey(包含2步reduce)
11. //此步骤生成MapPartitionRDD和ShuffleRDD
12. val WordCounts =pairs.reduceByKey(_+_)
13. WordCounts.collect.foreach(println)
14. sc.stop()

在程序正式运行前,Spark的DAG调度器会将整个流程设定为一个Stage,此Stage包含3个操作,5个RDD,分别为MapPartitionRDD(读取文件数据时)、MapPartitionRDD(flatMap操作)、MapPartitionRDD(map操作)、MapPartitionRDD(reduceByKey的local段的操作)、ShuffleRDD(reduceByKeyshuffle操作)。

(1)回溯整个流程,在shuffleRDD与MapPartitionRDD(reduceByKey的local段的操作)中存在shuffle操作,整个RDD先在此切开,形成两个Stage。

(2)继续向前回溯,MapPartitionRDD(reduceByKey的local段的操作)与MapPartitionRDD (map操作)中间不存在Shuffle(即两个RDD的依赖关系为窄依赖),归为同一个Stage。

(3)继续回溯,发现往前的所有的RDD之间都不存在Shuffle,应归为同一个Stage。

(4)回溯完成,形成DAG,由两个Stage构成:

 第一个Stage由MapPartitionRDD(读取文件数据时)、MapPartitionRDD(flatMap操作)、MapPartitionRDD(map操作)、MapPartitionRDD(reduceByKey的local段的操作)构成,如图3-4所示。

图3-4 Stage 0的构成

 第二个Stage由ShuffleRDD(reduceByKey Shuffle操作)构成,如图3-5所示。

图3-5 Stage 1的构成