5.1 Master启动原理和源码详解

本节讲解Master启动的原理和源码;Master HA双机切换;Master的注册机制和状态管理解密等内容。

5.1.1 Master启动的原理详解

Spark应用程序作为独立的集群进程运行,由主程序中的SparkContext对象(称为驱动程序)协调。Spark集群部署组件图5-1所示。

图5-1 Spark集群部署组件图

其中各个术语及相关术语的描述如下。

(1)Driver Program:运行Application的main函数并新建SparkContext实例的程序,称为驱动程序(Driver Program)。通常可以使用SparkContext代表驱动程序。

(2)Cluster Manager:集群管理器(Cluster Manager)是集群资源管理的外部服务。Spark上现在主要有Standalone、YARN、Mesos 3种集群资源管理器。Spark自带的Standalone模式能够满足绝大部分纯粹的Spark计算环境中对集群资源管理的需求,基本上只有在集群中运行多套计算框架的时候才建议考虑YARN和Mesos。

(3)Worker Node:集群中可以运行Application代码的工作节点(Worker Node),相当于Hadoop的Slave节点。

(4)Executor:在Worker Node上为Application启动的一个工作进程,在进程中负责任务(Task)的运行,并且负责将数据存放在内存或磁盘上,在Executor内部通过多线程的方式(即线程池)并发处理应用程序的具体任务。

每个Application都有各自独立的Executors,因此应用程序之间是相互隔离的。

(5)Task:任务(Task)是指被Driver送到Executor上的工作单元。通常,一个任务会处理一个Partition的数据,每个Partition一般是一个HDFS的Block块的大小。

(6)Application:是创建了SparkContext实例对象的Spark用户程序,包含了一个Driver program和集群中多个Worker上的Executor。

(7)Job:和Spark的action对应,每个action,如count、savaAsTextFile等都会对应一个Job实例,每个Job会拆分成多个Stages,一个Stage中包含一个任务集(TaskSet),任务集中的各个任务通过一定的调度机制发送到工作单位(Executor)上并行执行。

Spark Standalone集群的部署采用典型的Master/Slave架构。其中,Master节点负责整个集群的资源管理与调度,Worker节点(也可以称Slave节点)在Master节点的调度下启动Executor,负责执行具体工作(包括应用程序以及应用程序提交的任务)。

5.1.2 Master启动的源码详解

Spark中各个组件是通过脚本来启动部署的。下面以脚本为入口点开始分析Master的部署。每个组件对应提供了启动的脚本,同时也会提供停止的脚本。停止脚本比较简单,在此仅分析启动脚本。

1.Master部署的启动脚本解析

首先看一下Master的启动脚本./sbin/start-master.sh,内容如下。

1.  # 在脚本的执行节点启动Master组件
2.
3.  #如果没有设置环境变量SPARK_HOME,会根据脚本所在位置自动设置
4.  if [ -z "${SPARK_HOME}" ]; then
5.    export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
6.  fi
7.
8.  #注:提取的类名必须和SparkSubmit的类相匹配。任何变化都需在类中进行反映
9.
10. # Master 组件对应的类
11. CLASS="org.apache.spark.deploy.master.Master"
12.
13. #脚本的帮助信息
14.
15. if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
16.   echo "Usage: ./sbin/start-master.sh [options]"
17.   pattern="Usage:"
18.   pattern+="\|Using Spark's default log4j profile:"
19.   pattern+="\|Registered signal handlers for"
20.
21. # 通过脚本spark-class执行指定的Master类,参数为--help
22.   "${SPARK_HOME}"/bin/spark-class $CLASS --help 2>&1 | grep -v "$pattern"
      1>&2
23.   exit 1
24. fi
25.
26. ORIGINAL_ARGS="$@"
27.
28. #控制启动Master时,是否同时启动 Tachyon的Master组件
29. START_TACHYON=false
30.
31. while (( "$#" )); do
32. case $1 in
33.     --with-tachyon)
34.       if [ ! -e "${SPARK_HOME}"/tachyon/bin/tachyon ]; then
35.         echo "Error: --with-tachyon specified, but tachyon not found."
36.         exit -1
37.       fi
38.       START_TACHYON=true
39.       ;;
40. esac
41. shift
42. done
43.
44. . "${SPARK_HOME}/sbin/spark-config.sh"
45.
46. . "${SPARK_HOME}/bin/load-spark-env.sh"
47.
48. #下面的一些参数对应的默认配置属性
49. if [ "$SPARK_MASTER_PORT" = "" ]; then
50.   SPARK_MASTER_PORT=7077
51. fi
52.
53. //用于MasterURL,所以当没有设置时,默认使用hostname,而不是IP地址
54. //该MasterURL在Worker注册或应用程序提交时使用
55. if [ "$SPARK_MASTER_IP" = "" ]; then
56.   SPARK_MASTER_IP=`hostname`
57. fi
58.
59. if [ "$SPARK_MASTER_WEBUI_PORT" = "" ]; then
60.   SPARK_MASTER_WEBUI_PORT=8080
61. fi
62.
63. #通过启动后台进程的脚本spark-daemon.sh来启动Master组件
64. "${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS 1 \
65.   --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_
      MASTER_WEBUI_PORT \
66.   $ORIGINAL_ARGS
67.
68. #需要时同时启动Tachyon,此时Tachyon是编译在Spark内的
69. if [ "$START_TACHYON" == "true" ]; then
70.   "${SPARK_HOME}"/tachyon/bin/tachyon bootstrap-conf $SPARK_MASTER_IP
71.   "${SPARK_HOME}"/tachyon/bin/tachyon format -s
72.   "${SPARK_HOME}"/tachyon/bin/tachyon-start.sh master
73. fi

通过脚本的简单分析,可以看出Master组件是以后台守护进程的方式启动的,对应的后台守护进程的启动脚本spark-daemon.sh,在后台守护进程的启动脚本spark-daemon.sh内部,通过脚本spark-class启动一个指定主类的JVM进程,相关代码如下所示。

1.  case "$mode" in
2.  #这里对应的是启动一个Spark类
3.      (class)
4.  nohup nice -n "$SPARK_NICENESS" "${SPARK_HOME}"/bin/spark-class $command
    "$@" >> "$log" 2>&1 < /dev/null &
5.  newpid="$!"
6.        ;;
7.  #这里对应提交一个Spark 应用程序
8.      (submit)
9.  nohup nice -n "$SPARK_NICENESS" "${SPARK_HOME}"/bin/spark-submit --class
    $command "$@" >> "$log" 2>&1 < /dev/null &
10. newpid="$!"
11.       ;;
12.
13.     (*)
14.       echo "unknown mode: $mode"
15.       exit 1
16.       ;;

通过脚本的分析,可以知道最终执行的是Master类(对应的代码为前面的CLASS="org.apache.spark.deploy.master.Master"),对应的入口点是Master伴生对象中的main方法。下面以该方法作为入口点进一步解析Master部署框架。

部署Master组件时,最简单的方式是直接启动脚本,不带任何选项参数,命令如下所示。

1.  ./sbin/start-master.sh

如需设置选项参数,可以查看帮助信息,根据自己的需要进行设置。

2.Master的源码解析

首先查看Master伴生对象中的main方法。

Master.scala的源码如下。

1.  def main(argStrings: Array[String]) {
2.    Utils.initDaemon(log)
3.    val conf = new SparkConf
4.  //构建参数解析的实例
5.    val args = new MasterArguments(argStrings, conf)
6.   //启动RPC通信环境以及Master的RPC通信终端
7.    val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port,
      args.webUiPort, conf)
8.    rpcEnv.awaitTermination()
9.  }

和其他类(如SparkSubmit)一样,Master类的入口点处也包含了对应的参数类MasterArguments。MasterArguments类包括Spark属性配置相关的一些解析。

MasterArguments.scala的源码如下。

1.  private[master] class MasterArguments(args: Array[String], conf:
    SparkConf) extends Logging {
2.   var host = Utils.localHostName()
3.   var port = 7077
4.   var webUiPort = 8080
5.   var propertiesFile: String = null
6.
7.   //读取启动脚本中设置的环境变量
8.    if (System.getenv("SPARK_MASTER_IP") != null) {
9.      logWarning("SPARK_MASTER_IP is deprecated, please use SPARK_MASTER_
        HOST")
10.     host = System.getenv("SPARK_MASTER_IP")
11.   }
12.
13.   if (System.getenv("SPARK_MASTER_HOST") != null) {
14.     host = System.getenv("SPARK_MASTER_HOST")
15.   }
16.   if (System.getenv("SPARK_MASTER_PORT") != null) {
17.     port = System.getenv("SPARK_MASTER_PORT").toInt
18.   }
19.   if (System.getenv("SPARK_MASTER_WEBUI_PORT") != null) {
20.     webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT").toInt
21.   }
22.  //命令行选项参数的解析
23.   parse(args.toList)
24.
25.   //加载SparkConf文件,所有的访问必须经过此行
26.   propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)
27.
28.   if (conf.contains("spark.master.ui.port")) {
29.     webUiPort = conf.get("spark.master.ui.port").toInt
30.   }
31. ......

MasterArguments中的printUsageAndExit方法对应的就是命令行中的帮助信息。

MasterArguments.scala的源码如下。

1.   private def printUsageAndExit(exitCode: Int) {
2.     //scalastyle:off println
3.     System.err.println(
4.       "Usage: Master [options]\n" +
5.       "\n" +
6.       "Options:\n" +
7.       "  -i HOST, --ip HOST     Hostname to listen on (deprecated, please
         use --host or -h) \n" +
8.       "  -h HOST, --host HOST   Hostname to listen on\n" +
9.       "  -p PORT, --port PORT   Port to listen on (default: 7077)\n" +
10.      "  --webui-port PORT      Port for web UI (default: 8080)\n" +
11.      "  --properties-file FILE Path to a custom Spark properties file.\n" +
12.      "                         Default is conf/spark-defaults.conf.")
13.    //scalastyle:on println
14.    System.exit(exitCode)
15.  }

解析完Master的参数后,调用startRpcEnvAndEndpoin方法启动RPC通信环境以及Master的RPC通信终端。

Spark 2.1.1版本的Master.scala的startRpcEnvAndEndpoint的源码如下。

1.
2.  /**
3.    * 启动Master并返回一个三元组:
4.    *   (1) The Master RpcEnv
5.    *   (2) The web UI bound port
6.     *   (3) The REST server bound port, if any
7.     */
8.
9.  def startRpcEnvAndEndpoint(
10.       host: String,
11.       port: Int,
12.       webUiPort: Int,
13.       conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
14.     val securityMgr = new SecurityManager(conf)
15.    //构建RPC通信环境
16.     val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
17.    //构建RPC通信终端,实例化Master
18.     val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
19.       new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
20.
21.  //向Master的通信终端发送请求,获取绑定的端口号
22.  //包含Master的Web UI监听端口号和REST的监听端口号
23.
24.     val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse]
        (BoundPortsRequest)
25.     (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
26.   }
27. }

Spark 2.2.0版本的Master.scala的startRpcEnvAndEndpoint的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第24行MasterEndpoint的askWithRetry方法调整为askSync方法。

1.      ......
2.     val portsResponse = masterEndpoint.askSync[BoundPortsResponse]
       (BoundPortsRequest)
3.  ......

startRpcEnvAndEndpoint方法中定义的ENDPOINT_NAME如下。

Master.scala的源码如下。

1.    private[deploy] object Master extends Logging {
2.    val SYSTEM_NAME = "sparkMaster"
3.    val ENDPOINT_NAME = "Master"
4.  .......

startRpcEnvAndEndpoint方法中通过masterEndpoint.askWithRetry[BoundPortsResponse] (BoundPortsRequest)给Master自己发送一个消息BoundPortsRequest,是一个case object。发送消息BoundPortsRequest给自己,确保masterEndpoint正常启动起来。返回消息的类型是BoundPortsResponse,是一个case class。

MasterMessages.scala的源码如下。

1.   private[master] object MasterMessages {
2.  ......
3.    case object BoundPortsRequest
4.    case class BoundPortsResponse(rpcEndpointPort: Int, webUIPort: Int,
      restPort: Option[Int])
5.  }

Master收到消息BoundPortsRequest,发送返回消息BoundPortsResponse。

Master.scala的源码如下。

1.   override def receiveAndReply(context: RpcCallContext): PartialFunction
     [Any, Unit] = {
2.  ......
3.  case BoundPortsRequest =>
4.  context.reply(BoundPortsResponse(address.port, webUi.boundPort,
    restServerBoundPort))

在BoundPortsResponse传入的参数restServerBoundPort是在Master的onStart方法中定义的。

Master.scala的源码如下。

1.   ......
2.    private var restServerBoundPort: Option[Int] = None
3.
4.    override def onStart(): Unit = {
5.  ......
6.     restServerBoundPort = restServer.map(_.start())
7.  .......

而restServerBoundPort是通过restServer进行map操作启动赋值。下面看一下restServer。

Master.scala的源码如下。

1.    private var restServer: Option[StandaloneRestServer] = None
2.  ......
3.     if (restServerEnabled) {
4.        val port = conf.getInt("spark.master.rest.port", 6066)
5.       restServer = Some(new StandaloneRestServer(address.host, port, conf,
         self, masterUrl))
6.      }
7.  ......

其中调用new()函数创建一个StandaloneRestServer。StandaloneRestServer服务器响应请求提交的[RestSubmissionClient],将被嵌入到standalone Master中,仅用于集群模式。服务器根据不同的情况使用不同的HTTP代码进行响应。

 200 OK-请求已成功处理。

 400错误请求:请求格式错误,未成功验证,或意外类型。

 468未知协议版本:请求指定了此服务器不支持的协议。

 500内部服务器错误:服务器在处理请求时引发内部异常。

服务器在HTTP主体中总包含一个JSON表示的[SubmitRestProtocolResponse]。如果发生错误,服务器将包括一个[ErrorResponse]。如果构造了这个错误响应内部失败时,响应将由一个空体组成。响应体指示内部服务器错误。

StandaloneRestServer.scala的源码如下。

1.  private[deploy] class StandaloneRestServer(
2.     host: String,
3.     requestedPort: Int,
4.     masterConf: SparkConf,
5.     masterEndpoint: RpcEndpointRef,
6.     masterUrl: String)
7.   extends RestSubmissionServer(host, requestedPort, masterConf) {

下面看一下RestSubmissionClient客户端。客户端提交申请[RestSubmissionServer]。在协议版本V1中,REST URL以表单形式出现http://[host:port]/v1/submissions/[action],[action]可以是create、kill或状态中的其中一种。每种请求类型都表示为发送到以下前缀的HTTP消息:

(1)submit - POST to /submissions/create

(2)kill - POST /submissions/kill/[submissionId]

(3)status - GET /submissions/status/[submissionId]

在(1)情况下,参数以JSON字段的形式发布到HTTP主体中。否则,URL指定按客户端的预期操作。由于该协议预计将在Spark版本中保持稳定,因此现有字段不能添加或删除,但可以添加新的可选字段。如在少见的事件中向前或向后兼容性被破坏,Spark须引入一个新的协议版本(如V2)。客户机和服务器必须使用协议的同一版本进行通信。如果不匹配,服务器将用它支持的最高协议版本进行响应。此客户机的实现可以用指定的版本使用该信息重试。

RestSubmissionClient.scala的源码如下。

1.   private[spark] class RestSubmissionClient(master: String) extends
     Logging {
2.    import RestSubmissionClient._
3.    private val supportedMasterPrefixes = Seq("spark://", "mesos://")
4.  ......

Restful把一切都看成是资源。利用Restful API可以对Spark进行监控。程序运行的每一个步骤、Task的计算步骤都可以可视化,对Spark的运行进行详细监控。

回到startRpcEnvAndEndpoint方法中,新创建了一个Master实例。Master实例化时会对所有的成员进行初始化,如默认的Cores个数等。

Master.scala的源码如下。

1.   private[deploy] class Master(
2.      override val rpcEnv: RpcEnv,
3.      address: RpcAddress,
4.      webUiPort: Int,
5.      val securityMgr: SecurityManager,
6.      val conf: SparkConf)
7.    extends ThreadSafeRpcEndpoint with Logging with LeaderElectable {
8.  .......
9.    //缺省maxCores时默认应用程序没有指定(通过Int.MaxValue)
10.   private val defaultCores = conf.getInt("spark.deploy.defaultCores",
      Int.MaxValue)
11.   val reverseProxy = conf.getBoolean("spark.ui.reverseProxy", false)
12.   if (defaultCores < 1) {
13.     throw new SparkException("spark.deploy.defaultCores must be positive")
14.   }
15. ......
16.

Master继承了ThreadSafeRpcEndpoint和LeaderElectable,其中继承LeaderElectable涉及Master的高可用性(High Availability,HA)机制。这里先关注ThreadSafeRpcEndpoint,继承该类后,Master作为一个RpcEndpoint,实例化后首先会调用onStart方法。

Master.scala的源码如下。

1.  override def onStart(): Unit = {
2.    logInfo("Starting Spark master at " + masterUrl)
3.     logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
4.     //构建一个Master的Web UI,查看向Master提交的应用程序等信息
5.     webUi = new MasterWebUI(this, webUiPort)
6.     webUi.bind()
7.     masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
8.     if (reverseProxy) {
9.       masterWebUiUrl = conf.get("spark.ui.reverseProxyUrl", masterWebUiUrl)
10.      logInfo(s"Spark Master is acting as a reverse proxy. Master, Workers
         and " +
11.       s"Applications UIs are available at $masterWebUiUrl")
12.    }
13.  //在一个守护线程中,启动调度机制,周期性检查Worker是否超时,当Worker节点超时后,
     //会修改其状态或从Master中移除其相关的操作
14.
15.    checkForWorkerTimeOutTask      =   forwardMessageThread.scheduleAtFixedRate
       (new Runnable {
16.      override def run(): Unit = Utils.tryLogNonFatalError {
17.        self.send(CheckForWorkerTimeOut)
18.      }
19.    }, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
20.
21.  //默认情况下会启动Rest服务,可以通过该服务向Master提交各种请求
22.    if (restServerEnabled) {
23.      val port = conf.getInt("spark.master.rest.port", 6066)
24.     restServer = Some(new StandaloneRestServer(address.host, port, conf,
        self, masterUrl))
25.    }
26.    restServerBoundPort = restServer.map(_.start())
27.  //度量(Metroics)相关的操作,用于监控
28.    masterMetricsSystem.registerSource(masterSource)
29.    masterMetricsSystem.start()
30.    applicationMetricsSystem.start()
31.   //度量系统启动后,将主程序和应用程序度量handler处理程序附加到Web UI中
32.    masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
33.    applicationMetricsSystem.getServletHandlers.foreach(webUi.
       attachHandler)
34.   //Master HA相关的操作
35.    val serializer = new JavaSerializer(conf)
36.    val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
37.      case "ZOOKEEPER" =>
38.        logInfo("Persisting recovery state to ZooKeeper")
39.        val zkFactory =
40.          new ZooKeeperRecoveryModeFactory(conf, serializer)
41.        (zkFactory.createPersistenceEngine(), zkFactory.
           createLeaderElectionAgent(this))
42.      case "FILESYSTEM" =>
43.        val fsFactory =
44.          new FileSystemRecoveryModeFactory(conf, serializer)
45.        (fsFactory.createPersistenceEngine(), fsFactory.createLeader-
           ElectionAgent(this))
46.      case "CUSTOM" =>
47.        val clazz = Utils.classForName(conf.get("spark.deploy.
           recoveryMode.factory"))
48.        val factory = clazz.getConstructor(classOf[SparkConf], classOf
           [Serializer])
49.          .newInstance(conf, serializer)
50.          .asInstanceOf[StandaloneRecoveryModeFactory]
51.        (factory.createPersistenceEngine(), factory.createLeaderElectionAgent
           (this))
52.      case _ =>
53.        (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
54.    }
55.    persistenceEngine = persistenceEngine_
56.    leaderElectionAgent = leaderElectionAgent_
57.  }

其中在Master的onStart方法中用new()函数创建MasterWebUI,启动一个webServer。

Master.scala的源码如下。

1.    override def onStart(): Unit = {
2.  ......
3.   webUi = new MasterWebUI(this, webUiPort)
4.      webUi.bind()
5.      masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.
        boundPort

如MasterWebUI的spark.ui.killEnabled设置为True,可以通过WebUI页面把Spark的进程Kill掉。

MasterWebUI.scala的源码如下。

1.     private[master]
2.  class MasterWebUI(
3.      val master: Master,
4.      requestedPort: Int)
5.    extends    WebUI(master.securityMgr,        master.securityMgr.getSSLOptions
      ("standalone"),
6.      requestedPort, master.conf, name = "MasterUI") with Logging {
7.    val masterEndpointRef = master.self
8.    val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true)
9.  .......
10. initialize()
11.
12.   /**初始化所有的服务器组件 */
13.   def initialize() {
14.     val masterPage = new MasterPage(this)
15.     attachPage(new ApplicationPage(this))
16.     attachPage(masterPage)
17.     attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR,
        "/static"))
18.     attachHandler(createRedirectHandler(
19.       "/app/kill", "/", masterPage.handleAppKillRequest, httpMethods =
          Set("POST")))
20.     attachHandler(createRedirectHandler(
21.       "/driver/kill", "/", masterPage.handleDriverKillRequest,
          httpMethods = Set("POST")))
22.   }

MasterWebUI中在初始化时用new()函数创建MasterPage,在MasterPage中通过代码去写Web页面。

MasterPage.scala的源码如下。

1.  private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
2.  ......
3.   override def renderJson(request: HttpServletRequest): JValue = {
4.     JsonProtocol.writeMasterState(getMasterState)
5.    }
6.  .....
7.    val content =
8.          <div class="row-fluid">
9.            <div class="span12">
10.             <ul class="unstyled">
11.               <li><strong>URL:</strong> {state.uri}</li>
12.               {
13.                 state.restUri.map { uri =>
14.                   <li>
15.                     <strong>REST URL:</strong> {uri}
16.                     <span class="rest-uri"> (cluster mode)</span>
17.                   </li>
18.                 }.getOrElse { Seq.empty }
19.               }
20.               <li><strong>Alive Workers:</strong> {aliveWorkers.length}</li>
21.               <li><strong>Cores in use:</strong> {aliveWorkers.map
                 (_.cores).sum} Total,
22.                 {aliveWorkers.map(_.coresUsed).sum} Used</li>
23.               <li><strong>Memory in use:</strong>
24.                 {Utils.megabytesToString(aliveWorkers.map(_.memory).sum)}
                   Total,
25.                 {Utils.megabytesToString(aliveWorkers.map(_.memoryUsed)
                    .sum)} Used</li>
26.               <li><strong>Applications:</strong>
27.                 {state.activeApps.length} <a href="#running-app">Running</a>,
28.                 {state.completedApps.length}         <a   href="#completed-app">
                    Completed</a> </li>
29.               <li><strong>Drivers:</strong>
30.                 {state.activeDrivers.length} Running,
31.                 {state.completedDrivers.length} Completed </li>
32.               <li><strong>Status:</strong> {state.status}</li>
33.             </ul>
34.           </div>
35.         </div>
36. ........

回到MasterWebUI.scala的initialize()方法,其中调用了attachPage方法,在WebUI中增加Web页面。

WebUI.scala的源码如下。

1.     def attachPage(page: WebUIPage) {
2.     val pagePath = "/" + page.prefix
3.     val renderHandler = createServletHandler(pagePath,
4.       (request: HttpServletRequest) => page.render(request),
         securityManager, conf, basePath)
5.     val renderJsonHandler = createServletHandler(pagePath.stripSuffix
       ("/") + "/json",
6.       (request: HttpServletRequest) => page.renderJson(request),
         securityManager, conf, basePath)
7.     attachHandler(renderHandler)
8.     attachHandler(renderJsonHandler)
9.     val   handlers    =   pageToHandlers.getOrElseUpdate(page,         ArrayBuffer
       [ServletContextHandler]())
10.    handlers += renderHandler
11.  }

在WebUI的bind方法中启用了JettyServer。 WebUI.scala的bind的源码如下。

1.   def bind() {
2.     assert(!serverInfo.isDefined, s"Attempted to bind $className more than
       once!")
3.     try {
4.       val host = Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse
         ("0.0.0.0")
5.      serverInfo = Some(startJettyServer(host, port, sslOptions, handlers,
        conf, name))
6.       logInfo(s"Bound $className to $host, and started at $webUrl")
7.     } catch {
8.       case e: Exception =>
9.         logError(s"Failed to bind $className", e)
10.        System.exit(1)
11.    }
12.  }

JettyUtils.scala的startJettyServer尝试将Jetty服务器绑定到所提供的主机名、端口。

startJettyServer的源码如下。

1.  def startJettyServer(
2.        hostName: String,
3.        port: Int,
4.        sslOptions: SSLOptions,
5.        handlers: Seq[ServletContextHandler],
6.        conf: SparkConf,
7.        serverName: String = ""): ServerInfo = {

5.1.3 Master HA双机切换

Spark生产环境下一般采用ZooKeeper作HA,且建议为3台Master,ZooKeeper会自动化管理Masters的切换。

采用ZooKeeper作HA时,ZooKeeper会保存整个Spark集群运行时的元数据,包括Workers、Drivers、Applications、Executors。

ZooKeeper遇到当前Active级别的Master出现故障时会从Standby Masters中选取出一台作为Active Master,但是要注意,被选举后到成为真正的Active Master之前需要从ZooKeeper中获取集群当前运行状态的元数据信息并进行恢复。

在Master切换的过程中,所有已经在运行的程序皆正常运行。因为Spark Application在运行前就已经通过Cluster Manager获得了计算资源,所以在运行时,Job本身的调度和处理和Master是没有任何关系的。

在Master的切换过程中唯一的影响是不能提交新的Job:一方面不能够提交新的应用程序给集群,因为只有Active Master才能接收新的程序提交请求;另一方面,已经运行的程序中也不能因为Action操作触发新的Job提交请求。

ZooKeeper下Master HA的基本流程如图5-2所示。

ZooKeeper下Master HA的基本流程如下。

(1)使用ZooKeeperPersistenceEngine读取集群的状态数据,包括Drivers、Applications、Workers、Executors等信息。

图5-2 ZooKeeper下Master HA的基本流程

(2)判断元数据信息是否有空的内容。

(3)把通过ZooKeeper持久化引擎获得了Drivers、Applications、Workers、Executors等信息,重新注册到Master的内存中缓存起来。

(4)验证获得的信息和当前正在运行的集群状态的一致性。

(5)将Application和Workers的状态标识为Unknown,然后向Application中的Driver以及Workers发送现在是Leader的Standby模式的Master的地址信息。

(6)当Driver和Workers收到新的Master的地址信息后会响应该信息。

(7)Master接收到来自Drivers和Workers响应的信息后会使用一个关键的方法:completeRecovery()来对没有响应的Applications (Drivers)、Workers (Executors)进行处理。处理完毕后,Master的State会变成RecoveryState.ALIVE,从而开始对外提供服务。

(8)此时Master使用自己的Schedule方法对正在等待的Application和Drivers进行资源调度。

Master HA的4大方式分别是ZOOKEEPER、FILESYSTEM、CUSTOM、NONE。

需要说明的是:

(a)ZOOKEEPER是自动管理Master。

(b)FILESYSTEM的方式在Master出现故障后需要手动重新启动机器,机器启动后会立即成为Active级别的Master来对外提供服务(接收应用程序提交的请求、接收新的Job运行的请求)。

(c)CUSTOM的方式允许用户自定义Master HA的实现,这对高级用户特别有用。

(d)NONE,这是默认情况,Spark集群中就采用这种方式,该方式不会持久化集群的数据,Master启动后立即管理集群。

Master.scala的HA的源码如下。

1.    override def onStart(): Unit = {
2.  ......
3.  val serializer = new JavaSerializer(conf)
4.      val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
5.        case "ZOOKEEPER" =>
6.          logInfo("Persisting recovery state to ZooKeeper")
7.          val zkFactory =
8.            new ZooKeeperRecoveryModeFactory(conf, serializer)
9.          (zkFactory.createPersistenceEngine(), zkFactory.createLeader-
             ElectionAgent(this))
10.       case "FILESYSTEM" =>
11.         val fsFactory =
12.           new FileSystemRecoveryModeFactory(conf, serializer)
13.         (fsFactory.createPersistenceEngine(), fsFactory.
            createLeaderElectionAgent(this))
14.       case "CUSTOM" =>
15.         val clazz = Utils.classForName(conf.get("spark.deploy.
            recoveryMode.factory"))
16.         val factory = clazz.getConstructor(classOf[SparkConf], classOf
            [Serializer])
17.           .newInstance(conf, serializer)
18.           .asInstanceOf[StandaloneRecoveryModeFactory]
19.         (factory.createPersistenceEngine(), factory.createLeaderElectionAgent
            (this))
20.       case _ =>
21.         (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
22.     }
23.     persistenceEngine = persistenceEngine_
24.     leaderElectionAgent = leaderElectionAgent_
25.   }
26. ......

Spark默认的HA方式是NONE。

1.  private val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE")

如使用ZOOKEEPER的HA方式,ZooKeeperRecoveryModeFactory.scala的源码如下。

1.    private[master] class ZooKeeperRecoveryModeFactory(conf: SparkConf,
      serializer: Serializer)
2.    extends StandaloneRecoveryModeFactory(conf, serializer) {
3.
4.    def createPersistenceEngine(): PersistenceEngine = {
5.      new ZooKeeperPersistenceEngine(conf, serializer)
6.    }
7.
8.    def createLeaderElectionAgent(master: LeaderElectable):
      LeaderElectionAgent = {
9.      new ZooKeeperLeaderElectionAgent(master, conf)
10.   }
11. }

通过调用zkFactory.createPersistenceEngine()用new()函数创建一个ZooKeeper-PersistenceEngine。

ZooKeeperPersistenceEngine.scala的源码如下。

1.   private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val
     serializer: Serializer)
2.  extends PersistenceEngine
3.  with Logging {
4.
5.  private    val   WORKING_DIR     =   conf.get("spark.deploy.zookeeper.dir",
    "/spark") + "/master_status"
6.    private val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)
7.
8.    SparkCuratorUtil.mkdir(zk, WORKING_DIR)
9.
10.
11.   override def persist(name: String, obj: Object): Unit = {
12.     serializeIntoFile(WORKING_DIR + "/" + name, obj)
13.   }
14.
15.   override def unpersist(name: String): Unit = {
16.     zk.delete().forPath(WORKING_DIR + "/" + name)
17.   }
18.
19.   override def read[T: ClassTag](prefix: String): Seq[T] = {
20.     zk.getChildren.forPath(WORKING_DIR).asScala
21.       .filter(_.startsWith(prefix)).flatMap(deserializeFromFile[T])
22.   }
23.
24.   override def close() {
25.     zk.close()
26.   }
27.
28.   private def serializeIntoFile(path: String, value: AnyRef) {
29.     val serialized = serializer.newInstance().serialize(value)
30.     val bytes = new Array[Byte](serialized.remaining())
31.     serialized.get(bytes)
32.     zk.create().withMode(CreateMode.PERSISTENT).forPath(path, bytes)
33.   }
34.
35.   private    def   deserializeFromFile[T](filename:          String)(implicit  m:
      ClassTag[T]): Option[T] = {
36.     val fileData = zk.getData().forPath(WORKING_DIR + "/" + filename)
37.     try {
38.       Some(serializer.newInstance().deserialize[T](ByteBuffer.wrap
          (fileData)))
39.     } catch {
40.       case e: Exception =>
41.         logWarning("Exception while reading persisted file, deleting", e)
42.         zk.delete().forPath(WORKING_DIR + "/" + filename)
43.         None
44.     }
45.   }
46. }

PersistenceEngine中有至关重要的方法persist来实现数据持久化,readPersistedData来恢复集群中的元数据。

PersistenceEngine.scala的源码如下。

1.   def persist(name: String, obj: Object): Unit
2.  ......
3.    final def readPersistedData(
4.        rpcEnv: RpcEnv): (Seq[ApplicationInfo], Seq[DriverInfo], Seq
          [WorkerInfo]) = {
5.      rpcEnv.deserialize { () =>
6.        (read[ApplicationInfo]("app_"), read[DriverInfo]("driver_"), read
          [WorkerInfo]("worker_"))
7.      }
8.    }

下面来看createdLeaderElectionAgent方法。在createdLeaderElectionAgent方法中调用new()函数创建ZooKeeperLeaderElectionAgent实例。

StandaloneRecoveryModeFactory.scala的源码如下。

1.    def createLeaderElectionAgent(master: LeaderElectable):
      LeaderElectionAgent = {
2.      new ZooKeeperLeaderElectionAgent(master, conf)
3.    }
4.  }

ZooKeeperLeaderElectionAgent的源码如下。

1.   private[master] class ZooKeeperLeaderElectionAgent(val masterInstance:
     LeaderElectable,
2.     conf: SparkConf) extends LeaderLatchListener with LeaderElectionAgent
       with Logging  {
3.
4.   val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") +
     "/leader_election"
5.
6.   private var zk: CuratorFramework = _
7.   private var leaderLatch: LeaderLatch = _
8.   private var status = LeadershipStatus.NOT_LEADER
9.
10.  start()
11.
12.  private def start() {
13.    logInfo("Starting ZooKeeper LeaderElection agent")
14.    zk = SparkCuratorUtil.newClient(conf)
15.    leaderLatch = new LeaderLatch(zk, WORKING_DIR)
16.    leaderLatch.addListener(this)
17.    leaderLatch.start()
18.  }
19.
20.  override def stop() {
21.    leaderLatch.close()
22.    zk.close()
23.  }
24.
25.  override def isLeader() {
26.    synchronized {
27.      //可以取得领导权
28.      if (!leaderLatch.hasLeadership) {
29.        return
30.      }
31.
32.      logInfo("We have gained leadership")
33.      updateLeadershipStatus(true)
34.    }
35.  }
36.
37.  override def notLeader() {
38.    synchronized {
39.      //可以取得领导权
40.      if (leaderLatch.hasLeadership) {
41.        return
42.      }
43.
44.       logInfo("We have lost leadership")
45.       updateLeadershipStatus(false)
46.     }
47.   }
48.
49.   private def updateLeadershipStatus(isLeader: Boolean) {
50.     if (isLeader && status == LeadershipStatus.NOT_LEADER) {
51.       status = LeadershipStatus.LEADER
52.       masterInstance.electedLeader()
53.     } else if (!isLeader && status == LeadershipStatus.LEADER) {
54.       status = LeadershipStatus.NOT_LEADER
55.       masterInstance.revokedLeadership()
56.     }
57.   }
58.
59.   private object LeadershipStatus extends Enumeration {
60.     type LeadershipStatus = Value
61.     val LEADER, NOT_LEADER = Value
62.   }
63. }

FILESYSTEM和NONE的方式采用MonarchyLeaderAgent的方式来完成Leader的选举,其实现是直接把传入的Master作为Leader。

LeaderElectionAgent.scala的源码如下。

1.   private[spark] class MonarchyLeaderAgent(val masterInstance:
     LeaderElectable)
2.    extends LeaderElectionAgent {
3.    masterInstance.electedLeader()
4.  }

FileSystemRecoveryModeFactory.scala的源码如下。

1.  private[master] class FileSystemRecoveryModeFactory(conf: SparkConf,
    serializer: Serializer)
2.    extends StandaloneRecoveryModeFactory(conf, serializer) with Logging {
3.
4.    val RECOVERY_DIR = conf.get("spark.deploy.recoveryDirectory", "")
5.
6.    def createPersistenceEngine(): PersistenceEngine = {
7.      logInfo("Persisting recovery state to directory: " + RECOVERY_DIR)
8.      new FileSystemPersistenceEngine(RECOVERY_DIR, serializer)
9.    }
10.
11.   def createLeaderElectionAgent(master: LeaderElectable):
      LeaderElectionAgent = {
12.     new MonarchyLeaderAgent(master)
13.   }
14. }

如果WorkerState状态为UNKNOWN(Worker不响应),就把它删除,如果以集群方式运行,driver失败后可以重新启动,最后把状态变回ALIVE,注意,这里要加入--supervise这个参数。

Master.scala的源码如下。

1.  private def completeRecovery() {
2.     //使用短同步周期确保“only-once”恢复一次语义
3.   if (state != RecoveryState.RECOVERING) { return }
4.   state = RecoveryState.COMPLETING_RECOVERY
5.
6.   //杀掉不响应消息的workers 和apps
7.   workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
8.   apps.filter(_.state == ApplicationState.UNKNOWN).foreach
     (finishApplication)
9.
10.  //重新调度drivers,其未被任何workers声明
11.  drivers.filter(_.worker.isEmpty).foreach { d =>
12.    logWarning(s"Driver ${d.id} was not found after master recovery")
13.    if (d.desc.supervise) {
14.      logWarning(s"Re-launching ${d.id}")
15.      relaunchDriver(d)
16.    } else {
17.      removeDriver(d.id, DriverState.ERROR, None)
18.      logWarning(s"Did not re-launch ${d.id} because it was not supervised")
19.    }
20.  }

5.1.4 Master的注册机制和状态管理解密

1.Master对其他组件注册的处理

Master接收注册的对象主要是Driver、Application、Worker; Executor不会注册给Master,Executor是注册给Driver中的SchedulerBackend的。

Worker是在启动后主动向Master注册的,所以如果在生产环境下加入新的Worker到正在运行的Spark集群上,此时不需要重新启动Spark集群就能够使用新加入的Worker,以提升处理能力。假如在生产环境中的集群中有500台机器,可能又新加入100台机器,这时不需要重新启动整个集群,就可以将100台新机器加入到集群。

Worker的源码如下。

1.   private[deploy] class Worker(
2.      override val rpcEnv: RpcEnv,
3.      webUiPort: Int,
4.      cores: Int,
5.      memory: Int,
6.      masterRpcAddresses: Array[RpcAddress],
7.      endpointName: String,
8.      workDirPath: String = null,
9.      val conf: SparkConf,
10.     val securityMgr: SecurityManager)
11.   extends ThreadSafeRpcEndpoint with Logging {

Worker是一个消息循环体,继承自ThreadSafeRpcEndpoint,可以收消息,也可以发消息。Worker的onStart方法如下。

1.    override def onStart() {
2.  ......
3.    workerWebUiUrl = s"http://$publicAddress:${webUi.boundPort}"
4.    registerWithMaster()
5.    ......
6.  }

Worker的onStart方法中调用了registerWithMaster()。

1.  private def registerWithMaster() {
2.  .......
3.  registrationRetryTimer match {
4.    case None =>
5.      registered = false
6.      registerMasterFutures = tryRegisterAllMasters()
7.   ......

registerWithMaster方法中调用了tryRegisterAllMasters,向所有的Master进行注册。

Spark 2.1.1版本的Worker.scala的源码如下。

1.          private def tryRegisterAllMasters(): Array[JFuture[_]] = {
2.     masterRpcAddresses.map { masterAddress =>
3.       registerMasterThreadPool.submit(new Runnable {
4.         override def run(): Unit = {
5.           try {
6.             logInfo("Connecting to master " + masterAddress + "...")
7.             val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress,
               Master.ENDPOINT_NAME)
8.             registerWithMaster(masterEndpoint)
9.           } catch {
10.            case ie: InterruptedException => //Cancelled
11.            case NonFatal(e) => logWarning(s"Failed to connect to master
               $masterAddress", e)
12.          }
13.        }
14.      })
15.    }
16.  }

Spark 2.2.0版本的Worker.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第8行registerWithMaster(masterEndpoint)方法调整为sendRegisterMessageToMaster(masterEndpoint)。

1.  ......
2.          sendRegisterMessageToMaster(masterEndpoint)
3.  ......

tryRegisterAllMasters方法中,由于实际运行时有很多Master,因此使用线程池的线程进行提交,然后获取masterEndpoint。masterEndpoint是一个RpcEndpointRef,通过registerWithMaster (masterEndpoint)进行注册。

Spark 2.1.1版本的Worker.scala的registerWithMaster的源码如下。

1.   private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
2.     masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
3.       workerId, host, port, self, cores, memory, workerWebUiUrl))
4.       .onComplete {
5.         //这是一个非常快的动作,所以可以用"ThreadUtils.sameThread"
6.         case Success(msg) =>
7.           Utils.tryLogNonFatalError {
8.             handleRegisterResponse(msg)
9.           }
10.        case Failure(e) =>
11.          logError(s"Cannot register with master: ${masterEndpoint
             .address}", e)
12.          System.exit(1)
13.     }(ThreadUtils.sameThread)
14.  }

Spark 2.1.1版本的registerWithMaster(masterEndpoint)方法调整为Spark 2.2.0版本的sendRegisterMessageToMaster(masterEndpoint)。sendRegisterMessageToMaster方法仅将RegisterWorker消息发送给Master消息循环体。sendRegisterMessageToMaster方法内部不作其他处理。

1.   private def sendRegisterMessageToMaster(masterEndpoint: RpcEndpointRef):
     Unit = {
2.      masterEndpoint.send(RegisterWorker(
3.        workerId,
4.        host,
5.        port,
6.        self,
7.        cores,
8.        memory,
9.        workerWebUiUrl,
10.       masterEndpoint.address))
11.   }

registerWithMaster方法中的masterEndpoint.ask[RegisterWorkerResponse]传进去的是RegisterWorker。RegisterWorker是一个case class,包括id、host、port、worker、cores、memory等信息,这里Worker是自己的引用RpcEndpointRef,Master通过Ref通worker通信。

Spark 2.1.1版本的RegisterWorker.scala的源码如下。

1.   case class RegisterWorker(
2.       id: String,
3.       host: String,
4.       port: Int,
5.       worker: RpcEndpointRef,
6.       cores: Int,
7.       memory: Int,
8.       workerWebUiUrl: String)
9.     extends DeployMessage {
10.    Utils.checkHost(host, "Required hostname")
11.    assert (port > 0)
12.  }

Spark 2.2.0版本的RegisterWorker.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第8行之后新增加masterAddress的成员变量:Master的地址,用于Worker节点连接Master节点。

1.  ......
2.     masterAddress: RpcAddress)
3.    ......

Worker通过registerWithMaster向Master发送了RegisterWorker消息,Master收到RegisterWorker请求后,进行相应的处理。

Spark 2.1.1版本的Master.scala的receiveAndReply的源码如下。

1.  override def receiveAndReply(context: RpcCallContext):
    PartialFunction[Any, Unit] = {
2.  case RegisterWorker(
3.      id, workerHost, workerPort, workerRef, cores, memory,
       workerWebUiUrl) =>
4.   logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
5.     workerHost, workerPort, cores, Utils.megabytesToString(memory)))
6.   if (state == RecoveryState.STANDBY) {
7.     context.reply(MasterInStandby)
8.   } else if (idToWorker.contains(id)) {
9.     context.reply(RegisterWorkerFailed("Duplicate worker ID"))
10.  } else {
11.    val worker = new WorkerInfo(id, workerHost, workerPort, cores,
       memory,
12.      workerRef, workerWebUiUrl)
13.    if (registerWorker(worker)) {
14.      persistenceEngine.addWorker(worker)
15.      context.reply(RegisteredWorker(self, masterWebUiUrl))
16.      schedule()
17.    } else {
18.      val workerAddress = worker.endpoint.address
19.      logWarning("Worker registration failed. Attempted to re-register
         worker at same " +
20.        "address: " + workerAddress)
21.      context.reply(RegisterWorkerFailed("Attempted to              re-register
         worker at same address: "
22.        + workerAddress))
23.    }
24.  }

Spark 2.2.0版本的Master.scala的receiveAndReply的源码与Spark 2.1.1版本相比具有如下特点。

 上段代码中第1行,RegisterWorker消息的模式匹配从receiveAndReply方法中调整到receive方法。因为Worker向Master提交RegisterWorker消息,无须同步等待Master的答复。

 上段代码中第3行RegisterWorker的传入参数新增加了masterAddress。

 上段代码中第9行context.reply方法调整为workerRef.send方法。

 上段代码中第15行context.reply方法调整为workerRef.send方法,以及RegisteredWorker中新增了一个参数masterAddress。

1.  override def receive: PartialFunction[Any, Unit] = {
2.  ......
3.   case RegisterWorker(
4.       id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl,
         masterAddress)
5.  ......
6.        workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))
7.  ......
8.        workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress))
9.   ......

RegisterWorker中,Master接收到Worker的注册请求后,首先判断当前的Master是否是Standby的模式,如果是,就不处理;Master的idToWorker包含了所有已经注册的Worker的信息,然后会判断当前Master的内存数据结构idToWorker中是否已经有该Worker的注册,如果有,此时不会重复注册;其中idToWorker是一个HashMap,Key是String代表Worker的字符描述,Value是WorkerInfo。

1.  private val idToWorker = new HashMap[String, WorkerInfo]

WorkerInfo包括id、host、port 、cores、memory、endpoint等内容。

1.    private[spark] class WorkerInfo(
2.    val id: String,
3.    val host: String,
4.    val port: Int,
5.    val cores: Int,
6.    val memory: Int,
7.    val endpoint: RpcEndpointRef,
8.    val webUiAddress: String)
9.  extends Serializable {

Master如果决定接收注册的Worker,首先会创建WorkerInfo对象来保存注册的Worker的信息,然后调用registerWorker执行具体的注册的过程,如果Worker的状态是DEAD的状态,则直接过滤掉。对于UNKNOWN的内容,调用removeWorker进行清理(包括清理该Worker下的Executors和Drivers)。其中,UNKNOWN的情况:Master进行切换时,先对Worker发UNKNOWN消息,只有当Master收到Worker正确的回复消息,才将状态标识为正常。

registerWorker的源码如下。

1.      private def registerWorker(worker: WorkerInfo): Boolean = {
2.   //在同一节点上可能有一个或多个指向挂掉的workers节点的引用(不同ID),须删除它们
3.     workers.filter { w =>
4.       (w.host == worker.host && w.port == worker.port) && (w.state ==
         WorkerState.DEAD)
5.     }.foreach { w =>
6.       workers -= w
7.     }
8.
9.     val workerAddress = worker.endpoint.address
10.    if (addressToWorker.contains(workerAddress)) {
11.      val oldWorker = addressToWorker(workerAddress)
12.      if (oldWorker.state == WorkerState.UNKNOWN) {
13.        //未知状态的worker 意味着在恢复过程中worker 被重新启动。旧的worker节点
           //挂掉,须删掉旧节点,接收新worker节点
14.        removeWorker(oldWorker)
15.      } else {
16.        logInfo("Attempted to re-register worker at same address: " +
           workerAddress)
17.        return false
18.      }
19.    }
20.
21.    workers += worker
22.    idToWorker(worker.id) = worker
23.    addressToWorker(workerAddress) = worker
24.    if (reverseProxy) {
25.       webUi.addProxyTargets(worker.id, worker.webUiAddress)
26.    }
27.    true
28.  }

在registerWorker方法中,Worker注册完成后,把注册的Worker加入到Master的内存数据结构中。

1.  val workers = new HashSet[WorkerInfo]
2.  private val idToWorker = new HashMap[String, WorkerInfo]
3.    private val addressToWorker = new HashMap[RpcAddress, WorkerInfo]
4.  ......
5.
6.    workers += worker
7.      idToWorker(worker.id) = worker
8.      addressToWorker(workerAddress) = worker

回到Master.scala的receiveAndReply方法,Worker注册完成后,调用persistenceEngine.addWorker (worker),PersistenceEngine是持久化引擎,在Zookeeper下就是Zookeeper的持久化引擎,把注册的数据进行持久化。

PersistenceEngine.scala的addWorker方法如下。

1.      final def addWorker(worker: WorkerInfo): Unit = {
2.    persist("worker_" + worker.id, worker)
3.  }

ZooKeeperPersistenceEngine是PersistenceEngine的一个具体实现子类,其persist方法如下。

1.     private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val
       serializer: Serializer)
2.    extends PersistenceEngine
3.  ......
4.    override def persist(name: String, obj: Object): Unit = {
5.      serializeIntoFile(WORKING_DIR + "/" + name, obj)
6.    }
7.  ......
8.  private def serializeIntoFile(path: String, value: AnyRef) {
9.      val serialized = serializer.newInstance().serialize(value)
10.     val bytes = new Array[Byte](serialized.remaining())
11.     serialized.get(bytes)
12.     zk.create().withMode(CreateMode.PERSISTENT).forPath(path, bytes)
13.   }

回到Master.scala的receiveAndReply方法,注册的Worker数据持久化后,进行schedule()。至此,Worker的注册完成。

同样,Driver的注册过程:Driver提交给Master进行注册,Master会将Driver的信息放入内存缓存中,加入等待调度的队列,通过持久化引擎(如ZooKeeper)把注册信息持久化,然后进行Schedule。

Application的注册过程:Application提交给Master进行注册,Driver启动后会执行SparkContext的初始化,进而导致StandaloneSchedulerBackend的产生,其内部有StandaloneAppClient。StandaloneAppClient内部有ClientEndpoint。ClientEndpoint来发送RegisterApplication信息给Master。Master会将Application的信息放入内存缓存中,把Application加入等待调度的Application队列,通过持久化引擎(如ZooKeeper)把注册信息持久化,然后进行Schedule。

2.Master对Driver和Executor状态变化的处理

1)对Driver状态变化的处理

如果Driver的各个状态是DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED,就将其清理掉。其他情况则报异常。

1.     override def receive: PartialFunction[Any, Unit] = {
2.  ......
3.   case DriverStateChanged(driverId, state, exception) =>
4.        state match {
5.          case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED
            | DriverState.FAILED =>
6.            removeDriver(driverId, state, exception)
7.          case _ =>
8.            throw new Exception(s"Received unexpected state update for driver
              $driverId: $state")
9.        }

removeDriver清理掉Driver后,再次调用schedule方法,removeDriver的源码如下。

1.       private def removeDriver(
2.        driverId: String,
3.        finalState: DriverState,
4.        exception: Option[Exception]) {
5.      drivers.find(d => d.id == driverId) match {
6.        case Some(driver) =>
7.          logInfo(s"Removing driver: $driverId")
8.          drivers -= driver
9.          if (completedDrivers.size >= RETAINED_DRIVERS) {
10.           val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
11.           completedDrivers.trimStart(toRemove)
12.         }
13.         completedDrivers += driver
14.         persistenceEngine.removeDriver(driver)
15.         driver.state = finalState
16.         driver.exception = exception
17.         driver.worker.foreach(w => w.removeDriver(driver))
18.         schedule()
19.       case None =>
20.         logWarning(s"Asked to remove unknown driver: $driverId")
21.     }
22.   }
23. }

2)对Executor状态变化的处理

ExecutorStateChanged的源码如下。

1.   override def receive: PartialFunction[Any, Unit] = {
2.  ......
3.  case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
4.        val execOption = idToApp.get(appId).flatMap(app => app.executors.
          get(execId))
5.        execOption match {
6.          case Some(exec) =>
7.            val appInfo = idToApp(appId)
8.            val oldState = exec.state
9.            exec.state = state
10.
11.           if (state == ExecutorState.RUNNING) {
12.             assert(oldState == ExecutorState.LAUNCHING,
13.               s"executor $execId state transfer from $oldState to RUNNING
                  is illegal")
14.             appInfo.resetRetryCount()
15.      }
16.
17.      exec.application.driver.send(ExecutorUpdated(execId,                state,
         message, exitStatus, false))
18.
19.      if (ExecutorState.isFinished(state)) {
20.        //从worker 和app中删掉executor
21.        logInfo(s"Removing executor ${exec.fullId} because it is
           $state")
22.        //如果应用程序已经完成,保存其状态及在UI上正确显示其信息
23.        if (!appInfo.isFinished) {
24.          appInfo.removeExecutor(exec)
25.        }
26.        exec.worker.removeExecutor(exec)
27.
28.        val normalExit = exitStatus == Some(0)
29.        //只重试一定次数,这样就不会进入无限循环。重要提示:这个代码路径不是通过
           //测试执行的,所以改变if条件时必须小心
30.        if (!normalExit
31.            && appInfo.incrementRetryCount() >= MAX_EXECUTOR_RETRIES
32.            && MAX_EXECUTOR_RETRIES >= 0) { //< 0 disables this
               application-killing path
33.          val execs = appInfo.executors.values
34.          if (!execs.exists(_.state == ExecutorState.RUNNING)) {
35.            logError(s"Application ${appInfo.desc.name} with ID
               ${appInfo.id} failed " +
36.              s"${appInfo.retryCount} times; removing it")
37.            removeApplication(appInfo, ApplicationState.FAILED)
38.          }
39.        }
40.      }
41.      schedule()
42.    case None =>
43.      logWarning(s"Got status update for unknown executor $appId/
         $execId")
44.  }

Executor挂掉时系统会尝试一定次数的重启(最多重启10次)。

1.  private val MAX_EXECUTOR_RETRIES = conf.getInt("spark.deploy.
    maxExecutorRetries", 10)