From commits-return-34388-archive-asf-public=cust-asf.ponee.io@spark.apache.org Tue Oct 30 21:52:52 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 1A040180652 for ; Tue, 30 Oct 2018 21:52:49 +0100 (CET) Received: (qmail 97249 invoked by uid 500); 30 Oct 2018 20:52:49 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 97238 invoked by uid 99); 30 Oct 2018 20:52:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 30 Oct 2018 20:52:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0E611DFE14; Tue, 30 Oct 2018 20:52:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mcheah@apache.org To: commits@spark.apache.org Message-Id: <9b8c95a1f7c14ab4a54fe7242f1f4bff@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-24434][K8S] pod template files Date: Tue, 30 Oct 2018 20:52:49 +0000 (UTC) Repository: spark Updated Branches: refs/heads/master c36537fcf -> f6cc354d8 [SPARK-24434][K8S] pod template files ## What changes were proposed in this pull request? New feature to pass podspec files for driver and executor pods. ## How was this patch tested? new unit and integration tests - [x] more overwrites in integration tests - [ ] invalid template integration test, documentation Author: Onur Satici Author: Yifei Huang Author: onursatici Closes #22146 from onursatici/pod-template. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6cc354d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6cc354d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6cc354d Branch: refs/heads/master Commit: f6cc354d83c2c9a757f9b507aadd4dbdc5825cca Parents: c36537f Author: Onur Satici Authored: Tue Oct 30 13:52:44 2018 -0700 Committer: mcheah Committed: Tue Oct 30 13:52:44 2018 -0700 ---------------------------------------------------------------------- docs/running-on-kubernetes.md | 180 +++++++++++++++++++ .../org/apache/spark/deploy/k8s/Config.scala | 24 +++ .../org/apache/spark/deploy/k8s/Constants.scala | 10 +- .../spark/deploy/k8s/KubernetesDriverSpec.scala | 7 - .../spark/deploy/k8s/KubernetesUtils.scala | 49 ++++- .../k8s/features/BasicDriverFeatureStep.scala | 8 +- .../k8s/features/BasicExecutorFeatureStep.scala | 10 +- .../k8s/features/PodTemplateConfigMapStep.scala | 72 ++++++++ .../submit/KubernetesClientApplication.scala | 6 +- .../k8s/submit/KubernetesDriverBuilder.scala | 40 ++++- .../k8s/ExecutorPodsLifecycleManager.scala | 1 - .../cluster/k8s/KubernetesClusterManager.scala | 14 +- .../cluster/k8s/KubernetesExecutorBuilder.scala | 38 +++- .../spark/deploy/k8s/KubernetesUtilsSuite.scala | 68 +++++++ .../features/BasicDriverFeatureStepSuite.scala | 2 +- .../PodTemplateConfigMapStepSuite.scala | 97 ++++++++++ .../submit/KubernetesDriverBuilderSuite.scala | 116 +++++++++++- .../k8s/submit/PodBuilderSuiteUtils.scala | 142 +++++++++++++++ .../k8s/ExecutorPodsLifecycleManagerSuite.scala | 4 - .../k8s/KubernetesExecutorBuilderSuite.scala | 41 ++++- .../src/test/resources/driver-template.yml | 26 +++ .../src/test/resources/executor-template.yml | 25 +++ .../k8s/integrationtest/KubernetesSuite.scala | 8 +- .../k8s/integrationtest/PodTemplateSuite.scala | 55 ++++++ 24 files changed, 991 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/docs/running-on-kubernetes.md ---------------------------------------------------------------------- diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 7093ee5..2917197 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -186,6 +186,22 @@ To use a secret through an environment variable use the following options to the --conf spark.kubernetes.executor.secretKeyRef.ENV_NAME=name:key ``` +## Pod Template +Kubernetes allows defining pods from [template files](https://kubernetes.io/docs/concepts/workloads/pods/pod-overview/#pod-templates). +Spark users can similarly use template files to define the driver or executor pod configurations that Spark configurations do not support. +To do so, specify the spark properties `spark.kubernetes.driver.podTemplateFile` and `spark.kubernetes.executor.podTemplateFile` +to point to local files accessible to the `spark-submit` process. To allow the driver pod access the executor pod template +file, the file will be automatically mounted onto a volume in the driver pod when it's created. +Spark does not do any validation after unmarshalling these template files and relies on the Kubernetes API server for validation. + +It is important to note that Spark is opinionated about certain pod configurations so there are values in the +pod template that will always be overwritten by Spark. Therefore, users of this feature should note that specifying +the pod template file only lets Spark start with a template pod instead of an empty pod during the pod-building process. +For details, see the [full list](#pod-template-properties) of pod template values that will be overwritten by spark. + +Pod template files can also define multiple containers. In such cases, Spark will always assume that the first container in +the list will be the driver or executor container. + ## Using Kubernetes Volumes Starting with Spark 2.4.0, users can mount the following types of Kubernetes [volumes](https://kubernetes.io/docs/concepts/storage/volumes/) into the driver and executor pods: @@ -863,4 +879,168 @@ specific to Spark on Kubernetes. to provide any kerberos credentials for launching a job. + + spark.kubernetes.driver.podTemplateFile + (none) + + Specify the local file that contains the driver [pod template](#pod-template). For example + spark.kubernetes.driver.podTemplateFile=/path/to/driver-pod-template.yaml` + + + + spark.kubernetes.executor.podTemplateFile + (none) + + Specify the local file that contains the executor [pod template](#pod-template). For example + spark.kubernetes.executor.podTemplateFile=/path/to/executor-pod-template.yaml` + + + + +#### Pod template properties + +See the below table for the full list of pod specifications that will be overwritten by spark. + +### Pod Metadata + + + + + + + + + + + + + + + + + + + + + + + +
Pod metadata keyModified valueDescription
nameValue of spark.kubernetes.driver.pod.name + The driver pod name will be overwritten with either the configured or default value of + spark.kubernetes.driver.pod.name. The executor pod names will be unaffected. +
namespaceValue of spark.kubernetes.namespace + Spark makes strong assumptions about the driver and executor namespaces. Both driver and executor namespaces will + be replaced by either the configured or default spark conf value. +
labelsAdds the labels from spark.kubernetes.{driver,executor}.label.* + Spark will add additional labels specified by the spark configuration. +
annotationsAdds the annotations from spark.kubernetes.{driver,executor}.annotation.* + Spark will add additional labels specified by the spark configuration. +
+ +### Pod Spec + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Pod spec keyModified valueDescription
imagePullSecretsAdds image pull secrets from spark.kubernetes.container.image.pullSecrets + Additional pull secrets will be added from the spark configuration to both executor pods. +
nodeSelectorAdds node selectors from spark.kubernetes.node.selector.* + Additional node selectors will be added from the spark configuration to both executor pods. +
restartPolicy"never" + Spark assumes that both drivers and executors never restart. +
serviceAccountValue of spark.kubernetes.authenticate.driver.serviceAccountName + Spark will override serviceAccount with the value of the spark configuration for only + driver pods, and only if the spark configuration is specified. Executor pods will remain unaffected. +
serviceAccountNameValue of spark.kubernetes.authenticate.driver.serviceAccountName + Spark will override serviceAccountName with the value of the spark configuration for only + driver pods, and only if the spark configuration is specified. Executor pods will remain unaffected. +
volumesAdds volumes from spark.kubernetes.{driver,executor}.volumes.[VolumeType].[VolumeName].mount.path + Spark will add volumes as specified by the spark conf, as well as additional volumes necessary for passing + spark conf and pod template files. +
+ +### Container spec + +The following affect the driver and executor containers. All other containers in the pod spec will be unaffected. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Container spec keyModified valueDescription
envAdds env variables from spark.kubernetes.driverEnv.[EnvironmentVariableName] + Spark will add driver env variables from spark.kubernetes.driverEnv.[EnvironmentVariableName], and + executor env variables from spark.executorEnv.[EnvironmentVariableName]. +
imageValue of spark.kubernetes.{driver,executor}.container.image + The image will be defined by the spark configurations. +
imagePullPolicyValue of spark.kubernetes.container.image.pullPolicy + Spark will override the pull policy for both driver and executors. +
nameSee description. + The container name will be assigned by spark ("spark-kubernetes-driver" for the driver container, and + "executor" for each executor container) if not defined by the pod template. If the container is defined by the + template, the template's name will be used. +
resourcesSee description + The cpu limits are set by spark.kubernetes.{driver,executor}.limit.cores. The cpu is set by + spark.{driver,executor}.cores. The memory request and limit are set by summing the values of + spark.{driver,executor}.memory and spark.{driver,executor}.memoryOverhead. + +
volumeMountsAdd volumes from spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.{path,readOnly} + Spark will add volumes as specified by the spark conf, as well as additional volumes necessary for passing + spark conf and pod template files. +
http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index fff8fa4..862f1d6 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -278,6 +278,30 @@ private[spark] object Config extends Logging { .booleanConf .createWithDefault(false) + val KUBERNETES_DRIVER_PODTEMPLATE_FILE = + ConfigBuilder("spark.kubernetes.driver.podTemplateFile") + .doc("File containing a template pod spec for the driver") + .stringConf + .createOptional + + val KUBERNETES_EXECUTOR_PODTEMPLATE_FILE = + ConfigBuilder("spark.kubernetes.executor.podTemplateFile") + .doc("File containing a template pod spec for executors") + .stringConf + .createOptional + + val KUBERNETES_DRIVER_PODTEMPLATE_CONTAINER_NAME = + ConfigBuilder("spark.kubernetes.driver.podTemplateContainerName") + .doc("container name to be used as a basis for the driver in the given pod template") + .stringConf + .createOptional + + val KUBERNETES_EXECUTOR_PODTEMPLATE_CONTAINER_NAME = + ConfigBuilder("spark.kubernetes.executor.podTemplateContainerName") + .doc("container name to be used as a basis for executors in the given pod template") + .stringConf + .createOptional + val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX = "spark.kubernetes.authenticate.submission" http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala index 172a905..1c6d53c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala @@ -76,9 +76,17 @@ private[spark] object Constants { val ENV_R_PRIMARY = "R_PRIMARY" val ENV_R_ARGS = "R_APP_ARGS" + // Pod spec templates + val EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME = "pod-spec-template.yml" + val EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH = "/opt/spark/pod-template" + val POD_TEMPLATE_VOLUME = "pod-template-volume" + val POD_TEMPLATE_CONFIGMAP = "podspec-configmap" + val POD_TEMPLATE_KEY = "podspec-configmap-key" + // Miscellaneous val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc" - val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver" + val DEFAULT_DRIVER_CONTAINER_NAME = "spark-kubernetes-driver" + val DEFAULT_EXECUTOR_CONTAINER_NAME = "spark-kubernetes-executor" val MEMORY_OVERHEAD_MIN_MIB = 384L // Hadoop Configuration http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesDriverSpec.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesDriverSpec.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesDriverSpec.scala index 0c5ae02..fce8c6a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesDriverSpec.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesDriverSpec.scala @@ -22,10 +22,3 @@ private[spark] case class KubernetesDriverSpec( pod: SparkPod, driverKubernetesResources: Seq[HasMetadata], systemProperties: Map[String, String]) - -private[spark] object KubernetesDriverSpec { - def initialSpec(initialProps: Map[String, String]): KubernetesDriverSpec = KubernetesDriverSpec( - SparkPod.initialPod(), - Seq.empty, - initialProps) -} http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index 0f74045..6fafac3 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -16,14 +16,18 @@ */ package org.apache.spark.deploy.k8s +import java.io.File + import scala.collection.JavaConverters._ -import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod} +import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, PodBuilder} +import io.fabric8.kubernetes.client.KubernetesClient import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.internal.Logging import org.apache.spark.util.Utils -private[spark] object KubernetesUtils { +private[spark] object KubernetesUtils extends Logging { /** * Extract and parse Spark configuration properties with a given name prefix and @@ -82,6 +86,47 @@ private[spark] object KubernetesUtils { } } + def loadPodFromTemplate( + kubernetesClient: KubernetesClient, + templateFile: File, + containerName: Option[String]): SparkPod = { + try { + val pod = kubernetesClient.pods().load(templateFile).get() + selectSparkContainer(pod, containerName) + } catch { + case e: Exception => + logError( + s"Encountered exception while attempting to load initial pod spec from file", e) + throw new SparkException("Could not load pod from template file.", e) + } + } + + def selectSparkContainer(pod: Pod, containerName: Option[String]): SparkPod = { + def selectNamedContainer( + containers: List[Container], name: String): Option[(Container, List[Container])] = + containers.partition(_.getName == name) match { + case (sparkContainer :: Nil, rest) => Some((sparkContainer, rest)) + case _ => + logWarning( + s"specified container ${name} not found on pod template, " + + s"falling back to taking the first container") + Option.empty + } + val containers = pod.getSpec.getContainers.asScala.toList + containerName + .flatMap(selectNamedContainer(containers, _)) + .orElse(containers.headOption.map((_, containers.tail))) + .map { + case (sparkContainer: Container, rest: List[Container]) => SparkPod( + new PodBuilder(pod) + .editSpec() + .withContainers(rest.asJava) + .endSpec() + .build(), + sparkContainer) + }.getOrElse(SparkPod(pod, new ContainerBuilder().build())) + } + def parseMasterUrl(url: String): String = url.substring("k8s://".length) def formatPairsBundle(pairs: Seq[(String, String)], indent: Int = 1) : String = { http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index 575bc54..96b14a0 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -80,7 +80,7 @@ private[spark] class BasicDriverFeatureStep( ) val driverUIPort = SparkUI.getUIPort(conf.sparkConf) val driverContainer = new ContainerBuilder(pod.container) - .withName(DRIVER_CONTAINER_NAME) + .withName(Option(pod.container.getName).getOrElse(DEFAULT_DRIVER_CONTAINER_NAME)) .withImage(driverContainerImage) .withImagePullPolicy(conf.imagePullPolicy()) .addNewPort() @@ -105,7 +105,7 @@ private[spark] class BasicDriverFeatureStep( .withNewFieldRef("v1", "status.podIP") .build()) .endEnv() - .withNewResources() + .editOrNewResources() .addToRequests("cpu", driverCpuQuantity) .addToLimits(maybeCpuLimitQuantity.toMap.asJava) .addToRequests("memory", driverMemoryQuantity) @@ -119,9 +119,9 @@ private[spark] class BasicDriverFeatureStep( .addToLabels(conf.roleLabels.asJava) .addToAnnotations(conf.roleAnnotations.asJava) .endMetadata() - .withNewSpec() + .editOrNewSpec() .withRestartPolicy("Never") - .withNodeSelector(conf.nodeSelector().asJava) + .addToNodeSelector(conf.nodeSelector().asJava) .addToImagePullSecrets(conf.imagePullSecrets(): _*) .endSpec() .build() http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index d89995b..1dab2a8 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -139,10 +139,10 @@ private[spark] class BasicExecutorFeatureStep( } val executorContainer = new ContainerBuilder(pod.container) - .withName("executor") + .withName(Option(pod.container.getName).getOrElse(DEFAULT_EXECUTOR_CONTAINER_NAME)) .withImage(executorContainerImage) .withImagePullPolicy(kubernetesConf.imagePullPolicy()) - .withNewResources() + .editOrNewResources() .addToRequests("memory", executorMemoryQuantity) .addToLimits("memory", executorMemoryQuantity) .addToRequests("cpu", executorCpuQuantity) @@ -173,14 +173,14 @@ private[spark] class BasicExecutorFeatureStep( val executorPod = new PodBuilder(pod.pod) .editOrNewMetadata() .withName(name) - .withLabels(kubernetesConf.roleLabels.asJava) - .withAnnotations(kubernetesConf.roleAnnotations.asJava) + .addToLabels(kubernetesConf.roleLabels.asJava) + .addToAnnotations(kubernetesConf.roleAnnotations.asJava) .addToOwnerReferences(ownerReference.toSeq: _*) .endMetadata() .editOrNewSpec() .withHostname(hostname) .withRestartPolicy("Never") - .withNodeSelector(kubernetesConf.nodeSelector().asJava) + .addToNodeSelector(kubernetesConf.nodeSelector().asJava) .addToImagePullSecrets(kubernetesConf.imagePullSecrets(): _*) .endSpec() .build() http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala new file mode 100644 index 0000000..96a8013 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import java.io.File +import java.nio.charset.StandardCharsets + +import com.google.common.io.Files +import io.fabric8.kubernetes.api.model.{ConfigMapBuilder, ContainerBuilder, HasMetadata, PodBuilder} + +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesRoleSpecificConf, SparkPod} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ + +private[spark] class PodTemplateConfigMapStep( + conf: KubernetesConf[_ <: KubernetesRoleSpecificConf]) + extends KubernetesFeatureConfigStep { + def configurePod(pod: SparkPod): SparkPod = { + val podWithVolume = new PodBuilder(pod.pod) + .editSpec() + .addNewVolume() + .withName(POD_TEMPLATE_VOLUME) + .withNewConfigMap() + .withName(POD_TEMPLATE_CONFIGMAP) + .addNewItem() + .withKey(POD_TEMPLATE_KEY) + .withPath(EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME) + .endItem() + .endConfigMap() + .endVolume() + .endSpec() + .build() + + val containerWithVolume = new ContainerBuilder(pod.container) + .addNewVolumeMount() + .withName(POD_TEMPLATE_VOLUME) + .withMountPath(EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH) + .endVolumeMount() + .build() + SparkPod(podWithVolume, containerWithVolume) + } + + def getAdditionalPodSystemProperties(): Map[String, String] = Map[String, String]( + KUBERNETES_EXECUTOR_PODTEMPLATE_FILE.key -> + (EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH + "/" + EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME)) + + def getAdditionalKubernetesResources(): Seq[HasMetadata] = { + require(conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined) + val podTemplateFile = conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).get + val podTemplateString = Files.toString(new File(podTemplateFile), StandardCharsets.UTF_8) + Seq(new ConfigMapBuilder() + .withNewMetadata() + .withName(POD_TEMPLATE_CONFIGMAP) + .endMetadata() + .addToData(POD_TEMPLATE_KEY, podTemplateString) + .build()) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index c658756..4b58f8b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -17,7 +17,8 @@ package org.apache.spark.deploy.k8s.submit import java.io.StringWriter -import java.util.{Collections, Locale, UUID} +import java.util.{Collections, Locale, Properties, UUID} +import java.util.{Collections, UUID} import java.util.Properties import io.fabric8.kubernetes.api.model._ @@ -227,7 +228,6 @@ private[spark] class KubernetesClientApplication extends SparkApplication { clientArguments.driverArgs, clientArguments.maybePyFiles, clientArguments.hadoopConfigDir) - val builder = new KubernetesDriverBuilder val namespace = kubernetesConf.namespace() // The master URL has been checked for validity already in SparkSubmit. // We just need to get rid of the "k8s://" prefix here. @@ -244,7 +244,7 @@ private[spark] class KubernetesClientApplication extends SparkApplication { None, None)) { kubernetesClient => val client = new Client( - builder, + KubernetesDriverBuilder(kubernetesClient, kubernetesConf.sparkConf), kubernetesConf, kubernetesClient, waitForAppCompletion, http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala index b0b5332..5565cd7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala @@ -16,7 +16,12 @@ */ package org.apache.spark.deploy.k8s.submit -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf} +import java.io.File + +import io.fabric8.kubernetes.client.KubernetesClient + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.{Config, KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, KubernetesUtils, SparkPod} import org.apache.spark.deploy.k8s.features._ import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, PythonDriverFeatureStep, RDriverFeatureStep} @@ -55,7 +60,11 @@ private[spark] class KubernetesDriverBuilder( provideHadoopGlobalStep: ( KubernetesConf[KubernetesDriverSpecificConf] => KerberosConfDriverFeatureStep) = - new KerberosConfDriverFeatureStep(_)) { + new KerberosConfDriverFeatureStep(_), + providePodTemplateConfigMapStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf] + => PodTemplateConfigMapStep) = + new PodTemplateConfigMapStep(_), + provideInitialPod: () => SparkPod = SparkPod.initialPod) { def buildFromFeatures( kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]): KubernetesDriverSpec = { @@ -74,6 +83,10 @@ private[spark] class KubernetesDriverBuilder( val volumesFeature = if (kubernetesConf.roleVolumes.nonEmpty) { Seq(provideVolumesStep(kubernetesConf)) } else Nil + val podTemplateFeature = if ( + kubernetesConf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined) { + Seq(providePodTemplateConfigMapStep(kubernetesConf)) + } else Nil val bindingsStep = kubernetesConf.roleSpecificConf.mainAppResource.map { case JavaMainAppResource(_) => @@ -86,14 +99,17 @@ private[spark] class KubernetesDriverBuilder( val maybeHadoopConfigStep = kubernetesConf.hadoopConfSpec.map { _ => - provideHadoopGlobalStep(kubernetesConf)} + provideHadoopGlobalStep(kubernetesConf)} val allFeatures: Seq[KubernetesFeatureConfigStep] = (baseFeatures :+ bindingsStep) ++ secretFeature ++ envSecretFeature ++ volumesFeature ++ - maybeHadoopConfigStep.toSeq + maybeHadoopConfigStep.toSeq ++ podTemplateFeature - var spec = KubernetesDriverSpec.initialSpec(kubernetesConf.sparkConf.getAll.toMap) + var spec = KubernetesDriverSpec( + provideInitialPod(), + driverKubernetesResources = Seq.empty, + kubernetesConf.sparkConf.getAll.toMap) for (feature <- allFeatures) { val configuredPod = feature.configurePod(spec.pod) val addedSystemProperties = feature.getAdditionalPodSystemProperties() @@ -106,3 +122,17 @@ private[spark] class KubernetesDriverBuilder( spec } } + +private[spark] object KubernetesDriverBuilder { + def apply(kubernetesClient: KubernetesClient, conf: SparkConf): KubernetesDriverBuilder = { + conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_FILE) + .map(new File(_)) + .map(file => new KubernetesDriverBuilder(provideInitialPod = () => + KubernetesUtils.loadPodFromTemplate( + kubernetesClient, + file, + conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_CONTAINER_NAME)) + )) + .getOrElse(new KubernetesDriverBuilder()) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala index 1a75ae0..77a1d6c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala @@ -31,7 +31,6 @@ import org.apache.spark.util.Utils private[spark] class ExecutorPodsLifecycleManager( conf: SparkConf, - executorBuilder: KubernetesExecutorBuilder, kubernetesClient: KubernetesClient, snapshotsStore: ExecutorPodsSnapshotsStore, // Use a best-effort to track which executors have been removed already. It's not generally http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index 9999c62..ce10f76 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -69,6 +69,13 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit defaultServiceAccountToken, defaultServiceAccountCaCrt) + if (sc.conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined) { + KubernetesUtils.loadPodFromTemplate( + kubernetesClient, + new File(sc.conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).get), + sc.conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_CONTAINER_NAME)) + } + val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool( "kubernetes-executor-requests") @@ -81,13 +88,16 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit .build[java.lang.Long, java.lang.Long]() val executorPodsLifecycleEventHandler = new ExecutorPodsLifecycleManager( sc.conf, - new KubernetesExecutorBuilder(), kubernetesClient, snapshotsStore, removedExecutorsCache) val executorPodsAllocator = new ExecutorPodsAllocator( - sc.conf, new KubernetesExecutorBuilder(), kubernetesClient, snapshotsStore, new SystemClock()) + sc.conf, + KubernetesExecutorBuilder(kubernetesClient, sc.conf), + kubernetesClient, + snapshotsStore, + new SystemClock()) val podsWatchEventSource = new ExecutorPodsWatchSnapshotSource( snapshotsStore, http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala index 6199a8a..089f84d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala @@ -16,7 +16,12 @@ */ package org.apache.spark.scheduler.cluster.k8s -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, KubernetesRoleSpecificConf, SparkPod} +import java.io.File + +import io.fabric8.kubernetes.client.KubernetesClient + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.features._ @@ -35,19 +40,20 @@ private[spark] class KubernetesExecutorBuilder( new LocalDirsFeatureStep(_), provideVolumesStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf] => MountVolumesFeatureStep) = - new MountVolumesFeatureStep(_), + new MountVolumesFeatureStep(_), provideHadoopConfStep: ( KubernetesConf[KubernetesExecutorSpecificConf] - => HadoopConfExecutorFeatureStep) = - new HadoopConfExecutorFeatureStep(_), + => HadoopConfExecutorFeatureStep) = + new HadoopConfExecutorFeatureStep(_), provideKerberosConfStep: ( KubernetesConf[KubernetesExecutorSpecificConf] - => KerberosConfExecutorFeatureStep) = - new KerberosConfExecutorFeatureStep(_), + => KerberosConfExecutorFeatureStep) = + new KerberosConfExecutorFeatureStep(_), provideHadoopSparkUserStep: ( KubernetesConf[KubernetesExecutorSpecificConf] - => HadoopSparkUserExecutorFeatureStep) = - new HadoopSparkUserExecutorFeatureStep(_)) { + => HadoopSparkUserExecutorFeatureStep) = + new HadoopSparkUserExecutorFeatureStep(_), + provideInitialPod: () => SparkPod = SparkPod.initialPod) { def buildFromFeatures( kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf]): SparkPod = { @@ -85,10 +91,24 @@ private[spark] class KubernetesExecutorBuilder( volumesFeature ++ maybeHadoopConfFeatureSteps - var executorPod = SparkPod.initialPod() + var executorPod = provideInitialPod() for (feature <- allFeatures) { executorPod = feature.configurePod(executorPod) } executorPod } } + +private[spark] object KubernetesExecutorBuilder { + def apply(kubernetesClient: KubernetesClient, conf: SparkConf): KubernetesExecutorBuilder = { + conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE) + .map(new File(_)) + .map(file => new KubernetesExecutorBuilder(provideInitialPod = () => + KubernetesUtils.loadPodFromTemplate( + kubernetesClient, + file, + conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_CONTAINER_NAME)) + )) + .getOrElse(new KubernetesExecutorBuilder()) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala new file mode 100644 index 0000000..7c23158 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.k8s + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, PodBuilder} + +import org.apache.spark.SparkFunSuite + +class KubernetesUtilsSuite extends SparkFunSuite { + private val HOST = "test-host" + private val POD = new PodBuilder() + .withNewSpec() + .withHostname(HOST) + .withContainers( + new ContainerBuilder().withName("first").build(), + new ContainerBuilder().withName("second").build()) + .endSpec() + .build() + + test("Selects the given container as spark container.") { + val sparkPod = KubernetesUtils.selectSparkContainer(POD, Some("second")) + assert(sparkPod.pod.getSpec.getHostname == HOST) + assert(sparkPod.pod.getSpec.getContainers.asScala.toList.map(_.getName) == List("first")) + assert(sparkPod.container.getName == "second") + } + + test("Selects the first container if no container name is given.") { + val sparkPod = KubernetesUtils.selectSparkContainer(POD, Option.empty) + assert(sparkPod.pod.getSpec.getHostname == HOST) + assert(sparkPod.pod.getSpec.getContainers.asScala.toList.map(_.getName) == List("second")) + assert(sparkPod.container.getName == "first") + } + + test("Falls back to the first container if given container name does not exist.") { + val sparkPod = KubernetesUtils.selectSparkContainer(POD, Some("does-not-exist")) + assert(sparkPod.pod.getSpec.getHostname == HOST) + assert(sparkPod.pod.getSpec.getContainers.asScala.toList.map(_.getName) == List("second")) + assert(sparkPod.container.getName == "first") + } + + test("constructs spark pod correctly with pod template with no containers") { + val noContainersPod = new PodBuilder(POD).editSpec().withContainers().endSpec().build() + val sparkPod = KubernetesUtils.selectSparkContainer(noContainersPod, Some("does-not-exist")) + assert(sparkPod.pod.getSpec.getHostname == HOST) + assert(sparkPod.container.getName == null) + val sparkPodWithNoContainerName = + KubernetesUtils.selectSparkContainer(noContainersPod, Option.empty) + assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST) + assert(sparkPodWithNoContainerName.container.getName == null) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index eebdd15..5c6bcc7 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -84,7 +84,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { val basePod = SparkPod.initialPod() val configuredPod = featureStep.configurePod(basePod) - assert(configuredPod.container.getName === DRIVER_CONTAINER_NAME) + assert(configuredPod.container.getName === DEFAULT_DRIVER_CONTAINER_NAME) assert(configuredPod.container.getImage === "spark-driver:latest") assert(configuredPod.container.getImagePullPolicy === CONTAINER_IMAGE_PULL_POLICY) http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala new file mode 100644 index 0000000..d7bbbd1 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import java.io.{File, PrintWriter} +import java.nio.file.Files + +import io.fabric8.kubernetes.api.model.ConfigMap +import org.mockito.Mockito +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s._ + +class PodTemplateConfigMapStepSuite extends SparkFunSuite with BeforeAndAfter { + private var sparkConf: SparkConf = _ + private var kubernetesConf : KubernetesConf[_ <: KubernetesRoleSpecificConf] = _ + private var templateFile: File = _ + + before { + sparkConf = Mockito.mock(classOf[SparkConf]) + kubernetesConf = KubernetesConf( + sparkConf, + KubernetesDriverSpecificConf( + None, + "app-name", + "main", + Seq.empty), + "resource", + "app-id", + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Nil, + Seq.empty[String], + Option.empty) + templateFile = Files.createTempFile("pod-template", "yml").toFile + templateFile.deleteOnExit() + Mockito.doReturn(Option(templateFile.getAbsolutePath)).when(sparkConf) + .get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE) + } + + test("Mounts executor template volume if config specified") { + val writer = new PrintWriter(templateFile) + writer.write("pod-template-contents") + writer.close() + + val step = new PodTemplateConfigMapStep(kubernetesConf) + val configuredPod = step.configurePod(SparkPod.initialPod()) + + assert(configuredPod.pod.getSpec.getVolumes.size() === 1) + val volume = configuredPod.pod.getSpec.getVolumes.get(0) + assert(volume.getName === Constants.POD_TEMPLATE_VOLUME) + assert(volume.getConfigMap.getName === Constants.POD_TEMPLATE_CONFIGMAP) + assert(volume.getConfigMap.getItems.size() === 1) + assert(volume.getConfigMap.getItems.get(0).getKey === Constants.POD_TEMPLATE_KEY) + assert(volume.getConfigMap.getItems.get(0).getPath === + Constants.EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME) + + assert(configuredPod.container.getVolumeMounts.size() === 1) + val volumeMount = configuredPod.container.getVolumeMounts.get(0) + assert(volumeMount.getMountPath === Constants.EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH) + assert(volumeMount.getName === Constants.POD_TEMPLATE_VOLUME) + + val resources = step.getAdditionalKubernetesResources() + assert(resources.size === 1) + assert(resources.head.getMetadata.getName === Constants.POD_TEMPLATE_CONFIGMAP) + assert(resources.head.isInstanceOf[ConfigMap]) + val configMap = resources.head.asInstanceOf[ConfigMap] + assert(configMap.getData.size() === 1) + assert(configMap.getData.containsKey(Constants.POD_TEMPLATE_KEY)) + assert(configMap.getData.containsValue("pod-template-contents")) + + val systemProperties = step.getAdditionalPodSystemProperties() + assert(systemProperties.size === 1) + assert(systemProperties.contains(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE.key)) + assert(systemProperties.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE.key).get === + (Constants.EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH + "/" + + Constants.EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME)) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala index 051d7b6..84968c3 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala @@ -16,8 +16,13 @@ */ package org.apache.spark.deploy.k8s.submit -import org.apache.spark.{SparkConf, SparkFunSuite} +import io.fabric8.kubernetes.api.model.PodBuilder +import io.fabric8.kubernetes.client.KubernetesClient +import org.mockito.Mockito._ + +import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} import org.apache.spark.deploy.k8s._ +import org.apache.spark.deploy.k8s.Config.{CONTAINER_IMAGE, KUBERNETES_DRIVER_PODTEMPLATE_FILE, KUBERNETES_EXECUTOR_PODTEMPLATE_FILE} import org.apache.spark.deploy.k8s.features._ import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, PythonDriverFeatureStep, RDriverFeatureStep} @@ -34,6 +39,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { private val ENV_SECRETS_STEP_TYPE = "env-secrets" private val HADOOP_GLOBAL_STEP_TYPE = "hadoop-global" private val MOUNT_VOLUMES_STEP_TYPE = "mount-volumes" + private val TEMPLATE_VOLUME_STEP_TYPE = "template-volume" private val basicFeatureStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( BASIC_STEP_TYPE, classOf[BasicDriverFeatureStep]) @@ -68,6 +74,10 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { private val mountVolumesStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( MOUNT_VOLUMES_STEP_TYPE, classOf[MountVolumesFeatureStep]) + private val templateVolumeStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( + TEMPLATE_VOLUME_STEP_TYPE, classOf[PodTemplateConfigMapStep] + ) + private val builderUnderTest: KubernetesDriverBuilder = new KubernetesDriverBuilder( _ => basicFeatureStep, @@ -80,7 +90,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { _ => pythonStep, _ => rStep, _ => javaStep, - _ => hadoopGlobalStep) + _ => hadoopGlobalStep, + _ => templateVolumeStep) test("Apply fundamental steps all the time.") { val conf = KubernetesConf( @@ -252,6 +263,37 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { R_STEP_TYPE) } + test("Apply template volume step if executor template is present.") { + val sparkConf = spy(new SparkConf(false)) + doReturn(Option("filename")).when(sparkConf) + .get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE) + val conf = KubernetesConf( + sparkConf, + KubernetesDriverSpecificConf( + Some(JavaMainAppResource("example.jar")), + "test-app", + "main", + Seq.empty), + "prefix", + "appId", + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Nil, + Seq.empty[String], + Option.empty) + validateStepTypesApplied( + builderUnderTest.buildFromFeatures(conf), + BASIC_STEP_TYPE, + CREDENTIALS_STEP_TYPE, + SERVICE_STEP_TYPE, + LOCAL_DIRS_STEP_TYPE, + JAVA_STEP_TYPE, + TEMPLATE_VOLUME_STEP_TYPE) + } + test("Apply HadoopSteps if HADOOP_CONF_DIR is defined.") { val conf = KubernetesConf( new SparkConf(false), @@ -314,7 +356,6 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { HADOOP_GLOBAL_STEP_TYPE) } - private def validateStepTypesApplied(resolvedSpec: KubernetesDriverSpec, stepTypes: String*) : Unit = { assert(resolvedSpec.systemProperties.size === stepTypes.size) @@ -325,4 +366,73 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { assert(resolvedSpec.systemProperties(stepType) === stepType) } } + + test("Start with empty pod if template is not specified") { + val kubernetesClient = mock(classOf[KubernetesClient]) + val driverBuilder = KubernetesDriverBuilder.apply(kubernetesClient, new SparkConf()) + verify(kubernetesClient, never()).pods() + } + + test("Starts with template if specified") { + val kubernetesClient = PodBuilderSuiteUtils.loadingMockKubernetesClient() + val sparkConf = new SparkConf(false) + .set(CONTAINER_IMAGE, "spark-driver:latest") + .set(KUBERNETES_DRIVER_PODTEMPLATE_FILE, "template-file.yaml") + val kubernetesConf = new KubernetesConf( + sparkConf, + KubernetesDriverSpecificConf( + Some(JavaMainAppResource("example.jar")), + "test-app", + "main", + Seq.empty), + "prefix", + "appId", + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Nil, + Seq.empty[String], + Option.empty) + val driverSpec = KubernetesDriverBuilder + .apply(kubernetesClient, sparkConf) + .buildFromFeatures(kubernetesConf) + PodBuilderSuiteUtils.verifyPodWithSupportedFeatures(driverSpec.pod) + } + + test("Throws on misconfigured pod template") { + val kubernetesClient = PodBuilderSuiteUtils.loadingMockKubernetesClient( + new PodBuilder() + .withNewMetadata() + .addToLabels("test-label-key", "test-label-value") + .endMetadata() + .build()) + val sparkConf = new SparkConf(false) + .set(CONTAINER_IMAGE, "spark-driver:latest") + .set(KUBERNETES_DRIVER_PODTEMPLATE_FILE, "template-file.yaml") + val kubernetesConf = new KubernetesConf( + sparkConf, + KubernetesDriverSpecificConf( + Some(JavaMainAppResource("example.jar")), + "test-app", + "main", + Seq.empty), + "prefix", + "appId", + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Nil, + Seq.empty[String], + Option.empty) + val exception = intercept[SparkException] { + KubernetesDriverBuilder + .apply(kubernetesClient, sparkConf) + .buildFromFeatures(kubernetesConf) + } + assert(exception.getMessage.contains("Could not load pod from template file.")) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/PodBuilderSuiteUtils.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/PodBuilderSuiteUtils.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/PodBuilderSuiteUtils.scala new file mode 100644 index 0000000..c92e9e6 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/PodBuilderSuiteUtils.scala @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit + +import java.io.File + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.KubernetesClient +import io.fabric8.kubernetes.client.dsl.{MixedOperation, PodResource} +import org.mockito.Matchers.any +import org.mockito.Mockito.{mock, when} +import org.scalatest.FlatSpec +import scala.collection.JavaConverters._ + +import org.apache.spark.deploy.k8s.SparkPod + +object PodBuilderSuiteUtils extends FlatSpec { + + def loadingMockKubernetesClient(pod: Pod = podWithSupportedFeatures()): KubernetesClient = { + val kubernetesClient = mock(classOf[KubernetesClient]) + val pods = + mock(classOf[MixedOperation[Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]]]) + val podResource = mock(classOf[PodResource[Pod, DoneablePod]]) + when(kubernetesClient.pods()).thenReturn(pods) + when(pods.load(any(classOf[File]))).thenReturn(podResource) + when(podResource.get()).thenReturn(pod) + kubernetesClient + } + + def verifyPodWithSupportedFeatures(pod: SparkPod): Unit = { + val metadata = pod.pod.getMetadata + assert(metadata.getLabels.containsKey("test-label-key")) + assert(metadata.getAnnotations.containsKey("test-annotation-key")) + assert(metadata.getNamespace === "namespace") + assert(metadata.getOwnerReferences.asScala.exists(_.getName == "owner-reference")) + val spec = pod.pod.getSpec + assert(!spec.getContainers.asScala.exists(_.getName == "executor-container")) + assert(spec.getDnsPolicy === "dns-policy") + assert(spec.getHostAliases.asScala.exists(_.getHostnames.asScala.exists(_ == "hostname"))) + assert(spec.getImagePullSecrets.asScala.exists(_.getName == "local-reference")) + assert(spec.getInitContainers.asScala.exists(_.getName == "init-container")) + assert(spec.getNodeName == "node-name") + assert(spec.getNodeSelector.get("node-selector-key") === "node-selector-value") + assert(spec.getSchedulerName === "scheduler") + assert(spec.getSecurityContext.getRunAsUser === 1000L) + assert(spec.getServiceAccount === "service-account") + assert(spec.getSubdomain === "subdomain") + assert(spec.getTolerations.asScala.exists(_.getKey == "toleration-key")) + assert(spec.getVolumes.asScala.exists(_.getName == "test-volume")) + val container = pod.container + assert(container.getName === "executor-container") + assert(container.getArgs.contains("arg")) + assert(container.getCommand.equals(List("command").asJava)) + assert(container.getEnv.asScala.exists(_.getName == "env-key")) + assert(container.getResources.getLimits.get("gpu") === + new QuantityBuilder().withAmount("1").build()) + assert(container.getSecurityContext.getRunAsNonRoot) + assert(container.getStdin) + assert(container.getTerminationMessagePath === "termination-message-path") + assert(container.getTerminationMessagePolicy === "termination-message-policy") + assert(pod.container.getVolumeMounts.asScala.exists(_.getName == "test-volume")) + + } + + + def podWithSupportedFeatures(): Pod = new PodBuilder() + .withNewMetadata() + .addToLabels("test-label-key", "test-label-value") + .addToAnnotations("test-annotation-key", "test-annotation-value") + .withNamespace("namespace") + .addNewOwnerReference() + .withController(true) + .withName("owner-reference") + .endOwnerReference() + .endMetadata() + .withNewSpec() + .withDnsPolicy("dns-policy") + .withHostAliases(new HostAliasBuilder().withHostnames("hostname").build()) + .withImagePullSecrets( + new LocalObjectReferenceBuilder().withName("local-reference").build()) + .withInitContainers(new ContainerBuilder().withName("init-container").build()) + .withNodeName("node-name") + .withNodeSelector(Map("node-selector-key" -> "node-selector-value").asJava) + .withSchedulerName("scheduler") + .withNewSecurityContext() + .withRunAsUser(1000L) + .endSecurityContext() + .withServiceAccount("service-account") + .withSubdomain("subdomain") + .withTolerations(new TolerationBuilder() + .withKey("toleration-key") + .withOperator("Equal") + .withEffect("NoSchedule") + .build()) + .addNewVolume() + .withNewHostPath() + .withPath("/test") + .endHostPath() + .withName("test-volume") + .endVolume() + .addNewContainer() + .withArgs("arg") + .withCommand("command") + .addNewEnv() + .withName("env-key") + .withValue("env-value") + .endEnv() + .withImagePullPolicy("Always") + .withName("executor-container") + .withNewResources() + .withLimits(Map("gpu" -> new QuantityBuilder().withAmount("1").build()).asJava) + .endResources() + .withNewSecurityContext() + .withRunAsNonRoot(true) + .endSecurityContext() + .withStdin(true) + .withTerminationMessagePath("termination-message-path") + .withTerminationMessagePolicy("termination-message-policy") + .addToVolumeMounts( + new VolumeMountBuilder() + .withName("test-volume") + .withMountPath("/test") + .build()) + .endContainer() + .endSpec() + .build() + +} http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala index d840938..3995b2a 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala @@ -46,9 +46,6 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte private var podOperations: PODS = _ @Mock - private var executorBuilder: KubernetesExecutorBuilder = _ - - @Mock private var schedulerBackend: KubernetesClusterSchedulerBackend = _ private var snapshotsStore: DeterministicExecutorPodsSnapshotsStore = _ @@ -64,7 +61,6 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte when(podOperations.withName(any(classOf[String]))).thenAnswer(namedPodsAnswer()) eventHandlerUnderTest = new ExecutorPodsLifecycleManager( new SparkConf(), - executorBuilder, kubernetesClient, snapshotsStore, removedExecutorsCache) http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala index b572dac..fb2509f 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala @@ -16,12 +16,15 @@ */ package org.apache.spark.scheduler.cluster.k8s -import io.fabric8.kubernetes.api.model.PodBuilder +import io.fabric8.kubernetes.api.model.{Config => _, _} +import io.fabric8.kubernetes.client.KubernetesClient +import org.mockito.Mockito.{mock, never, verify} import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.features._ +import org.apache.spark.deploy.k8s.submit.PodBuilderSuiteUtils class KubernetesExecutorBuilderSuite extends SparkFunSuite { private val BASIC_STEP_TYPE = "basic" @@ -193,4 +196,40 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { assert(resolvedPod.pod.getMetadata.getLabels.get(stepType) === stepType) } } + + test("Starts with empty executor pod if template is not specified") { + val kubernetesClient = mock(classOf[KubernetesClient]) + val executorBuilder = KubernetesExecutorBuilder.apply(kubernetesClient, new SparkConf()) + verify(kubernetesClient, never()).pods() + } + + test("Starts with executor template if specified") { + val kubernetesClient = PodBuilderSuiteUtils.loadingMockKubernetesClient() + val sparkConf = new SparkConf(false) + .set("spark.driver.host", "https://driver.host.com") + .set(Config.CONTAINER_IMAGE, "spark-executor:latest") + .set(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE, "template-file.yaml") + val kubernetesConf = KubernetesConf( + sparkConf, + KubernetesExecutorSpecificConf( + "executor-id", Some(new PodBuilder() + .withNewMetadata() + .withName("driver") + .endMetadata() + .build())), + "prefix", + "appId", + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Nil, + Seq.empty[String], + Option.empty) + val sparkPod = KubernetesExecutorBuilder + .apply(kubernetesClient, sparkConf) + .buildFromFeatures(kubernetesConf) + PodBuilderSuiteUtils.verifyPodWithSupportedFeatures(sparkPod) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/integration-tests/src/test/resources/driver-template.yml ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/integration-tests/src/test/resources/driver-template.yml b/resource-managers/kubernetes/integration-tests/src/test/resources/driver-template.yml new file mode 100644 index 0000000..0c185be --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/resources/driver-template.yml @@ -0,0 +1,26 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +apiVersion: v1 +Kind: Pod +metadata: + labels: + template-label-key: driver-template-label-value +spec: + containers: + - name: test-driver-container + image: will-be-overwritten + http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/integration-tests/src/test/resources/executor-template.yml ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/integration-tests/src/test/resources/executor-template.yml b/resource-managers/kubernetes/integration-tests/src/test/resources/executor-template.yml new file mode 100644 index 0000000..0282e23 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/resources/executor-template.yml @@ -0,0 +1,25 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +apiVersion: v1 +Kind: Pod +metadata: + labels: + template-label-key: executor-template-label-value +spec: + containers: + - name: test-executor-container + image: will-be-overwritten http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index c99a907..e2e5880 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -38,7 +38,7 @@ import org.apache.spark.internal.Logging private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite - with PythonTestsSuite with ClientModeTestsSuite + with PythonTestsSuite with ClientModeTestsSuite with PodTemplateSuite with Logging with Eventually with Matchers { import KubernetesSuite._ @@ -288,21 +288,21 @@ private[spark] class KubernetesSuite extends SparkFunSuite protected def doBasicExecutorPodCheck(executorPod: Pod): Unit = { assert(executorPod.getSpec.getContainers.get(0).getImage === image) - assert(executorPod.getSpec.getContainers.get(0).getName === "executor") + assert(executorPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-executor") assert(executorPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount === baseMemory) } protected def doBasicExecutorPyPodCheck(executorPod: Pod): Unit = { assert(executorPod.getSpec.getContainers.get(0).getImage === pyImage) - assert(executorPod.getSpec.getContainers.get(0).getName === "executor") + assert(executorPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-executor") assert(executorPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount === standardNonJVMMemory) } protected def doBasicExecutorRPodCheck(executorPod: Pod): Unit = { assert(executorPod.getSpec.getContainers.get(0).getImage === rImage) - assert(executorPod.getSpec.getContainers.get(0).getName === "executor") + assert(executorPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-executor") assert(executorPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount === standardNonJVMMemory) } http://git-wip-us.apache.org/repos/asf/spark/blob/f6cc354d/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PodTemplateSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PodTemplateSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PodTemplateSuite.scala new file mode 100644 index 0000000..e5a847e --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PodTemplateSuite.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import java.io.File + +import io.fabric8.kubernetes.api.model.Pod + +import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.k8sTestTag + +private[spark] trait PodTemplateSuite { k8sSuite: KubernetesSuite => + + import PodTemplateSuite._ + + test("Start pod creation from template", k8sTestTag) { + sparkAppConf + .set("spark.kubernetes.driver.podTemplateFile", DRIVER_TEMPLATE_FILE.getAbsolutePath) + .set("spark.kubernetes.executor.podTemplateFile", EXECUTOR_TEMPLATE_FILE.getAbsolutePath) + runSparkPiAndVerifyCompletion( + driverPodChecker = (driverPod: Pod) => { + assert(driverPod.getMetadata.getName === driverPodName) + assert(driverPod.getSpec.getContainers.get(0).getImage === image) + assert(driverPod.getSpec.getContainers.get(0).getName === "test-driver-container") + assert(driverPod.getMetadata.getLabels.containsKey(LABEL_KEY)) + assert(driverPod.getMetadata.getLabels.get(LABEL_KEY) === "driver-template-label-value") + }, + executorPodChecker = (executorPod: Pod) => { + assert(executorPod.getSpec.getContainers.get(0).getImage === image) + assert(executorPod.getSpec.getContainers.get(0).getName === "test-executor-container") + assert(executorPod.getMetadata.getLabels.containsKey(LABEL_KEY)) + assert(executorPod.getMetadata.getLabels.get(LABEL_KEY) === "executor-template-label-value") + } + ) + } +} + +private[spark] object PodTemplateSuite { + val LABEL_KEY = "template-label-key" + val DRIVER_TEMPLATE_FILE = new File(getClass.getResource("/driver-template.yml").getFile) + val EXECUTOR_TEMPLATE_FILE = new File(getClass.getResource("/executor-template.yml").getFile) +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org