添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

Spark源码解析Yarn部署流程(SparkSubmit))

更多文章,可打开微信搜索 知了小巷 公众号并关注,公众号后台回复 资料 两个字,有大数据学习视频资料免费领取。

一、Yarn部署流程(SparkSubmit)

1.1 spark-submit 脚本

查看脚本 spark-submit 内容:
源码位置:spark/bin/spark-submit

# $@: 传入脚本的所有参数
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

spark-class

build_command() {
  "$RUNNER" -Xmx128m $SPARK_LAUNCHER_OPTS -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
  printf "%d\0" $?

源码位置:spark/launcher/src/main/java/org/apache/spark/launcher/Main.java
Spark应用启动的命令行接口,用于Spark内置脚本。

* Command line interface for the Spark launcher. Used internally by Spark scripts. class Main { // Usage: Main [class] [class args] // This CLI works in two different modes: 两种模式 // 1. 如果是"spark-submit":即class是org.apache.spark.deploy.SparkSubmit,那么SparkLauncher class is used to launch a Spark application. // 2. 如果是"spark-class":if another class is provided, an internal Spark class is run. // This class works in tandem with the "bin/spark-class" script on Unix-like systems public static void main(String[] argsArray) throws Exception { checkArgument(argsArray.length > 0, "Not enough arguments: missing class name."); // SparkSubmit if (className.equals("org.apache.spark.deploy.SparkSubmit")) { try { AbstractCommandBuilder builder = new SparkSubmitCommandBuilder(args); cmd = buildCommand(builder, env, printLaunchCommand); } catch (IllegalArgumentException e) { // ... MainClassOptionParser parser = new MainClassOptionParser(); try { parser.parse(args); } catch (Exception ignored) { // Ignore parsing exceptions. // ... help.add(parser.USAGE_ERROR); AbstractCommandBuilder builder = new SparkSubmitCommandBuilder(help); // cmd builder不同 cmd = buildCommand(builder, env, printLaunchCommand); } else { AbstractCommandBuilder builder = new SparkClassCommandBuilder(className, args); // cmd builder不同 cmd = buildCommand(builder, env, printLaunchCommand); // ... // ...

查看 SparkSubmit 的 main方法:
object SparkSubmit extends CommandLineUtils with Logging

override def main(args: Array[String]): Unit = {
    val submit = new SparkSubmit() {
      self =>
      // 封装配置参数
      override protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
        // 创建SparkSubmitArguments对象
        new SparkSubmitArguments(args) {
          // ...
      // ...
      override def doSubmit(args: Array[String]): Unit = {
        try {
          // 会调到这里
          super.doSubmit(args)
        } catch {
          case e: SparkUserAppException =>
            exitFn(e.exitCode)
    // 调用doSubmit
    submit.doSubmit(args)
1.1.1 封装参数 new SparkSubmitArguments

上面可以看到创建了 SparkSubmitArguments对象
spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

* Parses and encapsulates arguments from the spark-submit script. * The env argument is used for testing. private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env) extends SparkSubmitArgumentsParser with Logging { //... // Set parameters from command line arguments // 解析命令行传入的参数 parse(args.asJava) //...

parse的具体实现在:
spark/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java

* Parse a list of spark-submit command line options. * See SparkSubmitArguments.scala for a more formal description of available options. * @throws IllegalArgumentException If an error is found during parsing. protected final void parse(List<String> args) { Pattern eqSeparatedOpt = Pattern.compile("(--[^=]+)=(.+)"); int idx = 0; // 外层是一个for循环,遍历所有arg参数 for (idx = 0; idx < args.size(); idx++) { String arg = args.get(idx); String value = null; Matcher m = eqSeparatedOpt.matcher(arg); if (m.matches()) { arg = m.group(1); value = m.group(2); // Look for options with a value. String name = findCliOption(arg, opts); if (name != null) { if (value == null) { if (idx == args.size() - 1) { throw new IllegalArgumentException( String.format("Missing argument for option '%s'.", arg)); idx++; value = args.get(idx); // 对命令行输入的内容进行解析 if (!handle(name, value)) { break; continue; // Look for a switch. name = findCliOption(arg, switches); if (name != null) { // 对命令行输入的内容进行解析 if (!handle(name, null)) { break; continue; if (!handleUnknown(arg)) { break; if (idx < args.size()) { idx++; handleExtraArgs(args.subList(idx, args.size()));

handle方法主要是做模式匹配、然后赋值:
/Users/shaozhipeng/Development/project/java/spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
这里是一个模板方法的实现

/** Fill in values by parsing user options. */
override protected def handle(opt: String, value: String): Boolean = {
  // 模式匹配和赋值
  opt match {
    case NAME =>
      name = value
    case MASTER =>
      master = value
    case CLASS =>
      mainClass = value
    case DEPLOY_MODE =>
      if (value != "client" && value != "cluster") {
        error("--deploy-mode must be either \"client\" or \"cluster\"")
      deployMode = value
    // ...
    case _ =>
      error(s"Unexpected argument '$opt'.")
  action != SparkSubmitAction.PRINT_VERSION
1.1.2 执行 doSubmit 方法

super.doSubmit(args)

* Main gateway of launching a Spark application. * This program handles setting up the classpath with relevant Spark dependencies and provides * a layer over the different cluster managers and deploy modes that Spark supports. private[spark] class SparkSubmit extends Logging { import DependencyUtils._ import SparkSubmit._ def doSubmit(args: Array[String]): Unit = { // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to // be reset before the application starts. val uninitLog = initializeLogIfNecessary(true, silent = true) val appArgs = parseArguments(args) if (appArgs.verbose) { logInfo(appArgs.toString) appArgs.action match { case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog) case SparkSubmitAction.KILL => kill(appArgs) case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs) case SparkSubmitAction.PRINT_VERSION => printVersion() // ...

执行 submit 方法
执行 doRunMain 方法
调用 runMain 方法

* Submit the application using the provided parameters, ensuring to first wrap * in a doAs when --proxy-user is specified. @tailrec private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = { // doRunMain 声明了一个方法,后面会调用。而且里面无论如何都会调用runMain def doRunMain(): Unit = { if (args.proxyUser != null) { val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser, UserGroupInformation.getCurrentUser()) try { proxyUser.doAs(new PrivilegedExceptionAction[Unit]() { override def run(): Unit = { runMain(args, uninitLog) } catch { case e: Exception => // Hadoop's AuthorizationException suppresses the exception's stack trace, which // makes the message printed to the output by the JVM not very helpful. Instead, // detect exceptions with empty stack traces here, and treat them differently. if (e.getStackTrace().length == 0) { error(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}") } else { throw e } else { runMain(args, uninitLog) // In standalone cluster mode, there are two submission gateways: // (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper // (2) The new REST-based gateway introduced in Spark 1.3 // The latter is the default behavior as of Spark 1.3, but Spark submit will fail over // to use the legacy gateway if the master endpoint turns out to be not a REST server. if (args.isStandaloneCluster && args.useRest) { try { logInfo("Running Spark using the REST application submission protocol.") doRunMain() } catch { // Fail over to use the legacy submission gateway case e: SubmitRestConnectionException => logWarning(s"Master endpoint ${args.master} was not a REST server. " + "Falling back to legacy submission gateway instead.") args.useRest = false submit(args, false) // In all other modes, just run the main class as prepared } else { doRunMain()

runMain方法有两个步骤,如注释:

  • First, we prepare the launch environment by setting up the appropriate classpath, system properties, and application arguments for running the child main class based on the cluster manager and the deploy mode.
  • Second, we use this launch environment to invoke the main method of the child main class.
  • * Run the main method of the child class using the submit arguments. * This runs in two steps. First, we prepare the launch environment by setting up * the appropriate classpath, system properties, and application arguments for * running the child main class based on the cluster manager and the deploy mode. * Second, we use this launch environment to invoke the main method of the child * main class. * Note that this main class will not be the one provided by the user if we're * running cluster deploy mode or python applications. private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = { // 1. 准备提交环境 val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args) // ... try { // 2. 通过反射获取类 // childMainClass就是我们命令行中输入的那个--class类【客户端模式】 // org.apache.spark.deploy.yarn.YarnClusterApplication【集群模式】 mainClass = Utils.classForName(childMainClass) } catch { // ... val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) { // yarn-cluster实际上是会走这里 mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication] } else { // 查找 调用 main 方法 new JavaMainApplication(mainClass) // ... try { // 3. 此处重点,调用SparkApplication的start方法 app.start(childArgs.toArray, sparkConf) } catch { case t: Throwable => throw findCause(t)

    准备提交环境 prepareSubmitEnvironment

    * Prepare the environment for submitting an application. * @param args the parsed SparkSubmitArguments used for environment preparation. * @param conf the Hadoop Configuration, this argument will only be set in unit test. * @return a 4-tuple: * (1) the arguments for the child process, * (2) a list of classpath entries for the child, * (3) a map of system properties, and * (4) the main class for the child * Exposed for testing. private[deploy] def prepareSubmitEnvironment( args: SparkSubmitArguments, conf: Option[HadoopConfiguration] = None) : (Seq[String], Seq[String], SparkConf, String) = { // 返回值,重点看下childMainClass val childArgs = new ArrayBuffer[String]() val childClasspath = new ArrayBuffer[String]() val sparkConf = args.toSparkConf() // 比较重要 var childMainClass = "" // ...

    返回值里面的childMainClass,不同模式下要执行的main class:
    客户端模式:

    // In client mode, launch the application main class directly
    // In addition, add the main application jar and any added jars (if any) to the classpath
    if (deployMode == CLIENT) {
      childMainClass = args.mainClass
      if (localPrimaryResource != null && isUserJar(localPrimaryResource)) {
        childClasspath += localPrimaryResource
      if (localJars != null) { childClasspath ++= localJars.split(",") }
    

    集群模式:
    org.apache.spark.deploy.yarn.YarnClusterApplication

    // In yarn-cluster mode, use yarn.Client as a wrapper around the user class
    if (isYarnCluster) {
      //   private[deploy] val YARN_CLUSTER_SUBMIT_CLASS = "org.apache.spark.deploy.yarn.YarnClusterApplication"
      childMainClass = YARN_CLUSTER_SUBMIT_CLASS
      // ...
    

    JavaMainApplication的实现是通过反射加载类并执行main方法。
    runMain#Utils.classForName(childMainClass)
    查找并调用main方法:

    private[deploy] class JavaMainApplication(klass: Class[_]) extends SparkApplication {
      // 重写start方法,里面查找并调用klass的main方法
      override def start(args: Array[String], conf: SparkConf): Unit = {
        // 找到main方法
        val mainMethod = klass.getMethod("main", new Array[String](0).getClass)
        // 判断是否是static
        if (!Modifier.isStatic(mainMethod.getModifiers)) {
          throw new IllegalStateException("The main method in the given main class must be static")
        val sysProps = conf.getAll.toMap
        sysProps.foreach { case (k, v) =>
          sys.props(k) = v
        // 调用main方法
        mainMethod.invoke(null, args)
    

    YarnClusterApplication是没有main方法的,重写了SparkApplication的start方法,new了一个Client执行run方法。

    1.2 org.apache.spark.deploy.yarn.YarnClusterApplication

    spark/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

    // 继承SparkApplication重写了start方法
    private[spark] class YarnClusterApplication extends SparkApplication {
      override def start(args: Array[String], conf: SparkConf): Unit = {
        // SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
        // so remove them from sparkConf here for yarn mode.
        conf.remove(JARS)
        conf.remove(FILES)
        // 1.封装配置参数 new ClientArguments(args)
        // 2.创建客户端 new Client
        // 3.运行 .run()
        new Client(new ClientArguments(args), conf, null).run()
    
    1.2.1 封装配置参数 new ClientArguments(args)

    就是一些封装配置参数。

    * Command-line parser for the driver client. private[deploy] class ClientArguments(args: Array[String]) { import ClientArguments._ var cmd: String = "" // 'launch' or 'kill' // ...
    1.2.2 创建客户端 new Client

    对象中有一个 yarnClient = YarnClient.createYarnClient

    private[spark] class Client(
        val args: ClientArguments,
        val sparkConf: SparkConf,
        val rpcEnv: RpcEnv)
      extends Logging {
      import Client._
      import YarnSparkHadoopUtil._
      // 创建YarnClient
      private val yarnClient = YarnClient.createYarnClient
      // ...
    

    一个简单的静态工厂
    org.apache.hadoop.yarn.client.api#YarnClient

    @InterfaceAudience.Public
    @InterfaceStability.Stable
    public abstract class YarnClient extends AbstractService {
       * Create a new instance of YarnClient.
      @Public
      public static YarnClient createYarnClient() {
      	// YarnClient客户端实现
        YarnClient client = new YarnClientImpl();
        return client;
      // ...
    

    org.apache.hadoop.yarn.client.api.impl#YarnClientImpl
    可参考【】
    重点对象是rmClient

    @Private
    @Unstable
    public class YarnClientImpl extends YarnClient {
      private static final Log LOG = LogFactory.getLog(YarnClientImpl.class);
      // ResourceManager客户端
      protected ApplicationClientProtocol rmClient;
      // ...
    
    1.2.3 调用 client.run 方法

    注册Spark应用到ResourceManager

    * Submit an application to the ResourceManager. * If set spark.yarn.submit.waitAppCompletion to true, it will stay alive * reporting the application's status until the application has exited for any reason. * Otherwise, the client process will exit after submission. * If the application finishes with a failed, killed, or undefined status, * throw an appropriate SparkException. def run(): Unit = { // 应用ID - 提交应用 this.appId = submitApplication() // ...

    提交Spark应用
    submitApplication()
    用来运行当前Spark应用对应的ApplicationMaster。

    * Submit an application running our ApplicationMaster to the ResourceManager. * The stable Yarn API provides a convenience method (YarnClient#createApplication) for * creating applications and setting up the application submission context. This was not * available in the alpha API. def submitApplication(): ApplicationId = { ResourceRequestHelper.validateResources(sparkConf) var appId: ApplicationId = null try { // 1. YARN客户端初始化和启动 launcherBackend.connect() // 初始化YARN客户端 yarnClient.init(hadoopConf) // 启动YARN客户端 yarnClient.start() logInfo("Requesting a new application from cluster with %d NodeManagers" .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers)) // Get a new application from our RM // 2. 向RM请求创建新的YARN应用,拿到appId val newApp = yarnClient.createApplication() val newAppResponse = newApp.getNewApplicationResponse() appId = newAppResponse.getApplicationId() // ... // Set up the appropriate contexts to launch our AM // 3. 为ApplicationMaster创建上下文 val containerContext = createContainerLaunchContext(newAppResponse) val appContext = createApplicationSubmissionContext(newApp, containerContext) // Finally, submit and monitor the application // 4. 提交并监控应用的运行状态 logInfo(s"Submitting application $appId to ResourceManager") yarnClient.submitApplication(appContext) launcherBackend.setAppId(appId.toString) reportLauncherState(SparkAppHandle.State.SUBMITTED) // 返回appId appId } catch { case e: Throwable => if (stagingDirPath != null) { cleanupStagingDir() throw e

    创建容器上下文
    containerContext = createContainerLaunchContext(newAppResponse)

    * Set up a ContainerLaunchContext to launch our ApplicationMaster container. * This sets up the launch environment, java options, and the command for launching the AM. private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse) : ContainerLaunchContext = { // JVM参数等 // ... // 这里是重点 - ApplicationMaster的 class val amClass = if (isClusterMode) { Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName } else { Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName // ... // ApplicationMaster启动参数拼接 val amArgs = Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ userArgs ++ Seq("--properties-file", buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, SPARK_CONF_FILE)) ++ Seq("--dist-cache-conf", buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, DIST_CACHE_CONF_FILE)) // 启动AM的命令行脚本 Command for the ApplicationMaster val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++ javaOpts ++ amArgs ++ Seq( "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout", "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr") // TODO: it would be nicer to just make sure there are no null commands here val printableCommands = commands.map(s => if (s == null) "null" else s).toList amContainer.setCommands(printableCommands.asJava) // ... // send the acl settings into YARN to control who has access via YARN interfaces val securityManager = new SecurityManager(sparkConf) amContainer.setApplicationACLs( YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager).asJava) setupSecurityToken(amContainer) // 返回amContainer amContainer

    ContainerLaunchContext是一个抽象类,包含ContainerId、Resource、User、Security tokens、LocalResource、Environment variables、启动容器的命令等。

    * {@code ContainerLaunchContext} represents all of the information * needed by the {@code NodeManager} to launch a container. * It includes details such as: * <li>{ContainerId} of the container.</li> * <li>{Resource} allocated to the container.</li> * <li>User to whom the container is allocated.</li> * <li>Security tokens (if security is enabled).</li> * {LocalResource} necessary for running the container such * as binaries, jar, shared-objects, side-files etc. * </li> * <li>Optional, application-specific binary service data.</li> * <li>Environment variables for the launched process.</li> * <li>Command to launch the container.</li> * </ul> * @see ContainerManagementProtocol#startContainers(org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest) @Public @Stable public abstract class ContainerLaunchContext { //...} 复制代码
    分类:
    代码人生
    标签: