5.5 Executor执行结果的处理方式

本节讲解Executor工作原理、ExecutorBackend注册源码解密、Executor实例化内幕、Executor具体工作内幕。

Master让Worker启动,启动了一个Executor所在的进程。在Standalone模式中,Executor所在的进程是CoarseGrainedExecutorBackend。

 Master侧:Master发指令给Worker,启动Executor。

 Worker侧:Worker接收到Master发过来的指令,通过ExecutorRunner启动另外一个进程来运行Executor。这里是指启动另外一个进程来启动Executor,而不是直接启动Executor。Master向Worker发送指令,Worker为什么启动另外一个进程?在另外一个进程中注册给Driver,然后启动Executor。因为Worker是管理机器上的资源的,所以机器上的资源变动时要汇报给Master。Worker不是用来计算的,不能在Worker中进行计算;Spark集群中有很多应用程序,需要很多Executor,如果不是给每个Executor启动一个对应的进程,而是所有的应用程序进程都在同一个Executor里面,那么一个程序崩溃将导致其他程序也崩溃。

 启动CoarseGrainedExecutorBackend。CoarseGrainedExecutorBackend是Executor所在的进程。CoarseGrainedExecutorBackend启动时,须向Driver注册。通过发送RegisterExecutor向Driver注册,注册的内容是RegisterExecutor。

CoarseGrainedExecutorBackend.scala的onStart方法的源码如下。

1.   override def onStart() {
2.      logInfo("Connecting to driver: " + driverUrl)
3.      rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
4.        //这是一个非常快的Action,所以可以用ThreadUtils.sameThread
5.        driver = Some(ref)
6.        ref.ask[Boolean](RegisterExecutor(executorId,              self,  hostname,
          cores, extractLogUrls))
7.      }(ThreadUtils.sameThread).onComplete {
8.        //这是一个非常快的Action,所以可以用ThreadUtils.sameThread
9.        case Success(msg) =>
10.         //经常收到true,可忽略
11.       case Failure(e) =>
12.         exitExecutor(1, s"Cannot register with driver: $driverUrl", e,
            notifyDriver = false)
13.     }(ThreadUtils.sameThread)
14.   }

其中,RegisterExecutor是一个case class,源码如下。

1.   case class RegisterExecutor(
2.    executorId: String,
3.    executorRef: RpcEndpointRef,
4.    hostname: String,
5.    cores: Int,
6.    logUrls: Map[String, String])
7.  extends CoarseGrainedClusterMessage

CoarseGrainedExecutorBackend启动时,向Driver发送RegisterExecutor消息进行注册;Driver收到RegisterExecutor消息,在Executor注册成功后会返回消息RegisteredExecutor给CoarseGrainedExecutorBackend。这里注册的Executor和真正工作的Executor没有任何关系,其实注册的是RegisterExecutorBackend。可以将RegisteredExecutor理解为RegisterExecutorBackend。

需要特别注意的是,在CoarseGrainedExecutorBackend启动时向Driver注册Executor,其实质是注册ExecutorBackend实例,和Executor实例之间没有直接关系。

 CoarseGrainedExecutorBackend是Executor运行所在的进程名称,CoarseGrained-ExecutorBackend本身不会完成任务的计算。

 Executor才是正在处理任务的对象。Executor内部是通过线程池的方式来完成Task的计算的。Executor对象运行于CoarseGrainedExecutorBackend进程。

 CoarseGrainedExecutorBackend和Executor是一一对应的。

 CoarseGrainedExecutorBackend是一个消息通信体(其具体实现了ThreadSafeRPCEndpoint),可以发送信息给Driver,并可以接受Driver中发过来的指令,如启动Task等。

CoarseGrainedExecutorBackend继承自ThreadSafeRpcEndpoint,CoarseGrainedExecutor-Backend是一个消息通信体,可以收消息,也可以发消息。源码如下。

1.    private[spark] class CoarseGrainedExecutorBackend(
2.    override val rpcEnv: RpcEnv,
3.    driverUrl: String,
4.    executorId: String,
5.    hostname: String,
6.    cores: Int,
7.    userClassPath: Seq[URL],
8.    env: SparkEnv)
9.  extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging {

CoarseGrainedExecutorBackend发消息给Driver。Driver在StandaloneSchedulerBackend里面(Spark 2.0中已将SparkDeploySchedulerBackend更名为StandaloneSchedulerBackend)。StandaloneSchedulerBackend继承自CoarseGrainedSchedulerBackend,start启动时启动StandaloneAppClient。StandaloneAppClient(Spark 2.0中已将AppClient更名为StandaloneApp-Client)代表应用程序本身。

StandaloneAppClient的源码如下。

1.      private[spark] class StandaloneAppClient(
2.      rpcEnv: RpcEnv,
3.      masterUrls: Array[String],
4.      appDescription: ApplicationDescription,
5.      listener: StandaloneAppClientListener,
6.      conf: SparkConf)
7.    extends Logging {
8.  ......
9.  private    class   ClientEndpoint(override     val  rpcEnv:  RpcEnv) extends
    ThreadSafeRpcEndpoint
10.     with Logging {
11. ......

在Driver进程中有两个至关重要的Endpoint。

 ClientEndpoint:主要负责向Master注册当前的程序,是AppClient的内部成员。

 DriverEndpoint:这是整个程序运行时的驱动器,是CoarseGrainedExecutorBackend的内部成员。

CoarseGrainedSchedulerBackend的DriverEndpoint的源码如下。

1.  class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties:
    Seq[(String, String)])
2.  extends ThreadSafeRpcEndpoint with Logging {

DriverEndpoint会接收到RegisterExecutor消息,并完成在Driver上的注册。

RegisterExecutor中有一个数据结构executorDataMap,是Key-Value的方式。

1.  private val executorDataMap = new HashMap[String, ExecutorData]

ExecutorData中的executorEndpoint是RpcEndpointRef。ExecutorData的源码如下。

1.   private[cluster] class ExecutorData(
2.     val executorEndpoint: RpcEndpointRef,
3.     val executorAddress: RpcAddress,
4.     override val executorHost: String,
5.     var freeCores: Int,
6.     override val totalCores: Int,
7.     override val logUrlMap: Map[String, String]
8.  ) extends ExecutorInfo(executorHost, totalCores, logUrlMap)

CoarseGrainedExecutorBackend.scala的RegisteredExecutor的源码如下。

1.  override def receive: PartialFunction[Any, Unit] = {
2.     case RegisteredExecutor =>
3.       logInfo("Successfully registered with driver")
4.       try {
5.         executor = new Executor(executorId, hostname, env, userClassPath,
           isLocal = false)
6.       } catch {
7.         case NonFatal(e) =>
8.           exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
9.

CoarseGrainedExecutorBackend收到RegisteredExecutor消息以后,用new()函数创建一个Executor,而Executor就是一个普通的类。

Spark 2.1.1版本的Executor.scala的源码如下。

1.   private[spark] class Executor(
2.    executorId: String,
3.    executorHostname: String,
4.    env: SparkEnv,
5.    userClassPath: Seq[URL] = Nil,
6.    isLocal: Boolean = false)
7.  extends Logging {

Spark 2.2.0版本的Executor.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第6行之后Executor新增了UncaughtExceptionHandler成员变量,用于未捕获的异常。

1. ......
2.     uncaughtExceptionHandler: UncaughtExceptionHandler =
       SparkUncaughtExceptionHandler)
3. ......

回到ExecutorData.scala,其中的RpcEndpointRef是代理句柄,代理CoarseGrainedExecutorBackend。在Driver中,通过ExecutorData封装并注册ExecutorBackend的信息到Driver的内存数据结构executorMapData中。

1.   private[cluster] class ExecutorData(
2.     val executorEndpoint: RpcEndpointRef,
3.     val executorAddress: RpcAddress,
4.     override val executorHost: String,
5.     var freeCores: Int,
6.     override val totalCores: Int,
7.     override val logUrlMap: Map[String, String]
8.  ) extends ExecutorInfo(executorHost, totalCores, logUrlMap)

Executor注册消息提交给DriverEndpoint,通过DriverEndpoint写数据给CoarseGrainedSchedulerBackend里面的数据结构executorMapData。executorMapData是 CoarseGrainedSchedulerBackend的成员,因此最终注册给CoarseGrainedSchedulerBackend。CoarseGrainedSchedulerBackend获得Executor(其实是ExecutorBackend)的注册信息。

实际在执行的时候,DriverEndpoint会把信息写入CoarseGrainedSchedulerBackend的内存数据结构executorMapData中,所以最终是注册给了CoarseGrainedSchedulerBackend。也就是说,CoarseGrainedSchedulerBackend掌握了为当前程序分配的所有的ExecutorBackend进程,而在每个ExecutorBackend进行实例中,会通过Executor对象负责具体任务的运行。在运行的时候使用synchronized关键字来保证executorMapData安全地并发写操作。

CoarseGrainedSchedulerBackend.scala的receiveAndReply方法中RegisterExecutor注册的过程,源码如下。

Spark 2.1.1版本的CoarseGrainedSchedulerBackend.scala的receiveAndReply方法的源码如下。

1.      override def receiveAndReply(context: RpcCallContext):
        PartialFunction[Any, Unit] = {
2.
3.        case RegisterExecutor(executorId, executorRef, hostname, cores,
          logUrls) =>
4.            //检查executorDataMap中是否包含该executorId,如果包含,就返回
              //RegisterExecutorFailed消息
5.          if (executorDataMap.contains(executorId)) {
6.           executorRef.send(RegisterExecutorFailed("Duplicate executor ID:
             " + executorId))
7.            context.reply(true)
8.          } else {
9.          //若executorRef.address地址不为null,则取出executorRef的地址作为
            //executorAddress,否则使用sender的Address作为executorAddress
10.           val executorAddress = if (executorRef.address != null) {
11.               executorRef.address
12.             } else {
13.               context.senderAddress
14.             }
15.           logInfo(s"Registered executor $executorRef ($executorAddress)
              with ID $executorId")
16.  //在addressToExecutorId这个哈希表中加入executorAddress和executorId的对
     //应关系
17.           addressToExecutorId(executorAddress) = executorId
18.  //totalCore增加cores个
19.           totalCoreCount.addAndGet(cores)
20.           totalRegisteredExecutors.addAndGet(1)
21.  //创建ExecutorData对象
22.           val data = new ExecutorData(executorRef, executorRef.address,
              hostname,
23.             cores, cores, logUrls)
24.   //同步代码块
25.           CoarseGrainedSchedulerBackend.this.synchronized {
26. //在executorDataMap中加入executorId和ExecutorData的对应关系
27.             executorDataMap.put(executorId, data)
28.             if (currentExecutorIdCounter < executorId.toInt) {
29.               currentExecutorIdCounter = executorId.toInt
30.             }
31.      //如果挂起的Executors的数量大于0
32.             if (numPendingExecutors > 0) {
33.               numPendingExecutors -= 1
34.              logDebug(s"Decremented number of pending executors
                 ($numPendingExecutors left)")
35.            }
36.          }
37.          executorRef.send(RegisteredExecutor)
38.          //注:有些测试期望将executor放在map中进行reply
39.   //向CoarseGrainedExecutorBackend回复true
40.          context.reply(true)
41.          listenerBus.post(
42.            SparkListenerExecutorAdded(System.currentTimeMillis(),
               executorId, data))
43.  //调用makeOffers,给Executor发送执行任务
44.          makeOffers()
45.        }

Spark 2.2.0版本CoarseGrainedSchedulerBackend.scala的receiveAndReply方法的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第8行之前,新增If语句对黑名单节点进行判断。

1.   ......
2.      } else if (scheduler.nodeBlacklist != null &&
3.            scheduler.nodeBlacklist.contains(hostname)) {
4.            //如果集群管理器分配给我们一个Executor,而这个Executor在黑名单节点列
              //表中(因为通知它是黑名单节点之前,集群已经开始分配这些资源了),如果集群
              //忽略了我们的黑名单,那么我们立即拒绝Executor
5.            logInfo(s"Rejecting $executorId as it has been blacklisted.")
6.           executorRef.send(RegisterExecutorFailed(s"Executor is blacklisted:
             $executorId"))
7.            context.reply(true)
8.  .......

CoarseGrainedSchedulerBackend.scala中的RegisterExecutor:

 先判断executorDataMap是否已经包含executorId,如果已经包含,就会发送注册失败的消息RegisterExecutorFailed,因为已经有重复的executor ID的Executor在运行。

 然后进行Executor的注册,获取到executorAddress,在executorRef.address为空的情况下就获取到senderAddress。

 定义了3个数据结构:addressToExecutorId、totalCoreCount、totalRegisteredExecutors,其中,addressToExecutorId是DriverEndpoint的数据结构,而totalCoreCount、totalRegisteredExecutors是CoarseGrainedSchedulerBackend的数据结构。addressToExecutorId、totalCoreCount、totalRegisteredExecutors包含Executors注册的信息分别为:RPC地址主机名和端口与ExecutorId的对应关系、集群中的总核数Cores、当前注册的Executors总数等。

1.  protected val addressToExecutorId = new HashMap[RpcAddress, String]
2.  protected val totalCoreCount = new AtomicInteger(0)
3.  protected val totalRegisteredExecutors = new AtomicInteger(0)

 然后调用new()函数创建一个ExecutorData,提取出executorRef、executorRef.address、hostname、cores、cores、logUrls等信息。

 同步代码块CoarseGrainedSchedulerBackend.this.synchronized :集群中很多Executor向Driver注册,为防止写冲突,因此设计一个同步代码块。在运行时使用synchronized关键字,来保证executorMapData安全地并发写操作。

 executorRef.send(RegisteredExecutor)发消息RegisteredExecutor给我们的sender,sender是CoarseGrainedExecutorBackend。而CoarseGrainedExecutorBackend收到消息RegisteredExecutor以后,就调用new()函数创建了Executor。

CoarseGrainedExecutorBackend收到DriverEndpoint发送过来的RegisteredExecutor消息后会启动Executor实例对象,而Executor实例对象事实上是负责真正Task计算的。

1.  override def receive: PartialFunction[Any, Unit] = {
2.  case RegisteredExecutor =>
3.    logInfo("Successfully registered with driver")
4.    try {
5.      executor = new Executor(executorId, hostname, env, userClassPath,
        isLocal = false)
6.    } catch {
7.      case NonFatal(e) =>
8.        exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
9.    }

下面来看一下Executor.scala,其中的threadPool是一个线程池。

Executor是真正负责Task计算的;其在实例化的时候会实例化一个线程池threadPool来准备Task的计算。threadPool是一个newDaemonCachedThreadPool。newDaemonCached-ThreadPool创建线程池,线程工厂按照需要的格式调用new()函数创建线程。语法实现如下。

1.   def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = {
2.    val threadFactory = namedThreadFactory(prefix)
3.    Executors.newCachedThreadPool(threadFactory).asInstanceOf
      [ThreadPoolExecutor]
4.  }

namedThreadFactory的源码如下。

1.   def namedThreadFactory(prefix: String): ThreadFactory = {
2.    new   ThreadFactoryBuilder().setDaemon(true).setNameFormat(prefix  +
      "-%d").build()
3.  }

newCachedThreadPool创建一个线程池,根据需要创建新线程,线程池中的线程可以复用,使用提供的ThreadFactory创建新线程。newCachedThreadPool的源码如下。

1.     public static ExecutorService newCachedThreadPool(ThreadFactory
       threadFactory) {
2.     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
3.  }

创建的threadPool中以多线程并发执行和线程复用的方式来高效地执行Spark发过来的Task。线程池创建好后,接下来是等待Driver发送任务给CoarseGrainedExecutorBackend,不是直接发送给Executor,因为Executor不是一个消息循环体。

Executor具体是如何工作的?

当Driver发送过来Task的时候,其实是发送给了CoarseGrainedExecutorBackend这个RpcEndpoint,而不是直接发送给了Executor(Executor由于不是消息循环体,所以永远也无法直接接收远程发过来的信息)。

Driver向CoarseGrainedExecutorBackend发送LaunchTask,转过来交给线程池中的线程去执行。先判断Executor是否为空,Executor为空,则提示错误,进程就直接退出。如果Executor不为空,则反序列化任务调用Executor的launchTask,其中,attemptNumber是任务可以重试的次数。

ExecutorBackend收到Driver发送的消息,调用launchTask方法,提交给Executor执行。

Executor.scala的launchTask接收到Task执行的命令后,首先将Task封装在TaskRunner里面,然后放到runningTasks。runningTasks是一个简单的数据结构。

1.  private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]

launchTask最后交给threadPool.execute(tr),交给线程池中的线程执行任务。 TaskRunner继承自Runnable,是Java的一个对象。

TaskRunner其实是Java中Runnable接口的具体实现,在真正工作时会交给线程池中的线程去运行,此时会调用run方法来执行Task。

Executor.scala中的Run方法最终调用task.run方法。

Spark 2.1.1版本的Executor.scala的oun方法的源码如下。

1.     override def run(): Unit = {
2.   ......
3.       var threwException = true
4.          val value = try {
5.            val res = task.run(
6.              taskAttemptId = taskId,
7.              attemptNumber = attemptNumber,
8.              metricsSystem = env.metricsSystem)
9.            threwException = false
10.           res
11.         } finally {
12.           val   releasedLocks     =   env.blockManager.releaseAllLocksForTask
              (taskId)
13.
14. ......

Spark 2.2.0版本的Executor.scala的run方法的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第7行task.run方法的第二个参数由attemptNumber调整为taskDescription. attemptNumber。

1.    ......
2.           attemptNumber = taskDescription.attemptNumber,
3.  ......

跟进Task.scala中的run方法,在里面调用runTask。

1.   final def run(
2.      taskAttemptId: Long,
3.      attemptNumber: Int,
4.      metricsSystem: MetricsSystem): T = {
5.   ......
6.    try {
7.      runTask(context)
8.    } catch {
9.  ......

TaskRunner在调用run方法时会调用Task的run方法,而Task的run方法会调用runTask,实际上,Task有ShuffleMapTask和ResultTask。