8.1 Job到底在什么时候产生

典型的Job逻辑执行图如图8-1所示,经过下面四个步骤可以得到最终执行结果。

图8-1 典型的Job逻辑执行图

(1)从数据源(可以是本地file、内存数据结构、HDFS、HBase等)读取数据创建最初的RDD。

(2)对RDD进行一系列的transformation()操作,每个transformation()会产生一个或多个包含不同类型T的RDD[T]。T可以是Scala里面的基本类型或数据结构,不限于(K, V)。

(3)对最后的final RDD进行action()操作,每个partition计算后产生结果result。

(4)将result回送到driver端,进行最后的f(list[result])计算。RDD可以被Cache到内存或者checkpoint到磁盘上。RDD中的partition个数不固定,通常由用户设定。RDD和RDD间的partition的依赖关系可以不是1对1,如图8-1所示,既有1对1关系,也有多对多关系。

8.1.1 触发Job的原理和源码解析

对于Spark Job触发流程的源码,以RDD的count方法为例开始。RDD的count方法代码如下所示。

1.  /**
     * 返回RDD中元素的数量
2.   */
3.  def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

从上面的代码可以看出,count方法触发SparkContext的runJob方法的调用。SparkContext的runJob方法代码如下所示。

1.    /**
       * 触发一个Job处理一个RDD的所有partitions,并且把处理结果返回到一个数组
2.     */
3.    def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] ={
4.  runJob(rdd, func, 0 until rdd.partitions.length)
5.    }

进入SparkContext的runJob方法的同名重载方法,代码如下所示。

1.    /**
       *触发一个Job处理一个RDD的指定部分的partitions,并且把处理结果返回到一个数组
       *比第一个runJob方法多了一个partitions数组参数
2.     */
3.    def runJob[T, U: ClassTag](
4.  rdd: RDD[T],
5.  func: Iterator[T] => U,
6.        partitions: Seq[Int]): Array[U] = {
7.      val cleanedFunc = clean(func)
8.  runJob(rdd, (ctx: TaskContext, it: Iterator[T]) =>cleanedFunc(it),
    partitions)
9.    }

再进入SparkContext的runJob方法的另一个同名重载方法,代码如下所示。

1.    /**
       *触发一个Job处理一个RDD的指定部分的partitions,并且把处理结果返回到一个数组
       *比第一个runJob方法多了一个partitions数组参数,并且func的类型不同
2.     */
3.    def runJob[T, U: ClassTag](
4.  rdd: RDD[T],
5.  func: (TaskContext, Iterator[T]) => U,
6.        partitions: Seq[Int]): Array[U] = {
7.      val results = new Array[U](partitions.size)
8.  runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
9.      results
10.   }

最后一次进入SparkContext的runJob方法的另一个同名重载方法,代码如下所示。

1.    /**
       *触发一个Job处理一个RDD的指定部分的partitions,并把处理结果给指定的handler
       *函数,这是Spark所有Action的主入口
2.     */
3.    def runJob[T, U: ClassTag](
4.  rdd: RDD[T],
5.  func: (TaskContext, Iterator[T]) => U,
6.        partitions: Seq[Int],
7.  resultHandler: (Int, U) => Unit): Unit = {
8.      if (stopped.get()) {
9.        throw new IllegalStateException("SparkContext has been shutdown")
10.     }
11. //记录了方法调用的方法栈
12.     val callSite = getCallSite
13. //清除闭包,为了函数能够序列化
14.     val cleanedFunc = clean(func)
15. logInfo("Starting Job: " + callSite.shortForm)
16.     if (conf.getBoolean("spark.logLineage", false)) {
17. logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
18.     }
19.     //向高层调度器(DAGScheduler)提交Job,从而获得Job执行结果
20. dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler,
    localProperties.get)
21. progressBar.foreach(_.finishAll())
22. rdd.doCheckpoint()
23.   }

8.1.2 触发Job的算子案例

Spark Application里可以产生一个或者多个Job,例如,spark-shell默认启动时内部就没有Job,只是作为资源的分配程序,可以在spark-shell里面写代码产生若干个Job,普通程序中一般可以有不同的Action,每个Action一般也会触发一个Job。

给定Job的逻辑执行图,如何生成物理执行图,也就是给定这样一个复杂数据依赖图,如何合理划分Stage,并确定Task的类型和个数?

一个直观的想法是将前后关联的RDDs组成一个Stage,每个Stage生成一个Task。这样虽然可以解决问题,但效率不高。除了效率问题,这个想法还有一个更严重的问题:大量中间数据需要存储。对于task来说,其执行结果要么存到磁盘,要么存到内存,或者两者皆有。如果每个箭头都是Task,每个RDD里面的数据都需要存起来,占用空间可想而知。

仔细观察一下逻辑执行图会发现:在每个RDD中,每个Partition是独立的。也就是说,在RDD内部,每个Partition的数据依赖各自不会相互干扰。因此,一个大胆的想法是将整个流程图看成一个Stage,为最后一个finalRDD中的每个Partition分配一个Task。

Spark算法构造和物理执行时最基本的核心:最大化Pipeline。基于Pipeline的思想,数据被使用的时候才开始计算,从数据流动的视角来说,是数据流动到计算的位置。实质上,从逻辑的角度看,是算子在数据上流动。从算法构建的角度而言:肯定是算子作用于数据,所以是算子在数据上流动;方便算法的构建。

从物理执行的角度而言:是数据流动到计算的位置;方便系统最为高效地运行。对于Pipeline而言,数据计算的位置就是每个Stage中最后的RDD,一个震撼人心的内幕真相是:每个Stage中除了最后一个RDD算子是真实的外,前面的算子都是假的。计算的Lazy特性导致计算从后往前回溯,形成Computing Chain,导致的结果是需要首先计算出具体一个Stage内部左侧的RDD中本次计算依赖的Partition,如图8-2所示。

整个Computing Chain根据数据依赖关系自后向前建立,遇到ShuffleDependency后形成Stage。在每个Stage中,每个RDD中的compute()调用parentRDD.iter()将parent RDDs中的records一个个fetch过来。

图8-2 Stage示意图

例如,collect前面的RDD是transformation级别的,不会立即执行。从后往前推,回溯时如果是窄依赖,则在内存中迭代,否则把中间结果写出到磁盘,暂存给后面的计算使用。

依赖分为窄依赖和宽依赖。例如,现实生活中,工作依赖一个对象,是窄依赖,依赖很多对象,是宽依赖。窄依赖除了一对一,还有range级别的依赖,依赖固定的个数,随着数据的规模扩大而改变。如果是宽依赖,DAGScheduler会划分成不同的Stage,Stage内部是基于内存迭代的,也可以基于磁盘迭代,Stage内部计算的逻辑是完全一样的,只是计算的数据不同而已。具体的任务就是计算一个数据分片,一个Partition的大小是128MB。一个partition不是完全精准地等于一个block的大小,一般最后一条记录跨两个block。

Spark程序的运行有两种部署方式:Client和Cluster。

默认情况下建议使用Client模式,此模式下可以看到更多的交互性信息,及运行过程的信息。此时要专门使用一台机器来提交我们的Spark程序,配置和普通的Worker配置一样,而且要和Cluster Manager在同样的网络环境中,因为要指挥所有的Worker去工作,Worker里的线程要和Driver不断地交互。由于Driver要驱动整个集群,频繁地和所有为当前程序分配的Executor去交互,频繁地进行网络通信,所以必须在同样的网络中。

也可以指定部署方式为Cluster,这样真正的Driver会由Master决定在Worker中的某一台机器。Master为你分配的第一个Executor就是Driver级别的Executor。不推荐学习、开发的时候使用Cluster,因为Cluster无法直接看到一些日志信息,所以建议使用Client方式。