spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ues...@apache.org
Subject [2/2] spark git commit: [SPARK-22757][KUBERNETES] Enable use of remote dependencies (http, s3, gcs, etc.) in Kubernetes mode
Date Thu, 28 Dec 2017 04:44:53 GMT
[SPARK-22757][KUBERNETES] Enable use of remote dependencies (http, s3, gcs, etc.) in Kubernetes mode

## What changes were proposed in this pull request?

This PR expands the Kubernetes mode to be able to use remote dependencies on http/https endpoints, GCS, S3, etc. It adds steps for configuring and appending the Kubernetes init-container into the driver and executor pods for downloading remote dependencies.
[Init-containers](https://kubernetes.io/docs/concepts/workloads/pods/init-containers/), as the name suggests, are containers that are run to completion before the main containers start, and are often used to perform initialization tasks prior to starting the main containers. We use init-containers to localize remote application dependencies before the driver/executors start running. The code that the init-container runs is also included. This PR also adds a step to the driver and executors for mounting user-specified secrets that may store credentials for accessing data storage, e.g., S3 and Google Cloud Storage (GCS), into the driver and executors.

## How was this patch tested?

* The patch contains unit tests which are passing.
* Manual testing: `./build/mvn -Pkubernetes clean package` succeeded.
* Manual testing of the following cases:
  * [x] Running SparkPi using container-local spark-example jar.
  * [x] Running SparkPi using container-local spark-example jar with user-specific secret mounted.
  * [x] Running SparkPi using spark-example jar hosted remotely on an https endpoint.

cc rxin felixcheung mateiz (shepherd)
k8s-big-data SIG members & contributors: mccheah foxish ash211 ssuchter varunkatta kimoonkim erikerlandson tnachen ifilonenko liyinan926
reviewers: vanzin felixcheung jiangxb1987 mridulm

Author: Yinan Li <liyinan926@gmail.com>

Closes #19954 from liyinan926/init-container.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/171f6dda
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/171f6dda
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/171f6dda

Branch: refs/heads/master
Commit: 171f6ddadc6185ffcc6ad82e5f48952fb49095b2
Parents: 32ec269
Author: Yinan Li <liyinan926@gmail.com>
Authored: Thu Dec 28 13:44:44 2017 +0900
Committer: Takuya UESHIN <ueshin@databricks.com>
Committed: Thu Dec 28 13:44:44 2017 +0900

----------------------------------------------------------------------
 .../org/apache/spark/deploy/k8s/Config.scala    |  67 +++++++-
 .../spark/deploy/k8s/ConfigurationUtils.scala   |  41 -----
 .../org/apache/spark/deploy/k8s/Constants.scala |  11 ++
 .../deploy/k8s/InitContainerBootstrap.scala     | 119 +++++++++++++
 .../spark/deploy/k8s/KubernetesUtils.scala      | 114 +++++++++++++
 .../deploy/k8s/MountSecretsBootstrap.scala      |  62 +++++++
 .../k8s/PodWithDetachedInitContainer.scala      |  31 ++++
 .../k8s/SparkKubernetesClientFactory.scala      |   2 +-
 .../k8s/submit/DriverConfigOrchestrator.scala   | 170 +++++++++++++++++++
 .../DriverConfigurationStepsOrchestrator.scala  | 125 --------------
 .../submit/KubernetesClientApplication.scala    |  26 ++-
 .../deploy/k8s/submit/KubernetesFileUtils.scala |  68 --------
 .../steps/BaseDriverConfigurationStep.scala     | 165 ------------------
 .../steps/BasicDriverConfigurationStep.scala    | 160 +++++++++++++++++
 .../submit/steps/DependencyResolutionStep.scala |  27 +--
 .../submit/steps/DriverConfigurationStep.scala  |   2 +-
 .../DriverInitContainerBootstrapStep.scala      |  95 +++++++++++
 .../submit/steps/DriverMountSecretsStep.scala   |  38 +++++
 .../steps/DriverServiceBootstrapStep.scala      |  17 +-
 .../BasicInitContainerConfigurationStep.scala   |  67 ++++++++
 .../InitContainerConfigOrchestrator.scala       |  79 +++++++++
 .../InitContainerConfigurationStep.scala        |  25 +++
 .../InitContainerMountSecretsStep.scala         |  39 +++++
 .../steps/initcontainer/InitContainerSpec.scala |  37 ++++
 .../deploy/rest/k8s/SparkPodInitContainer.scala | 116 +++++++++++++
 .../cluster/k8s/ExecutorPodFactory.scala        |  77 ++++++---
 .../cluster/k8s/KubernetesClusterManager.scala  |  62 ++++++-
 .../submit/DriverConfigOrchestratorSuite.scala  | 129 ++++++++++++++
 ...verConfigurationStepsOrchestratorSuite.scala |  82 ---------
 .../deploy/k8s/submit/SecretVolumeUtils.scala   |  36 ++++
 .../BaseDriverConfigurationStepSuite.scala      | 118 -------------
 .../BasicDriverConfigurationStepSuite.scala     | 118 +++++++++++++
 .../DriverInitContainerBootstrapStepSuite.scala | 160 +++++++++++++++++
 .../steps/DriverMountSecretsStepSuite.scala     |  49 ++++++
 ...sicInitContainerConfigurationStepSuite.scala |  95 +++++++++++
 .../InitContainerConfigOrchestratorSuite.scala  |  80 +++++++++
 .../InitContainerMountSecretsStepSuite.scala    |  57 +++++++
 .../rest/k8s/SparkPodInitContainerSuite.scala   |  86 ++++++++++
 .../cluster/k8s/ExecutorPodFactorySuite.scala   |  82 ++++++++-
 .../src/main/dockerfiles/driver/Dockerfile      |   3 +-
 .../src/main/dockerfiles/executor/Dockerfile    |   3 +-
 .../main/dockerfiles/init-container/Dockerfile  |  24 +++
 42 files changed, 2288 insertions(+), 676 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 45f5279..e5d79d9 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -20,7 +20,6 @@ import java.util.concurrent.TimeUnit
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.ConfigBuilder
-import org.apache.spark.network.util.ByteUnit
 
 private[spark] object Config extends Logging {
 
@@ -132,20 +131,72 @@ private[spark] object Config extends Logging {
 
   val JARS_DOWNLOAD_LOCATION =
     ConfigBuilder("spark.kubernetes.mountDependencies.jarsDownloadDir")
-      .doc("Location to download jars to in the driver and executors. When using" +
-        " spark-submit, this directory must be empty and will be mounted as an empty directory" +
-        " volume on the driver and executor pod.")
+      .doc("Location to download jars to in the driver and executors. When using " +
+        "spark-submit, this directory must be empty and will be mounted as an empty directory " +
+        "volume on the driver and executor pod.")
       .stringConf
       .createWithDefault("/var/spark-data/spark-jars")
 
   val FILES_DOWNLOAD_LOCATION =
     ConfigBuilder("spark.kubernetes.mountDependencies.filesDownloadDir")
-      .doc("Location to download files to in the driver and executors. When using" +
-        " spark-submit, this directory must be empty and will be mounted as an empty directory" +
-        " volume on the driver and executor pods.")
+      .doc("Location to download files to in the driver and executors. When using " +
+        "spark-submit, this directory must be empty and will be mounted as an empty directory " +
+        "volume on the driver and executor pods.")
       .stringConf
       .createWithDefault("/var/spark-data/spark-files")
 
+  val INIT_CONTAINER_IMAGE =
+    ConfigBuilder("spark.kubernetes.initContainer.image")
+      .doc("Image for the driver and executor's init-container for downloading dependencies.")
+      .stringConf
+      .createOptional
+
+  val INIT_CONTAINER_MOUNT_TIMEOUT =
+    ConfigBuilder("spark.kubernetes.mountDependencies.timeout")
+      .doc("Timeout before aborting the attempt to download and unpack dependencies from remote " +
+        "locations into the driver and executor pods.")
+      .timeConf(TimeUnit.SECONDS)
+      .createWithDefault(300)
+
+  val INIT_CONTAINER_MAX_THREAD_POOL_SIZE =
+    ConfigBuilder("spark.kubernetes.mountDependencies.maxSimultaneousDownloads")
+      .doc("Maximum number of remote dependencies to download simultaneously in a driver or " +
+        "executor pod.")
+      .intConf
+      .createWithDefault(5)
+
+  val INIT_CONTAINER_REMOTE_JARS =
+    ConfigBuilder("spark.kubernetes.initContainer.remoteJars")
+      .doc("Comma-separated list of jar URIs to download in the init-container. This is " +
+        "calculated from spark.jars.")
+      .internal()
+      .stringConf
+      .createOptional
+
+  val INIT_CONTAINER_REMOTE_FILES =
+    ConfigBuilder("spark.kubernetes.initContainer.remoteFiles")
+      .doc("Comma-separated list of file URIs to download in the init-container. This is " +
+        "calculated from spark.files.")
+      .internal()
+      .stringConf
+      .createOptional
+
+  val INIT_CONTAINER_CONFIG_MAP_NAME =
+    ConfigBuilder("spark.kubernetes.initContainer.configMapName")
+      .doc("Name of the config map to use in the init-container that retrieves submitted files " +
+        "for the executor.")
+      .internal()
+      .stringConf
+      .createOptional
+
+  val INIT_CONTAINER_CONFIG_MAP_KEY_CONF =
+    ConfigBuilder("spark.kubernetes.initContainer.configMapKey")
+      .doc("Key for the entry in the init container config map for submitted files that " +
+        "corresponds to the properties for this init-container.")
+      .internal()
+      .stringConf
+      .createOptional
+
   val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX =
     "spark.kubernetes.authenticate.submission"
 
@@ -153,9 +204,11 @@ private[spark] object Config extends Logging {
 
   val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
   val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation."
+  val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets."
 
   val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label."
   val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation."
+  val KUBERNETES_EXECUTOR_SECRETS_PREFIX = "spark.kubernetes.executor.secrets."
 
   val KUBERNETES_DRIVER_ENV_KEY = "spark.kubernetes.driverEnv."
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/ConfigurationUtils.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/ConfigurationUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/ConfigurationUtils.scala
deleted file mode 100644
index 0171747..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/ConfigurationUtils.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.k8s
-
-import org.apache.spark.SparkConf
-
-private[spark] object ConfigurationUtils {
-
-  /**
-   * Extract and parse Spark configuration properties with a given name prefix and
-   * return the result as a Map. Keys must not have more than one value.
-   *
-   * @param sparkConf Spark configuration
-   * @param prefix the given property name prefix
-   * @return a Map storing the configuration property keys and values
-   */
-  def parsePrefixedKeyValuePairs(
-      sparkConf: SparkConf,
-      prefix: String): Map[String, String] = {
-    sparkConf.getAllWithPrefix(prefix).toMap
-  }
-
-  def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = {
-    opt1.foreach { _ => require(opt2.isEmpty, errMessage) }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
index 0b91145..111cb2a 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
@@ -69,6 +69,17 @@ private[spark] object Constants {
   val ENV_DRIVER_JAVA_OPTS = "SPARK_DRIVER_JAVA_OPTS"
   val ENV_DRIVER_BIND_ADDRESS = "SPARK_DRIVER_BIND_ADDRESS"
   val ENV_DRIVER_MEMORY = "SPARK_DRIVER_MEMORY"
+  val ENV_MOUNTED_FILES_DIR = "SPARK_MOUNTED_FILES_DIR"
+
+  // Bootstrapping dependencies with the init-container
+  val INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME = "download-jars-volume"
+  val INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME = "download-files-volume"
+  val INIT_CONTAINER_PROPERTIES_FILE_VOLUME = "spark-init-properties"
+  val INIT_CONTAINER_PROPERTIES_FILE_DIR = "/etc/spark-init"
+  val INIT_CONTAINER_PROPERTIES_FILE_NAME = "spark-init.properties"
+  val INIT_CONTAINER_PROPERTIES_FILE_PATH =
+    s"$INIT_CONTAINER_PROPERTIES_FILE_DIR/$INIT_CONTAINER_PROPERTIES_FILE_NAME"
+  val INIT_CONTAINER_SECRET_VOLUME_NAME = "spark-init-secret"
 
   // Miscellaneous
   val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"

http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/InitContainerBootstrap.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/InitContainerBootstrap.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/InitContainerBootstrap.scala
new file mode 100644
index 0000000..dfeccf9
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/InitContainerBootstrap.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, EmptyDirVolumeSource, EnvVarBuilder, PodBuilder, VolumeMount, VolumeMountBuilder}
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+
+/**
+ * Bootstraps an init-container for downloading remote dependencies. This is separated out from
+ * the init-container steps API because this component can be used to bootstrap init-containers
+ * for both the driver and executors.
+ */
+private[spark] class InitContainerBootstrap(
+    initContainerImage: String,
+    imagePullPolicy: String,
+    jarsDownloadPath: String,
+    filesDownloadPath: String,
+    configMapName: String,
+    configMapKey: String,
+    sparkRole: String,
+    sparkConf: SparkConf) {
+
+  /**
+   * Bootstraps an init-container that downloads dependencies to be used by a main container.
+   */
+  def bootstrapInitContainer(
+      original: PodWithDetachedInitContainer): PodWithDetachedInitContainer = {
+    val sharedVolumeMounts = Seq[VolumeMount](
+      new VolumeMountBuilder()
+        .withName(INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME)
+        .withMountPath(jarsDownloadPath)
+        .build(),
+      new VolumeMountBuilder()
+        .withName(INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME)
+        .withMountPath(filesDownloadPath)
+        .build())
+
+    val customEnvVarKeyPrefix = sparkRole match {
+      case SPARK_POD_DRIVER_ROLE => KUBERNETES_DRIVER_ENV_KEY
+      case SPARK_POD_EXECUTOR_ROLE => "spark.executorEnv."
+      case _ => throw new SparkException(s"$sparkRole is not a valid Spark pod role")
+    }
+    val customEnvVars = sparkConf.getAllWithPrefix(customEnvVarKeyPrefix).toSeq.map {
+      case (key, value) =>
+        new EnvVarBuilder()
+          .withName(key)
+          .withValue(value)
+          .build()
+    }
+
+    val initContainer = new ContainerBuilder(original.initContainer)
+      .withName("spark-init")
+      .withImage(initContainerImage)
+      .withImagePullPolicy(imagePullPolicy)
+      .addAllToEnv(customEnvVars.asJava)
+      .addNewVolumeMount()
+        .withName(INIT_CONTAINER_PROPERTIES_FILE_VOLUME)
+        .withMountPath(INIT_CONTAINER_PROPERTIES_FILE_DIR)
+        .endVolumeMount()
+      .addToVolumeMounts(sharedVolumeMounts: _*)
+      .addToArgs(INIT_CONTAINER_PROPERTIES_FILE_PATH)
+      .build()
+
+    val podWithBasicVolumes = new PodBuilder(original.pod)
+      .editSpec()
+      .addNewVolume()
+        .withName(INIT_CONTAINER_PROPERTIES_FILE_VOLUME)
+        .withNewConfigMap()
+          .withName(configMapName)
+          .addNewItem()
+            .withKey(configMapKey)
+            .withPath(INIT_CONTAINER_PROPERTIES_FILE_NAME)
+            .endItem()
+          .endConfigMap()
+        .endVolume()
+      .addNewVolume()
+        .withName(INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME)
+        .withEmptyDir(new EmptyDirVolumeSource())
+        .endVolume()
+      .addNewVolume()
+        .withName(INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME)
+        .withEmptyDir(new EmptyDirVolumeSource())
+        .endVolume()
+      .endSpec()
+      .build()
+
+    val mainContainer = new ContainerBuilder(original.mainContainer)
+      .addToVolumeMounts(sharedVolumeMounts: _*)
+      .addNewEnv()
+        .withName(ENV_MOUNTED_FILES_DIR)
+        .withValue(filesDownloadPath)
+        .endEnv()
+      .build()
+
+    PodWithDetachedInitContainer(
+      podWithBasicVolumes,
+      initContainer,
+      mainContainer)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
new file mode 100644
index 0000000..37331d8
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.io.File
+
+import io.fabric8.kubernetes.api.model.{Container, Pod, PodBuilder}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+private[spark] object KubernetesUtils {
+
+  /**
+   * Extract and parse Spark configuration properties with a given name prefix and
+   * return the result as a Map. Keys must not have more than one value.
+   *
+   * @param sparkConf Spark configuration
+   * @param prefix the given property name prefix
+   * @return a Map storing the configuration property keys and values
+   */
+  def parsePrefixedKeyValuePairs(
+      sparkConf: SparkConf,
+      prefix: String): Map[String, String] = {
+    sparkConf.getAllWithPrefix(prefix).toMap
+  }
+
+  def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = {
+    opt1.foreach { _ => require(opt2.isEmpty, errMessage) }
+  }
+
+  /**
+   * Append the given init-container to a pod's list of init-containers.
+   *
+   * @param originalPodSpec original specification of the pod
+   * @param initContainer the init-container to add to the pod
+   * @return the pod with the init-container added to the list of InitContainers
+   */
+  def appendInitContainer(originalPodSpec: Pod, initContainer: Container): Pod = {
+    new PodBuilder(originalPodSpec)
+      .editOrNewSpec()
+        .addToInitContainers(initContainer)
+        .endSpec()
+      .build()
+  }
+
+  /**
+   * For the given collection of file URIs, resolves them as follows:
+   * - File URIs with scheme file:// are resolved to the given download path.
+   * - File URIs with scheme local:// resolve to just the path of the URI.
+   * - Otherwise, the URIs are returned as-is.
+   */
+  def resolveFileUris(
+      fileUris: Iterable[String],
+      fileDownloadPath: String): Iterable[String] = {
+    fileUris.map { uri =>
+      resolveFileUri(uri, fileDownloadPath, false)
+    }
+  }
+
+  /**
+   * If any file uri has any scheme other than local:// it is mapped as if the file
+   * was downloaded to the file download path. Otherwise, it is mapped to the path
+   * part of the URI.
+   */
+  def resolveFilePaths(fileUris: Iterable[String], fileDownloadPath: String): Iterable[String] = {
+    fileUris.map { uri =>
+      resolveFileUri(uri, fileDownloadPath, true)
+    }
+  }
+
+  /**
+   * Get from a given collection of file URIs the ones that represent remote files.
+   */
+  def getOnlyRemoteFiles(uris: Iterable[String]): Iterable[String] = {
+    uris.filter { uri =>
+      val scheme = Utils.resolveURI(uri).getScheme
+      scheme != "file" && scheme != "local"
+    }
+  }
+
+  private def resolveFileUri(
+      uri: String,
+      fileDownloadPath: String,
+      assumesDownloaded: Boolean): String = {
+    val fileUri = Utils.resolveURI(uri)
+    val fileScheme = Option(fileUri.getScheme).getOrElse("file")
+    fileScheme match {
+      case "local" =>
+        fileUri.getPath
+      case _ =>
+        if (assumesDownloaded || fileScheme == "file") {
+          val fileName = new File(fileUri.getPath).getName
+          s"$fileDownloadPath/$fileName"
+        } else {
+          uri
+        }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/MountSecretsBootstrap.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/MountSecretsBootstrap.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/MountSecretsBootstrap.scala
new file mode 100644
index 0000000..8286546
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/MountSecretsBootstrap.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder}
+
+/**
+ * Bootstraps a driver or executor container or an init-container with needed secrets mounted.
+ */
+private[spark] class MountSecretsBootstrap(secretNamesToMountPaths: Map[String, String]) {
+
+  /**
+   * Mounts Kubernetes secrets as secret volumes into the given container in the given pod.
+   *
+   * @param pod the pod into which the secret volumes are being added.
+   * @param container the container into which the secret volumes are being mounted.
+   * @return the updated pod and container with the secrets mounted.
+   */
+  def mountSecrets(pod: Pod, container: Container): (Pod, Container) = {
+    var podBuilder = new PodBuilder(pod)
+    secretNamesToMountPaths.keys.foreach { name =>
+      podBuilder = podBuilder
+        .editOrNewSpec()
+          .addNewVolume()
+          .withName(secretVolumeName(name))
+          .withNewSecret()
+            .withSecretName(name)
+            .endSecret()
+          .endVolume()
+          .endSpec()
+    }
+
+    var containerBuilder = new ContainerBuilder(container)
+    secretNamesToMountPaths.foreach { case (name, path) =>
+      containerBuilder = containerBuilder
+        .addNewVolumeMount()
+          .withName(secretVolumeName(name))
+          .withMountPath(path)
+          .endVolumeMount()
+    }
+
+    (podBuilder.build(), containerBuilder.build())
+  }
+
+  private def secretVolumeName(secretName: String): String = {
+    secretName + "-volume"
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/PodWithDetachedInitContainer.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/PodWithDetachedInitContainer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/PodWithDetachedInitContainer.scala
new file mode 100644
index 0000000..0b79f8b
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/PodWithDetachedInitContainer.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import io.fabric8.kubernetes.api.model.{Container, Pod}
+
+/**
+ * Represents a pod with a detached init-container (not yet added to the pod).
+ *
+ * @param pod the pod
+ * @param initContainer the init-container in the pod
+ * @param mainContainer the main container in the pod
+ */
+private[spark] case class PodWithDetachedInitContainer(
+    pod: Pod,
+    initContainer: Container,
+    mainContainer: Container)

http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala
index 1e3f055..c47e78c 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala
@@ -48,7 +48,7 @@ private[spark] object SparkKubernetesClientFactory {
       .map(new File(_))
       .orElse(defaultServiceAccountToken)
     val oauthTokenValue = sparkConf.getOption(oauthTokenConf)
-    ConfigurationUtils.requireNandDefined(
+    KubernetesUtils.requireNandDefined(
       oauthTokenFile,
       oauthTokenValue,
       s"Cannot specify OAuth token through both a file $oauthTokenFileConf and a " +

http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestrator.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestrator.scala
new file mode 100644
index 0000000..00c9c4e
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestrator.scala
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.UUID
+
+import com.google.common.primitives.Longs
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.{KubernetesUtils, MountSecretsBootstrap}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.steps._
+import org.apache.spark.deploy.k8s.submit.steps.initcontainer.InitContainerConfigOrchestrator
+import org.apache.spark.launcher.SparkLauncher
+import org.apache.spark.util.SystemClock
+import org.apache.spark.util.Utils
+
+/**
+ * Figures out and returns the complete ordered list of needed DriverConfigurationSteps to
+ * configure the Spark driver pod. The returned steps will be applied one by one in the given
+ * order to produce a final KubernetesDriverSpec that is used in KubernetesClientApplication
+ * to construct and create the driver pod. It uses the InitContainerConfigOrchestrator to
+ * configure the driver init-container if one is needed, i.e., when there are remote dependencies
+ * to localize.
+ */
+private[spark] class DriverConfigOrchestrator(
+    kubernetesAppId: String,
+    launchTime: Long,
+    mainAppResource: Option[MainAppResource],
+    appName: String,
+    mainClass: String,
+    appArgs: Array[String],
+    sparkConf: SparkConf) {
+
+  // The resource name prefix is derived from the Spark application name, making it easy to connect
+  // the names of the Kubernetes resources from e.g. kubectl or the Kubernetes dashboard to the
+  // application the user submitted.
+  private val kubernetesResourceNamePrefix = {
+    val uuid = UUID.nameUUIDFromBytes(Longs.toByteArray(launchTime)).toString.replaceAll("-", "")
+    s"$appName-$uuid".toLowerCase.replaceAll("\\.", "-")
+  }
+
+  private val imagePullPolicy = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
+  private val initContainerConfigMapName = s"$kubernetesResourceNamePrefix-init-config"
+  private val jarsDownloadPath = sparkConf.get(JARS_DOWNLOAD_LOCATION)
+  private val filesDownloadPath = sparkConf.get(FILES_DOWNLOAD_LOCATION)
+
+  def getAllConfigurationSteps: Seq[DriverConfigurationStep] = {
+    val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
+      sparkConf,
+      KUBERNETES_DRIVER_LABEL_PREFIX)
+    require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with key " +
+      s"$SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping " +
+      "operations.")
+    require(!driverCustomLabels.contains(SPARK_ROLE_LABEL), "Label with key " +
+      s"$SPARK_ROLE_LABEL is not allowed as it is reserved for Spark bookkeeping " +
+      "operations.")
+
+    val secretNamesToMountPaths = KubernetesUtils.parsePrefixedKeyValuePairs(
+      sparkConf,
+      KUBERNETES_DRIVER_SECRETS_PREFIX)
+
+    val allDriverLabels = driverCustomLabels ++ Map(
+      SPARK_APP_ID_LABEL -> kubernetesAppId,
+      SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE)
+
+    val initialSubmissionStep = new BasicDriverConfigurationStep(
+      kubernetesAppId,
+      kubernetesResourceNamePrefix,
+      allDriverLabels,
+      imagePullPolicy,
+      appName,
+      mainClass,
+      appArgs,
+      sparkConf)
+
+    val serviceBootstrapStep = new DriverServiceBootstrapStep(
+      kubernetesResourceNamePrefix,
+      allDriverLabels,
+      sparkConf,
+      new SystemClock)
+
+    val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep(
+      sparkConf, kubernetesResourceNamePrefix)
+
+    val additionalMainAppJar = if (mainAppResource.nonEmpty) {
+       val mayBeResource = mainAppResource.get match {
+        case JavaMainAppResource(resource) if resource != SparkLauncher.NO_RESOURCE =>
+          Some(resource)
+        case _ => None
+      }
+      mayBeResource
+    } else {
+      None
+    }
+
+    val sparkJars = sparkConf.getOption("spark.jars")
+      .map(_.split(","))
+      .getOrElse(Array.empty[String]) ++
+      additionalMainAppJar.toSeq
+    val sparkFiles = sparkConf.getOption("spark.files")
+      .map(_.split(","))
+      .getOrElse(Array.empty[String])
+
+    val dependencyResolutionStep = if (sparkJars.nonEmpty || sparkFiles.nonEmpty) {
+      Seq(new DependencyResolutionStep(
+        sparkJars,
+        sparkFiles,
+        jarsDownloadPath,
+        filesDownloadPath))
+    } else {
+      Nil
+    }
+
+    val initContainerBootstrapStep = if (existNonContainerLocalFiles(sparkJars ++ sparkFiles)) {
+      val orchestrator = new InitContainerConfigOrchestrator(
+        sparkJars,
+        sparkFiles,
+        jarsDownloadPath,
+        filesDownloadPath,
+        imagePullPolicy,
+        initContainerConfigMapName,
+        INIT_CONTAINER_PROPERTIES_FILE_NAME,
+        sparkConf)
+      val bootstrapStep = new DriverInitContainerBootstrapStep(
+        orchestrator.getAllConfigurationSteps,
+        initContainerConfigMapName,
+        INIT_CONTAINER_PROPERTIES_FILE_NAME)
+
+      Seq(bootstrapStep)
+    } else {
+      Nil
+    }
+
+    val mountSecretsStep = if (secretNamesToMountPaths.nonEmpty) {
+      Seq(new DriverMountSecretsStep(new MountSecretsBootstrap(secretNamesToMountPaths)))
+    } else {
+      Nil
+    }
+
+    Seq(
+      initialSubmissionStep,
+      serviceBootstrapStep,
+      kubernetesCredentialsStep) ++
+      dependencyResolutionStep ++
+      initContainerBootstrapStep ++
+      mountSecretsStep
+  }
+
+  private def existNonContainerLocalFiles(files: Seq[String]): Boolean = {
+    files.exists { uri =>
+      Utils.resolveURI(uri).getScheme != "local"
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala
deleted file mode 100644
index 1411e6f..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit
-
-import java.util.UUID
-
-import com.google.common.primitives.Longs
-
-import org.apache.spark.SparkConf
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.deploy.k8s.ConfigurationUtils
-import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.submit.steps._
-import org.apache.spark.launcher.SparkLauncher
-import org.apache.spark.util.SystemClock
-
-/**
- * Constructs the complete list of driver configuration steps to run to deploy the Spark driver.
- */
-private[spark] class DriverConfigurationStepsOrchestrator(
-    namespace: String,
-    kubernetesAppId: String,
-    launchTime: Long,
-    mainAppResource: Option[MainAppResource],
-    appName: String,
-    mainClass: String,
-    appArgs: Array[String],
-    submissionSparkConf: SparkConf) {
-
-  // The resource name prefix is derived from the Spark application name, making it easy to connect
-  // the names of the Kubernetes resources from e.g. kubectl or the Kubernetes dashboard to the
-  // application the user submitted.
-  private val kubernetesResourceNamePrefix = {
-    val uuid = UUID.nameUUIDFromBytes(Longs.toByteArray(launchTime)).toString.replaceAll("-", "")
-    s"$appName-$uuid".toLowerCase.replaceAll("\\.", "-")
-  }
-
-  private val imagePullPolicy = submissionSparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
-  private val jarsDownloadPath = submissionSparkConf.get(JARS_DOWNLOAD_LOCATION)
-  private val filesDownloadPath = submissionSparkConf.get(FILES_DOWNLOAD_LOCATION)
-
-  def getAllConfigurationSteps(): Seq[DriverConfigurationStep] = {
-    val driverCustomLabels = ConfigurationUtils.parsePrefixedKeyValuePairs(
-      submissionSparkConf,
-      KUBERNETES_DRIVER_LABEL_PREFIX)
-    require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with key " +
-      s"$SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping " +
-      "operations.")
-    require(!driverCustomLabels.contains(SPARK_ROLE_LABEL), "Label with key " +
-      s"$SPARK_ROLE_LABEL is not allowed as it is reserved for Spark bookkeeping " +
-      "operations.")
-
-    val allDriverLabels = driverCustomLabels ++ Map(
-      SPARK_APP_ID_LABEL -> kubernetesAppId,
-      SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE)
-
-    val initialSubmissionStep = new BaseDriverConfigurationStep(
-      kubernetesAppId,
-      kubernetesResourceNamePrefix,
-      allDriverLabels,
-      imagePullPolicy,
-      appName,
-      mainClass,
-      appArgs,
-      submissionSparkConf)
-
-    val driverAddressStep = new DriverServiceBootstrapStep(
-      kubernetesResourceNamePrefix,
-      allDriverLabels,
-      submissionSparkConf,
-      new SystemClock)
-
-    val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep(
-      submissionSparkConf, kubernetesResourceNamePrefix)
-
-    val additionalMainAppJar = if (mainAppResource.nonEmpty) {
-       val mayBeResource = mainAppResource.get match {
-        case JavaMainAppResource(resource) if resource != SparkLauncher.NO_RESOURCE =>
-          Some(resource)
-        case _ => None
-      }
-      mayBeResource
-    } else {
-      None
-    }
-
-    val sparkJars = submissionSparkConf.getOption("spark.jars")
-      .map(_.split(","))
-      .getOrElse(Array.empty[String]) ++
-      additionalMainAppJar.toSeq
-    val sparkFiles = submissionSparkConf.getOption("spark.files")
-      .map(_.split(","))
-      .getOrElse(Array.empty[String])
-
-    val maybeDependencyResolutionStep = if (sparkJars.nonEmpty || sparkFiles.nonEmpty) {
-      Some(new DependencyResolutionStep(
-        sparkJars,
-        sparkFiles,
-        jarsDownloadPath,
-        filesDownloadPath))
-    } else {
-      None
-    }
-
-    Seq(
-      initialSubmissionStep,
-      driverAddressStep,
-      kubernetesCredentialsStep) ++
-      maybeDependencyResolutionStep.toSeq
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
index 240a114..5884348 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
@@ -80,22 +80,22 @@ private[spark] object ClientArguments {
  * spark.kubernetes.submission.waitAppCompletion is true.
  *
  * @param submissionSteps steps that collectively configure the driver
- * @param submissionSparkConf the submission client Spark configuration
+ * @param sparkConf the submission client Spark configuration
  * @param kubernetesClient the client to talk to the Kubernetes API server
  * @param waitForAppCompletion a flag indicating whether the client should wait for the application
  *                             to complete
  * @param appName the application name
- * @param loggingPodStatusWatcher a watcher that monitors and logs the application status
+ * @param watcher a watcher that monitors and logs the application status
  */
 private[spark] class Client(
     submissionSteps: Seq[DriverConfigurationStep],
-    submissionSparkConf: SparkConf,
+    sparkConf: SparkConf,
     kubernetesClient: KubernetesClient,
     waitForAppCompletion: Boolean,
     appName: String,
-    loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging {
+    watcher: LoggingPodStatusWatcher) extends Logging {
 
-  private val driverJavaOptions = submissionSparkConf.get(
+  private val driverJavaOptions = sparkConf.get(
     org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
 
    /**
@@ -104,7 +104,7 @@ private[spark] class Client(
     * will be used to build the Driver Container, Driver Pod, and Kubernetes Resources
     */
   def run(): Unit = {
-    var currentDriverSpec = KubernetesDriverSpec.initialSpec(submissionSparkConf)
+    var currentDriverSpec = KubernetesDriverSpec.initialSpec(sparkConf)
     // submissionSteps contain steps necessary to take, to resolve varying
     // client arguments that are passed in, created by orchestrator
     for (nextStep <- submissionSteps) {
@@ -141,7 +141,7 @@ private[spark] class Client(
       kubernetesClient
         .pods()
         .withName(resolvedDriverPod.getMetadata.getName)
-        .watch(loggingPodStatusWatcher)) { _ =>
+        .watch(watcher)) { _ =>
       val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
       try {
         if (currentDriverSpec.otherKubernetesResources.nonEmpty) {
@@ -157,7 +157,7 @@ private[spark] class Client(
 
       if (waitForAppCompletion) {
         logInfo(s"Waiting for application $appName to finish...")
-        loggingPodStatusWatcher.awaitCompletion()
+        watcher.awaitCompletion()
         logInfo(s"Application $appName finished.")
       } else {
         logInfo(s"Deployed Spark application $appName into Kubernetes.")
@@ -207,11 +207,9 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
     val master = sparkConf.get("spark.master").substring("k8s://".length)
     val loggingInterval = if (waitForAppCompletion) Some(sparkConf.get(REPORT_INTERVAL)) else None
 
-    val loggingPodStatusWatcher = new LoggingPodStatusWatcherImpl(
-      kubernetesAppId, loggingInterval)
+    val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, loggingInterval)
 
-    val configurationStepsOrchestrator = new DriverConfigurationStepsOrchestrator(
-      namespace,
+    val orchestrator = new DriverConfigOrchestrator(
       kubernetesAppId,
       launchTime,
       clientArguments.mainAppResource,
@@ -228,12 +226,12 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
       None,
       None)) { kubernetesClient =>
         val client = new Client(
-          configurationStepsOrchestrator.getAllConfigurationSteps(),
+          orchestrator.getAllConfigurationSteps,
           sparkConf,
           kubernetesClient,
           waitForAppCompletion,
           appName,
-          loggingPodStatusWatcher)
+          watcher)
         client.run()
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesFileUtils.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesFileUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesFileUtils.scala
deleted file mode 100644
index a38cf55..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesFileUtils.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit
-
-import java.io.File
-
-import org.apache.spark.util.Utils
-
-private[spark] object KubernetesFileUtils {
-
-  /**
-   * For the given collection of file URIs, resolves them as follows:
-   * - File URIs with scheme file:// are resolved to the given download path.
-   * - File URIs with scheme local:// resolve to just the path of the URI.
-   * - Otherwise, the URIs are returned as-is.
-   */
-  def resolveFileUris(
-      fileUris: Iterable[String],
-      fileDownloadPath: String): Iterable[String] = {
-    fileUris.map { uri =>
-      resolveFileUri(uri, fileDownloadPath, false)
-    }
-  }
-
-  /**
-   * If any file uri has any scheme other than local:// it is mapped as if the file
-   * was downloaded to the file download path. Otherwise, it is mapped to the path
-   * part of the URI.
-   */
-  def resolveFilePaths(fileUris: Iterable[String], fileDownloadPath: String): Iterable[String] = {
-    fileUris.map { uri =>
-      resolveFileUri(uri, fileDownloadPath, true)
-    }
-  }
-
-  private def resolveFileUri(
-      uri: String,
-      fileDownloadPath: String,
-      assumesDownloaded: Boolean): String = {
-    val fileUri = Utils.resolveURI(uri)
-    val fileScheme = Option(fileUri.getScheme).getOrElse("file")
-    fileScheme match {
-      case "local" =>
-        fileUri.getPath
-      case _ =>
-        if (assumesDownloaded || fileScheme == "file") {
-          val fileName = new File(fileUri.getPath).getName
-          s"$fileDownloadPath/$fileName"
-        } else {
-          uri
-        }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala
deleted file mode 100644
index c335fcc..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps
-
-import scala.collection.JavaConverters._
-
-import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, PodBuilder, QuantityBuilder}
-
-import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.deploy.k8s.ConfigurationUtils
-import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
-import org.apache.spark.internal.config.{DRIVER_CLASS_PATH, DRIVER_MEMORY, DRIVER_MEMORY_OVERHEAD}
-
-/**
- * Represents the initial setup required for the driver.
- */
-private[spark] class BaseDriverConfigurationStep(
-    kubernetesAppId: String,
-    kubernetesResourceNamePrefix: String,
-    driverLabels: Map[String, String],
-    imagePullPolicy: String,
-    appName: String,
-    mainClass: String,
-    appArgs: Array[String],
-    submissionSparkConf: SparkConf) extends DriverConfigurationStep {
-
-  private val kubernetesDriverPodName = submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME)
-    .getOrElse(s"$kubernetesResourceNamePrefix-driver")
-
-  private val driverExtraClasspath = submissionSparkConf.get(
-    DRIVER_CLASS_PATH)
-
-  private val driverContainerImage = submissionSparkConf
-    .get(DRIVER_CONTAINER_IMAGE)
-    .getOrElse(throw new SparkException("Must specify the driver container image"))
-
-  // CPU settings
-  private val driverCpuCores = submissionSparkConf.getOption("spark.driver.cores").getOrElse("1")
-  private val driverLimitCores = submissionSparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES)
-
-  // Memory settings
-  private val driverMemoryMiB = submissionSparkConf.get(
-    DRIVER_MEMORY)
-  private val driverMemoryString = submissionSparkConf.get(
-    DRIVER_MEMORY.key,
-    DRIVER_MEMORY.defaultValueString)
-  private val memoryOverheadMiB = submissionSparkConf
-    .get(DRIVER_MEMORY_OVERHEAD)
-    .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt,
-      MEMORY_OVERHEAD_MIN_MIB))
-  private val driverContainerMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB
-
-  override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
-    val driverExtraClasspathEnv = driverExtraClasspath.map { classPath =>
-      new EnvVarBuilder()
-        .withName(ENV_SUBMIT_EXTRA_CLASSPATH)
-        .withValue(classPath)
-        .build()
-    }
-
-    val driverCustomAnnotations = ConfigurationUtils
-      .parsePrefixedKeyValuePairs(
-        submissionSparkConf,
-        KUBERNETES_DRIVER_ANNOTATION_PREFIX)
-    require(!driverCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION),
-      s"Annotation with key $SPARK_APP_NAME_ANNOTATION is not allowed as it is reserved for" +
-        " Spark bookkeeping operations.")
-
-    val driverCustomEnvs = submissionSparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ENV_KEY).toSeq
-      .map { env =>
-        new EnvVarBuilder()
-          .withName(env._1)
-          .withValue(env._2)
-          .build()
-      }
-
-    val allDriverAnnotations = driverCustomAnnotations ++ Map(SPARK_APP_NAME_ANNOTATION -> appName)
-
-    val nodeSelector = ConfigurationUtils.parsePrefixedKeyValuePairs(
-      submissionSparkConf, KUBERNETES_NODE_SELECTOR_PREFIX)
-
-    val driverCpuQuantity = new QuantityBuilder(false)
-      .withAmount(driverCpuCores)
-      .build()
-    val driverMemoryQuantity = new QuantityBuilder(false)
-      .withAmount(s"${driverMemoryMiB}Mi")
-      .build()
-    val driverMemoryLimitQuantity = new QuantityBuilder(false)
-      .withAmount(s"${driverContainerMemoryWithOverheadMiB}Mi")
-      .build()
-    val maybeCpuLimitQuantity = driverLimitCores.map { limitCores =>
-      ("cpu", new QuantityBuilder(false).withAmount(limitCores).build())
-    }
-
-    val driverContainer = new ContainerBuilder(driverSpec.driverContainer)
-      .withName(DRIVER_CONTAINER_NAME)
-      .withImage(driverContainerImage)
-      .withImagePullPolicy(imagePullPolicy)
-      .addAllToEnv(driverCustomEnvs.asJava)
-      .addToEnv(driverExtraClasspathEnv.toSeq: _*)
-      .addNewEnv()
-        .withName(ENV_DRIVER_MEMORY)
-        .withValue(driverMemoryString)
-        .endEnv()
-      .addNewEnv()
-        .withName(ENV_DRIVER_MAIN_CLASS)
-        .withValue(mainClass)
-        .endEnv()
-      .addNewEnv()
-        .withName(ENV_DRIVER_ARGS)
-        .withValue(appArgs.map(arg => "\"" + arg + "\"").mkString(" "))
-        .endEnv()
-      .addNewEnv()
-        .withName(ENV_DRIVER_BIND_ADDRESS)
-        .withValueFrom(new EnvVarSourceBuilder()
-          .withNewFieldRef("v1", "status.podIP")
-          .build())
-        .endEnv()
-      .withNewResources()
-        .addToRequests("cpu", driverCpuQuantity)
-        .addToRequests("memory", driverMemoryQuantity)
-        .addToLimits("memory", driverMemoryLimitQuantity)
-        .addToLimits(maybeCpuLimitQuantity.toMap.asJava)
-        .endResources()
-      .build()
-
-    val baseDriverPod = new PodBuilder(driverSpec.driverPod)
-      .editOrNewMetadata()
-        .withName(kubernetesDriverPodName)
-        .addToLabels(driverLabels.asJava)
-        .addToAnnotations(allDriverAnnotations.asJava)
-      .endMetadata()
-      .withNewSpec()
-        .withRestartPolicy("Never")
-        .withNodeSelector(nodeSelector.asJava)
-        .endSpec()
-      .build()
-
-    val resolvedSparkConf = driverSpec.driverSparkConf.clone()
-      .setIfMissing(KUBERNETES_DRIVER_POD_NAME, kubernetesDriverPodName)
-      .set("spark.app.id", kubernetesAppId)
-      .set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, kubernetesResourceNamePrefix)
-
-    driverSpec.copy(
-      driverPod = baseDriverPod,
-      driverSparkConf = resolvedSparkConf,
-      driverContainer = driverContainer)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala
new file mode 100644
index 0000000..b7a69a7
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, PodBuilder, QuantityBuilder}
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesUtils
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+import org.apache.spark.internal.config.{DRIVER_CLASS_PATH, DRIVER_MEMORY, DRIVER_MEMORY_OVERHEAD}
+
+/**
+ * Performs basic configuration for the driver pod.
+ */
+private[spark] class BasicDriverConfigurationStep(
+    kubernetesAppId: String,
+    resourceNamePrefix: String,
+    driverLabels: Map[String, String],
+    imagePullPolicy: String,
+    appName: String,
+    mainClass: String,
+    appArgs: Array[String],
+    sparkConf: SparkConf) extends DriverConfigurationStep {
+
+  private val driverPodName = sparkConf
+    .get(KUBERNETES_DRIVER_POD_NAME)
+    .getOrElse(s"$resourceNamePrefix-driver")
+
+  private val driverExtraClasspath = sparkConf.get(DRIVER_CLASS_PATH)
+
+  private val driverContainerImage = sparkConf
+    .get(DRIVER_CONTAINER_IMAGE)
+    .getOrElse(throw new SparkException("Must specify the driver container image"))
+
+  // CPU settings
+  private val driverCpuCores = sparkConf.getOption("spark.driver.cores").getOrElse("1")
+  private val driverLimitCores = sparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES)
+
+  // Memory settings
+  private val driverMemoryMiB = sparkConf.get(DRIVER_MEMORY)
+  private val driverMemoryString = sparkConf.get(
+    DRIVER_MEMORY.key, DRIVER_MEMORY.defaultValueString)
+  private val memoryOverheadMiB = sparkConf
+    .get(DRIVER_MEMORY_OVERHEAD)
+    .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt, MEMORY_OVERHEAD_MIN_MIB))
+  private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB
+
+  override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
+    val driverExtraClasspathEnv = driverExtraClasspath.map { classPath =>
+      new EnvVarBuilder()
+        .withName(ENV_SUBMIT_EXTRA_CLASSPATH)
+        .withValue(classPath)
+        .build()
+    }
+
+    val driverCustomAnnotations = KubernetesUtils.parsePrefixedKeyValuePairs(
+      sparkConf, KUBERNETES_DRIVER_ANNOTATION_PREFIX)
+    require(!driverCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION),
+      s"Annotation with key $SPARK_APP_NAME_ANNOTATION is not allowed as it is reserved for" +
+        " Spark bookkeeping operations.")
+
+    val driverCustomEnvs = sparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ENV_KEY).toSeq
+      .map { env =>
+        new EnvVarBuilder()
+          .withName(env._1)
+          .withValue(env._2)
+          .build()
+      }
+
+    val driverAnnotations = driverCustomAnnotations ++ Map(SPARK_APP_NAME_ANNOTATION -> appName)
+
+    val nodeSelector = KubernetesUtils.parsePrefixedKeyValuePairs(
+      sparkConf, KUBERNETES_NODE_SELECTOR_PREFIX)
+
+    val driverCpuQuantity = new QuantityBuilder(false)
+      .withAmount(driverCpuCores)
+      .build()
+    val driverMemoryQuantity = new QuantityBuilder(false)
+      .withAmount(s"${driverMemoryMiB}Mi")
+      .build()
+    val driverMemoryLimitQuantity = new QuantityBuilder(false)
+      .withAmount(s"${driverMemoryWithOverheadMiB}Mi")
+      .build()
+    val maybeCpuLimitQuantity = driverLimitCores.map { limitCores =>
+      ("cpu", new QuantityBuilder(false).withAmount(limitCores).build())
+    }
+
+    val driverContainer = new ContainerBuilder(driverSpec.driverContainer)
+      .withName(DRIVER_CONTAINER_NAME)
+      .withImage(driverContainerImage)
+      .withImagePullPolicy(imagePullPolicy)
+      .addAllToEnv(driverCustomEnvs.asJava)
+      .addToEnv(driverExtraClasspathEnv.toSeq: _*)
+      .addNewEnv()
+        .withName(ENV_DRIVER_MEMORY)
+        .withValue(driverMemoryString)
+        .endEnv()
+      .addNewEnv()
+        .withName(ENV_DRIVER_MAIN_CLASS)
+        .withValue(mainClass)
+        .endEnv()
+      .addNewEnv()
+        .withName(ENV_DRIVER_ARGS)
+        .withValue(appArgs.map(arg => "\"" + arg + "\"").mkString(" "))
+        .endEnv()
+      .addNewEnv()
+        .withName(ENV_DRIVER_BIND_ADDRESS)
+        .withValueFrom(new EnvVarSourceBuilder()
+          .withNewFieldRef("v1", "status.podIP")
+          .build())
+        .endEnv()
+      .withNewResources()
+        .addToRequests("cpu", driverCpuQuantity)
+        .addToRequests("memory", driverMemoryQuantity)
+        .addToLimits("memory", driverMemoryLimitQuantity)
+        .addToLimits(maybeCpuLimitQuantity.toMap.asJava)
+        .endResources()
+      .build()
+
+    val baseDriverPod = new PodBuilder(driverSpec.driverPod)
+      .editOrNewMetadata()
+        .withName(driverPodName)
+        .addToLabels(driverLabels.asJava)
+        .addToAnnotations(driverAnnotations.asJava)
+      .endMetadata()
+      .withNewSpec()
+        .withRestartPolicy("Never")
+        .withNodeSelector(nodeSelector.asJava)
+        .endSpec()
+      .build()
+
+    val resolvedSparkConf = driverSpec.driverSparkConf.clone()
+      .setIfMissing(KUBERNETES_DRIVER_POD_NAME, driverPodName)
+      .set("spark.app.id", kubernetesAppId)
+      .set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, resourceNamePrefix)
+
+    driverSpec.copy(
+      driverPod = baseDriverPod,
+      driverSparkConf = resolvedSparkConf,
+      driverContainer = driverContainer)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStep.scala
index 44e0ecf..d4b8323 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStep.scala
@@ -21,7 +21,8 @@ import java.io.File
 import io.fabric8.kubernetes.api.model.ContainerBuilder
 
 import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.submit.{KubernetesDriverSpec, KubernetesFileUtils}
+import org.apache.spark.deploy.k8s.KubernetesUtils
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
 
 /**
  * Step that configures the classpath, spark.jars, and spark.files for the driver given that the
@@ -31,21 +32,22 @@ private[spark] class DependencyResolutionStep(
     sparkJars: Seq[String],
     sparkFiles: Seq[String],
     jarsDownloadPath: String,
-    localFilesDownloadPath: String) extends DriverConfigurationStep {
+    filesDownloadPath: String) extends DriverConfigurationStep {
 
   override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
-    val resolvedSparkJars = KubernetesFileUtils.resolveFileUris(sparkJars, jarsDownloadPath)
-    val resolvedSparkFiles = KubernetesFileUtils.resolveFileUris(
-      sparkFiles, localFilesDownloadPath)
-    val sparkConfResolvedSparkDependencies = driverSpec.driverSparkConf.clone()
+    val resolvedSparkJars = KubernetesUtils.resolveFileUris(sparkJars, jarsDownloadPath)
+    val resolvedSparkFiles = KubernetesUtils.resolveFileUris(sparkFiles, filesDownloadPath)
+
+    val sparkConf = driverSpec.driverSparkConf.clone()
     if (resolvedSparkJars.nonEmpty) {
-      sparkConfResolvedSparkDependencies.set("spark.jars", resolvedSparkJars.mkString(","))
+      sparkConf.set("spark.jars", resolvedSparkJars.mkString(","))
     }
     if (resolvedSparkFiles.nonEmpty) {
-      sparkConfResolvedSparkDependencies.set("spark.files", resolvedSparkFiles.mkString(","))
+      sparkConf.set("spark.files", resolvedSparkFiles.mkString(","))
     }
-    val resolvedClasspath = KubernetesFileUtils.resolveFilePaths(sparkJars, jarsDownloadPath)
-    val driverContainerWithResolvedClasspath = if (resolvedClasspath.nonEmpty) {
+
+    val resolvedClasspath = KubernetesUtils.resolveFilePaths(sparkJars, jarsDownloadPath)
+    val resolvedDriverContainer = if (resolvedClasspath.nonEmpty) {
       new ContainerBuilder(driverSpec.driverContainer)
         .addNewEnv()
         .withName(ENV_MOUNTED_CLASSPATH)
@@ -55,8 +57,9 @@ private[spark] class DependencyResolutionStep(
     } else {
       driverSpec.driverContainer
     }
+
     driverSpec.copy(
-      driverContainer = driverContainerWithResolvedClasspath,
-      driverSparkConf = sparkConfResolvedSparkDependencies)
+      driverContainer = resolvedDriverContainer,
+      driverSparkConf = sparkConf)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverConfigurationStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverConfigurationStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverConfigurationStep.scala
index c99c043..17614e0 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverConfigurationStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverConfigurationStep.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy.k8s.submit.steps
 import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
 
 /**
- * Represents a step in preparing the Kubernetes driver.
+ * Represents a step in configuring the Spark driver pod.
  */
 private[spark] trait DriverConfigurationStep {
 

http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverInitContainerBootstrapStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverInitContainerBootstrapStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverInitContainerBootstrapStep.scala
new file mode 100644
index 0000000..9fb3daf
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverInitContainerBootstrapStep.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import java.io.StringWriter
+import java.util.Properties
+
+import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapBuilder, ContainerBuilder, HasMetadata}
+
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.KubernetesUtils
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+import org.apache.spark.deploy.k8s.submit.steps.initcontainer.{InitContainerConfigurationStep, InitContainerSpec}
+
+/**
+ * Configures the driver init-container that localizes remote dependencies into the driver pod.
+ * It applies the given InitContainerConfigurationSteps in the given order to produce a final
+ * InitContainerSpec that is then used to configure the driver pod with the init-container attached.
+ * It also builds a ConfigMap that will be mounted into the init-container. The ConfigMap carries
+ * configuration properties for the init-container.
+ */
+private[spark] class DriverInitContainerBootstrapStep(
+    steps: Seq[InitContainerConfigurationStep],
+    configMapName: String,
+    configMapKey: String)
+  extends DriverConfigurationStep {
+
+  override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
+    var initContainerSpec = InitContainerSpec(
+      properties = Map.empty[String, String],
+      driverSparkConf = Map.empty[String, String],
+      initContainer = new ContainerBuilder().build(),
+      driverContainer = driverSpec.driverContainer,
+      driverPod = driverSpec.driverPod,
+      dependentResources = Seq.empty[HasMetadata])
+    for (nextStep <- steps) {
+      initContainerSpec = nextStep.configureInitContainer(initContainerSpec)
+    }
+
+    val configMap = buildConfigMap(
+      configMapName,
+      configMapKey,
+      initContainerSpec.properties)
+    val resolvedDriverSparkConf = driverSpec.driverSparkConf
+      .clone()
+      .set(INIT_CONTAINER_CONFIG_MAP_NAME, configMapName)
+      .set(INIT_CONTAINER_CONFIG_MAP_KEY_CONF, configMapKey)
+      .setAll(initContainerSpec.driverSparkConf)
+    val resolvedDriverPod = KubernetesUtils.appendInitContainer(
+      initContainerSpec.driverPod, initContainerSpec.initContainer)
+
+    driverSpec.copy(
+      driverPod = resolvedDriverPod,
+      driverContainer = initContainerSpec.driverContainer,
+      driverSparkConf = resolvedDriverSparkConf,
+      otherKubernetesResources =
+        driverSpec.otherKubernetesResources ++
+          initContainerSpec.dependentResources ++
+          Seq(configMap))
+  }
+
+  private def buildConfigMap(
+      configMapName: String,
+      configMapKey: String,
+      config: Map[String, String]): ConfigMap = {
+    val properties = new Properties()
+    config.foreach { entry =>
+      properties.setProperty(entry._1, entry._2)
+    }
+    val propertiesWriter = new StringWriter()
+    properties.store(propertiesWriter,
+      s"Java properties built from Kubernetes config map with name: $configMapName " +
+        s"and config map key: $configMapKey")
+    new ConfigMapBuilder()
+      .withNewMetadata()
+        .withName(configMapName)
+        .endMetadata()
+      .addToData(configMapKey, propertiesWriter.toString)
+      .build()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverMountSecretsStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverMountSecretsStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverMountSecretsStep.scala
new file mode 100644
index 0000000..f872e0f
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverMountSecretsStep.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import org.apache.spark.deploy.k8s.MountSecretsBootstrap
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+/**
+ * A driver configuration step for mounting user-specified secrets onto user-specified paths.
+ *
+ * @param bootstrap a utility actually handling mounting of the secrets.
+ */
+private[spark] class DriverMountSecretsStep(
+    bootstrap: MountSecretsBootstrap) extends DriverConfigurationStep {
+
+  override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
+    val (pod, container) = bootstrap.mountSecrets(
+      driverSpec.driverPod, driverSpec.driverContainer)
+    driverSpec.copy(
+      driverPod = pod,
+      driverContainer = container
+    )
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala
index 696d11f..eb594e4 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala
@@ -32,21 +32,22 @@ import org.apache.spark.util.Clock
  * ports should correspond to the ports that the executor will reach the pod at for RPC.
  */
 private[spark] class DriverServiceBootstrapStep(
-    kubernetesResourceNamePrefix: String,
+    resourceNamePrefix: String,
     driverLabels: Map[String, String],
-    submissionSparkConf: SparkConf,
+    sparkConf: SparkConf,
     clock: Clock) extends DriverConfigurationStep with Logging {
+
   import DriverServiceBootstrapStep._
 
   override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
-    require(submissionSparkConf.getOption(DRIVER_BIND_ADDRESS_KEY).isEmpty,
+    require(sparkConf.getOption(DRIVER_BIND_ADDRESS_KEY).isEmpty,
       s"$DRIVER_BIND_ADDRESS_KEY is not supported in Kubernetes mode, as the driver's bind " +
       "address is managed and set to the driver pod's IP address.")
-    require(submissionSparkConf.getOption(DRIVER_HOST_KEY).isEmpty,
+    require(sparkConf.getOption(DRIVER_HOST_KEY).isEmpty,
       s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the driver's hostname will be " +
       "managed via a Kubernetes service.")
 
-    val preferredServiceName = s"$kubernetesResourceNamePrefix$DRIVER_SVC_POSTFIX"
+    val preferredServiceName = s"$resourceNamePrefix$DRIVER_SVC_POSTFIX"
     val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) {
       preferredServiceName
     } else {
@@ -58,8 +59,8 @@ private[spark] class DriverServiceBootstrapStep(
       shorterServiceName
     }
 
-    val driverPort = submissionSparkConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT)
-    val driverBlockManagerPort = submissionSparkConf.getInt(
+    val driverPort = sparkConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT)
+    val driverBlockManagerPort = sparkConf.getInt(
         org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT.key, DEFAULT_BLOCKMANAGER_PORT)
     val driverService = new ServiceBuilder()
       .withNewMetadata()
@@ -81,7 +82,7 @@ private[spark] class DriverServiceBootstrapStep(
         .endSpec()
       .build()
 
-    val namespace = submissionSparkConf.get(KUBERNETES_NAMESPACE)
+    val namespace = sparkConf.get(KUBERNETES_NAMESPACE)
     val driverHostname = s"${driverService.getMetadata.getName}.$namespace.svc.cluster.local"
     val resolvedSparkConf = driverSpec.driverSparkConf.clone()
       .set(DRIVER_HOST_KEY, driverHostname)

http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/BasicInitContainerConfigurationStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/BasicInitContainerConfigurationStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/BasicInitContainerConfigurationStep.scala
new file mode 100644
index 0000000..0146985
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/BasicInitContainerConfigurationStep.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps.initcontainer
+
+import org.apache.spark.deploy.k8s.{InitContainerBootstrap, PodWithDetachedInitContainer}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.KubernetesUtils
+
+/**
+ * Performs basic configuration for the driver init-container with most of the work delegated to
+ * the given InitContainerBootstrap.
+ */
+private[spark] class BasicInitContainerConfigurationStep(
+    sparkJars: Seq[String],
+    sparkFiles: Seq[String],
+    jarsDownloadPath: String,
+    filesDownloadPath: String,
+    bootstrap: InitContainerBootstrap)
+  extends InitContainerConfigurationStep {
+
+  override def configureInitContainer(spec: InitContainerSpec): InitContainerSpec = {
+    val remoteJarsToDownload = KubernetesUtils.getOnlyRemoteFiles(sparkJars)
+    val remoteFilesToDownload = KubernetesUtils.getOnlyRemoteFiles(sparkFiles)
+    val remoteJarsConf = if (remoteJarsToDownload.nonEmpty) {
+      Map(INIT_CONTAINER_REMOTE_JARS.key -> remoteJarsToDownload.mkString(","))
+    } else {
+      Map()
+    }
+    val remoteFilesConf = if (remoteFilesToDownload.nonEmpty) {
+      Map(INIT_CONTAINER_REMOTE_FILES.key -> remoteFilesToDownload.mkString(","))
+    } else {
+      Map()
+    }
+
+    val baseInitContainerConfig = Map(
+      JARS_DOWNLOAD_LOCATION.key -> jarsDownloadPath,
+      FILES_DOWNLOAD_LOCATION.key -> filesDownloadPath) ++
+      remoteJarsConf ++
+      remoteFilesConf
+
+    val bootstrapped = bootstrap.bootstrapInitContainer(
+      PodWithDetachedInitContainer(
+        spec.driverPod,
+        spec.initContainer,
+        spec.driverContainer))
+
+    spec.copy(
+      initContainer = bootstrapped.initContainer,
+      driverContainer = bootstrapped.mainContainer,
+      driverPod = bootstrapped.pod,
+      properties = spec.properties ++ baseInitContainerConfig)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigOrchestrator.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigOrchestrator.scala
new file mode 100644
index 0000000..f2c29c7
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigOrchestrator.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps.initcontainer
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.{InitContainerBootstrap, KubernetesUtils, MountSecretsBootstrap}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+
+/**
+ * Figures out and returns the complete ordered list of InitContainerConfigurationSteps required to
+ * configure the driver init-container. The returned steps will be applied in the given order to
+ * produce a final InitContainerSpec that is used to construct the driver init-container in
+ * DriverInitContainerBootstrapStep. This class is only used when an init-container is needed, i.e.,
+ * when there are remote application dependencies to localize.
+ */
+private[spark] class InitContainerConfigOrchestrator(
+    sparkJars: Seq[String],
+    sparkFiles: Seq[String],
+    jarsDownloadPath: String,
+    filesDownloadPath: String,
+    imagePullPolicy: String,
+    configMapName: String,
+    configMapKey: String,
+    sparkConf: SparkConf) {
+
+  private val initContainerImage = sparkConf
+    .get(INIT_CONTAINER_IMAGE)
+    .getOrElse(throw new SparkException(
+      "Must specify the init-container image when there are remote dependencies"))
+
+  def getAllConfigurationSteps: Seq[InitContainerConfigurationStep] = {
+    val initContainerBootstrap = new InitContainerBootstrap(
+      initContainerImage,
+      imagePullPolicy,
+      jarsDownloadPath,
+      filesDownloadPath,
+      configMapName,
+      configMapKey,
+      SPARK_POD_DRIVER_ROLE,
+      sparkConf)
+    val baseStep = new BasicInitContainerConfigurationStep(
+      sparkJars,
+      sparkFiles,
+      jarsDownloadPath,
+      filesDownloadPath,
+      initContainerBootstrap)
+
+    val secretNamesToMountPaths = KubernetesUtils.parsePrefixedKeyValuePairs(
+      sparkConf,
+      KUBERNETES_DRIVER_SECRETS_PREFIX)
+    // Mount user-specified driver secrets also into the driver's init-container. The
+    // init-container may need credentials in the secrets to be able to download remote
+    // dependencies. The driver's main container and its init-container share the secrets
+    // because the init-container is sort of an implementation details and this sharing
+    // avoids introducing a dedicated configuration property just for the init-container.
+    val mountSecretsStep = if (secretNamesToMountPaths.nonEmpty) {
+      Seq(new InitContainerMountSecretsStep(new MountSecretsBootstrap(secretNamesToMountPaths)))
+    } else {
+      Nil
+    }
+
+    Seq(baseStep) ++ mountSecretsStep
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigurationStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigurationStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigurationStep.scala
new file mode 100644
index 0000000..0372ad5
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigurationStep.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps.initcontainer
+
+/**
+ * Represents a step in configuring the driver init-container.
+ */
+private[spark] trait InitContainerConfigurationStep {
+
+  def configureInitContainer(spec: InitContainerSpec): InitContainerSpec
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message