4.4 SchedulerBackend解析

本节讲解SchedulerBackend原理剖析、SchedulerBackend源码解析、Spark程序的注册机制、Spark程序对计算资源Executor的管理等内容。

4.4.1 SchedulerBackend原理剖析

以Spark Standalone部署方式为例,StandaloneSchedulerBackend在启动的时候构造了StandaloneAppClient实例,并在该实例start的时候启动了ClientEndpoint消息循环体,ClientEndpoint在启动的时候会向Master注册当前程序。而StandaloneSchedulerBackend的父类CoarseGrainedSchedulerBackend在start的时候会实例化类型为DriverEndPoint(这就是程序运行时的经典的对象Driver)的消息循环体,StandaloneSchedulerBackend专门负责收集Worker上的资源信息,当ExecutorBackend启动的时候,会发送RegisteredExecutor信息向DriverEndpoint注册,此时StandaloneSchedulerBackend就掌握了当前应用程序拥有的计算资源,TaskScheduler就是通过StandaloneSchedulerBackend拥有的计算资源来具体运行Task的。

4.4.2 SchedulerBackend源码解析

StandaloneSchedulerBackend收集和分配资源给调度的Task使用。

StandaloneSchedulerBackend.scala的源码如下。

1.  private[spark] class StandaloneSchedulerBackend(
2.  ......
3.   override def start() {
4.  ......
5.   val command = Command("org.apache.spark.executor.
     CoarseGrainedExecutorBackend",
6.        args, sc.executorEnvs, classPathEntries ++ testingClassPath,
          libraryPathEntries, javaOpts)
7.  ......
8.    client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this,
      conf)
9.      client.start()
10. ......

在StandaloneAppClient的start方法中调用new()函数创建一个ClientEndpoint,将在ClientEndpoint中向Master注册。

StandaloneAppClient.scala的源码如下。

1.  ......
2.    def start() {
3.      //启动 rpcEndpoint; 将直接回调到listener
4.      endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint
        (rpcEnv)))
5.    }

4.4.3 Spark程序的注册机制

在上面的源码分析中,StandaloneAppClient在启动的时候创建了StandaloneAppClient内部类ClientEndpoint的实例对象作为消息循环体,以便向Master注册当前的Application。既然ClientEndpoint是RpcEndpoint的子类,那么就会有这样的生命周期:constructor -> onStart -> receive ->onStop。根据这个原理,我们来看ClientEndpoint的onStart方法代码。

StandaloneAppClient.scala的源码如下。

1.  override def onStart(): Unit = {
2.    try {
3.      registerWithMaster(1)
4.    } catch {
5.   .......

ClientEndpoint在启动时就立即调用registerWithMaster来注册Application,继续查看registerWithMaster方法代码。

StandaloneAppClient.scala的源码如下。

1.    private def registerWithMaster(nthRetry: Int) {
2.  //向所有Master异步地尝试注册Application
3.        registerMasterFutures.set(tryRegisterAllMasters())
4.      ......
5.      }

ClientEndpoint在tryRegiesterAllMasters方法中会向所有的Master尝试注册Application。向Master发送RegisterApplication消息。

StandaloneAppClient.scala的源码如下。

1.    private def tryRegisterAllMasters(): Array[JFuture[_]] = {
2.  ......
3.     val   masterRef     =  rpcEnv.setupEndpointRef(masterAddress,  Master.
       ENDPOINT_NAME)
4.            masterRef.send(RegisterApplication(appDescription, self))
5.  ......

Master也是RpcEndpoint的子类,所以可以通过receive方法接收DeployMessage类型的消息RegisterApplication。

Master.scala的源码如下。

1.       override def receive: PartialFunction[Any, Unit] = {
2.    ......
3.  case RegisterApplication(description, driver) =>
4.     .......
5.         registerApplication(app)
6.       .......
7.         driver.send(RegisteredApplication(app.id, self))
8.    .......

ClientEndpoint最后在receive方法中得到来自Master注册好Application的确认消息RegisteredApplication。

StandaloneAppClient.scala的源码如下。

1.   override def receive: PartialFunction[Any, Unit] = {
2.     case RegisteredApplication(appId_, masterRef) =>
3.  ......
4.       appId.set(appId_)
5.       registered.set(true)
6.   ......

至此,Application向Master注册完毕。在上面的RegisterApplication中,调用了schedule方法,这个方法将完成Application的调度,并在Worker节点上启动分配好的Executor给Application使用。

4.4.4 Spark程序对计算资源Executor的管理

从TaskSchedulerImpl的submitTasks的方法中我们知道,Spark Standalone部署模式调用StandaloneSchedulerBackend的reviveOffers方法进行TaskSet所需要资源的分配,得到足够的资源后,将TaskSet中的Task逐个发送到Executor去执行。下面来看这里的资源,即Executor是如何得到和分配的。

StandaloneSchedulerBackend的reviveOffers方法很简单,就是发送一个ReviveOffers消息给内部类DriverEndpoint,代码如下所示。

CoarseGrainedSchedulerBackend.scala的源码如下。

1.  override def reviveOffers() {
2.    driverEndpoint.send(ReviveOffers)
3.  }

DriverEndpoint的receive方法处理ReviveOffers消息也很简单,就是调用makeOffers方法。receive方法部分关键代码如下所示。

CoarseGrainedSchedulerBackend.scala的源码如下。

1.  override def receive: PartialFunction[Any, Unit] = {
2.  ......
3.  case ReviveOffers =>
4.          makeOffers()
5.  ......

DriverEndpoint的makeOffers方法首先过滤出Alive状态的Executor放到activeExecutorsHahMap变量中,然后使用id、ExecutorData.ExecutorHost、ExecutorData.freeCores构建代表Executor可用资源的WorkerOffer。然后是最重要的两个方法调用。先是调用TaskSchedulerImpl的resourceOffers得到TaskDescription的二维数组,包含Task ID、Executor ID、Task Index等Task执行需要的信息。然后回调DriverEndpoint的launchTask给每个Task对应的Executor发执行Task的LaunchTask消息(其实是由CourseGrainedExecutorBackend转发LauchTask消息)。

TaskSchedulerImpl的resourceOffers方法返回二维数组TaskDescription后作为DriverEndpoint的launchTasks方法的参数,DriverEndpoint的launchTasks方法中首先对传入的tasks进行扁平化操作(例如,将多维数组降维成一维数组),得到所有的Task,然后遍历所有的Task。在遍历过程中,调用serialize()方法对task进行序列化,得到serializedTask。判断如果serializedTask大于等于Akka帧减去Akka预留空间大小,则调用TaskSetManager的abort方法终止该任务的执行,否则将LaunchTask(new SerializableBuffer(serializedTask))消息发送到CoarseGrainedExecutorBackend。

CoarseGrainedExecutorBackend匹配到LaunchTask(data)消息后,首先调用deserialized方法,反序列化出task,然后调用Executor的lauchTask方法执行Task的处理。