Chinaunix首页 | 论坛 | 博客
  • 博客访问: 368451
  • 博文数量: 85
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 657
  • 用 户 组: 普通用户
  • 注册时间: 2013-07-17 20:48
个人简介

行到水穷处,坐看云起时

文章分类

全部博文(85)

文章存档

2019年(2)

2018年(1)

2016年(1)

2015年(66)

2014年(15)

我的朋友

分类: 大数据

2015-07-13 15:56:42

SparkSubmitDriverBootstrapper、SparkSubmit均为object,用来完成作业向spark集群的提交。

SparkSubmitDriverBootstrapper:
该object的主要作用是,解析环境变量和配置文件以及运行时传入的配置参数,具体的作业提交还是通过创建子进程SparkSubmit完成,比较重要的代码如下:

//从环境变量获取内存设置
val submitDriverMemory = sys.env.get("SPARK_SUBMIT_DRIVER_MEMORY")

//从配置文件获取内存设置
val confDriverMemory = properties.get("spark.driver.memory")

//默认内存
val defaultDriverMemory = sys.env("OUR_JAVA_MEM")
val newDriverMemory = submitDriverMemory .orElse(confDriverMemory) .getOrElse(defaultDriverMemory)

// Build up command 
val command: Seq[String] =
   Seq(runner) ++
   Seq("-cp", newClasspath) ++
   filteredJavaOpts ++
   Seq(s"-Xms$newDriverMemory",s"-Xmx$newDriverMemory") ++
   Seq("org.apache.spark.deploy.SparkSubmit") ++
   submitArgs

// Start the driver JVM 
val filteredCommand = command.filter(_.nonEmpty) 
val builder =newProcessBuilder(filteredCommand)

//启动SparkSubmit子进程  
val process = builder.start()

SparkSubmit:
用来提交、Kill、查询应用程序。

//用户请求的object
private[spark] object SparkSubmitActionextendsEnumeration {
  type SparkSubmitAction = Value
  val  SUBMIT,KILL,REQUEST_STATUS= Value
}

//代码主逻辑
def main(args: Array[String]): Unit = {
    val appArgs = new SparkSubmitArguments(args)
    if (appArgs.verbose) {
      printStream.println(appArgs)
    }
    appArgs.action match {
      case SparkSubmitAction.SUBMIT => submit(appArgs)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
    }
  }

  //停止已经提交作业
  private def kill(args: SparkSubmitArguments): Unit = {
    new StandaloneRestClient()
      .killSubmission(args.master, args.submissionToKill)
  }


 //查询已提交作业状态
  private def requestStatus(args: SparkSubmitArguments): Unit = {
    new StandaloneRestClient()
      .requestSubmissionStatus(args.master, args.submissionToRequestStatusFor)
  }

//提交作业的代码如下,主要分为两部分,首先设置环境变量、系统变量等参数;然后调用子进程的main函数
  private[spark] def submit(args: SparkSubmitArguments): Unit = {
    val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args) //获取子进程的参数、classpath、系统参数、main函数

    def doRunMain(): Unit = {
      if (args.proxyUser != null) {
        val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
          UserGroupInformation.getCurrentUser())
        try {
          proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
            override def run(): Unit = {
              runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
            }
          })
        } catch {
           ...
        }
      } else {
        runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
      }
    }

    if (args.isStandaloneCluster && args.useRest) {
      try {
        printStream.println("Running Spark using the REST application submission protocol.")
        doRunMain()
      } catch {
        // Fail over to use the legacy submission gateway
          ...
          args.useRest = false
          submit(args)
      }
    // In all other modes, just run the main class as prepared
    } else {
      doRunMain()
    }
  }

//runMain函数的主体
  private def runMain(
      childArgs: Seq[String],
      childClasspath: Seq[String],
      sysProps: Map[String, String],
      childMainClass: String,
      verbose: Boolean): Unit = {
    ...
    val loader =
      if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
        new ChildFirstURLClassLoader(new Array[URL](0),
          Thread.currentThread.getContextClassLoader)
      } else {
        new MutableURLClassLoader(new Array[URL](0),
          Thread.currentThread.getContextClassLoader)
      }
    Thread.currentThread.setContextClassLoader(loader)
    ...
    var mainClass: Class[_] = null
    try {
      mainClass = Class.forName(childMainClass, true, loader)
    } catch {
        ...
        System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
    }
    ...
    try {
      mainMethod.invoke(null, childArgs.toArray)
    } catch {
       ...
    }
  }

childMainClass 由prepareSubmitEnvironment(args)方法解析出来,主要代码如下:

  private[spark] def prepareSubmitEnvironment(args: SparkSubmitArguments)
      : (Seq[String], Seq[String], Map[String, String], String) = {
      ...
      if (deployMode == CLIENT) {
         childMainClass = args.mainClass 
         ...
     }

      //standalone模式
    if (args.isStandaloneCluster) {
      if (args.useRest) {
        childMainClass = "org.apache.spark.deploy.rest.StandaloneRestClient"
        childArgs += (args.primaryResource, args.mainClass)
      } else {
        // In legacy standalone cluster mode, use Client as a wrapper around the user class
        childMainClass = "org.apache.spark.deploy.Client"
        ...
      }
      ...
    }

  //Yarn模式
    if (isYarnCluster) {
      childMainClass = "org.apache.spark.deploy.yarn.Client"
      if (args.isPython) {
        val mainPyFile = new Path(args.primaryResource).getName
        childArgs += ("--primary-py-file", mainPyFile)
         ...
        childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
      } else {
        if (args.primaryResource != SPARK_INTERNAL) {
          childArgs += ("--jar", args.primaryResource)
        }
        childArgs += ("--class", args.mainClass)
      }
      if (args.childArgs != null) {
        args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
      }
    }
  }


阅读(1658) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~