spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mch...@apache.org
Subject spark git commit: [SPARK-24434][K8S] pod template files
Date Tue, 30 Oct 2018 20:52:49 GMT
Repository: spark
Updated Branches:
  refs/heads/master c36537fcf -> f6cc354d8


[SPARK-24434][K8S] pod template files

## What changes were proposed in this pull request?

New feature to pass podspec files for driver and executor pods.

## How was this patch tested?
new unit and integration tests

- [x] more overwrites in integration tests
- [ ] invalid template integration test, documentation

Author: Onur Satici <osatici@palantir.com>
Author: Yifei Huang <yifeih@palantir.com>
Author: onursatici <onursatici@gmail.com>

Closes #22146 from onursatici/pod-template.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6cc354d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6cc354d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6cc354d

Branch: refs/heads/master
Commit: f6cc354d83c2c9a757f9b507aadd4dbdc5825cca
Parents: c36537f
Author: Onur Satici <osatici@palantir.com>
Authored: Tue Oct 30 13:52:44 2018 -0700
Committer: mcheah <mcheah@palantir.com>
Committed: Tue Oct 30 13:52:44 2018 -0700

----------------------------------------------------------------------
 docs/running-on-kubernetes.md                   | 180 +++++++++++++++++++
 .../org/apache/spark/deploy/k8s/Config.scala    |  24 +++
 .../org/apache/spark/deploy/k8s/Constants.scala |  10 +-
 .../spark/deploy/k8s/KubernetesDriverSpec.scala |   7 -
 .../spark/deploy/k8s/KubernetesUtils.scala      |  49 ++++-
 .../k8s/features/BasicDriverFeatureStep.scala   |   8 +-
 .../k8s/features/BasicExecutorFeatureStep.scala |  10 +-
 .../k8s/features/PodTemplateConfigMapStep.scala |  72 ++++++++
 .../submit/KubernetesClientApplication.scala    |   6 +-
 .../k8s/submit/KubernetesDriverBuilder.scala    |  40 ++++-
 .../k8s/ExecutorPodsLifecycleManager.scala      |   1 -
 .../cluster/k8s/KubernetesClusterManager.scala  |  14 +-
 .../cluster/k8s/KubernetesExecutorBuilder.scala |  38 +++-
 .../spark/deploy/k8s/KubernetesUtilsSuite.scala |  68 +++++++
 .../features/BasicDriverFeatureStepSuite.scala  |   2 +-
 .../PodTemplateConfigMapStepSuite.scala         |  97 ++++++++++
 .../submit/KubernetesDriverBuilderSuite.scala   | 116 +++++++++++-
 .../k8s/submit/PodBuilderSuiteUtils.scala       | 142 +++++++++++++++
 .../k8s/ExecutorPodsLifecycleManagerSuite.scala |   4 -
 .../k8s/KubernetesExecutorBuilderSuite.scala    |  41 ++++-
 .../src/test/resources/driver-template.yml      |  26 +++
 .../src/test/resources/executor-template.yml    |  25 +++
 .../k8s/integrationtest/KubernetesSuite.scala   |   8 +-
 .../k8s/integrationtest/PodTemplateSuite.scala  |  55 ++++++
 24 files changed, 991 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/docs/running-on-kubernetes.md
----------------------------------------------------------------------
diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index 7093ee5..2917197 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -186,6 +186,22 @@ To use a secret through an environment variable use the following options to the
 --conf spark.kubernetes.executor.secretKeyRef.ENV_NAME=name:key
 ```
 
+## Pod Template
+Kubernetes allows defining pods from [template files](https://kubernetes.io/docs/concepts/workloads/pods/pod-overview/#pod-templates).
+Spark users can similarly use template files to define the driver or executor pod configurations that Spark configurations do not support.
+To do so, specify the spark properties `spark.kubernetes.driver.podTemplateFile` and `spark.kubernetes.executor.podTemplateFile`
+to point to local files accessible to the `spark-submit` process. To allow the driver pod access the executor pod template
+file, the file will be automatically mounted onto a volume in the driver pod when it's created.
+Spark does not do any validation after unmarshalling these template files and relies on the Kubernetes API server for validation.
+
+It is important to note that Spark is opinionated about certain pod configurations so there are values in the
+pod template that will always be overwritten by Spark. Therefore, users of this feature should note that specifying
+the pod template file only lets Spark start with a template pod instead of an empty pod during the pod-building process.
+For details, see the [full list](#pod-template-properties) of pod template values that will be overwritten by spark.
+
+Pod template files can also define multiple containers. In such cases, Spark will always assume that the first container in
+the list will be the driver or executor container.
+
 ## Using Kubernetes Volumes
 
 Starting with Spark 2.4.0, users can mount the following types of Kubernetes [volumes](https://kubernetes.io/docs/concepts/storage/volumes/) into the driver and executor pods:
@@ -863,4 +879,168 @@ specific to Spark on Kubernetes.
     to provide any kerberos credentials for launching a job.
   </td>
 </tr>
+<tr>
+  <td><code>spark.kubernetes.driver.podTemplateFile</code></td>
+  <td>(none)</td>
+  <td>
+   Specify the local file that contains the driver [pod template](#pod-template). For example
+   <code>spark.kubernetes.driver.podTemplateFile=/path/to/driver-pod-template.yaml`</code>
+  </td>
+</tr>
+<tr>
+  <td><code>spark.kubernetes.executor.podTemplateFile</code></td>
+  <td>(none)</td>
+  <td>
+   Specify the local file that contains the executor [pod template](#pod-template). For example
+   <code>spark.kubernetes.executor.podTemplateFile=/path/to/executor-pod-template.yaml`</code>
+  </td>
+</tr>
+</table>
+
+#### Pod template properties
+
+See the below table for the full list of pod specifications that will be overwritten by spark.
+
+### Pod Metadata
+
+<table class="table">
+<tr><th>Pod metadata key</th><th>Modified value</th><th>Description</th></tr>
+<tr>
+  <td>name</td>
+  <td>Value of <code>spark.kubernetes.driver.pod.name</code></td>
+  <td>
+    The driver pod name will be overwritten with either the configured or default value of
+    <code>spark.kubernetes.driver.pod.name</code>. The executor pod names will be unaffected.
+  </td>
+</tr>
+<tr>
+  <td>namespace</td>
+  <td>Value of <code>spark.kubernetes.namespace</code></td>
+  <td>
+    Spark makes strong assumptions about the driver and executor namespaces. Both driver and executor namespaces will
+    be replaced by either the configured or default spark conf value.
+  </td>
+</tr>
+<tr>
+  <td>labels</td>
+  <td>Adds the labels from <code>spark.kubernetes.{driver,executor}.label.*</code></td>
+  <td>
+    Spark will add additional labels specified by the spark configuration.
+  </td>
+</tr>
+<tr>
+  <td>annotations</td>
+  <td>Adds the annotations from <code>spark.kubernetes.{driver,executor}.annotation.*</code></td>
+  <td>
+    Spark will add additional labels specified by the spark configuration.
+  </td>
+</tr>
+</table>
+
+### Pod Spec
+
+<table class="table">
+<tr><th>Pod spec key</th><th>Modified value</th><th>Description</th></tr>
+<tr>
+  <td>imagePullSecrets</td>
+  <td>Adds image pull secrets from <code>spark.kubernetes.container.image.pullSecrets</code></td>
+  <td>
+    Additional pull secrets will be added from the spark configuration to both executor pods.
+  </td>
+</tr>
+<tr>
+  <td>nodeSelector</td>
+  <td>Adds node selectors from <code>spark.kubernetes.node.selector.*</code></td>
+  <td>
+    Additional node selectors will be added from the spark configuration to both executor pods.
+  </td>
+</tr>
+<tr>
+  <td>restartPolicy</td>
+  <td><code>"never"</code></td>
+  <td>
+    Spark assumes that both drivers and executors never restart.
+  </td>
+</tr>
+<tr>
+  <td>serviceAccount</td>
+  <td>Value of <code>spark.kubernetes.authenticate.driver.serviceAccountName</code></td>
+  <td>
+    Spark will override <code>serviceAccount</code> with the value of the spark configuration for only
+    driver pods, and only if the spark configuration is specified. Executor pods will remain unaffected.
+  </td>
+</tr>
+<tr>
+  <td>serviceAccountName</td>
+  <td>Value of <code>spark.kubernetes.authenticate.driver.serviceAccountName</code></td>
+  <td>
+    Spark will override <code>serviceAccountName</code> with the value of the spark configuration for only
+    driver pods, and only if the spark configuration is specified. Executor pods will remain unaffected.
+  </td>
+</tr>
+<tr>
+  <td>volumes</td>
+  <td>Adds volumes from <code>spark.kubernetes.{driver,executor}.volumes.[VolumeType].[VolumeName].mount.path</code></td>
+  <td>
+    Spark will add volumes as specified by the spark conf, as well as additional volumes necessary for passing
+    spark conf and pod template files.
+  </td>
+</tr>
+</table>
+
+### Container spec
+
+The following affect the driver and executor containers. All other containers in the pod spec will be unaffected.
+
+<table class="table">
+<tr><th>Container spec key</th><th>Modified value</th><th>Description</th></tr>
+<tr>
+  <td>env</td>
+  <td>Adds env variables from <code>spark.kubernetes.driverEnv.[EnvironmentVariableName]</code></td>
+  <td>
+    Spark will add driver env variables from <code>spark.kubernetes.driverEnv.[EnvironmentVariableName]</code>, and
+    executor env variables from <code>spark.executorEnv.[EnvironmentVariableName]</code>.
+  </td>
+</tr>
+<tr>
+  <td>image</td>
+  <td>Value of <code>spark.kubernetes.{driver,executor}.container.image</code></td>
+  <td>
+    The image will be defined by the spark configurations.
+  </td>
+</tr>
+<tr>
+  <td>imagePullPolicy</td>
+  <td>Value of <code>spark.kubernetes.container.image.pullPolicy</code></td>
+  <td>
+    Spark will override the pull policy for both driver and executors.
+  </td>
+</tr>
+<tr>
+  <td>name</td>
+  <td>See description.</code></td>
+  <td>
+    The container name will be assigned by spark ("spark-kubernetes-driver" for the driver container, and
+    "executor" for each executor container) if not defined by the pod template. If the container is defined by the
+    template, the template's name will be used.
+  </td>
+</tr>
+<tr>
+  <td>resources</td>
+  <td>See description</td>
+  <td>
+    The cpu limits are set by <code>spark.kubernetes.{driver,executor}.limit.cores</code>. The cpu is set by
+    <code>spark.{driver,executor}.cores</code>. The memory request and limit are set by summing the values of
+    <code>spark.{driver,executor}.memory</code> and <code>spark.{driver,executor}.memoryOverhead</code>.
+
+  </td>
+</tr>
+<tr>
+  <td>volumeMounts</td>
+  <td>Add volumes from <code>spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.{path,readOnly}</code></td>
+  <td>
+    Spark will add volumes as specified by the spark conf, as well as additional volumes necessary for passing
+    spark conf and pod template files.
+  </td>
+</tr>
 </table>

http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index fff8fa4..862f1d6 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -278,6 +278,30 @@ private[spark] object Config extends Logging {
       .booleanConf
       .createWithDefault(false)
 
+  val KUBERNETES_DRIVER_PODTEMPLATE_FILE =
+    ConfigBuilder("spark.kubernetes.driver.podTemplateFile")
+      .doc("File containing a template pod spec for the driver")
+      .stringConf
+      .createOptional
+
+  val KUBERNETES_EXECUTOR_PODTEMPLATE_FILE =
+    ConfigBuilder("spark.kubernetes.executor.podTemplateFile")
+      .doc("File containing a template pod spec for executors")
+      .stringConf
+      .createOptional
+
+  val KUBERNETES_DRIVER_PODTEMPLATE_CONTAINER_NAME =
+    ConfigBuilder("spark.kubernetes.driver.podTemplateContainerName")
+      .doc("container name to be used as a basis for the driver in the given pod template")
+      .stringConf
+      .createOptional
+
+  val KUBERNETES_EXECUTOR_PODTEMPLATE_CONTAINER_NAME =
+    ConfigBuilder("spark.kubernetes.executor.podTemplateContainerName")
+      .doc("container name to be used as a basis for executors in the given pod template")
+      .stringConf
+      .createOptional
+
   val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX =
     "spark.kubernetes.authenticate.submission"
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
index 172a905..1c6d53c 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
@@ -76,9 +76,17 @@ private[spark] object Constants {
   val ENV_R_PRIMARY = "R_PRIMARY"
   val ENV_R_ARGS = "R_APP_ARGS"
 
+  // Pod spec templates
+  val EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME = "pod-spec-template.yml"
+  val EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH = "/opt/spark/pod-template"
+  val POD_TEMPLATE_VOLUME = "pod-template-volume"
+  val POD_TEMPLATE_CONFIGMAP = "podspec-configmap"
+  val POD_TEMPLATE_KEY = "podspec-configmap-key"
+
   // Miscellaneous
   val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
-  val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
+  val DEFAULT_DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
+  val DEFAULT_EXECUTOR_CONTAINER_NAME = "spark-kubernetes-executor"
   val MEMORY_OVERHEAD_MIN_MIB = 384L
 
   // Hadoop Configuration

http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesDriverSpec.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesDriverSpec.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesDriverSpec.scala
index 0c5ae02..fce8c6a 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesDriverSpec.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesDriverSpec.scala
@@ -22,10 +22,3 @@ private[spark] case class KubernetesDriverSpec(
     pod: SparkPod,
     driverKubernetesResources: Seq[HasMetadata],
     systemProperties: Map[String, String])
-
-private[spark] object KubernetesDriverSpec {
-  def initialSpec(initialProps: Map[String, String]): KubernetesDriverSpec = KubernetesDriverSpec(
-    SparkPod.initialPod(),
-    Seq.empty,
-    initialProps)
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
index 0f74045..6fafac3 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
@@ -16,14 +16,18 @@
  */
 package org.apache.spark.deploy.k8s
 
+import java.io.File
+
 import scala.collection.JavaConverters._
 
-import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod}
+import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, PodBuilder}
+import io.fabric8.kubernetes.client.KubernetesClient
 
 import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.internal.Logging
 import org.apache.spark.util.Utils
 
-private[spark] object KubernetesUtils {
+private[spark] object KubernetesUtils extends Logging {
 
   /**
    * Extract and parse Spark configuration properties with a given name prefix and
@@ -82,6 +86,47 @@ private[spark] object KubernetesUtils {
     }
   }
 
+  def loadPodFromTemplate(
+      kubernetesClient: KubernetesClient,
+      templateFile: File,
+      containerName: Option[String]): SparkPod = {
+    try {
+      val pod = kubernetesClient.pods().load(templateFile).get()
+      selectSparkContainer(pod, containerName)
+    } catch {
+      case e: Exception =>
+        logError(
+          s"Encountered exception while attempting to load initial pod spec from file", e)
+        throw new SparkException("Could not load pod from template file.", e)
+    }
+  }
+
+  def selectSparkContainer(pod: Pod, containerName: Option[String]): SparkPod = {
+    def selectNamedContainer(
+      containers: List[Container], name: String): Option[(Container, List[Container])] =
+      containers.partition(_.getName == name) match {
+        case (sparkContainer :: Nil, rest) => Some((sparkContainer, rest))
+        case _ =>
+          logWarning(
+            s"specified container ${name} not found on pod template, " +
+              s"falling back to taking the first container")
+          Option.empty
+      }
+    val containers = pod.getSpec.getContainers.asScala.toList
+    containerName
+      .flatMap(selectNamedContainer(containers, _))
+      .orElse(containers.headOption.map((_, containers.tail)))
+      .map {
+        case (sparkContainer: Container, rest: List[Container]) => SparkPod(
+          new PodBuilder(pod)
+            .editSpec()
+            .withContainers(rest.asJava)
+            .endSpec()
+            .build(),
+          sparkContainer)
+      }.getOrElse(SparkPod(pod, new ContainerBuilder().build()))
+  }
+
   def parseMasterUrl(url: String): String = url.substring("k8s://".length)
 
   def formatPairsBundle(pairs: Seq[(String, String)], indent: Int = 1) : String = {

http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
index 575bc54..96b14a0 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
@@ -80,7 +80,7 @@ private[spark] class BasicDriverFeatureStep(
     )
     val driverUIPort = SparkUI.getUIPort(conf.sparkConf)
     val driverContainer = new ContainerBuilder(pod.container)
-      .withName(DRIVER_CONTAINER_NAME)
+      .withName(Option(pod.container.getName).getOrElse(DEFAULT_DRIVER_CONTAINER_NAME))
       .withImage(driverContainerImage)
       .withImagePullPolicy(conf.imagePullPolicy())
       .addNewPort()
@@ -105,7 +105,7 @@ private[spark] class BasicDriverFeatureStep(
           .withNewFieldRef("v1", "status.podIP")
           .build())
         .endEnv()
-      .withNewResources()
+      .editOrNewResources()
         .addToRequests("cpu", driverCpuQuantity)
         .addToLimits(maybeCpuLimitQuantity.toMap.asJava)
         .addToRequests("memory", driverMemoryQuantity)
@@ -119,9 +119,9 @@ private[spark] class BasicDriverFeatureStep(
         .addToLabels(conf.roleLabels.asJava)
         .addToAnnotations(conf.roleAnnotations.asJava)
         .endMetadata()
-      .withNewSpec()
+      .editOrNewSpec()
         .withRestartPolicy("Never")
-        .withNodeSelector(conf.nodeSelector().asJava)
+        .addToNodeSelector(conf.nodeSelector().asJava)
         .addToImagePullSecrets(conf.imagePullSecrets(): _*)
         .endSpec()
       .build()

http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
index d89995b..1dab2a8 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
@@ -139,10 +139,10 @@ private[spark] class BasicExecutorFeatureStep(
       }
 
     val executorContainer = new ContainerBuilder(pod.container)
-      .withName("executor")
+      .withName(Option(pod.container.getName).getOrElse(DEFAULT_EXECUTOR_CONTAINER_NAME))
       .withImage(executorContainerImage)
       .withImagePullPolicy(kubernetesConf.imagePullPolicy())
-      .withNewResources()
+      .editOrNewResources()
         .addToRequests("memory", executorMemoryQuantity)
         .addToLimits("memory", executorMemoryQuantity)
         .addToRequests("cpu", executorCpuQuantity)
@@ -173,14 +173,14 @@ private[spark] class BasicExecutorFeatureStep(
     val executorPod = new PodBuilder(pod.pod)
       .editOrNewMetadata()
         .withName(name)
-        .withLabels(kubernetesConf.roleLabels.asJava)
-        .withAnnotations(kubernetesConf.roleAnnotations.asJava)
+        .addToLabels(kubernetesConf.roleLabels.asJava)
+        .addToAnnotations(kubernetesConf.roleAnnotations.asJava)
         .addToOwnerReferences(ownerReference.toSeq: _*)
         .endMetadata()
       .editOrNewSpec()
         .withHostname(hostname)
         .withRestartPolicy("Never")
-        .withNodeSelector(kubernetesConf.nodeSelector().asJava)
+        .addToNodeSelector(kubernetesConf.nodeSelector().asJava)
         .addToImagePullSecrets(kubernetesConf.imagePullSecrets(): _*)
         .endSpec()
       .build()

http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala
new file mode 100644
index 0000000..96a8013
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+
+import com.google.common.io.Files
+import io.fabric8.kubernetes.api.model.{ConfigMapBuilder, ContainerBuilder, HasMetadata, PodBuilder}
+
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesRoleSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+
+private[spark] class PodTemplateConfigMapStep(
+   conf: KubernetesConf[_ <: KubernetesRoleSpecificConf])
+  extends KubernetesFeatureConfigStep {
+  def configurePod(pod: SparkPod): SparkPod = {
+    val podWithVolume = new PodBuilder(pod.pod)
+        .editSpec()
+          .addNewVolume()
+            .withName(POD_TEMPLATE_VOLUME)
+            .withNewConfigMap()
+              .withName(POD_TEMPLATE_CONFIGMAP)
+              .addNewItem()
+                .withKey(POD_TEMPLATE_KEY)
+                .withPath(EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME)
+              .endItem()
+            .endConfigMap()
+          .endVolume()
+        .endSpec()
+      .build()
+
+    val containerWithVolume = new ContainerBuilder(pod.container)
+        .addNewVolumeMount()
+          .withName(POD_TEMPLATE_VOLUME)
+          .withMountPath(EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH)
+        .endVolumeMount()
+      .build()
+    SparkPod(podWithVolume, containerWithVolume)
+  }
+
+  def getAdditionalPodSystemProperties(): Map[String, String] = Map[String, String](
+    KUBERNETES_EXECUTOR_PODTEMPLATE_FILE.key ->
+      (EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH + "/" + EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME))
+
+  def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
+    require(conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined)
+    val podTemplateFile = conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).get
+    val podTemplateString = Files.toString(new File(podTemplateFile), StandardCharsets.UTF_8)
+    Seq(new ConfigMapBuilder()
+        .withNewMetadata()
+          .withName(POD_TEMPLATE_CONFIGMAP)
+        .endMetadata()
+        .addToData(POD_TEMPLATE_KEY, podTemplateString)
+      .build())
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
index c658756..4b58f8b 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
@@ -17,7 +17,8 @@
 package org.apache.spark.deploy.k8s.submit
 
 import java.io.StringWriter
-import java.util.{Collections, Locale, UUID}
+import java.util.{Collections, Locale, Properties, UUID}
+import java.util.{Collections, UUID}
 import java.util.Properties
 
 import io.fabric8.kubernetes.api.model._
@@ -227,7 +228,6 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
       clientArguments.driverArgs,
       clientArguments.maybePyFiles,
       clientArguments.hadoopConfigDir)
-    val builder = new KubernetesDriverBuilder
     val namespace = kubernetesConf.namespace()
     // The master URL has been checked for validity already in SparkSubmit.
     // We just need to get rid of the "k8s://" prefix here.
@@ -244,7 +244,7 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
       None,
       None)) { kubernetesClient =>
         val client = new Client(
-          builder,
+          KubernetesDriverBuilder(kubernetesClient, kubernetesConf.sparkConf),
           kubernetesConf,
           kubernetesClient,
           waitForAppCompletion,

http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
index b0b5332..5565cd7 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
@@ -16,7 +16,12 @@
  */
 package org.apache.spark.deploy.k8s.submit
 
-import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf}
+import java.io.File
+
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.{Config, KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, KubernetesUtils, SparkPod}
 import org.apache.spark.deploy.k8s.features._
 import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, PythonDriverFeatureStep, RDriverFeatureStep}
 
@@ -55,7 +60,11 @@ private[spark] class KubernetesDriverBuilder(
     provideHadoopGlobalStep: (
       KubernetesConf[KubernetesDriverSpecificConf]
         => KerberosConfDriverFeatureStep) =
-    new KerberosConfDriverFeatureStep(_))  {
+    new KerberosConfDriverFeatureStep(_),
+    providePodTemplateConfigMapStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
+      => PodTemplateConfigMapStep) =
+    new PodTemplateConfigMapStep(_),
+    provideInitialPod: () => SparkPod = SparkPod.initialPod) {
 
   def buildFromFeatures(
     kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]): KubernetesDriverSpec = {
@@ -74,6 +83,10 @@ private[spark] class KubernetesDriverBuilder(
     val volumesFeature = if (kubernetesConf.roleVolumes.nonEmpty) {
       Seq(provideVolumesStep(kubernetesConf))
     } else Nil
+    val podTemplateFeature = if (
+      kubernetesConf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined) {
+      Seq(providePodTemplateConfigMapStep(kubernetesConf))
+    } else Nil
 
     val bindingsStep = kubernetesConf.roleSpecificConf.mainAppResource.map {
         case JavaMainAppResource(_) =>
@@ -86,14 +99,17 @@ private[spark] class KubernetesDriverBuilder(
 
     val maybeHadoopConfigStep =
       kubernetesConf.hadoopConfSpec.map { _ =>
-          provideHadoopGlobalStep(kubernetesConf)}
+        provideHadoopGlobalStep(kubernetesConf)}
 
     val allFeatures: Seq[KubernetesFeatureConfigStep] =
       (baseFeatures :+ bindingsStep) ++
         secretFeature ++ envSecretFeature ++ volumesFeature ++
-        maybeHadoopConfigStep.toSeq
+        maybeHadoopConfigStep.toSeq ++ podTemplateFeature
 
-    var spec = KubernetesDriverSpec.initialSpec(kubernetesConf.sparkConf.getAll.toMap)
+    var spec = KubernetesDriverSpec(
+      provideInitialPod(),
+      driverKubernetesResources = Seq.empty,
+      kubernetesConf.sparkConf.getAll.toMap)
     for (feature <- allFeatures) {
       val configuredPod = feature.configurePod(spec.pod)
       val addedSystemProperties = feature.getAdditionalPodSystemProperties()
@@ -106,3 +122,17 @@ private[spark] class KubernetesDriverBuilder(
     spec
   }
 }
+
+private[spark] object KubernetesDriverBuilder {
+  def apply(kubernetesClient: KubernetesClient, conf: SparkConf): KubernetesDriverBuilder = {
+    conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_FILE)
+      .map(new File(_))
+      .map(file => new KubernetesDriverBuilder(provideInitialPod = () =>
+        KubernetesUtils.loadPodFromTemplate(
+          kubernetesClient,
+          file,
+          conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_CONTAINER_NAME))
+      ))
+      .getOrElse(new KubernetesDriverBuilder())
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
index 1a75ae0..77a1d6c 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
@@ -31,7 +31,6 @@ import org.apache.spark.util.Utils
 
 private[spark] class ExecutorPodsLifecycleManager(
     conf: SparkConf,
-    executorBuilder: KubernetesExecutorBuilder,
     kubernetesClient: KubernetesClient,
     snapshotsStore: ExecutorPodsSnapshotsStore,
     // Use a best-effort to track which executors have been removed already. It's not generally

http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
index 9999c62..ce10f76 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
@@ -69,6 +69,13 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
       defaultServiceAccountToken,
       defaultServiceAccountCaCrt)
 
+    if (sc.conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined) {
+      KubernetesUtils.loadPodFromTemplate(
+        kubernetesClient,
+        new File(sc.conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).get),
+        sc.conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_CONTAINER_NAME))
+    }
+
     val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool(
       "kubernetes-executor-requests")
 
@@ -81,13 +88,16 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
       .build[java.lang.Long, java.lang.Long]()
     val executorPodsLifecycleEventHandler = new ExecutorPodsLifecycleManager(
       sc.conf,
-      new KubernetesExecutorBuilder(),
       kubernetesClient,
       snapshotsStore,
       removedExecutorsCache)
 
     val executorPodsAllocator = new ExecutorPodsAllocator(
-      sc.conf, new KubernetesExecutorBuilder(), kubernetesClient, snapshotsStore, new SystemClock())
+      sc.conf,
+      KubernetesExecutorBuilder(kubernetesClient, sc.conf),
+      kubernetesClient,
+      snapshotsStore,
+      new SystemClock())
 
     val podsWatchEventSource = new ExecutorPodsWatchSnapshotSource(
       snapshotsStore,

http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
index 6199a8a..089f84d 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
@@ -16,7 +16,12 @@
  */
 package org.apache.spark.scheduler.cluster.k8s
 
-import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, KubernetesRoleSpecificConf, SparkPod}
+import java.io.File
+
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.features._
 
@@ -35,19 +40,20 @@ private[spark] class KubernetesExecutorBuilder(
       new LocalDirsFeatureStep(_),
     provideVolumesStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
       => MountVolumesFeatureStep) =
-    new MountVolumesFeatureStep(_),
+      new MountVolumesFeatureStep(_),
     provideHadoopConfStep: (
       KubernetesConf[KubernetesExecutorSpecificConf]
-        => HadoopConfExecutorFeatureStep) =
-    new HadoopConfExecutorFeatureStep(_),
+      => HadoopConfExecutorFeatureStep) =
+      new HadoopConfExecutorFeatureStep(_),
     provideKerberosConfStep: (
       KubernetesConf[KubernetesExecutorSpecificConf]
-        => KerberosConfExecutorFeatureStep) =
-    new KerberosConfExecutorFeatureStep(_),
+      => KerberosConfExecutorFeatureStep) =
+      new KerberosConfExecutorFeatureStep(_),
     provideHadoopSparkUserStep: (
       KubernetesConf[KubernetesExecutorSpecificConf]
-        => HadoopSparkUserExecutorFeatureStep) =
-    new HadoopSparkUserExecutorFeatureStep(_)) {
+      => HadoopSparkUserExecutorFeatureStep) =
+      new HadoopSparkUserExecutorFeatureStep(_),
+    provideInitialPod: () => SparkPod = SparkPod.initialPod) {
 
   def buildFromFeatures(
     kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf]): SparkPod = {
@@ -85,10 +91,24 @@ private[spark] class KubernetesExecutorBuilder(
       volumesFeature ++
       maybeHadoopConfFeatureSteps
 
-    var executorPod = SparkPod.initialPod()
+    var executorPod = provideInitialPod()
     for (feature <- allFeatures) {
       executorPod = feature.configurePod(executorPod)
     }
     executorPod
   }
 }
+
+private[spark] object KubernetesExecutorBuilder {
+  def apply(kubernetesClient: KubernetesClient, conf: SparkConf): KubernetesExecutorBuilder = {
+    conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE)
+      .map(new File(_))
+      .map(file => new KubernetesExecutorBuilder(provideInitialPod = () =>
+          KubernetesUtils.loadPodFromTemplate(
+            kubernetesClient,
+            file,
+            conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_CONTAINER_NAME))
+      ))
+      .getOrElse(new KubernetesExecutorBuilder())
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
new file mode 100644
index 0000000..7c23158
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.k8s
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, PodBuilder}
+
+import org.apache.spark.SparkFunSuite
+
+class KubernetesUtilsSuite extends SparkFunSuite {
+  private val HOST = "test-host"
+  private val POD = new PodBuilder()
+    .withNewSpec()
+    .withHostname(HOST)
+    .withContainers(
+      new ContainerBuilder().withName("first").build(),
+      new ContainerBuilder().withName("second").build())
+    .endSpec()
+    .build()
+
+  test("Selects the given container as spark container.") {
+    val sparkPod = KubernetesUtils.selectSparkContainer(POD, Some("second"))
+    assert(sparkPod.pod.getSpec.getHostname == HOST)
+    assert(sparkPod.pod.getSpec.getContainers.asScala.toList.map(_.getName) == List("first"))
+    assert(sparkPod.container.getName == "second")
+  }
+
+  test("Selects the first container if no container name is given.") {
+    val sparkPod = KubernetesUtils.selectSparkContainer(POD, Option.empty)
+    assert(sparkPod.pod.getSpec.getHostname == HOST)
+    assert(sparkPod.pod.getSpec.getContainers.asScala.toList.map(_.getName) == List("second"))
+    assert(sparkPod.container.getName == "first")
+  }
+
+  test("Falls back to the first container if given container name does not exist.") {
+    val sparkPod = KubernetesUtils.selectSparkContainer(POD, Some("does-not-exist"))
+    assert(sparkPod.pod.getSpec.getHostname == HOST)
+    assert(sparkPod.pod.getSpec.getContainers.asScala.toList.map(_.getName) == List("second"))
+    assert(sparkPod.container.getName == "first")
+  }
+
+  test("constructs spark pod correctly with pod template with no containers") {
+    val noContainersPod = new PodBuilder(POD).editSpec().withContainers().endSpec().build()
+    val sparkPod = KubernetesUtils.selectSparkContainer(noContainersPod, Some("does-not-exist"))
+    assert(sparkPod.pod.getSpec.getHostname == HOST)
+    assert(sparkPod.container.getName == null)
+    val sparkPodWithNoContainerName =
+      KubernetesUtils.selectSparkContainer(noContainersPod, Option.empty)
+    assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
+    assert(sparkPodWithNoContainerName.container.getName == null)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
index eebdd15..5c6bcc7 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
@@ -84,7 +84,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
     val basePod = SparkPod.initialPod()
     val configuredPod = featureStep.configurePod(basePod)
 
-    assert(configuredPod.container.getName === DRIVER_CONTAINER_NAME)
+    assert(configuredPod.container.getName === DEFAULT_DRIVER_CONTAINER_NAME)
     assert(configuredPod.container.getImage === "spark-driver:latest")
     assert(configuredPod.container.getImagePullPolicy === CONTAINER_IMAGE_PULL_POLICY)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala
new file mode 100644
index 0000000..d7bbbd1
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import java.io.{File, PrintWriter}
+import java.nio.file.Files
+
+import io.fabric8.kubernetes.api.model.ConfigMap
+import org.mockito.Mockito
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s._
+
+class PodTemplateConfigMapStepSuite extends SparkFunSuite with BeforeAndAfter {
+  private var sparkConf: SparkConf = _
+  private var kubernetesConf : KubernetesConf[_ <: KubernetesRoleSpecificConf] = _
+  private var templateFile: File = _
+
+  before {
+    sparkConf = Mockito.mock(classOf[SparkConf])
+    kubernetesConf = KubernetesConf(
+      sparkConf,
+      KubernetesDriverSpecificConf(
+        None,
+        "app-name",
+        "main",
+        Seq.empty),
+      "resource",
+      "app-id",
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Nil,
+      Seq.empty[String],
+      Option.empty)
+    templateFile = Files.createTempFile("pod-template", "yml").toFile
+    templateFile.deleteOnExit()
+    Mockito.doReturn(Option(templateFile.getAbsolutePath)).when(sparkConf)
+      .get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE)
+  }
+
+  test("Mounts executor template volume if config specified") {
+    val writer = new PrintWriter(templateFile)
+    writer.write("pod-template-contents")
+    writer.close()
+
+    val step = new PodTemplateConfigMapStep(kubernetesConf)
+    val configuredPod = step.configurePod(SparkPod.initialPod())
+
+    assert(configuredPod.pod.getSpec.getVolumes.size() === 1)
+    val volume = configuredPod.pod.getSpec.getVolumes.get(0)
+    assert(volume.getName === Constants.POD_TEMPLATE_VOLUME)
+    assert(volume.getConfigMap.getName === Constants.POD_TEMPLATE_CONFIGMAP)
+    assert(volume.getConfigMap.getItems.size() === 1)
+    assert(volume.getConfigMap.getItems.get(0).getKey === Constants.POD_TEMPLATE_KEY)
+    assert(volume.getConfigMap.getItems.get(0).getPath ===
+      Constants.EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME)
+
+    assert(configuredPod.container.getVolumeMounts.size() === 1)
+    val volumeMount = configuredPod.container.getVolumeMounts.get(0)
+    assert(volumeMount.getMountPath === Constants.EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH)
+    assert(volumeMount.getName === Constants.POD_TEMPLATE_VOLUME)
+
+    val resources = step.getAdditionalKubernetesResources()
+    assert(resources.size === 1)
+    assert(resources.head.getMetadata.getName === Constants.POD_TEMPLATE_CONFIGMAP)
+    assert(resources.head.isInstanceOf[ConfigMap])
+    val configMap = resources.head.asInstanceOf[ConfigMap]
+    assert(configMap.getData.size() === 1)
+    assert(configMap.getData.containsKey(Constants.POD_TEMPLATE_KEY))
+    assert(configMap.getData.containsValue("pod-template-contents"))
+
+    val systemProperties = step.getAdditionalPodSystemProperties()
+    assert(systemProperties.size === 1)
+    assert(systemProperties.contains(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE.key))
+    assert(systemProperties.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE.key).get ===
+      (Constants.EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH + "/" +
+        Constants.EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
index 051d7b6..84968c3 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
@@ -16,8 +16,13 @@
  */
 package org.apache.spark.deploy.k8s.submit
 
-import org.apache.spark.{SparkConf, SparkFunSuite}
+import io.fabric8.kubernetes.api.model.PodBuilder
+import io.fabric8.kubernetes.client.KubernetesClient
+import org.mockito.Mockito._
+
+import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
 import org.apache.spark.deploy.k8s._
+import org.apache.spark.deploy.k8s.Config.{CONTAINER_IMAGE, KUBERNETES_DRIVER_PODTEMPLATE_FILE, KUBERNETES_EXECUTOR_PODTEMPLATE_FILE}
 import org.apache.spark.deploy.k8s.features._
 import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, PythonDriverFeatureStep, RDriverFeatureStep}
 
@@ -34,6 +39,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
   private val ENV_SECRETS_STEP_TYPE = "env-secrets"
   private val HADOOP_GLOBAL_STEP_TYPE = "hadoop-global"
   private val MOUNT_VOLUMES_STEP_TYPE = "mount-volumes"
+  private val TEMPLATE_VOLUME_STEP_TYPE = "template-volume"
 
   private val basicFeatureStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
     BASIC_STEP_TYPE, classOf[BasicDriverFeatureStep])
@@ -68,6 +74,10 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
   private val mountVolumesStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
     MOUNT_VOLUMES_STEP_TYPE, classOf[MountVolumesFeatureStep])
 
+  private val templateVolumeStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
+    TEMPLATE_VOLUME_STEP_TYPE, classOf[PodTemplateConfigMapStep]
+  )
+
   private val builderUnderTest: KubernetesDriverBuilder =
     new KubernetesDriverBuilder(
       _ => basicFeatureStep,
@@ -80,7 +90,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
       _ => pythonStep,
       _ => rStep,
       _ => javaStep,
-      _ => hadoopGlobalStep)
+      _ => hadoopGlobalStep,
+      _ => templateVolumeStep)
 
   test("Apply fundamental steps all the time.") {
     val conf = KubernetesConf(
@@ -252,6 +263,37 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
       R_STEP_TYPE)
   }
 
+  test("Apply template volume step if executor template is present.") {
+    val sparkConf = spy(new SparkConf(false))
+    doReturn(Option("filename")).when(sparkConf)
+      .get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE)
+    val conf = KubernetesConf(
+      sparkConf,
+      KubernetesDriverSpecificConf(
+        Some(JavaMainAppResource("example.jar")),
+        "test-app",
+        "main",
+        Seq.empty),
+      "prefix",
+      "appId",
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Nil,
+      Seq.empty[String],
+      Option.empty)
+    validateStepTypesApplied(
+      builderUnderTest.buildFromFeatures(conf),
+      BASIC_STEP_TYPE,
+      CREDENTIALS_STEP_TYPE,
+      SERVICE_STEP_TYPE,
+      LOCAL_DIRS_STEP_TYPE,
+      JAVA_STEP_TYPE,
+      TEMPLATE_VOLUME_STEP_TYPE)
+  }
+
   test("Apply HadoopSteps if HADOOP_CONF_DIR is defined.") {
     val conf = KubernetesConf(
       new SparkConf(false),
@@ -314,7 +356,6 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
       HADOOP_GLOBAL_STEP_TYPE)
   }
 
-
   private def validateStepTypesApplied(resolvedSpec: KubernetesDriverSpec, stepTypes: String*)
   : Unit = {
     assert(resolvedSpec.systemProperties.size === stepTypes.size)
@@ -325,4 +366,73 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
       assert(resolvedSpec.systemProperties(stepType) === stepType)
     }
   }
+
+  test("Start with empty pod if template is not specified") {
+    val kubernetesClient = mock(classOf[KubernetesClient])
+    val driverBuilder = KubernetesDriverBuilder.apply(kubernetesClient, new SparkConf())
+    verify(kubernetesClient, never()).pods()
+  }
+
+  test("Starts with template if specified") {
+    val kubernetesClient = PodBuilderSuiteUtils.loadingMockKubernetesClient()
+    val sparkConf = new SparkConf(false)
+      .set(CONTAINER_IMAGE, "spark-driver:latest")
+      .set(KUBERNETES_DRIVER_PODTEMPLATE_FILE, "template-file.yaml")
+    val kubernetesConf = new KubernetesConf(
+      sparkConf,
+      KubernetesDriverSpecificConf(
+        Some(JavaMainAppResource("example.jar")),
+        "test-app",
+        "main",
+        Seq.empty),
+      "prefix",
+      "appId",
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Nil,
+      Seq.empty[String],
+      Option.empty)
+    val driverSpec = KubernetesDriverBuilder
+      .apply(kubernetesClient, sparkConf)
+      .buildFromFeatures(kubernetesConf)
+    PodBuilderSuiteUtils.verifyPodWithSupportedFeatures(driverSpec.pod)
+  }
+
+  test("Throws on misconfigured pod template") {
+    val kubernetesClient = PodBuilderSuiteUtils.loadingMockKubernetesClient(
+      new PodBuilder()
+        .withNewMetadata()
+        .addToLabels("test-label-key", "test-label-value")
+        .endMetadata()
+        .build())
+    val sparkConf = new SparkConf(false)
+      .set(CONTAINER_IMAGE, "spark-driver:latest")
+      .set(KUBERNETES_DRIVER_PODTEMPLATE_FILE, "template-file.yaml")
+    val kubernetesConf = new KubernetesConf(
+      sparkConf,
+      KubernetesDriverSpecificConf(
+        Some(JavaMainAppResource("example.jar")),
+        "test-app",
+        "main",
+        Seq.empty),
+      "prefix",
+      "appId",
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Nil,
+      Seq.empty[String],
+      Option.empty)
+    val exception = intercept[SparkException] {
+      KubernetesDriverBuilder
+        .apply(kubernetesClient, sparkConf)
+        .buildFromFeatures(kubernetesConf)
+    }
+    assert(exception.getMessage.contains("Could not load pod from template file."))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/PodBuilderSuiteUtils.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/PodBuilderSuiteUtils.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/PodBuilderSuiteUtils.scala
new file mode 100644
index 0000000..c92e9e6
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/PodBuilderSuiteUtils.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.io.File
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+import io.fabric8.kubernetes.client.dsl.{MixedOperation, PodResource}
+import org.mockito.Matchers.any
+import org.mockito.Mockito.{mock, when}
+import org.scalatest.FlatSpec
+import scala.collection.JavaConverters._
+
+import org.apache.spark.deploy.k8s.SparkPod
+
+object PodBuilderSuiteUtils extends FlatSpec {
+
+  def loadingMockKubernetesClient(pod: Pod = podWithSupportedFeatures()): KubernetesClient = {
+    val kubernetesClient = mock(classOf[KubernetesClient])
+    val pods =
+      mock(classOf[MixedOperation[Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]]])
+    val podResource = mock(classOf[PodResource[Pod, DoneablePod]])
+    when(kubernetesClient.pods()).thenReturn(pods)
+    when(pods.load(any(classOf[File]))).thenReturn(podResource)
+    when(podResource.get()).thenReturn(pod)
+    kubernetesClient
+  }
+
+  def verifyPodWithSupportedFeatures(pod: SparkPod): Unit = {
+    val metadata = pod.pod.getMetadata
+    assert(metadata.getLabels.containsKey("test-label-key"))
+    assert(metadata.getAnnotations.containsKey("test-annotation-key"))
+    assert(metadata.getNamespace === "namespace")
+    assert(metadata.getOwnerReferences.asScala.exists(_.getName == "owner-reference"))
+    val spec = pod.pod.getSpec
+    assert(!spec.getContainers.asScala.exists(_.getName == "executor-container"))
+    assert(spec.getDnsPolicy === "dns-policy")
+    assert(spec.getHostAliases.asScala.exists(_.getHostnames.asScala.exists(_ == "hostname")))
+    assert(spec.getImagePullSecrets.asScala.exists(_.getName == "local-reference"))
+    assert(spec.getInitContainers.asScala.exists(_.getName == "init-container"))
+    assert(spec.getNodeName == "node-name")
+    assert(spec.getNodeSelector.get("node-selector-key") === "node-selector-value")
+    assert(spec.getSchedulerName === "scheduler")
+    assert(spec.getSecurityContext.getRunAsUser === 1000L)
+    assert(spec.getServiceAccount === "service-account")
+    assert(spec.getSubdomain === "subdomain")
+    assert(spec.getTolerations.asScala.exists(_.getKey == "toleration-key"))
+    assert(spec.getVolumes.asScala.exists(_.getName == "test-volume"))
+    val container = pod.container
+    assert(container.getName === "executor-container")
+    assert(container.getArgs.contains("arg"))
+    assert(container.getCommand.equals(List("command").asJava))
+    assert(container.getEnv.asScala.exists(_.getName == "env-key"))
+    assert(container.getResources.getLimits.get("gpu") ===
+      new QuantityBuilder().withAmount("1").build())
+    assert(container.getSecurityContext.getRunAsNonRoot)
+    assert(container.getStdin)
+    assert(container.getTerminationMessagePath === "termination-message-path")
+    assert(container.getTerminationMessagePolicy === "termination-message-policy")
+    assert(pod.container.getVolumeMounts.asScala.exists(_.getName == "test-volume"))
+
+  }
+
+
+  def podWithSupportedFeatures(): Pod = new PodBuilder()
+        .withNewMetadata()
+          .addToLabels("test-label-key", "test-label-value")
+          .addToAnnotations("test-annotation-key", "test-annotation-value")
+          .withNamespace("namespace")
+          .addNewOwnerReference()
+            .withController(true)
+            .withName("owner-reference")
+            .endOwnerReference()
+          .endMetadata()
+        .withNewSpec()
+          .withDnsPolicy("dns-policy")
+          .withHostAliases(new HostAliasBuilder().withHostnames("hostname").build())
+          .withImagePullSecrets(
+            new LocalObjectReferenceBuilder().withName("local-reference").build())
+          .withInitContainers(new ContainerBuilder().withName("init-container").build())
+          .withNodeName("node-name")
+          .withNodeSelector(Map("node-selector-key" -> "node-selector-value").asJava)
+          .withSchedulerName("scheduler")
+          .withNewSecurityContext()
+            .withRunAsUser(1000L)
+            .endSecurityContext()
+          .withServiceAccount("service-account")
+          .withSubdomain("subdomain")
+          .withTolerations(new TolerationBuilder()
+            .withKey("toleration-key")
+            .withOperator("Equal")
+            .withEffect("NoSchedule")
+            .build())
+          .addNewVolume()
+            .withNewHostPath()
+            .withPath("/test")
+            .endHostPath()
+            .withName("test-volume")
+            .endVolume()
+          .addNewContainer()
+            .withArgs("arg")
+            .withCommand("command")
+            .addNewEnv()
+              .withName("env-key")
+              .withValue("env-value")
+              .endEnv()
+            .withImagePullPolicy("Always")
+            .withName("executor-container")
+            .withNewResources()
+              .withLimits(Map("gpu" -> new QuantityBuilder().withAmount("1").build()).asJava)
+              .endResources()
+            .withNewSecurityContext()
+              .withRunAsNonRoot(true)
+              .endSecurityContext()
+            .withStdin(true)
+            .withTerminationMessagePath("termination-message-path")
+            .withTerminationMessagePolicy("termination-message-policy")
+            .addToVolumeMounts(
+              new VolumeMountBuilder()
+                .withName("test-volume")
+                .withMountPath("/test")
+                .build())
+            .endContainer()
+          .endSpec()
+        .build()
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
index d840938..3995b2a 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
@@ -46,9 +46,6 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte
   private var podOperations: PODS = _
 
   @Mock
-  private var executorBuilder: KubernetesExecutorBuilder = _
-
-  @Mock
   private var schedulerBackend: KubernetesClusterSchedulerBackend = _
 
   private var snapshotsStore: DeterministicExecutorPodsSnapshotsStore = _
@@ -64,7 +61,6 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte
     when(podOperations.withName(any(classOf[String]))).thenAnswer(namedPodsAnswer())
     eventHandlerUnderTest = new ExecutorPodsLifecycleManager(
       new SparkConf(),
-      executorBuilder,
       kubernetesClient,
       snapshotsStore,
       removedExecutorsCache)

http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
index b572dac..fb2509f 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
@@ -16,12 +16,15 @@
  */
 package org.apache.spark.scheduler.cluster.k8s
 
-import io.fabric8.kubernetes.api.model.PodBuilder
+import io.fabric8.kubernetes.api.model.{Config => _, _}
+import io.fabric8.kubernetes.client.KubernetesClient
+import org.mockito.Mockito.{mock, never, verify}
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.k8s._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.features._
+import org.apache.spark.deploy.k8s.submit.PodBuilderSuiteUtils
 
 class KubernetesExecutorBuilderSuite extends SparkFunSuite {
   private val BASIC_STEP_TYPE = "basic"
@@ -193,4 +196,40 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
       assert(resolvedPod.pod.getMetadata.getLabels.get(stepType) === stepType)
     }
   }
+
+  test("Starts with empty executor pod if template is not specified") {
+    val kubernetesClient = mock(classOf[KubernetesClient])
+    val executorBuilder = KubernetesExecutorBuilder.apply(kubernetesClient, new SparkConf())
+    verify(kubernetesClient, never()).pods()
+  }
+
+  test("Starts with executor template if specified") {
+    val kubernetesClient = PodBuilderSuiteUtils.loadingMockKubernetesClient()
+    val sparkConf = new SparkConf(false)
+      .set("spark.driver.host", "https://driver.host.com")
+      .set(Config.CONTAINER_IMAGE, "spark-executor:latest")
+      .set(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE, "template-file.yaml")
+    val kubernetesConf = KubernetesConf(
+      sparkConf,
+      KubernetesExecutorSpecificConf(
+        "executor-id", Some(new PodBuilder()
+          .withNewMetadata()
+          .withName("driver")
+          .endMetadata()
+          .build())),
+      "prefix",
+      "appId",
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Nil,
+      Seq.empty[String],
+      Option.empty)
+    val sparkPod = KubernetesExecutorBuilder
+      .apply(kubernetesClient, sparkConf)
+      .buildFromFeatures(kubernetesConf)
+    PodBuilderSuiteUtils.verifyPodWithSupportedFeatures(sparkPod)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/integration-tests/src/test/resources/driver-template.yml
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/integration-tests/src/test/resources/driver-template.yml b/resource-managers/kubernetes/integration-tests/src/test/resources/driver-template.yml
new file mode 100644
index 0000000..0c185be
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/src/test/resources/driver-template.yml
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+apiVersion: v1
+Kind: Pod
+metadata:
+  labels:
+    template-label-key: driver-template-label-value
+spec:
+  containers:
+  - name: test-driver-container
+    image: will-be-overwritten
+

http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/integration-tests/src/test/resources/executor-template.yml
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/integration-tests/src/test/resources/executor-template.yml b/resource-managers/kubernetes/integration-tests/src/test/resources/executor-template.yml
new file mode 100644
index 0000000..0282e23
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/src/test/resources/executor-template.yml
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+apiVersion: v1
+Kind: Pod
+metadata:
+  labels:
+    template-label-key: executor-template-label-value
+spec:
+  containers:
+  - name: test-executor-container
+    image: will-be-overwritten

http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index c99a907..e2e5880 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -38,7 +38,7 @@ import org.apache.spark.internal.Logging
 
 private[spark] class KubernetesSuite extends SparkFunSuite
   with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite
-  with PythonTestsSuite with ClientModeTestsSuite
+  with PythonTestsSuite with ClientModeTestsSuite with PodTemplateSuite
   with Logging with Eventually with Matchers {
 
   import KubernetesSuite._
@@ -288,21 +288,21 @@ private[spark] class KubernetesSuite extends SparkFunSuite
 
   protected def doBasicExecutorPodCheck(executorPod: Pod): Unit = {
     assert(executorPod.getSpec.getContainers.get(0).getImage === image)
-    assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
+    assert(executorPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-executor")
     assert(executorPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount
       === baseMemory)
   }
 
   protected def doBasicExecutorPyPodCheck(executorPod: Pod): Unit = {
     assert(executorPod.getSpec.getContainers.get(0).getImage === pyImage)
-    assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
+    assert(executorPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-executor")
     assert(executorPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount
       === standardNonJVMMemory)
   }
 
   protected def doBasicExecutorRPodCheck(executorPod: Pod): Unit = {
     assert(executorPod.getSpec.getContainers.get(0).getImage === rImage)
-    assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
+    assert(executorPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-executor")
     assert(executorPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount
       === standardNonJVMMemory)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PodTemplateSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PodTemplateSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PodTemplateSuite.scala
new file mode 100644
index 0000000..e5a847e
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PodTemplateSuite.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest
+
+import java.io.File
+
+import io.fabric8.kubernetes.api.model.Pod
+
+import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.k8sTestTag
+
+private[spark] trait PodTemplateSuite { k8sSuite: KubernetesSuite =>
+
+  import PodTemplateSuite._
+
+  test("Start pod creation from template", k8sTestTag) {
+    sparkAppConf
+      .set("spark.kubernetes.driver.podTemplateFile", DRIVER_TEMPLATE_FILE.getAbsolutePath)
+      .set("spark.kubernetes.executor.podTemplateFile", EXECUTOR_TEMPLATE_FILE.getAbsolutePath)
+    runSparkPiAndVerifyCompletion(
+      driverPodChecker = (driverPod: Pod) => {
+        assert(driverPod.getMetadata.getName === driverPodName)
+        assert(driverPod.getSpec.getContainers.get(0).getImage === image)
+        assert(driverPod.getSpec.getContainers.get(0).getName === "test-driver-container")
+        assert(driverPod.getMetadata.getLabels.containsKey(LABEL_KEY))
+        assert(driverPod.getMetadata.getLabels.get(LABEL_KEY) === "driver-template-label-value")
+      },
+      executorPodChecker = (executorPod: Pod) => {
+        assert(executorPod.getSpec.getContainers.get(0).getImage === image)
+        assert(executorPod.getSpec.getContainers.get(0).getName === "test-executor-container")
+        assert(executorPod.getMetadata.getLabels.containsKey(LABEL_KEY))
+        assert(executorPod.getMetadata.getLabels.get(LABEL_KEY) === "executor-template-label-value")
+      }
+    )
+  }
+}
+
+private[spark] object PodTemplateSuite {
+  val LABEL_KEY = "template-label-key"
+  val DRIVER_TEMPLATE_FILE = new File(getClass.getResource("/driver-template.yml").getFile)
+  val EXECUTOR_TEMPLATE_FILE = new File(getClass.getResource("/executor-template.yml").getFile)
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message