spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fox...@apache.org
Subject [2/3] spark git commit: [SPARK-22839][K8S] Refactor to unify driver and executor pod builder APIs
Date Fri, 13 Apr 2018 15:44:22 GMT
http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStep.scala
deleted file mode 100644
index 43de329..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStep.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps
-
-import java.io.File
-
-import io.fabric8.kubernetes.api.model.ContainerBuilder
-
-import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.KubernetesUtils
-import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
-
-/**
- * Step that configures the classpath, spark.jars, and spark.files for the driver given that the
- * user may provide remote files or files with local:// schemes.
- */
-private[spark] class DependencyResolutionStep(
-    sparkJars: Seq[String],
-    sparkFiles: Seq[String]) extends DriverConfigurationStep {
-
-  override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
-    val resolvedSparkJars = KubernetesUtils.resolveFileUrisAndPath(sparkJars)
-    val resolvedSparkFiles = KubernetesUtils.resolveFileUrisAndPath(sparkFiles)
-
-    val sparkConf = driverSpec.driverSparkConf.clone()
-    if (resolvedSparkJars.nonEmpty) {
-      sparkConf.set("spark.jars", resolvedSparkJars.mkString(","))
-    }
-    if (resolvedSparkFiles.nonEmpty) {
-      sparkConf.set("spark.files", resolvedSparkFiles.mkString(","))
-    }
-    val resolvedDriverContainer = if (resolvedSparkJars.nonEmpty) {
-      new ContainerBuilder(driverSpec.driverContainer)
-        .addNewEnv()
-          .withName(ENV_MOUNTED_CLASSPATH)
-          .withValue(resolvedSparkJars.mkString(File.pathSeparator))
-          .endEnv()
-        .build()
-    } else {
-      driverSpec.driverContainer
-    }
-
-    driverSpec.copy(
-      driverContainer = resolvedDriverContainer,
-      driverSparkConf = sparkConf)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverConfigurationStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverConfigurationStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverConfigurationStep.scala
deleted file mode 100644
index 17614e0..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverConfigurationStep.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps
-
-import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
-
-/**
- * Represents a step in configuring the Spark driver pod.
- */
-private[spark] trait DriverConfigurationStep {
-
-  /**
-   * Apply some transformation to the previous state of the driver to add a new feature to it.
-   */
-  def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala
deleted file mode 100644
index 2424e63..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps
-
-import java.io.File
-import java.nio.charset.StandardCharsets
-
-import scala.collection.JavaConverters._
-import scala.language.implicitConversions
-
-import com.google.common.io.{BaseEncoding, Files}
-import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, Secret, SecretBuilder}
-
-import org.apache.spark.SparkConf
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
-
-/**
- * Mounts Kubernetes credentials into the driver pod. The driver will use such mounted credentials
- * to request executors.
- */
-private[spark] class DriverKubernetesCredentialsStep(
-    submissionSparkConf: SparkConf,
-    kubernetesResourceNamePrefix: String) extends DriverConfigurationStep {
-
-  private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption(
-    s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX")
-  private val maybeMountedClientKeyFile = submissionSparkConf.getOption(
-    s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX")
-  private val maybeMountedClientCertFile = submissionSparkConf.getOption(
-    s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX")
-  private val maybeMountedCaCertFile = submissionSparkConf.getOption(
-    s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX")
-  private val driverServiceAccount = submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME)
-
-  override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
-    val driverSparkConf = driverSpec.driverSparkConf.clone()
-
-    val oauthTokenBase64 = submissionSparkConf
-      .getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX")
-      .map { token =>
-        BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8))
-      }
-    val caCertDataBase64 = safeFileConfToBase64(
-      s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
-      "Driver CA cert file")
-    val clientKeyDataBase64 = safeFileConfToBase64(
-      s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX",
-      "Driver client key file")
-    val clientCertDataBase64 = safeFileConfToBase64(
-      s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX",
-      "Driver client cert file")
-
-    val driverSparkConfWithCredentialsLocations = setDriverPodKubernetesCredentialLocations(
-      driverSparkConf,
-      oauthTokenBase64,
-      caCertDataBase64,
-      clientKeyDataBase64,
-      clientCertDataBase64)
-
-    val kubernetesCredentialsSecret = createCredentialsSecret(
-      oauthTokenBase64,
-      caCertDataBase64,
-      clientKeyDataBase64,
-      clientCertDataBase64)
-
-    val driverPodWithMountedKubernetesCredentials = kubernetesCredentialsSecret.map { secret =>
-      new PodBuilder(driverSpec.driverPod)
-        .editOrNewSpec()
-          .addNewVolume()
-            .withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
-            .withNewSecret().withSecretName(secret.getMetadata.getName).endSecret()
-            .endVolume()
-          .endSpec()
-        .build()
-    }.getOrElse(
-      driverServiceAccount.map { account =>
-        new PodBuilder(driverSpec.driverPod)
-          .editOrNewSpec()
-          .withServiceAccount(account)
-          .withServiceAccountName(account)
-          .endSpec()
-          .build()
-      }.getOrElse(driverSpec.driverPod)
-    )
-
-    val driverContainerWithMountedSecretVolume = kubernetesCredentialsSecret.map { _ =>
-      new ContainerBuilder(driverSpec.driverContainer)
-        .addNewVolumeMount()
-          .withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
-          .withMountPath(DRIVER_CREDENTIALS_SECRETS_BASE_DIR)
-          .endVolumeMount()
-        .build()
-    }.getOrElse(driverSpec.driverContainer)
-
-    driverSpec.copy(
-      driverPod = driverPodWithMountedKubernetesCredentials,
-      otherKubernetesResources =
-        driverSpec.otherKubernetesResources ++ kubernetesCredentialsSecret.toSeq,
-      driverSparkConf = driverSparkConfWithCredentialsLocations,
-      driverContainer = driverContainerWithMountedSecretVolume)
-  }
-
-  private def createCredentialsSecret(
-      driverOAuthTokenBase64: Option[String],
-      driverCaCertDataBase64: Option[String],
-      driverClientKeyDataBase64: Option[String],
-      driverClientCertDataBase64: Option[String]): Option[Secret] = {
-    val allSecretData =
-      resolveSecretData(
-        driverClientKeyDataBase64,
-        DRIVER_CREDENTIALS_CLIENT_KEY_SECRET_NAME) ++
-      resolveSecretData(
-        driverClientCertDataBase64,
-        DRIVER_CREDENTIALS_CLIENT_CERT_SECRET_NAME) ++
-      resolveSecretData(
-        driverCaCertDataBase64,
-        DRIVER_CREDENTIALS_CA_CERT_SECRET_NAME) ++
-      resolveSecretData(
-        driverOAuthTokenBase64,
-        DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME)
-
-    if (allSecretData.isEmpty) {
-      None
-    } else {
-      Some(new SecretBuilder()
-        .withNewMetadata()
-          .withName(s"$kubernetesResourceNamePrefix-kubernetes-credentials")
-          .endMetadata()
-        .withData(allSecretData.asJava)
-        .build())
-    }
-  }
-
-  private def setDriverPodKubernetesCredentialLocations(
-      driverSparkConf: SparkConf,
-      driverOauthTokenBase64: Option[String],
-      driverCaCertDataBase64: Option[String],
-      driverClientKeyDataBase64: Option[String],
-      driverClientCertDataBase64: Option[String]): SparkConf = {
-    val resolvedMountedOAuthTokenFile = resolveSecretLocation(
-      maybeMountedOAuthTokenFile,
-      driverOauthTokenBase64,
-      DRIVER_CREDENTIALS_OAUTH_TOKEN_PATH)
-    val resolvedMountedClientKeyFile = resolveSecretLocation(
-      maybeMountedClientKeyFile,
-      driverClientKeyDataBase64,
-      DRIVER_CREDENTIALS_CLIENT_KEY_PATH)
-    val resolvedMountedClientCertFile = resolveSecretLocation(
-      maybeMountedClientCertFile,
-      driverClientCertDataBase64,
-      DRIVER_CREDENTIALS_CLIENT_CERT_PATH)
-    val resolvedMountedCaCertFile = resolveSecretLocation(
-      maybeMountedCaCertFile,
-      driverCaCertDataBase64,
-      DRIVER_CREDENTIALS_CA_CERT_PATH)
-
-    val sparkConfWithCredentialLocations = driverSparkConf
-      .setOption(
-        s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
-        resolvedMountedCaCertFile)
-      .setOption(
-        s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX",
-        resolvedMountedClientKeyFile)
-      .setOption(
-        s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX",
-        resolvedMountedClientCertFile)
-      .setOption(
-        s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX",
-        resolvedMountedOAuthTokenFile)
-
-    // Redact all OAuth token values
-    sparkConfWithCredentialLocations
-      .getAll
-      .filter(_._1.endsWith(OAUTH_TOKEN_CONF_SUFFIX)).map(_._1)
-      .foreach {
-        sparkConfWithCredentialLocations.set(_, "<present_but_redacted>")
-      }
-    sparkConfWithCredentialLocations
-  }
-
-  private def safeFileConfToBase64(conf: String, fileType: String): Option[String] = {
-    submissionSparkConf.getOption(conf)
-      .map(new File(_))
-      .map { file =>
-        require(file.isFile, String.format("%s provided at %s does not exist or is not a file.",
-          fileType, file.getAbsolutePath))
-        BaseEncoding.base64().encode(Files.toByteArray(file))
-      }
-  }
-
-  private def resolveSecretLocation(
-      mountedUserSpecified: Option[String],
-      valueMountedFromSubmitter: Option[String],
-      mountedCanonicalLocation: String): Option[String] = {
-    mountedUserSpecified.orElse(valueMountedFromSubmitter.map { _ =>
-      mountedCanonicalLocation
-    })
-  }
-
-  /**
-   * Resolve a Kubernetes secret data entry from an optional client credential used by the
-   * driver to talk to the Kubernetes API server.
-   *
-   * @param userSpecifiedCredential the optional user-specified client credential.
-   * @param secretName name of the Kubernetes secret storing the client credential.
-   * @return a secret data entry in the form of a map from the secret name to the secret data,
-   *         which may be empty if the user-specified credential is empty.
-   */
-  private def resolveSecretData(
-      userSpecifiedCredential: Option[String],
-      secretName: String): Map[String, String] = {
-    userSpecifiedCredential.map { valueBase64 =>
-      Map(secretName -> valueBase64)
-    }.getOrElse(Map.empty[String, String])
-  }
-
-  private implicit def augmentSparkConf(sparkConf: SparkConf): OptionSettableSparkConf = {
-    new OptionSettableSparkConf(sparkConf)
-  }
-}
-
-private class OptionSettableSparkConf(sparkConf: SparkConf) {
-  def setOption(configEntry: String, option: Option[String]): SparkConf = {
-    option.foreach { opt =>
-      sparkConf.set(configEntry, opt)
-    }
-    sparkConf
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverMountSecretsStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverMountSecretsStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverMountSecretsStep.scala
deleted file mode 100644
index 91e9a9f..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverMountSecretsStep.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps
-
-import org.apache.spark.deploy.k8s.MountSecretsBootstrap
-import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
-
-/**
- * A driver configuration step for mounting user-specified secrets onto user-specified paths.
- *
- * @param bootstrap a utility actually handling mounting of the secrets.
- */
-private[spark] class DriverMountSecretsStep(
-    bootstrap: MountSecretsBootstrap) extends DriverConfigurationStep {
-
-  override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
-    val pod = bootstrap.addSecretVolumes(driverSpec.driverPod)
-    val container = bootstrap.mountSecrets(driverSpec.driverContainer)
-    driverSpec.copy(
-      driverPod = pod,
-      driverContainer = container
-    )
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala
deleted file mode 100644
index 34af7cd..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps
-
-import scala.collection.JavaConverters._
-
-import io.fabric8.kubernetes.api.model.ServiceBuilder
-
-import org.apache.spark.SparkConf
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
-import org.apache.spark.internal.Logging
-import org.apache.spark.util.Clock
-
-/**
- * Allows the driver to be reachable by executor pods through a headless service. The service's
- * ports should correspond to the ports that the executor will reach the pod at for RPC.
- */
-private[spark] class DriverServiceBootstrapStep(
-    resourceNamePrefix: String,
-    driverLabels: Map[String, String],
-    sparkConf: SparkConf,
-    clock: Clock) extends DriverConfigurationStep with Logging {
-
-  import DriverServiceBootstrapStep._
-
-  override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
-    require(sparkConf.getOption(DRIVER_BIND_ADDRESS_KEY).isEmpty,
-      s"$DRIVER_BIND_ADDRESS_KEY is not supported in Kubernetes mode, as the driver's bind " +
-      "address is managed and set to the driver pod's IP address.")
-    require(sparkConf.getOption(DRIVER_HOST_KEY).isEmpty,
-      s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the driver's hostname will be " +
-      "managed via a Kubernetes service.")
-
-    val preferredServiceName = s"$resourceNamePrefix$DRIVER_SVC_POSTFIX"
-    val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) {
-      preferredServiceName
-    } else {
-      val randomServiceId = clock.getTimeMillis()
-      val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
-      logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " +
-        s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " +
-        s"$shorterServiceName as the driver service's name.")
-      shorterServiceName
-    }
-
-    val driverPort = sparkConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT)
-    val driverBlockManagerPort = sparkConf.getInt(
-        org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT.key, DEFAULT_BLOCKMANAGER_PORT)
-    val driverService = new ServiceBuilder()
-      .withNewMetadata()
-        .withName(resolvedServiceName)
-        .endMetadata()
-      .withNewSpec()
-        .withClusterIP("None")
-        .withSelector(driverLabels.asJava)
-        .addNewPort()
-          .withName(DRIVER_PORT_NAME)
-          .withPort(driverPort)
-          .withNewTargetPort(driverPort)
-          .endPort()
-        .addNewPort()
-          .withName(BLOCK_MANAGER_PORT_NAME)
-          .withPort(driverBlockManagerPort)
-          .withNewTargetPort(driverBlockManagerPort)
-          .endPort()
-        .endSpec()
-      .build()
-
-    val namespace = sparkConf.get(KUBERNETES_NAMESPACE)
-    val driverHostname = s"${driverService.getMetadata.getName}.$namespace.svc"
-    val resolvedSparkConf = driverSpec.driverSparkConf.clone()
-      .set(DRIVER_HOST_KEY, driverHostname)
-      .set("spark.driver.port", driverPort.toString)
-      .set(
-        org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, driverBlockManagerPort)
-
-    driverSpec.copy(
-      driverSparkConf = resolvedSparkConf,
-      otherKubernetesResources = driverSpec.otherKubernetesResources ++ Seq(driverService))
-  }
-}
-
-private[spark] object DriverServiceBootstrapStep {
-  val DRIVER_BIND_ADDRESS_KEY = org.apache.spark.internal.config.DRIVER_BIND_ADDRESS.key
-  val DRIVER_HOST_KEY = org.apache.spark.internal.config.DRIVER_HOST_ADDRESS.key
-  val DRIVER_SVC_POSTFIX = "-driver-svc"
-  val MAX_SERVICE_NAME_LENGTH = 63
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
deleted file mode 100644
index 8607d6f..0000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.scheduler.cluster.k8s
-
-import scala.collection.JavaConverters._
-
-import io.fabric8.kubernetes.api.model._
-
-import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.deploy.k8s.{KubernetesUtils, MountSecretsBootstrap}
-import org.apache.spark.deploy.k8s.Config._
-import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.internal.config.{EXECUTOR_CLASS_PATH, EXECUTOR_JAVA_OPTIONS, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD}
-import org.apache.spark.util.Utils
-
-/**
- * A factory class for bootstrapping and creating executor pods with the given bootstrapping
- * components.
- *
- * @param sparkConf Spark configuration
- * @param mountSecretsBootstrap an optional component for mounting user-specified secrets onto
- *                              user-specified paths into the executor container
- */
-private[spark] class ExecutorPodFactory(
-    sparkConf: SparkConf,
-    mountSecretsBootstrap: Option[MountSecretsBootstrap]) {
-
-  private val executorExtraClasspath = sparkConf.get(EXECUTOR_CLASS_PATH)
-
-  private val executorLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
-    sparkConf,
-    KUBERNETES_EXECUTOR_LABEL_PREFIX)
-  require(
-    !executorLabels.contains(SPARK_APP_ID_LABEL),
-    s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.")
-  require(
-    !executorLabels.contains(SPARK_EXECUTOR_ID_LABEL),
-    s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" +
-      " Spark.")
-  require(
-    !executorLabels.contains(SPARK_ROLE_LABEL),
-    s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is reserved for Spark.")
-
-  private val executorAnnotations =
-    KubernetesUtils.parsePrefixedKeyValuePairs(
-      sparkConf,
-      KUBERNETES_EXECUTOR_ANNOTATION_PREFIX)
-  private val nodeSelector =
-    KubernetesUtils.parsePrefixedKeyValuePairs(
-      sparkConf,
-      KUBERNETES_NODE_SELECTOR_PREFIX)
-
-  private val executorContainerImage = sparkConf
-    .get(EXECUTOR_CONTAINER_IMAGE)
-    .getOrElse(throw new SparkException("Must specify the executor container image"))
-  private val imagePullPolicy = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
-  private val imagePullSecrets = sparkConf.get(IMAGE_PULL_SECRETS)
-  private val blockManagerPort = sparkConf
-    .getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)
-
-  private val executorPodNamePrefix = sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
-
-  private val executorMemoryMiB = sparkConf.get(EXECUTOR_MEMORY)
-  private val executorMemoryString = sparkConf.get(
-    EXECUTOR_MEMORY.key, EXECUTOR_MEMORY.defaultValueString)
-
-  private val memoryOverheadMiB = sparkConf
-    .get(EXECUTOR_MEMORY_OVERHEAD)
-    .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt,
-      MEMORY_OVERHEAD_MIN_MIB))
-  private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB
-
-  private val executorCores = sparkConf.getInt("spark.executor.cores", 1)
-  private val executorCoresRequest = if (sparkConf.contains(KUBERNETES_EXECUTOR_REQUEST_CORES)) {
-    sparkConf.get(KUBERNETES_EXECUTOR_REQUEST_CORES).get
-  } else {
-    executorCores.toString
-  }
-  private val executorLimitCores = sparkConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES)
-
-  /**
-   * Configure and construct an executor pod with the given parameters.
-   */
-  def createExecutorPod(
-      executorId: String,
-      applicationId: String,
-      driverUrl: String,
-      executorEnvs: Seq[(String, String)],
-      driverPod: Pod,
-      nodeToLocalTaskCount: Map[String, Int]): Pod = {
-    val name = s"$executorPodNamePrefix-exec-$executorId"
-
-    val parsedImagePullSecrets = KubernetesUtils.parseImagePullSecrets(imagePullSecrets)
-
-    // hostname must be no longer than 63 characters, so take the last 63 characters of the pod
-    // name as the hostname.  This preserves uniqueness since the end of name contains
-    // executorId
-    val hostname = name.substring(Math.max(0, name.length - 63))
-    val resolvedExecutorLabels = Map(
-      SPARK_EXECUTOR_ID_LABEL -> executorId,
-      SPARK_APP_ID_LABEL -> applicationId,
-      SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++
-      executorLabels
-    val executorMemoryQuantity = new QuantityBuilder(false)
-      .withAmount(s"${executorMemoryWithOverhead}Mi")
-      .build()
-    val executorCpuQuantity = new QuantityBuilder(false)
-      .withAmount(executorCoresRequest)
-      .build()
-    val executorExtraClasspathEnv = executorExtraClasspath.map { cp =>
-      new EnvVarBuilder()
-        .withName(ENV_CLASSPATH)
-        .withValue(cp)
-        .build()
-    }
-    val executorExtraJavaOptionsEnv = sparkConf
-      .get(EXECUTOR_JAVA_OPTIONS)
-      .map { opts =>
-        val delimitedOpts = Utils.splitCommandString(opts)
-        delimitedOpts.zipWithIndex.map {
-          case (opt, index) =>
-            new EnvVarBuilder().withName(s"$ENV_JAVA_OPT_PREFIX$index").withValue(opt).build()
-        }
-      }.getOrElse(Seq.empty[EnvVar])
-    val executorEnv = (Seq(
-      (ENV_DRIVER_URL, driverUrl),
-      (ENV_EXECUTOR_CORES, executorCores.toString),
-      (ENV_EXECUTOR_MEMORY, executorMemoryString),
-      (ENV_APPLICATION_ID, applicationId),
-      // This is to set the SPARK_CONF_DIR to be /opt/spark/conf
-      (ENV_SPARK_CONF_DIR, SPARK_CONF_DIR_INTERNAL),
-      (ENV_EXECUTOR_ID, executorId)) ++ executorEnvs)
-      .map(env => new EnvVarBuilder()
-        .withName(env._1)
-        .withValue(env._2)
-        .build()
-      ) ++ Seq(
-      new EnvVarBuilder()
-        .withName(ENV_EXECUTOR_POD_IP)
-        .withValueFrom(new EnvVarSourceBuilder()
-          .withNewFieldRef("v1", "status.podIP")
-          .build())
-        .build()
-    ) ++ executorExtraJavaOptionsEnv ++ executorExtraClasspathEnv.toSeq
-    val requiredPorts = Seq(
-      (BLOCK_MANAGER_PORT_NAME, blockManagerPort))
-      .map { case (name, port) =>
-        new ContainerPortBuilder()
-          .withName(name)
-          .withContainerPort(port)
-          .build()
-      }
-
-    val executorContainer = new ContainerBuilder()
-      .withName("executor")
-      .withImage(executorContainerImage)
-      .withImagePullPolicy(imagePullPolicy)
-      .withNewResources()
-        .addToRequests("memory", executorMemoryQuantity)
-        .addToLimits("memory", executorMemoryQuantity)
-        .addToRequests("cpu", executorCpuQuantity)
-        .endResources()
-      .addAllToEnv(executorEnv.asJava)
-      .withPorts(requiredPorts.asJava)
-      .addToArgs("executor")
-      .build()
-
-    val executorPod = new PodBuilder()
-      .withNewMetadata()
-        .withName(name)
-        .withLabels(resolvedExecutorLabels.asJava)
-        .withAnnotations(executorAnnotations.asJava)
-        .withOwnerReferences()
-          .addNewOwnerReference()
-            .withController(true)
-            .withApiVersion(driverPod.getApiVersion)
-            .withKind(driverPod.getKind)
-            .withName(driverPod.getMetadata.getName)
-            .withUid(driverPod.getMetadata.getUid)
-            .endOwnerReference()
-        .endMetadata()
-      .withNewSpec()
-        .withHostname(hostname)
-        .withRestartPolicy("Never")
-        .withNodeSelector(nodeSelector.asJava)
-        .withImagePullSecrets(parsedImagePullSecrets.asJava)
-        .endSpec()
-      .build()
-
-    val containerWithLimitCores = executorLimitCores.map { limitCores =>
-      val executorCpuLimitQuantity = new QuantityBuilder(false)
-        .withAmount(limitCores)
-        .build()
-      new ContainerBuilder(executorContainer)
-        .editResources()
-        .addToLimits("cpu", executorCpuLimitQuantity)
-        .endResources()
-        .build()
-    }.getOrElse(executorContainer)
-
-    val (maybeSecretsMountedPod, maybeSecretsMountedContainer) =
-      mountSecretsBootstrap.map { bootstrap =>
-        (bootstrap.addSecretVolumes(executorPod), bootstrap.mountSecrets(containerWithLimitCores))
-      }.getOrElse((executorPod, containerWithLimitCores))
-
-
-    new PodBuilder(maybeSecretsMountedPod)
-      .editSpec()
-        .addToContainers(maybeSecretsMountedContainer)
-        .endSpec()
-      .build()
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
index ff5f680..0ea80df 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
@@ -21,7 +21,7 @@ import java.io.File
 import io.fabric8.kubernetes.client.Config
 
 import org.apache.spark.{SparkContext, SparkException}
-import org.apache.spark.deploy.k8s.{KubernetesUtils, MountSecretsBootstrap, SparkKubernetesClientFactory}
+import org.apache.spark.deploy.k8s.{KubernetesUtils, SparkKubernetesClientFactory}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.internal.Logging
@@ -48,12 +48,6 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
       scheduler: TaskScheduler): SchedulerBackend = {
     val executorSecretNamesToMountPaths = KubernetesUtils.parsePrefixedKeyValuePairs(
       sc.conf, KUBERNETES_EXECUTOR_SECRETS_PREFIX)
-    val mountSecretBootstrap = if (executorSecretNamesToMountPaths.nonEmpty) {
-      Some(new MountSecretsBootstrap(executorSecretNamesToMountPaths))
-    } else {
-      None
-    }
-
     val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient(
       KUBERNETES_MASTER_INTERNAL_URL,
       Some(sc.conf.get(KUBERNETES_NAMESPACE)),
@@ -62,8 +56,6 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
       Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)),
       Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)))
 
-    val executorPodFactory = new ExecutorPodFactory(sc.conf, mountSecretBootstrap)
-
     val allocatorExecutor = ThreadUtils
       .newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator")
     val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool(
@@ -71,7 +63,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
     new KubernetesClusterSchedulerBackend(
       scheduler.asInstanceOf[TaskSchedulerImpl],
       sc.env.rpcEnv,
-      executorPodFactory,
+      new KubernetesExecutorBuilder,
       kubernetesClient,
       allocatorExecutor,
       requestExecutorsService)

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index 9de4b16..d86664c 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -32,6 +32,7 @@ import scala.concurrent.{ExecutionContext, Future}
 import org.apache.spark.SparkException
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesConf
 import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv}
 import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils}
@@ -40,7 +41,7 @@ import org.apache.spark.util.Utils
 private[spark] class KubernetesClusterSchedulerBackend(
     scheduler: TaskSchedulerImpl,
     rpcEnv: RpcEnv,
-    executorPodFactory: ExecutorPodFactory,
+    executorBuilder: KubernetesExecutorBuilder,
     kubernetesClient: KubernetesClient,
     allocatorExecutor: ScheduledExecutorService,
     requestExecutorsService: ExecutorService)
@@ -115,14 +116,19 @@ private[spark] class KubernetesClusterSchedulerBackend(
           for (_ <- 0 until math.min(
             currentTotalExpectedExecutors - runningExecutorsToPods.size, podAllocationSize)) {
             val executorId = EXECUTOR_ID_COUNTER.incrementAndGet().toString
-            val executorPod = executorPodFactory.createExecutorPod(
+            val executorConf = KubernetesConf.createExecutorConf(
+              conf,
               executorId,
               applicationId(),
-              driverUrl,
-              conf.getExecutorEnv,
-              driverPod,
-              currentNodeToLocalTaskCount)
-            executorsToAllocate(executorId) = executorPod
+              driverPod)
+            val executorPod = executorBuilder.buildFromFeatures(executorConf)
+            val podWithAttachedContainer = new PodBuilder(executorPod.pod)
+              .editOrNewSpec()
+                .addToContainers(executorPod.container)
+                .endSpec()
+              .build()
+
+            executorsToAllocate(executorId) = podWithAttachedContainer
             logInfo(
               s"Requesting a new executor, total executors is now ${runningExecutorsToPods.size}")
           }

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
new file mode 100644
index 0000000..22568fe
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, KubernetesRoleSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, MountSecretsFeatureStep}
+
+private[spark] class KubernetesExecutorBuilder(
+    provideBasicStep: (KubernetesConf[KubernetesExecutorSpecificConf]) => BasicExecutorFeatureStep =
+      new BasicExecutorFeatureStep(_),
+    provideSecretsStep:
+      (KubernetesConf[_ <: KubernetesRoleSpecificConf]) => MountSecretsFeatureStep =
+      new MountSecretsFeatureStep(_)) {
+
+  def buildFromFeatures(
+    kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf]): SparkPod = {
+    val baseFeatures = Seq(provideBasicStep(kubernetesConf))
+    val allFeatures = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
+      baseFeatures ++ Seq(provideSecretsStep(kubernetesConf))
+    } else baseFeatures
+    var executorPod = SparkPod.initialPod()
+    for (feature <- allFeatures) {
+      executorPod = feature.configurePod(executorPod)
+    }
+    executorPod
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
new file mode 100644
index 0000000..f10202f
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.k8s
+
+import io.fabric8.kubernetes.api.model.{LocalObjectReferenceBuilder, PodBuilder}
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.JavaMainAppResource
+
+class KubernetesConfSuite extends SparkFunSuite {
+
+  private val APP_NAME = "test-app"
+  private val RESOURCE_NAME_PREFIX = "prefix"
+  private val APP_ID = "test-id"
+  private val MAIN_CLASS = "test-class"
+  private val APP_ARGS = Array("arg1", "arg2")
+  private val CUSTOM_LABELS = Map(
+    "customLabel1Key" -> "customLabel1Value",
+    "customLabel2Key" -> "customLabel2Value")
+  private val CUSTOM_ANNOTATIONS = Map(
+    "customAnnotation1Key" -> "customAnnotation1Value",
+    "customAnnotation2Key" -> "customAnnotation2Value")
+  private val SECRET_NAMES_TO_MOUNT_PATHS = Map(
+    "secret1" -> "/mnt/secrets/secret1",
+    "secret2" -> "/mnt/secrets/secret2")
+  private val CUSTOM_ENVS = Map(
+    "customEnvKey1" -> "customEnvValue1",
+    "customEnvKey2" -> "customEnvValue2")
+  private val DRIVER_POD = new PodBuilder().build()
+  private val EXECUTOR_ID = "executor-id"
+
+  test("Basic driver translated fields.") {
+    val sparkConf = new SparkConf(false)
+    val conf = KubernetesConf.createDriverConf(
+      sparkConf,
+      APP_NAME,
+      RESOURCE_NAME_PREFIX,
+      APP_ID,
+      None,
+      MAIN_CLASS,
+      APP_ARGS)
+    assert(conf.appId === APP_ID)
+    assert(conf.sparkConf.getAll.toMap === sparkConf.getAll.toMap)
+    assert(conf.appResourceNamePrefix === RESOURCE_NAME_PREFIX)
+    assert(conf.roleSpecificConf.appName === APP_NAME)
+    assert(conf.roleSpecificConf.mainAppResource.isEmpty)
+    assert(conf.roleSpecificConf.mainClass === MAIN_CLASS)
+    assert(conf.roleSpecificConf.appArgs === APP_ARGS)
+  }
+
+  test("Creating driver conf with and without the main app jar influences spark.jars") {
+    val sparkConf = new SparkConf(false)
+      .setJars(Seq("local:///opt/spark/jar1.jar"))
+    val mainAppJar = Some(JavaMainAppResource("local:///opt/spark/main.jar"))
+    val kubernetesConfWithMainJar = KubernetesConf.createDriverConf(
+      sparkConf,
+      APP_NAME,
+      RESOURCE_NAME_PREFIX,
+      APP_ID,
+      mainAppJar,
+      MAIN_CLASS,
+      APP_ARGS)
+    assert(kubernetesConfWithMainJar.sparkConf.get("spark.jars")
+      .split(",")
+      === Array("local:///opt/spark/jar1.jar", "local:///opt/spark/main.jar"))
+    val kubernetesConfWithoutMainJar = KubernetesConf.createDriverConf(
+      sparkConf,
+      APP_NAME,
+      RESOURCE_NAME_PREFIX,
+      APP_ID,
+      None,
+      MAIN_CLASS,
+      APP_ARGS)
+    assert(kubernetesConfWithoutMainJar.sparkConf.get("spark.jars").split(",")
+      === Array("local:///opt/spark/jar1.jar"))
+  }
+
+  test("Resolve driver labels, annotations, secret mount paths, and envs.") {
+    val sparkConf = new SparkConf(false)
+    CUSTOM_LABELS.foreach { case (key, value) =>
+      sparkConf.set(s"$KUBERNETES_DRIVER_LABEL_PREFIX$key", value)
+    }
+    CUSTOM_ANNOTATIONS.foreach { case (key, value) =>
+      sparkConf.set(s"$KUBERNETES_DRIVER_ANNOTATION_PREFIX$key", value)
+    }
+    SECRET_NAMES_TO_MOUNT_PATHS.foreach { case (key, value) =>
+      sparkConf.set(s"$KUBERNETES_DRIVER_SECRETS_PREFIX$key", value)
+    }
+    CUSTOM_ENVS.foreach { case (key, value) =>
+      sparkConf.set(s"$KUBERNETES_DRIVER_ENV_PREFIX$key", value)
+    }
+
+    val conf = KubernetesConf.createDriverConf(
+      sparkConf,
+      APP_NAME,
+      RESOURCE_NAME_PREFIX,
+      APP_ID,
+      None,
+      MAIN_CLASS,
+      APP_ARGS)
+    assert(conf.roleLabels === Map(
+      SPARK_APP_ID_LABEL -> APP_ID,
+      SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) ++
+      CUSTOM_LABELS)
+    assert(conf.roleAnnotations === CUSTOM_ANNOTATIONS)
+    assert(conf.roleSecretNamesToMountPaths === SECRET_NAMES_TO_MOUNT_PATHS)
+    assert(conf.roleEnvs === CUSTOM_ENVS)
+  }
+
+  test("Basic executor translated fields.") {
+    val conf = KubernetesConf.createExecutorConf(
+      new SparkConf(false),
+      EXECUTOR_ID,
+      APP_ID,
+      DRIVER_POD)
+    assert(conf.roleSpecificConf.executorId === EXECUTOR_ID)
+    assert(conf.roleSpecificConf.driverPod === DRIVER_POD)
+  }
+
+  test("Image pull secrets.") {
+    val conf = KubernetesConf.createExecutorConf(
+      new SparkConf(false)
+        .set(IMAGE_PULL_SECRETS, "my-secret-1,my-secret-2 "),
+      EXECUTOR_ID,
+      APP_ID,
+      DRIVER_POD)
+    assert(conf.imagePullSecrets() ===
+      Seq(
+        new LocalObjectReferenceBuilder().withName("my-secret-1").build(),
+        new LocalObjectReferenceBuilder().withName("my-secret-2").build()))
+  }
+
+  test("Set executor labels, annotations, and secrets") {
+    val sparkConf = new SparkConf(false)
+    CUSTOM_LABELS.foreach { case (key, value) =>
+      sparkConf.set(s"$KUBERNETES_EXECUTOR_LABEL_PREFIX$key", value)
+    }
+    CUSTOM_ANNOTATIONS.foreach { case (key, value) =>
+      sparkConf.set(s"$KUBERNETES_EXECUTOR_ANNOTATION_PREFIX$key", value)
+    }
+    SECRET_NAMES_TO_MOUNT_PATHS.foreach { case (key, value) =>
+      sparkConf.set(s"$KUBERNETES_EXECUTOR_SECRETS_PREFIX$key", value)
+    }
+
+    val conf = KubernetesConf.createExecutorConf(
+      sparkConf,
+      EXECUTOR_ID,
+      APP_ID,
+      DRIVER_POD)
+    assert(conf.roleLabels === Map(
+      SPARK_EXECUTOR_ID_LABEL -> EXECUTOR_ID,
+      SPARK_APP_ID_LABEL -> APP_ID,
+      SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++ CUSTOM_LABELS)
+    assert(conf.roleAnnotations === CUSTOM_ANNOTATIONS)
+    assert(conf.roleSecretNamesToMountPaths === SECRET_NAMES_TO_MOUNT_PATHS)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsTest.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsTest.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsTest.scala
deleted file mode 100644
index cf41b22..0000000
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsTest.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s
-
-import io.fabric8.kubernetes.api.model.LocalObjectReference
-
-import org.apache.spark.SparkFunSuite
-
-class KubernetesUtilsTest extends SparkFunSuite {
-
-  test("testParseImagePullSecrets") {
-    val noSecrets = KubernetesUtils.parseImagePullSecrets(None)
-    assert(noSecrets === Nil)
-
-    val oneSecret = KubernetesUtils.parseImagePullSecrets(Some("imagePullSecret"))
-    assert(oneSecret === new LocalObjectReference("imagePullSecret") :: Nil)
-
-    val commaSeparatedSecrets = KubernetesUtils.parseImagePullSecrets(Some("s1, s2  , s3,s4"))
-    assert(commaSeparatedSecrets.map(_.getName) === "s1" :: "s2" :: "s3" :: "s4" :: Nil)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
new file mode 100644
index 0000000..eee85b8
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.LocalObjectReferenceBuilder
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+
+class BasicDriverFeatureStepSuite extends SparkFunSuite {
+
+  private val APP_ID = "spark-app-id"
+  private val RESOURCE_NAME_PREFIX = "spark"
+  private val DRIVER_LABELS = Map("labelkey" -> "labelvalue")
+  private val CONTAINER_IMAGE_PULL_POLICY = "IfNotPresent"
+  private val APP_NAME = "spark-test"
+  private val MAIN_CLASS = "org.apache.spark.examples.SparkPi"
+  private val APP_ARGS = Array("arg1", "arg2", "\"arg 3\"")
+  private val CUSTOM_ANNOTATION_KEY = "customAnnotation"
+  private val CUSTOM_ANNOTATION_VALUE = "customAnnotationValue"
+  private val DRIVER_ANNOTATIONS = Map(CUSTOM_ANNOTATION_KEY -> CUSTOM_ANNOTATION_VALUE)
+  private val DRIVER_CUSTOM_ENV1 = "customDriverEnv1"
+  private val DRIVER_CUSTOM_ENV2 = "customDriverEnv2"
+  private val DRIVER_ENVS = Map(
+    DRIVER_CUSTOM_ENV1 -> DRIVER_CUSTOM_ENV1,
+    DRIVER_CUSTOM_ENV2 -> DRIVER_CUSTOM_ENV2)
+  private val TEST_IMAGE_PULL_SECRETS = Seq("my-secret-1", "my-secret-2")
+  private val TEST_IMAGE_PULL_SECRET_OBJECTS =
+    TEST_IMAGE_PULL_SECRETS.map { secret =>
+      new LocalObjectReferenceBuilder().withName(secret).build()
+    }
+
+  test("Check the pod respects all configurations from the user.") {
+    val sparkConf = new SparkConf()
+      .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod")
+      .set("spark.driver.cores", "2")
+      .set(KUBERNETES_DRIVER_LIMIT_CORES, "4")
+      .set(org.apache.spark.internal.config.DRIVER_MEMORY.key, "256M")
+      .set(org.apache.spark.internal.config.DRIVER_MEMORY_OVERHEAD, 200L)
+      .set(CONTAINER_IMAGE, "spark-driver:latest")
+      .set(IMAGE_PULL_SECRETS, TEST_IMAGE_PULL_SECRETS.mkString(","))
+    val kubernetesConf = KubernetesConf(
+      sparkConf,
+      KubernetesDriverSpecificConf(
+        None,
+        APP_NAME,
+        MAIN_CLASS,
+        APP_ARGS),
+      RESOURCE_NAME_PREFIX,
+      APP_ID,
+      DRIVER_LABELS,
+      DRIVER_ANNOTATIONS,
+      Map.empty,
+      DRIVER_ENVS)
+
+    val featureStep = new BasicDriverFeatureStep(kubernetesConf)
+    val basePod = SparkPod.initialPod()
+    val configuredPod = featureStep.configurePod(basePod)
+
+    assert(configuredPod.container.getName === DRIVER_CONTAINER_NAME)
+    assert(configuredPod.container.getImage === "spark-driver:latest")
+    assert(configuredPod.container.getImagePullPolicy === CONTAINER_IMAGE_PULL_POLICY)
+
+    assert(configuredPod.container.getEnv.size === 3)
+    val envs = configuredPod.container
+      .getEnv
+      .asScala
+      .map(env => (env.getName, env.getValue))
+      .toMap
+    assert(envs(DRIVER_CUSTOM_ENV1) === DRIVER_ENVS(DRIVER_CUSTOM_ENV1))
+    assert(envs(DRIVER_CUSTOM_ENV2) === DRIVER_ENVS(DRIVER_CUSTOM_ENV2))
+
+    assert(configuredPod.pod.getSpec().getImagePullSecrets.asScala ===
+      TEST_IMAGE_PULL_SECRET_OBJECTS)
+
+    assert(configuredPod.container.getEnv.asScala.exists(envVar =>
+      envVar.getName.equals(ENV_DRIVER_BIND_ADDRESS) &&
+        envVar.getValueFrom.getFieldRef.getApiVersion.equals("v1") &&
+        envVar.getValueFrom.getFieldRef.getFieldPath.equals("status.podIP")))
+
+    val resourceRequirements = configuredPod.container.getResources
+    val requests = resourceRequirements.getRequests.asScala
+    assert(requests("cpu").getAmount === "2")
+    assert(requests("memory").getAmount === "456Mi")
+    val limits = resourceRequirements.getLimits.asScala
+    assert(limits("memory").getAmount === "456Mi")
+    assert(limits("cpu").getAmount === "4")
+
+    val driverPodMetadata = configuredPod.pod.getMetadata
+    assert(driverPodMetadata.getName === "spark-driver-pod")
+    assert(driverPodMetadata.getLabels.asScala === DRIVER_LABELS)
+    assert(driverPodMetadata.getAnnotations.asScala === DRIVER_ANNOTATIONS)
+    assert(configuredPod.pod.getSpec.getRestartPolicy === "Never")
+
+    val expectedSparkConf = Map(
+      KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod",
+      "spark.app.id" -> APP_ID,
+      KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> RESOURCE_NAME_PREFIX,
+      "spark.kubernetes.submitInDriver" -> "true")
+    assert(featureStep.getAdditionalPodSystemProperties() === expectedSparkConf)
+  }
+
+  test("Additional system properties resolve jars and set cluster-mode confs.") {
+    val allJars = Seq("local:///opt/spark/jar1.jar", "hdfs:///opt/spark/jar2.jar")
+    val allFiles = Seq("https://localhost:9000/file1.txt", "local:///opt/spark/file2.txt")
+    val sparkConf = new SparkConf()
+      .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod")
+      .setJars(allJars)
+      .set("spark.files", allFiles.mkString(","))
+      .set(CONTAINER_IMAGE, "spark-driver:latest")
+    val kubernetesConf = KubernetesConf(
+      sparkConf,
+      KubernetesDriverSpecificConf(
+        None,
+        APP_NAME,
+        MAIN_CLASS,
+        APP_ARGS),
+      RESOURCE_NAME_PREFIX,
+      APP_ID,
+      DRIVER_LABELS,
+      DRIVER_ANNOTATIONS,
+      Map.empty,
+      Map.empty)
+    val step = new BasicDriverFeatureStep(kubernetesConf)
+    val additionalProperties = step.getAdditionalPodSystemProperties()
+    val expectedSparkConf = Map(
+      KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod",
+      "spark.app.id" -> APP_ID,
+      KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> RESOURCE_NAME_PREFIX,
+      "spark.kubernetes.submitInDriver" -> "true",
+      "spark.jars" -> "/opt/spark/jar1.jar,hdfs:///opt/spark/jar2.jar",
+      "spark.files" -> "https://localhost:9000/file1.txt,/opt/spark/file2.txt")
+    assert(additionalProperties === expectedSparkConf)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
new file mode 100644
index 0000000..a764f76
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model._
+import org.mockito.MockitoAnnotations
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterEach}
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.rpc.RpcEndpointAddress
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+
+class BasicExecutorFeatureStepSuite
+  extends SparkFunSuite with BeforeAndAfter with BeforeAndAfterEach {
+
+  private val APP_ID = "app-id"
+  private val DRIVER_HOSTNAME = "localhost"
+  private val DRIVER_PORT = 7098
+  private val DRIVER_ADDRESS = RpcEndpointAddress(
+    DRIVER_HOSTNAME,
+    DRIVER_PORT.toInt,
+    CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
+  private val DRIVER_POD_NAME = "driver-pod"
+
+  private val DRIVER_POD_UID = "driver-uid"
+  private val RESOURCE_NAME_PREFIX = "base"
+  private val EXECUTOR_IMAGE = "executor-image"
+  private val LABELS = Map("label1key" -> "label1value")
+  private val ANNOTATIONS = Map("annotation1key" -> "annotation1value")
+  private val TEST_IMAGE_PULL_SECRETS = Seq("my-1secret-1", "my-secret-2")
+  private val TEST_IMAGE_PULL_SECRET_OBJECTS =
+    TEST_IMAGE_PULL_SECRETS.map { secret =>
+      new LocalObjectReferenceBuilder().withName(secret).build()
+    }
+  private val DRIVER_POD = new PodBuilder()
+    .withNewMetadata()
+      .withName(DRIVER_POD_NAME)
+      .withUid(DRIVER_POD_UID)
+      .endMetadata()
+    .withNewSpec()
+      .withNodeName("some-node")
+      .endSpec()
+    .withNewStatus()
+      .withHostIP("192.168.99.100")
+      .endStatus()
+    .build()
+  private var baseConf: SparkConf = _
+
+  before {
+    MockitoAnnotations.initMocks(this)
+    baseConf = new SparkConf()
+      .set(KUBERNETES_DRIVER_POD_NAME, DRIVER_POD_NAME)
+      .set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, RESOURCE_NAME_PREFIX)
+      .set(CONTAINER_IMAGE, EXECUTOR_IMAGE)
+      .set(KUBERNETES_DRIVER_SUBMIT_CHECK, true)
+      .set("spark.driver.host", DRIVER_HOSTNAME)
+      .set("spark.driver.port", DRIVER_PORT.toString)
+      .set(IMAGE_PULL_SECRETS, TEST_IMAGE_PULL_SECRETS.mkString(","))
+  }
+
+  test("basic executor pod has reasonable defaults") {
+    val step = new BasicExecutorFeatureStep(
+      KubernetesConf(
+        baseConf,
+        KubernetesExecutorSpecificConf("1", DRIVER_POD),
+        RESOURCE_NAME_PREFIX,
+        APP_ID,
+        LABELS,
+        ANNOTATIONS,
+        Map.empty,
+        Map.empty))
+    val executor = step.configurePod(SparkPod.initialPod())
+
+    // The executor pod name and default labels.
+    assert(executor.pod.getMetadata.getName === s"$RESOURCE_NAME_PREFIX-exec-1")
+    assert(executor.pod.getMetadata.getLabels.asScala === LABELS)
+    assert(executor.pod.getSpec.getImagePullSecrets.asScala === TEST_IMAGE_PULL_SECRET_OBJECTS)
+
+    // There is exactly 1 container with no volume mounts and default memory limits.
+    // Default memory limit is 1024M + 384M (minimum overhead constant).
+    assert(executor.container.getImage === EXECUTOR_IMAGE)
+    assert(executor.container.getVolumeMounts.isEmpty)
+    assert(executor.container.getResources.getLimits.size() === 1)
+    assert(executor.container.getResources
+      .getLimits.get("memory").getAmount === "1408Mi")
+
+    // The pod has no node selector, volumes.
+    assert(executor.pod.getSpec.getNodeSelector.isEmpty)
+    assert(executor.pod.getSpec.getVolumes.isEmpty)
+
+    checkEnv(executor, Map())
+    checkOwnerReferences(executor.pod, DRIVER_POD_UID)
+  }
+
+  test("executor pod hostnames get truncated to 63 characters") {
+    val conf = baseConf.clone()
+    val longPodNamePrefix = "loremipsumdolorsitametvimatelitrefficiendisuscipianturvixlegeresple"
+
+    val step = new BasicExecutorFeatureStep(
+      KubernetesConf(
+        conf,
+        KubernetesExecutorSpecificConf("1", DRIVER_POD),
+        longPodNamePrefix,
+        APP_ID,
+        LABELS,
+        ANNOTATIONS,
+        Map.empty,
+        Map.empty))
+    assert(step.configurePod(SparkPod.initialPod()).pod.getSpec.getHostname.length === 63)
+  }
+
+  test("classpath and extra java options get translated into environment variables") {
+    val conf = baseConf.clone()
+    conf.set(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS, "foo=bar")
+    conf.set(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH, "bar=baz")
+
+    val step = new BasicExecutorFeatureStep(
+      KubernetesConf(
+        conf,
+        KubernetesExecutorSpecificConf("1", DRIVER_POD),
+        RESOURCE_NAME_PREFIX,
+        APP_ID,
+        LABELS,
+        ANNOTATIONS,
+        Map.empty,
+        Map("qux" -> "quux")))
+    val executor = step.configurePod(SparkPod.initialPod())
+
+    checkEnv(executor,
+      Map("SPARK_JAVA_OPT_0" -> "foo=bar",
+        ENV_CLASSPATH -> "bar=baz",
+        "qux" -> "quux"))
+    checkOwnerReferences(executor.pod, DRIVER_POD_UID)
+  }
+
+  // There is always exactly one controller reference, and it points to the driver pod.
+  private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = {
+    assert(executor.getMetadata.getOwnerReferences.size() === 1)
+    assert(executor.getMetadata.getOwnerReferences.get(0).getUid === driverPodUid)
+    assert(executor.getMetadata.getOwnerReferences.get(0).getController === true)
+  }
+
+  // Check that the expected environment variables are present.
+  private def checkEnv(executorPod: SparkPod, additionalEnvVars: Map[String, String]): Unit = {
+    val defaultEnvs = Map(
+      ENV_EXECUTOR_ID -> "1",
+      ENV_DRIVER_URL -> DRIVER_ADDRESS.toString,
+      ENV_EXECUTOR_CORES -> "1",
+      ENV_EXECUTOR_MEMORY -> "1g",
+      ENV_APPLICATION_ID -> APP_ID,
+      ENV_SPARK_CONF_DIR -> SPARK_CONF_DIR_INTERNAL,
+      ENV_EXECUTOR_POD_IP -> null) ++ additionalEnvVars
+
+    assert(executorPod.container.getEnv.size() === defaultEnvs.size)
+    val mapEnvs = executorPod.container.getEnv.asScala.map {
+      x => (x.getName, x.getValue)
+    }.toMap
+    assert(defaultEnvs === mapEnvs)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
new file mode 100644
index 0000000..9f817d3
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import java.io.File
+
+import com.google.common.base.Charsets
+import com.google.common.io.{BaseEncoding, Files}
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder, Secret}
+import org.mockito.{Mock, MockitoAnnotations}
+import org.scalatest.BeforeAndAfter
+import scala.collection.JavaConverters._
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.util.Utils
+
+class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
+
+  private val KUBERNETES_RESOURCE_NAME_PREFIX = "spark"
+  private val APP_ID = "k8s-app"
+  private var credentialsTempDirectory: File = _
+  private val BASE_DRIVER_POD = SparkPod.initialPod()
+
+  @Mock
+  private var driverSpecificConf: KubernetesDriverSpecificConf = _
+
+  before {
+    MockitoAnnotations.initMocks(this)
+    credentialsTempDirectory = Utils.createTempDir()
+  }
+
+  after {
+    credentialsTempDirectory.delete()
+  }
+
+  test("Don't set any credentials") {
+    val kubernetesConf = KubernetesConf(
+      new SparkConf(false),
+      driverSpecificConf,
+      KUBERNETES_RESOURCE_NAME_PREFIX,
+      APP_ID,
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Map.empty)
+    val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf)
+    assert(kubernetesCredentialsStep.configurePod(BASE_DRIVER_POD) === BASE_DRIVER_POD)
+    assert(kubernetesCredentialsStep.getAdditionalPodSystemProperties().isEmpty)
+    assert(kubernetesCredentialsStep.getAdditionalKubernetesResources().isEmpty)
+  }
+
+  test("Only set credentials that are manually mounted.") {
+    val submissionSparkConf = new SparkConf(false)
+      .set(
+        s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX",
+        "/mnt/secrets/my-token.txt")
+      .set(
+        s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX",
+        "/mnt/secrets/my-key.pem")
+      .set(
+        s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX",
+        "/mnt/secrets/my-cert.pem")
+      .set(
+        s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
+        "/mnt/secrets/my-ca.pem")
+    val kubernetesConf = KubernetesConf(
+      submissionSparkConf,
+      driverSpecificConf,
+      KUBERNETES_RESOURCE_NAME_PREFIX,
+      APP_ID,
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Map.empty)
+
+    val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf)
+    assert(kubernetesCredentialsStep.configurePod(BASE_DRIVER_POD) === BASE_DRIVER_POD)
+    assert(kubernetesCredentialsStep.getAdditionalKubernetesResources().isEmpty)
+    val resolvedProperties = kubernetesCredentialsStep.getAdditionalPodSystemProperties()
+    resolvedProperties.foreach { case (propKey, propValue) =>
+      assert(submissionSparkConf.get(propKey) === propValue)
+    }
+  }
+
+  test("Mount credentials from the submission client as a secret.") {
+    val caCertFile = writeCredentials("ca.pem", "ca-cert")
+    val clientKeyFile = writeCredentials("key.pem", "key")
+    val clientCertFile = writeCredentials("cert.pem", "cert")
+    val submissionSparkConf = new SparkConf(false)
+      .set(
+        s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX",
+        "token")
+      .set(
+        s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX",
+        clientKeyFile.getAbsolutePath)
+      .set(
+        s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX",
+        clientCertFile.getAbsolutePath)
+      .set(
+        s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
+        caCertFile.getAbsolutePath)
+    val kubernetesConf = KubernetesConf(
+      submissionSparkConf,
+      driverSpecificConf,
+      KUBERNETES_RESOURCE_NAME_PREFIX,
+      APP_ID,
+      Map.empty,
+      Map.empty,
+      Map.empty,
+      Map.empty)
+    val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf)
+    val resolvedProperties = kubernetesCredentialsStep.getAdditionalPodSystemProperties()
+    val expectedSparkConf = Map(
+      s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX" -> "<present_but_redacted>",
+      s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX" ->
+        DRIVER_CREDENTIALS_OAUTH_TOKEN_PATH,
+      s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX" ->
+        DRIVER_CREDENTIALS_CLIENT_KEY_PATH,
+      s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX" ->
+        DRIVER_CREDENTIALS_CLIENT_CERT_PATH,
+      s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX" ->
+        DRIVER_CREDENTIALS_CA_CERT_PATH)
+    assert(resolvedProperties === expectedSparkConf)
+    assert(kubernetesCredentialsStep.getAdditionalKubernetesResources().size === 1)
+    val credentialsSecret = kubernetesCredentialsStep
+      .getAdditionalKubernetesResources()
+      .head
+      .asInstanceOf[Secret]
+    assert(credentialsSecret.getMetadata.getName ===
+      s"$KUBERNETES_RESOURCE_NAME_PREFIX-kubernetes-credentials")
+    val decodedSecretData = credentialsSecret.getData.asScala.map { data =>
+      (data._1, new String(BaseEncoding.base64().decode(data._2), Charsets.UTF_8))
+    }
+    val expectedSecretData = Map(
+      DRIVER_CREDENTIALS_CA_CERT_SECRET_NAME -> "ca-cert",
+      DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME -> "token",
+      DRIVER_CREDENTIALS_CLIENT_KEY_SECRET_NAME -> "key",
+      DRIVER_CREDENTIALS_CLIENT_CERT_SECRET_NAME -> "cert")
+    assert(decodedSecretData === expectedSecretData)
+    val driverPod = kubernetesCredentialsStep.configurePod(BASE_DRIVER_POD)
+    val driverPodVolumes = driverPod.pod.getSpec.getVolumes.asScala
+    assert(driverPodVolumes.size === 1)
+    assert(driverPodVolumes.head.getName === DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
+    assert(driverPodVolumes.head.getSecret != null)
+    assert(driverPodVolumes.head.getSecret.getSecretName === credentialsSecret.getMetadata.getName)
+    val driverContainerVolumeMount = driverPod.container.getVolumeMounts.asScala
+    assert(driverContainerVolumeMount.size === 1)
+    assert(driverContainerVolumeMount.head.getName === DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
+    assert(driverContainerVolumeMount.head.getMountPath === DRIVER_CREDENTIALS_SECRETS_BASE_DIR)
+  }
+
+  private def writeCredentials(credentialsFileName: String, credentialsContents: String): File = {
+    val credentialsFile = new File(credentialsTempDirectory, credentialsFileName)
+    Files.write(credentialsContents, credentialsFile, Charsets.UTF_8)
+    credentialsFile
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
new file mode 100644
index 0000000..c299d56
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import io.fabric8.kubernetes.api.model.Service
+import org.mockito.{Mock, MockitoAnnotations}
+import org.mockito.Mockito.when
+import org.scalatest.BeforeAndAfter
+import scala.collection.JavaConverters._
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.util.Clock
+
+class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
+
+  private val SHORT_RESOURCE_NAME_PREFIX =
+    "a" * (DriverServiceFeatureStep.MAX_SERVICE_NAME_LENGTH -
+      DriverServiceFeatureStep.DRIVER_SVC_POSTFIX.length)
+
+  private val LONG_RESOURCE_NAME_PREFIX =
+    "a" * (DriverServiceFeatureStep.MAX_SERVICE_NAME_LENGTH -
+      DriverServiceFeatureStep.DRIVER_SVC_POSTFIX.length + 1)
+  private val DRIVER_LABELS = Map(
+    "label1key" -> "label1value",
+    "label2key" -> "label2value")
+
+  @Mock
+  private var clock: Clock = _
+
+  private var sparkConf: SparkConf = _
+
+  before {
+    MockitoAnnotations.initMocks(this)
+    sparkConf = new SparkConf(false)
+  }
+
+  test("Headless service has a port for the driver RPC and the block manager.") {
+    sparkConf = sparkConf
+      .set("spark.driver.port", "9000")
+      .set(org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, 8080)
+    val configurationStep = new DriverServiceFeatureStep(
+      KubernetesConf(
+        sparkConf,
+        KubernetesDriverSpecificConf(
+          None, "main", "app", Seq.empty),
+        SHORT_RESOURCE_NAME_PREFIX,
+        "app-id",
+        DRIVER_LABELS,
+        Map.empty,
+        Map.empty,
+        Map.empty))
+    assert(configurationStep.configurePod(SparkPod.initialPod()) === SparkPod.initialPod())
+    assert(configurationStep.getAdditionalKubernetesResources().size === 1)
+    assert(configurationStep.getAdditionalKubernetesResources().head.isInstanceOf[Service])
+    val driverService = configurationStep
+      .getAdditionalKubernetesResources()
+      .head
+      .asInstanceOf[Service]
+    verifyService(
+      9000,
+      8080,
+      s"$SHORT_RESOURCE_NAME_PREFIX${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}",
+      driverService)
+  }
+
+  test("Hostname and ports are set according to the service name.") {
+    val configurationStep = new DriverServiceFeatureStep(
+      KubernetesConf(
+        sparkConf
+          .set("spark.driver.port", "9000")
+          .set(org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, 8080)
+          .set(KUBERNETES_NAMESPACE, "my-namespace"),
+        KubernetesDriverSpecificConf(
+          None, "main", "app", Seq.empty),
+        SHORT_RESOURCE_NAME_PREFIX,
+        "app-id",
+        DRIVER_LABELS,
+        Map.empty,
+        Map.empty,
+        Map.empty))
+    val expectedServiceName = SHORT_RESOURCE_NAME_PREFIX +
+      DriverServiceFeatureStep.DRIVER_SVC_POSTFIX
+    val expectedHostName = s"$expectedServiceName.my-namespace.svc"
+    val additionalProps = configurationStep.getAdditionalPodSystemProperties()
+    verifySparkConfHostNames(additionalProps, expectedHostName)
+  }
+
+  test("Ports should resolve to defaults in SparkConf and in the service.") {
+    val configurationStep = new DriverServiceFeatureStep(
+      KubernetesConf(
+        sparkConf,
+        KubernetesDriverSpecificConf(
+          None, "main", "app", Seq.empty),
+        SHORT_RESOURCE_NAME_PREFIX,
+        "app-id",
+        DRIVER_LABELS,
+        Map.empty,
+        Map.empty,
+        Map.empty))
+    val resolvedService = configurationStep
+      .getAdditionalKubernetesResources()
+      .head
+      .asInstanceOf[Service]
+    verifyService(
+      DEFAULT_DRIVER_PORT,
+      DEFAULT_BLOCKMANAGER_PORT,
+      s"$SHORT_RESOURCE_NAME_PREFIX${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}",
+      resolvedService)
+    val additionalProps = configurationStep.getAdditionalPodSystemProperties()
+    assert(additionalProps("spark.driver.port") === DEFAULT_DRIVER_PORT.toString)
+    assert(additionalProps(org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT.key)
+      === DEFAULT_BLOCKMANAGER_PORT.toString)
+  }
+
+  test("Long prefixes should switch to using a generated name.") {
+    when(clock.getTimeMillis()).thenReturn(10000)
+    val configurationStep = new DriverServiceFeatureStep(
+      KubernetesConf(
+        sparkConf.set(KUBERNETES_NAMESPACE, "my-namespace"),
+        KubernetesDriverSpecificConf(
+          None, "main", "app", Seq.empty),
+        LONG_RESOURCE_NAME_PREFIX,
+        "app-id",
+        DRIVER_LABELS,
+        Map.empty,
+        Map.empty,
+        Map.empty),
+      clock)
+    val driverService = configurationStep
+      .getAdditionalKubernetesResources()
+      .head
+      .asInstanceOf[Service]
+    val expectedServiceName = s"spark-10000${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}"
+    assert(driverService.getMetadata.getName === expectedServiceName)
+    val expectedHostName = s"$expectedServiceName.my-namespace.svc"
+    val additionalProps = configurationStep.getAdditionalPodSystemProperties()
+    verifySparkConfHostNames(additionalProps, expectedHostName)
+  }
+
+  test("Disallow bind address and driver host to be set explicitly.") {
+    try {
+      new DriverServiceFeatureStep(
+        KubernetesConf(
+          sparkConf.set(org.apache.spark.internal.config.DRIVER_BIND_ADDRESS, "host"),
+          KubernetesDriverSpecificConf(
+            None, "main", "app", Seq.empty),
+          LONG_RESOURCE_NAME_PREFIX,
+          "app-id",
+          DRIVER_LABELS,
+          Map.empty,
+          Map.empty,
+          Map.empty),
+        clock)
+      fail("The driver bind address should not be allowed.")
+    } catch {
+      case e: Throwable =>
+        assert(e.getMessage ===
+          s"requirement failed: ${DriverServiceFeatureStep.DRIVER_BIND_ADDRESS_KEY} is" +
+          " not supported in Kubernetes mode, as the driver's bind address is managed" +
+          " and set to the driver pod's IP address.")
+    }
+    sparkConf.remove(org.apache.spark.internal.config.DRIVER_BIND_ADDRESS)
+    sparkConf.set(org.apache.spark.internal.config.DRIVER_HOST_ADDRESS, "host")
+    try {
+      new DriverServiceFeatureStep(
+        KubernetesConf(
+          sparkConf,
+          KubernetesDriverSpecificConf(
+            None, "main", "app", Seq.empty),
+          LONG_RESOURCE_NAME_PREFIX,
+          "app-id",
+          DRIVER_LABELS,
+          Map.empty,
+          Map.empty,
+          Map.empty),
+        clock)
+      fail("The driver host address should not be allowed.")
+    } catch {
+      case e: Throwable =>
+        assert(e.getMessage ===
+          s"requirement failed: ${DriverServiceFeatureStep.DRIVER_HOST_KEY} is" +
+          " not supported in Kubernetes mode, as the driver's hostname will be managed via" +
+          " a Kubernetes service.")
+    }
+  }
+
+  private def verifyService(
+      driverPort: Int,
+      blockManagerPort: Int,
+      expectedServiceName: String,
+      service: Service): Unit = {
+    assert(service.getMetadata.getName === expectedServiceName)
+    assert(service.getSpec.getClusterIP === "None")
+    assert(service.getSpec.getSelector.asScala === DRIVER_LABELS)
+    assert(service.getSpec.getPorts.size() === 2)
+    val driverServicePorts = service.getSpec.getPorts.asScala
+    assert(driverServicePorts.head.getName === DRIVER_PORT_NAME)
+    assert(driverServicePorts.head.getPort.intValue() === driverPort)
+    assert(driverServicePorts.head.getTargetPort.getIntVal === driverPort)
+    assert(driverServicePorts(1).getName === BLOCK_MANAGER_PORT_NAME)
+    assert(driverServicePorts(1).getPort.intValue() === blockManagerPort)
+    assert(driverServicePorts(1).getTargetPort.getIntVal === blockManagerPort)
+  }
+
+  private def verifySparkConfHostNames(
+      driverSparkConf: Map[String, String], expectedHostName: String): Unit = {
+    assert(driverSparkConf(
+      org.apache.spark.internal.config.DRIVER_HOST_ADDRESS.key) === expectedHostName)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala
new file mode 100644
index 0000000..27bff74
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import io.fabric8.kubernetes.api.model.{HasMetadata, PodBuilder, SecretBuilder}
+import org.mockito.Matchers
+import org.mockito.Mockito._
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+
+import org.apache.spark.deploy.k8s.SparkPod
+
+object KubernetesFeaturesTestUtils {
+
+  def getMockConfigStepForStepType[T <: KubernetesFeatureConfigStep](
+    stepType: String, stepClass: Class[T]): T = {
+    val mockStep = mock(stepClass)
+    when(mockStep.getAdditionalKubernetesResources()).thenReturn(
+      getSecretsForStepType(stepType))
+
+    when(mockStep.getAdditionalPodSystemProperties())
+      .thenReturn(Map(stepType -> stepType))
+    when(mockStep.configurePod(Matchers.any(classOf[SparkPod])))
+      .thenAnswer(new Answer[SparkPod]() {
+        override def answer(invocation: InvocationOnMock): SparkPod = {
+          val originalPod = invocation.getArgumentAt(0, classOf[SparkPod])
+          val configuredPod = new PodBuilder(originalPod.pod)
+            .editOrNewMetadata()
+            .addToLabels(stepType, stepType)
+            .endMetadata()
+            .build()
+          SparkPod(configuredPod, originalPod.container)
+        }
+      })
+    mockStep
+  }
+
+  def getSecretsForStepType[T <: KubernetesFeatureConfigStep](stepType: String)
+    : Seq[HasMetadata] = {
+    Seq(new SecretBuilder()
+      .withNewMetadata()
+      .withName(stepType)
+      .endMetadata()
+      .build())
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message