Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D5017200BC0 for ; Tue, 15 Nov 2016 08:47:00 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id D393E160B02; Tue, 15 Nov 2016 07:47:00 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A9DF7160B03 for ; Tue, 15 Nov 2016 08:46:59 +0100 (CET) Received: (qmail 95389 invoked by uid 500); 15 Nov 2016 07:46:58 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 95335 invoked by uid 99); 15 Nov 2016 07:46:58 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Nov 2016 07:46:58 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 99964E08B3; Tue, 15 Nov 2016 07:46:58 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rxin@apache.org To: commits@spark.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-18232][MESOS] Support CNI Date: Tue, 15 Nov 2016 07:46:58 +0000 (UTC) archived-at: Tue, 15 Nov 2016 07:47:01 -0000 Repository: spark Updated Branches: refs/heads/master 86430cc4e -> d89bfc923 [SPARK-18232][MESOS] Support CNI ## What changes were proposed in this pull request? Adds support for CNI-isolated containers ## How was this patch tested? I launched SparkPi both with and without `spark.mesos.network.name`, and verified the job completed successfully. Author: Michael Gummelt Closes #15740 from mgummelt/spark-342-cni. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d89bfc92 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d89bfc92 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d89bfc92 Branch: refs/heads/master Commit: d89bfc92302424406847ac7a9cfca714e6b742fc Parents: 86430cc Author: Michael Gummelt Authored: Mon Nov 14 23:46:54 2016 -0800 Committer: Reynold Xin Committed: Mon Nov 14 23:46:54 2016 -0800 ---------------------------------------------------------------------- docs/running-on-mesos.md | 27 +++-- .../cluster/mesos/MesosClusterScheduler.scala | 8 +- .../MesosCoarseGrainedSchedulerBackend.scala | 23 ++-- .../MesosFineGrainedSchedulerBackend.scala | 9 +- .../mesos/MesosSchedulerBackendUtil.scala | 120 +++++++++---------- .../mesos/MesosClusterSchedulerSuite.scala | 26 ++++ ...esosCoarseGrainedSchedulerBackendSuite.scala | 19 ++- 7 files changed, 131 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d89bfc92/docs/running-on-mesos.md ---------------------------------------------------------------------- diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 923d8db..8d5ad12 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -368,17 +368,6 @@ See the [configuration page](configuration.html) for information on Spark config - spark.mesos.executor.docker.portmaps - (none) - - Set the list of incoming ports exposed by the Docker image, which was set using - spark.mesos.executor.docker.image. The format of this property is a comma-separated list of - mappings which take the form: - -
host_port:container_port[:tcp|:udp]
- - - spark.mesos.executor.home driver side SPARK_HOME @@ -505,12 +494,26 @@ See the [configuration page](configuration.html) for information on Spark config Set the maximum number GPU resources to acquire for this job. Note that executors will still launch when no GPU resources are found since this configuration is just a upper limit and not a guaranteed amount. + + + spark.mesos.network.name + (none) + + Attach containers to the given named network. If this job is + launched in cluster mode, also launch the driver in the given named + network. See + the Mesos CNI docs + for more details. + spark.mesos.fetcherCache.enable false - If set to `true`, all URIs (example: `spark.executor.uri`, `spark.mesos.uris`) will be cached by the [Mesos fetcher cache](http://mesos.apache.org/documentation/latest/fetcher/) + If set to `true`, all URIs (example: `spark.executor.uri`, + `spark.mesos.uris`) will be cached by the Mesos + Fetcher Cache http://git-wip-us.apache.org/repos/asf/spark/blob/d89bfc92/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala ---------------------------------------------------------------------- diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index 8db1d12..f384290 100644 --- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -531,13 +531,7 @@ private[spark] class MesosClusterScheduler( .setCommand(buildDriverCommand(desc)) .addAllResources(cpuResourcesToUse.asJava) .addAllResources(memResourcesToUse.asJava) - - desc.conf.getOption("spark.mesos.executor.docker.image").foreach { image => - MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(image, - desc.conf, - taskInfo.getContainerBuilder) - } - + taskInfo.setContainer(MesosSchedulerBackendUtil.containerInfo(desc.conf)) taskInfo.build } http://git-wip-us.apache.org/repos/asf/spark/blob/d89bfc92/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 842c05e..3258b09 100644 --- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -213,7 +213,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( .format(prefixEnv, runScript) + s" --driver-url $driverURL" + s" --executor-id $taskId" + - s" --hostname ${offer.getHostname}" + + s" --hostname ${executorHostname(offer)}" + s" --cores $numCores" + s" --app-id $appId") } else { @@ -225,7 +225,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" + s" --driver-url $driverURL" + s" --executor-id $taskId" + - s" --hostname ${offer.getHostname}" + + s" --hostname ${executorHostname(offer)}" + s" --cores $numCores" + s" --app-id $appId") command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get).setCache(useFetcherCache)) @@ -418,16 +418,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( .setSlaveId(offer.getSlaveId) .setCommand(createCommand(offer, taskCPUs + extraCoresPerExecutor, taskId)) .setName("Task " + taskId) - taskBuilder.addAllResources(resourcesToUse.asJava) - - sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image => - MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo( - image, - sc.conf, - taskBuilder.getContainerBuilder - ) - } + taskBuilder.setContainer(MesosSchedulerBackendUtil.containerInfo(sc.conf)) tasks(offer.getId) ::= taskBuilder.build() remainingResources(offerId) = resourcesLeft.asJava @@ -658,6 +650,15 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( private def numExecutors(): Int = { slaves.values.map(_.taskIDs.size).sum } + + private def executorHostname(offer: Offer): String = { + if (sc.conf.getOption("spark.mesos.network.name").isDefined) { + // The agent's IP is not visible in a CNI container, so we bind to 0.0.0.0 + "0.0.0.0" + } else { + offer.getHostname + } + } } private class Slave(val hostname: String) { http://git-wip-us.apache.org/repos/asf/spark/blob/d89bfc92/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala index c1aa001..779ffb5 100644 --- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala +++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala @@ -155,14 +155,7 @@ private[spark] class MesosFineGrainedSchedulerBackend( .setCommand(command) .setData(ByteString.copyFrom(createExecArg())) - sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image => - MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo( - image, - sc.conf, - executorInfo.getContainerBuilder() - ) - } - + executorInfo.setContainer(MesosSchedulerBackendUtil.containerInfo(sc.conf)) (executorInfo.build(), resourcesAfterMem.asJava) } http://git-wip-us.apache.org/repos/asf/spark/blob/d89bfc92/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala ---------------------------------------------------------------------- diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala index 3fe0674..a2adb22 100644 --- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala +++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala @@ -17,8 +17,8 @@ package org.apache.spark.scheduler.cluster.mesos -import org.apache.mesos.Protos.{ContainerInfo, Image, Volume} -import org.apache.mesos.Protos.ContainerInfo.DockerInfo +import org.apache.mesos.Protos.{ContainerInfo, Image, NetworkInfo, Volume} +import org.apache.mesos.Protos.ContainerInfo.{DockerInfo, MesosInfo} import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging @@ -99,67 +99,67 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging { .toList } - /** - * Construct a DockerInfo structure and insert it into a ContainerInfo - */ - def addDockerInfo( - container: ContainerInfo.Builder, - image: String, - containerizer: String, - forcePullImage: Boolean = false, - volumes: Option[List[Volume]] = None, - portmaps: Option[List[ContainerInfo.DockerInfo.PortMapping]] = None): Unit = { - - containerizer match { - case "docker" => - container.setType(ContainerInfo.Type.DOCKER) - val docker = ContainerInfo.DockerInfo.newBuilder() - .setImage(image) - .setForcePullImage(forcePullImage) - // TODO (mgummelt): Remove this. Portmaps have no effect, - // as we don't support bridge networking. - portmaps.foreach(_.foreach(docker.addPortMappings)) - container.setDocker(docker) - case "mesos" => - container.setType(ContainerInfo.Type.MESOS) - val imageProto = Image.newBuilder() - .setType(Image.Type.DOCKER) - .setDocker(Image.Docker.newBuilder().setName(image)) - .setCached(!forcePullImage) - container.setMesos(ContainerInfo.MesosInfo.newBuilder().setImage(imageProto)) - case _ => - throw new SparkException( - "spark.mesos.containerizer must be one of {\"docker\", \"mesos\"}") + def containerInfo(conf: SparkConf): ContainerInfo = { + val containerType = if (conf.contains("spark.mesos.executor.docker.image") && + conf.get("spark.mesos.containerizer", "docker") == "docker") { + ContainerInfo.Type.DOCKER + } else { + ContainerInfo.Type.MESOS } - volumes.foreach(_.foreach(container.addVolumes)) + val containerInfo = ContainerInfo.newBuilder() + .setType(containerType) + + conf.getOption("spark.mesos.executor.docker.image").map { image => + val forcePullImage = conf + .getOption("spark.mesos.executor.docker.forcePullImage") + .exists(_.equals("true")) + + val portMaps = conf + .getOption("spark.mesos.executor.docker.portmaps") + .map(parsePortMappingsSpec) + .getOrElse(List.empty) + + if (containerType == ContainerInfo.Type.DOCKER) { + containerInfo.setDocker(dockerInfo(image, forcePullImage, portMaps)) + } else { + containerInfo.setMesos(mesosInfo(image, forcePullImage)) + } + + val volumes = conf + .getOption("spark.mesos.executor.docker.volumes") + .map(parseVolumesSpec) + + volumes.foreach(_.foreach(containerInfo.addVolumes(_))) + } + + conf.getOption("spark.mesos.network.name").map { name => + val info = NetworkInfo.newBuilder().setName(name).build() + containerInfo.addNetworkInfos(info) + } + + containerInfo.build() } - /** - * Setup a docker containerizer from MesosDriverDescription scheduler properties - */ - def setupContainerBuilderDockerInfo( - imageName: String, - conf: SparkConf, - builder: ContainerInfo.Builder): Unit = { - val forcePullImage = conf - .getOption("spark.mesos.executor.docker.forcePullImage") - .exists(_.equals("true")) - val volumes = conf - .getOption("spark.mesos.executor.docker.volumes") - .map(parseVolumesSpec) - val portmaps = conf - .getOption("spark.mesos.executor.docker.portmaps") - .map(parsePortMappingsSpec) - - val containerizer = conf.get("spark.mesos.containerizer", "docker") - addDockerInfo( - builder, - imageName, - containerizer, - forcePullImage = forcePullImage, - volumes = volumes, - portmaps = portmaps) - logDebug("setupContainerDockerInfo: using docker image: " + imageName) + private def dockerInfo( + image: String, + forcePullImage: Boolean, + portMaps: List[ContainerInfo.DockerInfo.PortMapping]): DockerInfo = { + val dockerBuilder = ContainerInfo.DockerInfo.newBuilder() + .setImage(image) + .setForcePullImage(forcePullImage) + portMaps.foreach(dockerBuilder.addPortMappings(_)) + + dockerBuilder.build + } + + private def mesosInfo(image: String, forcePullImage: Boolean): MesosInfo = { + val imageProto = Image.newBuilder() + .setType(Image.Type.DOCKER) + .setDocker(Image.Docker.newBuilder().setName(image)) + .setCached(!forcePullImage) + ContainerInfo.MesosInfo.newBuilder() + .setImage(imageProto) + .build } } http://git-wip-us.apache.org/repos/asf/spark/blob/d89bfc92/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala ---------------------------------------------------------------------- diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala index 87d9080..74e5ce2 100644 --- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala +++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala @@ -210,4 +210,30 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi (v.getName, v.getValue)).toMap assert(env.getOrElse("TEST_ENV", null) == "TEST_VAL") } + + test("supports spark.mesos.network.name") { + setScheduler() + + val mem = 1000 + val cpu = 1 + + val response = scheduler.submitDriver( + new MesosDriverDescription("d1", "jar", mem, cpu, true, + command, + Map("spark.mesos.executor.home" -> "test", + "spark.app.name" -> "test", + "spark.mesos.network.name" -> "test-network-name"), + "s1", + new Date())) + + assert(response.success) + + val offer = Utils.createOffer("o1", "s1", mem, cpu) + scheduler.resourceOffers(driver, List(offer).asJava) + + val launchedTasks = Utils.verifyTaskLaunched(driver, "o1") + val networkInfos = launchedTasks.head.getContainer.getNetworkInfosList + assert(networkInfos.size == 1) + assert(networkInfos.get(0).getName == "test-network-name") + } } http://git-wip-us.apache.org/repos/asf/spark/blob/d89bfc92/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala ---------------------------------------------------------------------- diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index f73638f..a674da4 100644 --- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -388,9 +388,6 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite val dockerInfo = containerInfo.getDocker - assert(dockerInfo.getImage == "some_image") - assert(dockerInfo.getForcePullImage) - val portMappings = dockerInfo.getPortMappingsList.asScala assert(portMappings.size == 1) @@ -491,6 +488,22 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite assert(!uris.asScala.head.getCache) } + test("mesos supports spark.mesos.network.name") { + setBackend(Map( + "spark.mesos.network.name" -> "test-network-name" + )) + + val (mem, cpu) = (backend.executorMemory(sc), 4) + + val offer1 = createOffer("o1", "s1", mem, cpu) + backend.resourceOffers(driver, List(offer1).asJava) + + val launchedTasks = verifyTaskLaunched(driver, "o1") + val networkInfos = launchedTasks.head.getContainer.getNetworkInfosList + assert(networkInfos.size == 1) + assert(networkInfos.get(0).getName == "test-network-name") + } + private case class Resources(mem: Int, cpus: Int, gpus: Int = 0) private def verifyDeclinedOffer(driver: SchedulerDriver, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org