SparkSubmitDriverBootstrapper、SparkSubmit均为object,用来完成作业向spark集群的提交。
SparkSubmitDriverBootstrapper:
该object的主要作用是,解析环境变量和配置文件以及运行时传入的配置参数,具体的作业提交还是通过创建子进程SparkSubmit完成,比较重要的代码如下:
//从环境变量获取内存设置
val submitDriverMemory = sys.env.get("SPARK_SUBMIT_DRIVER_MEMORY")
//从配置文件获取内存设置
val confDriverMemory = properties.get("spark.driver.memory")
//默认内存
val defaultDriverMemory = sys.env("OUR_JAVA_MEM")
val newDriverMemory = submitDriverMemory
.orElse(confDriverMemory)
.getOrElse(defaultDriverMemory)
// Build up command
val command: Seq[String] =
Seq(runner) ++
Seq("-cp", newClasspath) ++
filteredJavaOpts ++
Seq(s"-Xms$newDriverMemory",s"-Xmx$newDriverMemory") ++
Seq("org.apache.spark.deploy.SparkSubmit") ++
submitArgs
// Start the driver JVM
val filteredCommand = command.filter(_.nonEmpty)
val builder =newProcessBuilder(filteredCommand)
//启动SparkSubmit子进程
val process = builder.start()
SparkSubmit:
用来提交、Kill、查询应用程序。
//用户请求的object
private[spark] object SparkSubmitActionextendsEnumeration {
type SparkSubmitAction = Value
val SUBMIT,KILL,REQUEST_STATUS= Value
}
//代码主逻辑
def main(args: Array[String]): Unit = {
val appArgs = new SparkSubmitArguments(args)
if (appArgs.verbose) {
printStream.println(appArgs)
}
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}
}
//停止已经提交作业
private def kill(args: SparkSubmitArguments): Unit = {
new StandaloneRestClient()
.killSubmission(args.master, args.submissionToKill)
}
//查询已提交作业状态
private def requestStatus(args: SparkSubmitArguments): Unit = {
new StandaloneRestClient()
.requestSubmissionStatus(args.master, args.submissionToRequestStatusFor)
}
//提交作业的代码如下,主要分为两部分,首先设置环境变量、系统变量等参数;然后调用子进程的main函数
private[spark] def submit(args: SparkSubmitArguments): Unit = {
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args) //获取子进程的参数、classpath、系统参数、main函数
def doRunMain(): Unit = {
if (args.proxyUser != null) {
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
})
} catch {
...
}
} else {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
}
if (args.isStandaloneCluster && args.useRest) {
try {
printStream.println("Running Spark using the REST application submission protocol.")
doRunMain()
} catch {
// Fail over to use the legacy submission gateway
...
args.useRest = false
submit(args)
}
// In all other modes, just run the main class as prepared
} else {
doRunMain()
}
}
//runMain函数的主体
private def runMain(
childArgs: Seq[String],
childClasspath: Seq[String],
sysProps: Map[String, String],
childMainClass: String,
verbose: Boolean): Unit = {
...
val loader =
if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
new ChildFirstURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
} else {
new MutableURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
}
Thread.currentThread.setContextClassLoader(loader)
...
var mainClass: Class[_] = null
try {
mainClass = Class.forName(childMainClass, true, loader)
} catch {
...
System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
}
...
try {
mainMethod.invoke(null, childArgs.toArray)
} catch {
...
}
}
childMainClass 由prepareSubmitEnvironment(args)方法解析出来,主要代码如下:
private[spark] def prepareSubmitEnvironment(args: SparkSubmitArguments)
: (Seq[String], Seq[String], Map[String, String], String) = {
...
if (deployMode == CLIENT) {
childMainClass = args.mainClass
...
}
//standalone模式
if (args.isStandaloneCluster) {
if (args.useRest) {
childMainClass = "org.apache.spark.deploy.rest.StandaloneRestClient"
childArgs += (args.primaryResource, args.mainClass)
} else {
// In legacy standalone cluster mode, use Client as a wrapper around the user class
childMainClass = "org.apache.spark.deploy.Client"
...
}
...
}
//Yarn模式
if (isYarnCluster) {
childMainClass = "org.apache.spark.deploy.yarn.Client"
if (args.isPython) {
val mainPyFile = new Path(args.primaryResource).getName
childArgs += ("--primary-py-file", mainPyFile)
...
childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
} else {
if (args.primaryResource != SPARK_INTERNAL) {
childArgs += ("--jar", args.primaryResource)
}
childArgs += ("--class", args.mainClass)
}
if (args.childArgs != null) {
args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
}
}
}
阅读(1668) | 评论(0) | 转发(0) |