spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From van...@apache.org
Subject [02/18] spark git commit: [SPARK-18662] Move resource managers to separate directory
Date Wed, 07 Dec 2016 00:23:39 GMT
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
deleted file mode 100644
index 7deaf0a..0000000
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ /dev/null
@@ -1,462 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.io.{File, FileInputStream, FileOutputStream}
-import java.net.URI
-import java.util.Properties
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.{HashMap => MutableHashMap}
-import scala.reflect.ClassTag
-import scala.util.Try
-
-import org.apache.commons.lang3.SerializationUtils
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapreduce.MRJobConfig
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.client.api.YarnClientApplication
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.util.Records
-import org.mockito.Matchers.{eq => meq, _}
-import org.mockito.Mockito._
-import org.scalatest.{BeforeAndAfterAll, Matchers}
-
-import org.apache.spark.{SparkConf, SparkFunSuite, TestUtils}
-import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.util.{ResetSystemProperties, SparkConfWithEnv, Utils}
-
-class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
-  with ResetSystemProperties {
-
-  import Client._
-
-  var oldSystemProperties: Properties = null
-
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-    oldSystemProperties = SerializationUtils.clone(System.getProperties)
-    System.setProperty("SPARK_YARN_MODE", "true")
-  }
-
-  override def afterAll(): Unit = {
-    try {
-      System.setProperties(oldSystemProperties)
-      oldSystemProperties = null
-    } finally {
-      super.afterAll()
-    }
-  }
-
-  test("default Yarn application classpath") {
-    getDefaultYarnApplicationClasspath should be(Some(Fixtures.knownDefYarnAppCP))
-  }
-
-  test("default MR application classpath") {
-    getDefaultMRApplicationClasspath should be(Some(Fixtures.knownDefMRAppCP))
-  }
-
-  test("resultant classpath for an application that defines a classpath for YARN") {
-    withAppConf(Fixtures.mapYARNAppConf) { conf =>
-      val env = newEnv
-      populateHadoopClasspath(conf, env)
-      classpath(env) should be(
-        flatten(Fixtures.knownYARNAppCP, getDefaultMRApplicationClasspath))
-    }
-  }
-
-  test("resultant classpath for an application that defines a classpath for MR") {
-    withAppConf(Fixtures.mapMRAppConf) { conf =>
-      val env = newEnv
-      populateHadoopClasspath(conf, env)
-      classpath(env) should be(
-        flatten(getDefaultYarnApplicationClasspath, Fixtures.knownMRAppCP))
-    }
-  }
-
-  test("resultant classpath for an application that defines both classpaths, YARN and MR") {
-    withAppConf(Fixtures.mapAppConf) { conf =>
-      val env = newEnv
-      populateHadoopClasspath(conf, env)
-      classpath(env) should be(flatten(Fixtures.knownYARNAppCP, Fixtures.knownMRAppCP))
-    }
-  }
-
-  private val SPARK = "local:/sparkJar"
-  private val USER = "local:/userJar"
-  private val ADDED = "local:/addJar1,local:/addJar2,/addJar3"
-
-  private val PWD =
-    if (classOf[Environment].getMethods().exists(_.getName == "$$")) {
-      "{{PWD}}"
-    } else if (Utils.isWindows) {
-      "%PWD%"
-    } else {
-      Environment.PWD.$()
-    }
-
-  test("Local jar URIs") {
-    val conf = new Configuration()
-    val sparkConf = new SparkConf()
-      .set(SPARK_JARS, Seq(SPARK))
-      .set(USER_CLASS_PATH_FIRST, true)
-      .set("spark.yarn.dist.jars", ADDED)
-    val env = new MutableHashMap[String, String]()
-    val args = new ClientArguments(Array("--jar", USER))
-
-    populateClasspath(args, conf, sparkConf, env)
-
-    val cp = env("CLASSPATH").split(":|;|<CPS>")
-    s"$SPARK,$USER,$ADDED".split(",").foreach({ entry =>
-      val uri = new URI(entry)
-      if (LOCAL_SCHEME.equals(uri.getScheme())) {
-        cp should contain (uri.getPath())
-      } else {
-        cp should not contain (uri.getPath())
-      }
-    })
-    cp should contain(PWD)
-    cp should contain (s"$PWD${Path.SEPARATOR}${LOCALIZED_CONF_DIR}")
-    cp should not contain (APP_JAR)
-  }
-
-  test("Jar path propagation through SparkConf") {
-    val conf = new Configuration()
-    val sparkConf = new SparkConf()
-      .set(SPARK_JARS, Seq(SPARK))
-      .set("spark.yarn.dist.jars", ADDED)
-    val client = createClient(sparkConf, args = Array("--jar", USER))
-    doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
-      any(classOf[Path]), anyShort(), anyBoolean(), any())
-
-    val tempDir = Utils.createTempDir()
-    try {
-      // Because we mocked "copyFileToRemote" above to avoid having to create fake local files,
-      // we need to create a fake config archive in the temp dir to avoid having
-      // prepareLocalResources throw an exception.
-      new FileOutputStream(new File(tempDir, LOCALIZED_CONF_ARCHIVE)).close()
-
-      client.prepareLocalResources(new Path(tempDir.getAbsolutePath()), Nil)
-      sparkConf.get(APP_JAR) should be (Some(USER))
-
-      // The non-local path should be propagated by name only, since it will end up in the app's
-      // staging dir.
-      val expected = ADDED.split(",")
-        .map(p => {
-          val uri = new URI(p)
-          if (LOCAL_SCHEME == uri.getScheme()) {
-            p
-          } else {
-            Option(uri.getFragment()).getOrElse(new File(p).getName())
-          }
-        })
-        .mkString(",")
-
-      sparkConf.get(SECONDARY_JARS) should be (Some(expected.split(",").toSeq))
-    } finally {
-      Utils.deleteRecursively(tempDir)
-    }
-  }
-
-  test("Cluster path translation") {
-    val conf = new Configuration()
-    val sparkConf = new SparkConf()
-      .set(SPARK_JARS, Seq("local:/localPath/spark.jar"))
-      .set(GATEWAY_ROOT_PATH, "/localPath")
-      .set(REPLACEMENT_ROOT_PATH, "/remotePath")
-
-    getClusterPath(sparkConf, "/localPath") should be ("/remotePath")
-    getClusterPath(sparkConf, "/localPath/1:/localPath/2") should be (
-      "/remotePath/1:/remotePath/2")
-
-    val env = new MutableHashMap[String, String]()
-    populateClasspath(null, conf, sparkConf, env, extraClassPath = Some("/localPath/my1.jar"))
-    val cp = classpath(env)
-    cp should contain ("/remotePath/spark.jar")
-    cp should contain ("/remotePath/my1.jar")
-  }
-
-  test("configuration and args propagate through createApplicationSubmissionContext") {
-    val conf = new Configuration()
-    // When parsing tags, duplicates and leading/trailing whitespace should be removed.
-    // Spaces between non-comma strings should be preserved as single tags. Empty strings may or
-    // may not be removed depending on the version of Hadoop being used.
-    val sparkConf = new SparkConf()
-      .set(APPLICATION_TAGS.key, ",tag1, dup,tag2 , ,multi word , dup")
-      .set(MAX_APP_ATTEMPTS, 42)
-      .set("spark.app.name", "foo-test-app")
-      .set(QUEUE_NAME, "staging-queue")
-    val args = new ClientArguments(Array())
-
-    val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
-    val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse])
-    val containerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext])
-
-    val client = new Client(args, conf, sparkConf)
-    client.createApplicationSubmissionContext(
-      new YarnClientApplication(getNewApplicationResponse, appContext),
-      containerLaunchContext)
-
-    appContext.getApplicationName should be ("foo-test-app")
-    appContext.getQueue should be ("staging-queue")
-    appContext.getAMContainerSpec should be (containerLaunchContext)
-    appContext.getApplicationType should be ("SPARK")
-    appContext.getClass.getMethods.filter(_.getName.equals("getApplicationTags")).foreach{ method =>
-      val tags = method.invoke(appContext).asInstanceOf[java.util.Set[String]]
-      tags should contain allOf ("tag1", "dup", "tag2", "multi word")
-      tags.asScala.count(_.nonEmpty) should be (4)
-    }
-    appContext.getMaxAppAttempts should be (42)
-  }
-
-  test("spark.yarn.jars with multiple paths and globs") {
-    val libs = Utils.createTempDir()
-    val single = Utils.createTempDir()
-    val jar1 = TestUtils.createJarWithFiles(Map(), libs)
-    val jar2 = TestUtils.createJarWithFiles(Map(), libs)
-    val jar3 = TestUtils.createJarWithFiles(Map(), single)
-    val jar4 = TestUtils.createJarWithFiles(Map(), single)
-
-    val jarsConf = Seq(
-      s"${libs.getAbsolutePath()}/*",
-      jar3.getPath(),
-      s"local:${jar4.getPath()}",
-      s"local:${single.getAbsolutePath()}/*")
-
-    val sparkConf = new SparkConf().set(SPARK_JARS, jarsConf)
-    val client = createClient(sparkConf)
-
-    val tempDir = Utils.createTempDir()
-    client.prepareLocalResources(new Path(tempDir.getAbsolutePath()), Nil)
-
-    assert(sparkConf.get(SPARK_JARS) ===
-      Some(Seq(s"local:${jar4.getPath()}", s"local:${single.getAbsolutePath()}/*")))
-
-    verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar1.toURI())), anyShort(),
-      anyBoolean(), any())
-    verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar2.toURI())), anyShort(),
-      anyBoolean(), any())
-    verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar3.toURI())), anyShort(),
-      anyBoolean(), any())
-
-    val cp = classpath(client)
-    cp should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
-    cp should not contain (jar3.getPath())
-    cp should contain (jar4.getPath())
-    cp should contain (buildPath(single.getAbsolutePath(), "*"))
-  }
-
-  test("distribute jars archive") {
-    val temp = Utils.createTempDir()
-    val archive = TestUtils.createJarWithFiles(Map(), temp)
-
-    val sparkConf = new SparkConf().set(SPARK_ARCHIVE, archive.getPath())
-    val client = createClient(sparkConf)
-    client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil)
-
-    verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(archive.toURI())), anyShort(),
-      anyBoolean(), any())
-    classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
-
-    sparkConf.set(SPARK_ARCHIVE, LOCAL_SCHEME + ":" + archive.getPath())
-    intercept[IllegalArgumentException] {
-      client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil)
-    }
-  }
-
-  test("distribute archive multiple times") {
-    val libs = Utils.createTempDir()
-    // Create jars dir and RELEASE file to avoid IllegalStateException.
-    val jarsDir = new File(libs, "jars")
-    assert(jarsDir.mkdir())
-    new FileOutputStream(new File(libs, "RELEASE")).close()
-
-    val userLib1 = Utils.createTempDir()
-    val testJar = TestUtils.createJarWithFiles(Map(), userLib1)
-
-    // Case 1:  FILES_TO_DISTRIBUTE and ARCHIVES_TO_DISTRIBUTE can't have duplicate files
-    val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> libs.getAbsolutePath))
-      .set(FILES_TO_DISTRIBUTE, Seq(testJar.getPath))
-      .set(ARCHIVES_TO_DISTRIBUTE, Seq(testJar.getPath))
-
-    val client = createClient(sparkConf)
-    val tempDir = Utils.createTempDir()
-    intercept[IllegalArgumentException] {
-      client.prepareLocalResources(new Path(tempDir.getAbsolutePath()), Nil)
-    }
-
-    // Case 2: FILES_TO_DISTRIBUTE can't have duplicate files.
-    val sparkConfFiles = new SparkConfWithEnv(Map("SPARK_HOME" -> libs.getAbsolutePath))
-      .set(FILES_TO_DISTRIBUTE, Seq(testJar.getPath, testJar.getPath))
-
-    val clientFiles = createClient(sparkConfFiles)
-    val tempDirForFiles = Utils.createTempDir()
-    intercept[IllegalArgumentException] {
-      clientFiles.prepareLocalResources(new Path(tempDirForFiles.getAbsolutePath()), Nil)
-    }
-
-    // Case 3: ARCHIVES_TO_DISTRIBUTE can't have duplicate files.
-    val sparkConfArchives = new SparkConfWithEnv(Map("SPARK_HOME" -> libs.getAbsolutePath))
-      .set(ARCHIVES_TO_DISTRIBUTE, Seq(testJar.getPath, testJar.getPath))
-
-    val clientArchives = createClient(sparkConfArchives)
-    val tempDirForArchives = Utils.createTempDir()
-    intercept[IllegalArgumentException] {
-      clientArchives.prepareLocalResources(new Path(tempDirForArchives.getAbsolutePath()), Nil)
-    }
-
-    // Case 4: FILES_TO_DISTRIBUTE can have unique file.
-    val sparkConfFilesUniq = new SparkConfWithEnv(Map("SPARK_HOME" -> libs.getAbsolutePath))
-      .set(FILES_TO_DISTRIBUTE, Seq(testJar.getPath))
-
-    val clientFilesUniq = createClient(sparkConfFilesUniq)
-    val tempDirForFilesUniq = Utils.createTempDir()
-    clientFilesUniq.prepareLocalResources(new Path(tempDirForFilesUniq.getAbsolutePath()), Nil)
-
-    // Case 5: ARCHIVES_TO_DISTRIBUTE can have unique file.
-    val sparkConfArchivesUniq = new SparkConfWithEnv(Map("SPARK_HOME" -> libs.getAbsolutePath))
-      .set(ARCHIVES_TO_DISTRIBUTE, Seq(testJar.getPath))
-
-    val clientArchivesUniq = createClient(sparkConfArchivesUniq)
-    val tempDirArchivesUniq = Utils.createTempDir()
-    clientArchivesUniq.prepareLocalResources(new Path(tempDirArchivesUniq.getAbsolutePath()), Nil)
-
-  }
-
-  test("distribute local spark jars") {
-    val temp = Utils.createTempDir()
-    val jarsDir = new File(temp, "jars")
-    assert(jarsDir.mkdir())
-    val jar = TestUtils.createJarWithFiles(Map(), jarsDir)
-    new FileOutputStream(new File(temp, "RELEASE")).close()
-
-    val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> temp.getAbsolutePath()))
-    val client = createClient(sparkConf)
-    client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil)
-    classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
-  }
-
-  test("ignore same name jars") {
-    val libs = Utils.createTempDir()
-    val jarsDir = new File(libs, "jars")
-    assert(jarsDir.mkdir())
-    new FileOutputStream(new File(libs, "RELEASE")).close()
-    val userLib1 = Utils.createTempDir()
-    val userLib2 = Utils.createTempDir()
-
-    val jar1 = TestUtils.createJarWithFiles(Map(), jarsDir)
-    val jar2 = TestUtils.createJarWithFiles(Map(), userLib1)
-    // Copy jar2 to jar3 with same name
-    val jar3 = {
-      val target = new File(userLib2, new File(jar2.toURI).getName)
-      val input = new FileInputStream(jar2.getPath)
-      val output = new FileOutputStream(target)
-      Utils.copyStream(input, output, closeStreams = true)
-      target.toURI.toURL
-    }
-
-    val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> libs.getAbsolutePath))
-      .set(JARS_TO_DISTRIBUTE, Seq(jar2.getPath, jar3.getPath))
-
-    val client = createClient(sparkConf)
-    val tempDir = Utils.createTempDir()
-    client.prepareLocalResources(new Path(tempDir.getAbsolutePath()), Nil)
-
-    // Only jar2 will be added to SECONDARY_JARS, jar3 which has the same name with jar2 will be
-    // ignored.
-    sparkConf.get(SECONDARY_JARS) should be (Some(Seq(new File(jar2.toURI).getName)))
-  }
-
-  object Fixtures {
-
-    val knownDefYarnAppCP: Seq[String] =
-      getFieldValue[Array[String], Seq[String]](classOf[YarnConfiguration],
-                                                "DEFAULT_YARN_APPLICATION_CLASSPATH",
-                                                Seq[String]())(a => a.toSeq)
-
-
-    val knownDefMRAppCP: Seq[String] =
-      getFieldValue2[String, Array[String], Seq[String]](
-        classOf[MRJobConfig],
-        "DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH",
-        Seq[String]())(a => a.split(","))(a => a.toSeq)
-
-    val knownYARNAppCP = Some(Seq("/known/yarn/path"))
-
-    val knownMRAppCP = Some(Seq("/known/mr/path"))
-
-    val mapMRAppConf =
-      Map("mapreduce.application.classpath" -> knownMRAppCP.map(_.mkString(":")).get)
-
-    val mapYARNAppConf =
-      Map(YarnConfiguration.YARN_APPLICATION_CLASSPATH -> knownYARNAppCP.map(_.mkString(":")).get)
-
-    val mapAppConf = mapYARNAppConf ++ mapMRAppConf
-  }
-
-  def withAppConf(m: Map[String, String] = Map())(testCode: (Configuration) => Any) {
-    val conf = new Configuration
-    m.foreach { case (k, v) => conf.set(k, v, "ClientSpec") }
-    testCode(conf)
-  }
-
-  def newEnv: MutableHashMap[String, String] = MutableHashMap[String, String]()
-
-  def classpath(env: MutableHashMap[String, String]): Array[String] =
-    env(Environment.CLASSPATH.name).split(":|;|<CPS>")
-
-  def flatten(a: Option[Seq[String]], b: Option[Seq[String]]): Array[String] =
-    (a ++ b).flatten.toArray
-
-  def getFieldValue[A, B](clazz: Class[_], field: String, defaults: => B)(mapTo: A => B): B = {
-    Try(clazz.getField(field))
-      .map(_.get(null).asInstanceOf[A])
-      .toOption
-      .map(mapTo)
-      .getOrElse(defaults)
-  }
-
-  def getFieldValue2[A: ClassTag, A1: ClassTag, B](
-        clazz: Class[_],
-        field: String,
-        defaults: => B)(mapTo: A => B)(mapTo1: A1 => B): B = {
-    Try(clazz.getField(field)).map(_.get(null)).map {
-      case v: A => mapTo(v)
-      case v1: A1 => mapTo1(v1)
-      case _ => defaults
-    }.toOption.getOrElse(defaults)
-  }
-
-  private def createClient(
-      sparkConf: SparkConf,
-      conf: Configuration = new Configuration(),
-      args: Array[String] = Array()): Client = {
-    val clientArgs = new ClientArguments(args)
-    spy(new Client(clientArgs, conf, sparkConf))
-  }
-
-  private def classpath(client: Client): Array[String] = {
-    val env = new MutableHashMap[String, String]()
-    populateClasspath(null, client.hadoopConf, client.sparkConf, env)
-    classpath(env)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala
deleted file mode 100644
index afb4b69..0000000
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.scalatest.{BeforeAndAfterEach, Matchers}
-
-import org.apache.spark.SparkFunSuite
-
-class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with BeforeAndAfterEach {
-
-  private val yarnAllocatorSuite = new YarnAllocatorSuite
-  import yarnAllocatorSuite._
-
-  def createContainerRequest(nodes: Array[String]): ContainerRequest =
-    new ContainerRequest(containerResource, nodes, null, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
-
-  override def beforeEach() {
-    yarnAllocatorSuite.beforeEach()
-  }
-
-  override def afterEach() {
-    yarnAllocatorSuite.afterEach()
-  }
-
-  test("allocate locality preferred containers with enough resource and no matched existed " +
-    "containers") {
-    // 1. All the locations of current containers cannot satisfy the new requirements
-    // 2. Current requested container number can fully satisfy the pending tasks.
-
-    val handler = createAllocator(2)
-    handler.updateResourceRequests()
-    handler.handleAllocatedContainers(Array(createContainer("host1"), createContainer("host2")))
-
-    val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
-      3, 15, Map("host3" -> 15, "host4" -> 15, "host5" -> 10),
-        handler.allocatedHostToContainersMap, Seq.empty)
-
-    assert(localities.map(_.nodes) === Array(
-      Array("host3", "host4", "host5"),
-      Array("host3", "host4", "host5"),
-      Array("host3", "host4")))
-  }
-
-  test("allocate locality preferred containers with enough resource and partially matched " +
-    "containers") {
-    // 1. Parts of current containers' locations can satisfy the new requirements
-    // 2. Current requested container number can fully satisfy the pending tasks.
-
-    val handler = createAllocator(3)
-    handler.updateResourceRequests()
-    handler.handleAllocatedContainers(Array(
-      createContainer("host1"),
-      createContainer("host1"),
-      createContainer("host2")
-    ))
-
-    val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
-      3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10),
-        handler.allocatedHostToContainersMap, Seq.empty)
-
-    assert(localities.map(_.nodes) ===
-      Array(null, Array("host2", "host3"), Array("host2", "host3")))
-  }
-
-  test("allocate locality preferred containers with limited resource and partially matched " +
-    "containers") {
-    // 1. Parts of current containers' locations can satisfy the new requirements
-    // 2. Current requested container number cannot fully satisfy the pending tasks.
-
-    val handler = createAllocator(3)
-    handler.updateResourceRequests()
-    handler.handleAllocatedContainers(Array(
-      createContainer("host1"),
-      createContainer("host1"),
-      createContainer("host2")
-    ))
-
-    val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
-      1, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10),
-        handler.allocatedHostToContainersMap, Seq.empty)
-
-    assert(localities.map(_.nodes) === Array(Array("host2", "host3")))
-  }
-
-  test("allocate locality preferred containers with fully matched containers") {
-    // Current containers' locations can fully satisfy the new requirements
-
-    val handler = createAllocator(5)
-    handler.updateResourceRequests()
-    handler.handleAllocatedContainers(Array(
-      createContainer("host1"),
-      createContainer("host1"),
-      createContainer("host2"),
-      createContainer("host2"),
-      createContainer("host3")
-    ))
-
-    val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
-      3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10),
-        handler.allocatedHostToContainersMap, Seq.empty)
-
-    assert(localities.map(_.nodes) === Array(null, null, null))
-  }
-
-  test("allocate containers with no locality preference") {
-    // Request new container without locality preference
-
-    val handler = createAllocator(2)
-    handler.updateResourceRequests()
-    handler.handleAllocatedContainers(Array(createContainer("host1"), createContainer("host2")))
-
-    val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
-      1, 0, Map.empty, handler.allocatedHostToContainersMap, Seq.empty)
-
-    assert(localities.map(_.nodes) === Array(null))
-  }
-
-  test("allocate locality preferred containers by considering the localities of pending requests") {
-    val handler = createAllocator(3)
-    handler.updateResourceRequests()
-    handler.handleAllocatedContainers(Array(
-      createContainer("host1"),
-      createContainer("host1"),
-      createContainer("host2")
-    ))
-
-    val pendingAllocationRequests = Seq(
-      createContainerRequest(Array("host2", "host3")),
-      createContainerRequest(Array("host1", "host4")))
-
-    val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
-      1, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10),
-        handler.allocatedHostToContainersMap, pendingAllocationRequests)
-
-    assert(localities.map(_.nodes) === Array(Array("host3")))
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
deleted file mode 100644
index 994dc75..0000000
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.util.{Arrays, List => JList}
-
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic
-import org.apache.hadoop.net.DNSToSwitchMapping
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.client.api.AMRMClient
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.mockito.Mockito._
-import org.scalatest.{BeforeAndAfterEach, Matchers}
-
-import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.deploy.yarn.YarnAllocator._
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
-import org.apache.spark.rpc.RpcEndpointRef
-import org.apache.spark.scheduler.SplitInfo
-import org.apache.spark.util.ManualClock
-
-class MockResolver extends DNSToSwitchMapping {
-
-  override def resolve(names: JList[String]): JList[String] = {
-    if (names.size > 0 && names.get(0) == "host3") Arrays.asList("/rack2")
-    else Arrays.asList("/rack1")
-  }
-
-  override def reloadCachedMappings() {}
-
-  def reloadCachedMappings(names: JList[String]) {}
-}
-
-class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach {
-  val conf = new YarnConfiguration()
-  conf.setClass(
-    CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
-    classOf[MockResolver], classOf[DNSToSwitchMapping])
-
-  val sparkConf = new SparkConf()
-  sparkConf.set("spark.driver.host", "localhost")
-  sparkConf.set("spark.driver.port", "4040")
-  sparkConf.set(SPARK_JARS, Seq("notarealjar.jar"))
-  sparkConf.set("spark.yarn.launchContainers", "false")
-
-  val appAttemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0)
-
-  // Resource returned by YARN.  YARN can give larger containers than requested, so give 6 cores
-  // instead of the 5 requested and 3 GB instead of the 2 requested.
-  val containerResource = Resource.newInstance(3072, 6)
-
-  var rmClient: AMRMClient[ContainerRequest] = _
-
-  var containerNum = 0
-
-  override def beforeEach() {
-    super.beforeEach()
-    rmClient = AMRMClient.createAMRMClient()
-    rmClient.init(conf)
-    rmClient.start()
-  }
-
-  override def afterEach() {
-    try {
-      rmClient.stop()
-    } finally {
-      super.afterEach()
-    }
-  }
-
-  class MockSplitInfo(host: String) extends SplitInfo(null, host, null, 1, null) {
-    override def hashCode(): Int = 0
-    override def equals(other: Any): Boolean = false
-  }
-
-  def createAllocator(maxExecutors: Int = 5): YarnAllocator = {
-    val args = Array(
-      "--jar", "somejar.jar",
-      "--class", "SomeClass")
-    val sparkConfClone = sparkConf.clone()
-    sparkConfClone
-      .set("spark.executor.instances", maxExecutors.toString)
-      .set("spark.executor.cores", "5")
-      .set("spark.executor.memory", "2048")
-    new YarnAllocator(
-      "not used",
-      mock(classOf[RpcEndpointRef]),
-      conf,
-      sparkConfClone,
-      rmClient,
-      appAttemptId,
-      new SecurityManager(sparkConf),
-      Map())
-  }
-
-  def createContainer(host: String): Container = {
-    // When YARN 2.6+ is required, avoid deprecation by using version with long second arg
-    val containerId = ContainerId.newInstance(appAttemptId, containerNum)
-    containerNum += 1
-    val nodeId = NodeId.newInstance(host, 1000)
-    Container.newInstance(containerId, nodeId, "", containerResource, RM_REQUEST_PRIORITY, null)
-  }
-
-  test("single container allocated") {
-    // request a single container and receive it
-    val handler = createAllocator(1)
-    handler.updateResourceRequests()
-    handler.getNumExecutorsRunning should be (0)
-    handler.getPendingAllocate.size should be (1)
-
-    val container = createContainer("host1")
-    handler.handleAllocatedContainers(Array(container))
-
-    handler.getNumExecutorsRunning should be (1)
-    handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
-    handler.allocatedHostToContainersMap.get("host1").get should contain (container.getId)
-
-    val size = rmClient.getMatchingRequests(container.getPriority, "host1", containerResource).size
-    size should be (0)
-  }
-
-  test("container should not be created if requested number if met") {
-    // request a single container and receive it
-    val handler = createAllocator(1)
-    handler.updateResourceRequests()
-    handler.getNumExecutorsRunning should be (0)
-    handler.getPendingAllocate.size should be (1)
-
-    val container = createContainer("host1")
-    handler.handleAllocatedContainers(Array(container))
-
-    handler.getNumExecutorsRunning should be (1)
-    handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
-    handler.allocatedHostToContainersMap.get("host1").get should contain (container.getId)
-
-    val container2 = createContainer("host2")
-    handler.handleAllocatedContainers(Array(container2))
-    handler.getNumExecutorsRunning should be (1)
-  }
-
-  test("some containers allocated") {
-    // request a few containers and receive some of them
-    val handler = createAllocator(4)
-    handler.updateResourceRequests()
-    handler.getNumExecutorsRunning should be (0)
-    handler.getPendingAllocate.size should be (4)
-
-    val container1 = createContainer("host1")
-    val container2 = createContainer("host1")
-    val container3 = createContainer("host2")
-    handler.handleAllocatedContainers(Array(container1, container2, container3))
-
-    handler.getNumExecutorsRunning should be (3)
-    handler.allocatedContainerToHostMap.get(container1.getId).get should be ("host1")
-    handler.allocatedContainerToHostMap.get(container2.getId).get should be ("host1")
-    handler.allocatedContainerToHostMap.get(container3.getId).get should be ("host2")
-    handler.allocatedHostToContainersMap.get("host1").get should contain (container1.getId)
-    handler.allocatedHostToContainersMap.get("host1").get should contain (container2.getId)
-    handler.allocatedHostToContainersMap.get("host2").get should contain (container3.getId)
-  }
-
-  test("receive more containers than requested") {
-    val handler = createAllocator(2)
-    handler.updateResourceRequests()
-    handler.getNumExecutorsRunning should be (0)
-    handler.getPendingAllocate.size should be (2)
-
-    val container1 = createContainer("host1")
-    val container2 = createContainer("host2")
-    val container3 = createContainer("host4")
-    handler.handleAllocatedContainers(Array(container1, container2, container3))
-
-    handler.getNumExecutorsRunning should be (2)
-    handler.allocatedContainerToHostMap.get(container1.getId).get should be ("host1")
-    handler.allocatedContainerToHostMap.get(container2.getId).get should be ("host2")
-    handler.allocatedContainerToHostMap.contains(container3.getId) should be (false)
-    handler.allocatedHostToContainersMap.get("host1").get should contain (container1.getId)
-    handler.allocatedHostToContainersMap.get("host2").get should contain (container2.getId)
-    handler.allocatedHostToContainersMap.contains("host4") should be (false)
-  }
-
-  test("decrease total requested executors") {
-    val handler = createAllocator(4)
-    handler.updateResourceRequests()
-    handler.getNumExecutorsRunning should be (0)
-    handler.getPendingAllocate.size should be (4)
-
-    handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty)
-    handler.updateResourceRequests()
-    handler.getPendingAllocate.size should be (3)
-
-    val container = createContainer("host1")
-    handler.handleAllocatedContainers(Array(container))
-
-    handler.getNumExecutorsRunning should be (1)
-    handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
-    handler.allocatedHostToContainersMap.get("host1").get should contain (container.getId)
-
-    handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map.empty)
-    handler.updateResourceRequests()
-    handler.getPendingAllocate.size should be (1)
-  }
-
-  test("decrease total requested executors to less than currently running") {
-    val handler = createAllocator(4)
-    handler.updateResourceRequests()
-    handler.getNumExecutorsRunning should be (0)
-    handler.getPendingAllocate.size should be (4)
-
-    handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty)
-    handler.updateResourceRequests()
-    handler.getPendingAllocate.size should be (3)
-
-    val container1 = createContainer("host1")
-    val container2 = createContainer("host2")
-    handler.handleAllocatedContainers(Array(container1, container2))
-
-    handler.getNumExecutorsRunning should be (2)
-
-    handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map.empty)
-    handler.updateResourceRequests()
-    handler.getPendingAllocate.size should be (0)
-    handler.getNumExecutorsRunning should be (2)
-  }
-
-  test("kill executors") {
-    val handler = createAllocator(4)
-    handler.updateResourceRequests()
-    handler.getNumExecutorsRunning should be (0)
-    handler.getPendingAllocate.size should be (4)
-
-    val container1 = createContainer("host1")
-    val container2 = createContainer("host2")
-    handler.handleAllocatedContainers(Array(container1, container2))
-
-    handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map.empty)
-    handler.executorIdToContainer.keys.foreach { id => handler.killExecutor(id ) }
-
-    val statuses = Seq(container1, container2).map { c =>
-      ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, "Finished", 0)
-    }
-    handler.updateResourceRequests()
-    handler.processCompletedContainers(statuses.toSeq)
-    handler.getNumExecutorsRunning should be (0)
-    handler.getPendingAllocate.size should be (1)
-  }
-
-  test("lost executor removed from backend") {
-    val handler = createAllocator(4)
-    handler.updateResourceRequests()
-    handler.getNumExecutorsRunning should be (0)
-    handler.getPendingAllocate.size should be (4)
-
-    val container1 = createContainer("host1")
-    val container2 = createContainer("host2")
-    handler.handleAllocatedContainers(Array(container1, container2))
-
-    handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map())
-
-    val statuses = Seq(container1, container2).map { c =>
-      ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, "Failed", -1)
-    }
-    handler.updateResourceRequests()
-    handler.processCompletedContainers(statuses.toSeq)
-    handler.updateResourceRequests()
-    handler.getNumExecutorsRunning should be (0)
-    handler.getPendingAllocate.size should be (2)
-    handler.getNumExecutorsFailed should be (2)
-    handler.getNumUnexpectedContainerRelease should be (2)
-  }
-
-  test("memory exceeded diagnostic regexes") {
-    val diagnostics =
-      "Container [pid=12465,containerID=container_1412887393566_0003_01_000002] is running " +
-        "beyond physical memory limits. Current usage: 2.1 MB of 2 GB physical memory used; " +
-        "5.8 GB of 4.2 GB virtual memory used. Killing container."
-    val vmemMsg = memLimitExceededLogMessage(diagnostics, VMEM_EXCEEDED_PATTERN)
-    val pmemMsg = memLimitExceededLogMessage(diagnostics, PMEM_EXCEEDED_PATTERN)
-    assert(vmemMsg.contains("5.8 GB of 4.2 GB virtual memory used."))
-    assert(pmemMsg.contains("2.1 MB of 2 GB physical memory used."))
-  }
-
-  test("window based failure executor counting") {
-    sparkConf.set("spark.yarn.executor.failuresValidityInterval", "100s")
-    val handler = createAllocator(4)
-    val clock = new ManualClock(0L)
-    handler.setClock(clock)
-
-    handler.updateResourceRequests()
-    handler.getNumExecutorsRunning should be (0)
-    handler.getPendingAllocate.size should be (4)
-
-    val containers = Seq(
-      createContainer("host1"),
-      createContainer("host2"),
-      createContainer("host3"),
-      createContainer("host4")
-    )
-    handler.handleAllocatedContainers(containers)
-
-    val failedStatuses = containers.map { c =>
-      ContainerStatus.newInstance(c.getId, ContainerState.COMPLETE, "Failed", -1)
-    }
-
-    handler.getNumExecutorsFailed should be (0)
-
-    clock.advance(100 * 1000L)
-    handler.processCompletedContainers(failedStatuses.slice(0, 1))
-    handler.getNumExecutorsFailed should be (1)
-
-    clock.advance(101 * 1000L)
-    handler.getNumExecutorsFailed should be (0)
-
-    handler.processCompletedContainers(failedStatuses.slice(1, 3))
-    handler.getNumExecutorsFailed should be (2)
-
-    clock.advance(50 * 1000L)
-    handler.processCompletedContainers(failedStatuses.slice(3, 4))
-    handler.getNumExecutorsFailed should be (3)
-
-    clock.advance(51 * 1000L)
-    handler.getNumExecutorsFailed should be (1)
-
-    clock.advance(50 * 1000L)
-    handler.getNumExecutorsFailed should be (0)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
deleted file mode 100644
index 99fb58a..0000000
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ /dev/null
@@ -1,493 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.io.File
-import java.net.URL
-import java.nio.charset.StandardCharsets
-import java.util.{HashMap => JHashMap}
-
-import scala.collection.mutable
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-import com.google.common.io.{ByteStreams, Files}
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.scalatest.Matchers
-import org.scalatest.concurrent.Eventually._
-
-import org.apache.spark._
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.internal.Logging
-import org.apache.spark.launcher._
-import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationStart,
-  SparkListenerExecutorAdded}
-import org.apache.spark.scheduler.cluster.ExecutorInfo
-import org.apache.spark.tags.ExtendedYarnTest
-import org.apache.spark.util.Utils
-
-/**
- * Integration tests for YARN; these tests use a mini Yarn cluster to run Spark-on-YARN
- * applications, and require the Spark assembly to be built before they can be successfully
- * run.
- */
-@ExtendedYarnTest
-class YarnClusterSuite extends BaseYarnClusterSuite {
-
-  override def newYarnConfig(): YarnConfiguration = new YarnConfiguration()
-
-  private val TEST_PYFILE = """
-    |import mod1, mod2
-    |import sys
-    |from operator import add
-    |
-    |from pyspark import SparkConf , SparkContext
-    |if __name__ == "__main__":
-    |    if len(sys.argv) != 2:
-    |        print >> sys.stderr, "Usage: test.py [result file]"
-    |        exit(-1)
-    |    sc = SparkContext(conf=SparkConf())
-    |    status = open(sys.argv[1],'w')
-    |    result = "failure"
-    |    rdd = sc.parallelize(range(10)).map(lambda x: x * mod1.func() * mod2.func())
-    |    cnt = rdd.count()
-    |    if cnt == 10:
-    |        result = "success"
-    |    status.write(result)
-    |    status.close()
-    |    sc.stop()
-    """.stripMargin
-
-  private val TEST_PYMODULE = """
-    |def func():
-    |    return 42
-    """.stripMargin
-
-  test("run Spark in yarn-client mode") {
-    testBasicYarnApp(true)
-  }
-
-  test("run Spark in yarn-cluster mode") {
-    testBasicYarnApp(false)
-  }
-
-  test("run Spark in yarn-client mode with different configurations") {
-    testBasicYarnApp(true,
-      Map(
-        "spark.driver.memory" -> "512m",
-        "spark.executor.cores" -> "1",
-        "spark.executor.memory" -> "512m",
-        "spark.executor.instances" -> "2"
-      ))
-  }
-
-  test("run Spark in yarn-cluster mode with different configurations") {
-    testBasicYarnApp(false,
-      Map(
-        "spark.driver.memory" -> "512m",
-        "spark.driver.cores" -> "1",
-        "spark.executor.cores" -> "1",
-        "spark.executor.memory" -> "512m",
-        "spark.executor.instances" -> "2"
-      ))
-  }
-
-  test("run Spark in yarn-cluster mode with using SparkHadoopUtil.conf") {
-    testYarnAppUseSparkHadoopUtilConf()
-  }
-
-  test("run Spark in yarn-client mode with additional jar") {
-    testWithAddJar(true)
-  }
-
-  test("run Spark in yarn-cluster mode with additional jar") {
-    testWithAddJar(false)
-  }
-
-  test("run Spark in yarn-cluster mode unsuccessfully") {
-    // Don't provide arguments so the driver will fail.
-    val finalState = runSpark(false, mainClassName(YarnClusterDriver.getClass))
-    finalState should be (SparkAppHandle.State.FAILED)
-  }
-
-  test("run Spark in yarn-cluster mode failure after sc initialized") {
-    val finalState = runSpark(false, mainClassName(YarnClusterDriverWithFailure.getClass))
-    finalState should be (SparkAppHandle.State.FAILED)
-  }
-
-  test("run Python application in yarn-client mode") {
-    testPySpark(true)
-  }
-
-  test("run Python application in yarn-cluster mode") {
-    testPySpark(false)
-  }
-
-  test("run Python application in yarn-cluster mode using " +
-    " spark.yarn.appMasterEnv to override local envvar") {
-    testPySpark(
-      clientMode = false,
-      extraConf = Map(
-        "spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON"
-          -> sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", "python"),
-        "spark.yarn.appMasterEnv.PYSPARK_PYTHON"
-          -> sys.env.getOrElse("PYSPARK_PYTHON", "python")),
-      extraEnv = Map(
-        "PYSPARK_DRIVER_PYTHON" -> "not python",
-        "PYSPARK_PYTHON" -> "not python"))
-  }
-
-  test("user class path first in client mode") {
-    testUseClassPathFirst(true)
-  }
-
-  test("user class path first in cluster mode") {
-    testUseClassPathFirst(false)
-  }
-
-  test("monitor app using launcher library") {
-    val env = new JHashMap[String, String]()
-    env.put("YARN_CONF_DIR", hadoopConfDir.getAbsolutePath())
-
-    val propsFile = createConfFile()
-    val handle = new SparkLauncher(env)
-      .setSparkHome(sys.props("spark.test.home"))
-      .setConf("spark.ui.enabled", "false")
-      .setPropertiesFile(propsFile)
-      .setMaster("yarn")
-      .setDeployMode("client")
-      .setAppResource(SparkLauncher.NO_RESOURCE)
-      .setMainClass(mainClassName(YarnLauncherTestApp.getClass))
-      .startApplication()
-
-    try {
-      eventually(timeout(30 seconds), interval(100 millis)) {
-        handle.getState() should be (SparkAppHandle.State.RUNNING)
-      }
-
-      handle.getAppId() should not be (null)
-      handle.getAppId() should startWith ("application_")
-      handle.stop()
-
-      eventually(timeout(30 seconds), interval(100 millis)) {
-        handle.getState() should be (SparkAppHandle.State.KILLED)
-      }
-    } finally {
-      handle.kill()
-    }
-  }
-
-  test("timeout to get SparkContext in cluster mode triggers failure") {
-    val timeout = 2000
-    val finalState = runSpark(false, mainClassName(SparkContextTimeoutApp.getClass),
-      appArgs = Seq((timeout * 4).toString),
-      extraConf = Map(AM_MAX_WAIT_TIME.key -> timeout.toString))
-    finalState should be (SparkAppHandle.State.FAILED)
-  }
-
-  private def testBasicYarnApp(clientMode: Boolean, conf: Map[String, String] = Map()): Unit = {
-    val result = File.createTempFile("result", null, tempDir)
-    val finalState = runSpark(clientMode, mainClassName(YarnClusterDriver.getClass),
-      appArgs = Seq(result.getAbsolutePath()),
-      extraConf = conf)
-    checkResult(finalState, result)
-  }
-
-  private def testYarnAppUseSparkHadoopUtilConf(): Unit = {
-    val result = File.createTempFile("result", null, tempDir)
-    val finalState = runSpark(false,
-      mainClassName(YarnClusterDriverUseSparkHadoopUtilConf.getClass),
-      appArgs = Seq("key=value", result.getAbsolutePath()),
-      extraConf = Map("spark.hadoop.key" -> "value"))
-    checkResult(finalState, result)
-  }
-
-  private def testWithAddJar(clientMode: Boolean): Unit = {
-    val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> "ORIGINAL"), tempDir)
-    val driverResult = File.createTempFile("driver", null, tempDir)
-    val executorResult = File.createTempFile("executor", null, tempDir)
-    val finalState = runSpark(clientMode, mainClassName(YarnClasspathTest.getClass),
-      appArgs = Seq(driverResult.getAbsolutePath(), executorResult.getAbsolutePath()),
-      extraClassPath = Seq(originalJar.getPath()),
-      extraJars = Seq("local:" + originalJar.getPath()))
-    checkResult(finalState, driverResult, "ORIGINAL")
-    checkResult(finalState, executorResult, "ORIGINAL")
-  }
-
-  private def testPySpark(
-      clientMode: Boolean,
-      extraConf: Map[String, String] = Map(),
-      extraEnv: Map[String, String] = Map()): Unit = {
-    val primaryPyFile = new File(tempDir, "test.py")
-    Files.write(TEST_PYFILE, primaryPyFile, StandardCharsets.UTF_8)
-
-    // When running tests, let's not assume the user has built the assembly module, which also
-    // creates the pyspark archive. Instead, let's use PYSPARK_ARCHIVES_PATH to point at the
-    // needed locations.
-    val sparkHome = sys.props("spark.test.home")
-    val pythonPath = Seq(
-        s"$sparkHome/python/lib/py4j-0.10.4-src.zip",
-        s"$sparkHome/python")
-    val extraEnvVars = Map(
-      "PYSPARK_ARCHIVES_PATH" -> pythonPath.map("local:" + _).mkString(File.pathSeparator),
-      "PYTHONPATH" -> pythonPath.mkString(File.pathSeparator)) ++ extraEnv
-
-    val moduleDir =
-      if (clientMode) {
-        // In client-mode, .py files added with --py-files are not visible in the driver.
-        // This is something that the launcher library would have to handle.
-        tempDir
-      } else {
-        val subdir = new File(tempDir, "pyModules")
-        subdir.mkdir()
-        subdir
-      }
-    val pyModule = new File(moduleDir, "mod1.py")
-    Files.write(TEST_PYMODULE, pyModule, StandardCharsets.UTF_8)
-
-    val mod2Archive = TestUtils.createJarWithFiles(Map("mod2.py" -> TEST_PYMODULE), moduleDir)
-    val pyFiles = Seq(pyModule.getAbsolutePath(), mod2Archive.getPath()).mkString(",")
-    val result = File.createTempFile("result", null, tempDir)
-
-    val finalState = runSpark(clientMode, primaryPyFile.getAbsolutePath(),
-      sparkArgs = Seq("--py-files" -> pyFiles),
-      appArgs = Seq(result.getAbsolutePath()),
-      extraEnv = extraEnvVars,
-      extraConf = extraConf)
-    checkResult(finalState, result)
-  }
-
-  private def testUseClassPathFirst(clientMode: Boolean): Unit = {
-    // Create a jar file that contains a different version of "test.resource".
-    val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> "ORIGINAL"), tempDir)
-    val userJar = TestUtils.createJarWithFiles(Map("test.resource" -> "OVERRIDDEN"), tempDir)
-    val driverResult = File.createTempFile("driver", null, tempDir)
-    val executorResult = File.createTempFile("executor", null, tempDir)
-    val finalState = runSpark(clientMode, mainClassName(YarnClasspathTest.getClass),
-      appArgs = Seq(driverResult.getAbsolutePath(), executorResult.getAbsolutePath()),
-      extraClassPath = Seq(originalJar.getPath()),
-      extraJars = Seq("local:" + userJar.getPath()),
-      extraConf = Map(
-        "spark.driver.userClassPathFirst" -> "true",
-        "spark.executor.userClassPathFirst" -> "true"))
-    checkResult(finalState, driverResult, "OVERRIDDEN")
-    checkResult(finalState, executorResult, "OVERRIDDEN")
-  }
-
-}
-
-private[spark] class SaveExecutorInfo extends SparkListener {
-  val addedExecutorInfos = mutable.Map[String, ExecutorInfo]()
-  var driverLogs: Option[collection.Map[String, String]] = None
-
-  override def onExecutorAdded(executor: SparkListenerExecutorAdded) {
-    addedExecutorInfos(executor.executorId) = executor.executorInfo
-  }
-
-  override def onApplicationStart(appStart: SparkListenerApplicationStart): Unit = {
-    driverLogs = appStart.driverLogs
-  }
-}
-
-private object YarnClusterDriverWithFailure extends Logging with Matchers {
-  def main(args: Array[String]): Unit = {
-    val sc = new SparkContext(new SparkConf()
-      .set("spark.extraListeners", classOf[SaveExecutorInfo].getName)
-      .setAppName("yarn test with failure"))
-
-    throw new Exception("exception after sc initialized")
-  }
-}
-
-private object YarnClusterDriverUseSparkHadoopUtilConf extends Logging with Matchers {
-  def main(args: Array[String]): Unit = {
-    if (args.length != 2) {
-      // scalastyle:off println
-      System.err.println(
-        s"""
-        |Invalid command line: ${args.mkString(" ")}
-        |
-        |Usage: YarnClusterDriverUseSparkHadoopUtilConf [hadoopConfKey=value] [result file]
-        """.stripMargin)
-      // scalastyle:on println
-      System.exit(1)
-    }
-
-    val sc = new SparkContext(new SparkConf()
-      .set("spark.extraListeners", classOf[SaveExecutorInfo].getName)
-      .setAppName("yarn test using SparkHadoopUtil's conf"))
-
-    val kv = args(0).split("=")
-    val status = new File(args(1))
-    var result = "failure"
-    try {
-      SparkHadoopUtil.get.conf.get(kv(0)) should be (kv(1))
-      result = "success"
-    } finally {
-      Files.write(result, status, StandardCharsets.UTF_8)
-      sc.stop()
-    }
-  }
-}
-
-private object YarnClusterDriver extends Logging with Matchers {
-
-  val WAIT_TIMEOUT_MILLIS = 10000
-
-  def main(args: Array[String]): Unit = {
-    if (args.length != 1) {
-      // scalastyle:off println
-      System.err.println(
-        s"""
-        |Invalid command line: ${args.mkString(" ")}
-        |
-        |Usage: YarnClusterDriver [result file]
-        """.stripMargin)
-      // scalastyle:on println
-      System.exit(1)
-    }
-
-    val sc = new SparkContext(new SparkConf()
-      .set("spark.extraListeners", classOf[SaveExecutorInfo].getName)
-      .setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns"))
-    val conf = sc.getConf
-    val status = new File(args(0))
-    var result = "failure"
-    try {
-      val data = sc.parallelize(1 to 4, 4).collect().toSet
-      sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
-      data should be (Set(1, 2, 3, 4))
-      result = "success"
-
-      // Verify that the config archive is correctly placed in the classpath of all containers.
-      val confFile = "/" + Client.SPARK_CONF_FILE
-      assert(getClass().getResource(confFile) != null)
-      val configFromExecutors = sc.parallelize(1 to 4, 4)
-        .map { _ => Option(getClass().getResource(confFile)).map(_.toString).orNull }
-        .collect()
-      assert(configFromExecutors.find(_ == null) === None)
-    } finally {
-      Files.write(result, status, StandardCharsets.UTF_8)
-      sc.stop()
-    }
-
-    // verify log urls are present
-    val listeners = sc.listenerBus.findListenersByClass[SaveExecutorInfo]
-    assert(listeners.size === 1)
-    val listener = listeners(0)
-    val executorInfos = listener.addedExecutorInfos.values
-    assert(executorInfos.nonEmpty)
-    executorInfos.foreach { info =>
-      assert(info.logUrlMap.nonEmpty)
-    }
-
-    // If we are running in yarn-cluster mode, verify that driver logs links and present and are
-    // in the expected format.
-    if (conf.get("spark.submit.deployMode") == "cluster") {
-      assert(listener.driverLogs.nonEmpty)
-      val driverLogs = listener.driverLogs.get
-      assert(driverLogs.size === 2)
-      assert(driverLogs.contains("stderr"))
-      assert(driverLogs.contains("stdout"))
-      val urlStr = driverLogs("stderr")
-      // Ensure that this is a valid URL, else this will throw an exception
-      new URL(urlStr)
-      val containerId = YarnSparkHadoopUtil.get.getContainerId
-      val user = Utils.getCurrentUserName()
-      assert(urlStr.endsWith(s"/node/containerlogs/$containerId/$user/stderr?start=-4096"))
-    }
-  }
-
-}
-
-private object YarnClasspathTest extends Logging {
-  def error(m: String, ex: Throwable = null): Unit = {
-    logError(m, ex)
-    // scalastyle:off println
-    System.out.println(m)
-    if (ex != null) {
-      ex.printStackTrace(System.out)
-    }
-    // scalastyle:on println
-  }
-
-  def main(args: Array[String]): Unit = {
-    if (args.length != 2) {
-      error(
-        s"""
-        |Invalid command line: ${args.mkString(" ")}
-        |
-        |Usage: YarnClasspathTest [driver result file] [executor result file]
-        """.stripMargin)
-      // scalastyle:on println
-    }
-
-    readResource(args(0))
-    val sc = new SparkContext(new SparkConf())
-    try {
-      sc.parallelize(Seq(1)).foreach { x => readResource(args(1)) }
-    } finally {
-      sc.stop()
-    }
-  }
-
-  private def readResource(resultPath: String): Unit = {
-    var result = "failure"
-    try {
-      val ccl = Thread.currentThread().getContextClassLoader()
-      val resource = ccl.getResourceAsStream("test.resource")
-      val bytes = ByteStreams.toByteArray(resource)
-      result = new String(bytes, 0, bytes.length, StandardCharsets.UTF_8)
-    } catch {
-      case t: Throwable =>
-        error(s"loading test.resource to $resultPath", t)
-    } finally {
-      Files.write(result, new File(resultPath), StandardCharsets.UTF_8)
-    }
-  }
-
-}
-
-private object YarnLauncherTestApp {
-
-  def main(args: Array[String]): Unit = {
-    // Do not stop the application; the test will stop it using the launcher lib. Just run a task
-    // that will prevent the process from exiting.
-    val sc = new SparkContext(new SparkConf())
-    sc.parallelize(Seq(1)).foreach { i =>
-      this.synchronized {
-        wait()
-      }
-    }
-  }
-
-}
-
-/**
- * Used to test code in the AM that detects the SparkContext instance. Expects a single argument
- * with the duration to sleep for, in ms.
- */
-private object SparkContextTimeoutApp {
-
-  def main(args: Array[String]): Unit = {
-    val Array(sleepTime) = args
-    Thread.sleep(java.lang.Long.parseLong(sleepTime))
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
deleted file mode 100644
index 950ebd9..0000000
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.deploy.yarn
-
-import java.io.File
-import java.nio.charset.StandardCharsets
-
-import com.google.common.io.Files
-import org.apache.commons.io.FileUtils
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.scalatest.Matchers
-
-import org.apache.spark._
-import org.apache.spark.internal.Logging
-import org.apache.spark.network.shuffle.ShuffleTestAccessor
-import org.apache.spark.network.yarn.{YarnShuffleService, YarnTestAccessor}
-import org.apache.spark.tags.ExtendedYarnTest
-
-/**
- * Integration test for the external shuffle service with a yarn mini-cluster
- */
-@ExtendedYarnTest
-class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite {
-
-  override def newYarnConfig(): YarnConfiguration = {
-    val yarnConfig = new YarnConfiguration()
-    yarnConfig.set(YarnConfiguration.NM_AUX_SERVICES, "spark_shuffle")
-    yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"),
-      classOf[YarnShuffleService].getCanonicalName)
-    yarnConfig.set("spark.shuffle.service.port", "0")
-    yarnConfig
-  }
-
-  test("external shuffle service") {
-    val shuffleServicePort = YarnTestAccessor.getShuffleServicePort
-    val shuffleService = YarnTestAccessor.getShuffleServiceInstance
-
-    val registeredExecFile = YarnTestAccessor.getRegisteredExecutorFile(shuffleService)
-
-    logInfo("Shuffle service port = " + shuffleServicePort)
-    val result = File.createTempFile("result", null, tempDir)
-    val finalState = runSpark(
-      false,
-      mainClassName(YarnExternalShuffleDriver.getClass),
-      appArgs = Seq(result.getAbsolutePath(), registeredExecFile.getAbsolutePath),
-      extraConf = Map(
-        "spark.shuffle.service.enabled" -> "true",
-        "spark.shuffle.service.port" -> shuffleServicePort.toString
-      )
-    )
-    checkResult(finalState, result)
-    assert(YarnTestAccessor.getRegisteredExecutorFile(shuffleService).exists())
-  }
-}
-
-private object YarnExternalShuffleDriver extends Logging with Matchers {
-
-  val WAIT_TIMEOUT_MILLIS = 10000
-
-  def main(args: Array[String]): Unit = {
-    if (args.length != 2) {
-      // scalastyle:off println
-      System.err.println(
-        s"""
-        |Invalid command line: ${args.mkString(" ")}
-        |
-        |Usage: ExternalShuffleDriver [result file] [registered exec file]
-        """.stripMargin)
-      // scalastyle:on println
-      System.exit(1)
-    }
-
-    val sc = new SparkContext(new SparkConf()
-      .setAppName("External Shuffle Test"))
-    val conf = sc.getConf
-    val status = new File(args(0))
-    val registeredExecFile = new File(args(1))
-    logInfo("shuffle service executor file = " + registeredExecFile)
-    var result = "failure"
-    val execStateCopy = new File(registeredExecFile.getAbsolutePath + "_dup")
-    try {
-      val data = sc.parallelize(0 until 100, 10).map { x => (x % 10) -> x }.reduceByKey{ _ + _ }.
-        collect().toSet
-      sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
-      data should be ((0 until 10).map{x => x -> (x * 10 + 450)}.toSet)
-      result = "success"
-      // only one process can open a leveldb file at a time, so we copy the files
-      FileUtils.copyDirectory(registeredExecFile, execStateCopy)
-      assert(!ShuffleTestAccessor.reloadRegisteredExecutors(execStateCopy).isEmpty)
-    } finally {
-      sc.stop()
-      FileUtils.deleteDirectory(execStateCopy)
-      Files.write(result, status, StandardCharsets.UTF_8)
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
deleted file mode 100644
index 7fbbe12..0000000
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.io.{File, IOException}
-import java.nio.charset.StandardCharsets
-
-import com.google.common.io.{ByteStreams, Files}
-import org.apache.hadoop.io.Text
-import org.apache.hadoop.yarn.api.ApplicationConstants
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.scalatest.Matchers
-
-import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.internal.Logging
-import org.apache.spark.util.{ResetSystemProperties, Utils}
-
-class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
-  with ResetSystemProperties {
-
-  val hasBash =
-    try {
-      val exitCode = Runtime.getRuntime().exec(Array("bash", "--version")).waitFor()
-      exitCode == 0
-    } catch {
-      case e: IOException =>
-        false
-    }
-
-  if (!hasBash) {
-    logWarning("Cannot execute bash, skipping bash tests.")
-  }
-
-  def bashTest(name: String)(fn: => Unit): Unit =
-    if (hasBash) test(name)(fn) else ignore(name)(fn)
-
-  bashTest("shell script escaping") {
-    val scriptFile = File.createTempFile("script.", ".sh", Utils.createTempDir())
-    val args = Array("arg1", "${arg.2}", "\"arg3\"", "'arg4'", "$arg5", "\\arg6")
-    try {
-      val argLine = args.map(a => YarnSparkHadoopUtil.escapeForShell(a)).mkString(" ")
-      Files.write(("bash -c \"echo " + argLine + "\"").getBytes(StandardCharsets.UTF_8), scriptFile)
-      scriptFile.setExecutable(true)
-
-      val proc = Runtime.getRuntime().exec(Array(scriptFile.getAbsolutePath()))
-      val out = new String(ByteStreams.toByteArray(proc.getInputStream())).trim()
-      val err = new String(ByteStreams.toByteArray(proc.getErrorStream()))
-      val exitCode = proc.waitFor()
-      exitCode should be (0)
-      out should be (args.mkString(" "))
-    } finally {
-      scriptFile.delete()
-    }
-  }
-
-  test("Yarn configuration override") {
-    val key = "yarn.nodemanager.hostname"
-    val default = new YarnConfiguration()
-
-    val sparkConf = new SparkConf()
-      .set("spark.hadoop." + key, "someHostName")
-    val yarnConf = new YarnSparkHadoopUtil().newConfiguration(sparkConf)
-
-    yarnConf.getClass() should be (classOf[YarnConfiguration])
-    yarnConf.get(key) should not be default.get(key)
-  }
-
-
-  test("test getApplicationAclsForYarn acls on") {
-
-    // spark acls on, just pick up default user
-    val sparkConf = new SparkConf()
-    sparkConf.set("spark.acls.enable", "true")
-
-    val securityMgr = new SecurityManager(sparkConf)
-    val acls = YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr)
-
-    val viewAcls = acls.get(ApplicationAccessType.VIEW_APP)
-    val modifyAcls = acls.get(ApplicationAccessType.MODIFY_APP)
-
-    viewAcls match {
-      case Some(vacls) =>
-        val aclSet = vacls.split(',').map(_.trim).toSet
-        assert(aclSet.contains(System.getProperty("user.name", "invalid")))
-      case None =>
-        fail()
-    }
-    modifyAcls match {
-      case Some(macls) =>
-        val aclSet = macls.split(',').map(_.trim).toSet
-        assert(aclSet.contains(System.getProperty("user.name", "invalid")))
-      case None =>
-        fail()
-    }
-  }
-
-  test("test getApplicationAclsForYarn acls on and specify users") {
-
-    // default spark acls are on and specify acls
-    val sparkConf = new SparkConf()
-    sparkConf.set("spark.acls.enable", "true")
-    sparkConf.set("spark.ui.view.acls", "user1,user2")
-    sparkConf.set("spark.modify.acls", "user3,user4")
-
-    val securityMgr = new SecurityManager(sparkConf)
-    val acls = YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr)
-
-    val viewAcls = acls.get(ApplicationAccessType.VIEW_APP)
-    val modifyAcls = acls.get(ApplicationAccessType.MODIFY_APP)
-
-    viewAcls match {
-      case Some(vacls) =>
-        val aclSet = vacls.split(',').map(_.trim).toSet
-        assert(aclSet.contains("user1"))
-        assert(aclSet.contains("user2"))
-        assert(aclSet.contains(System.getProperty("user.name", "invalid")))
-      case None =>
-        fail()
-    }
-    modifyAcls match {
-      case Some(macls) =>
-        val aclSet = macls.split(',').map(_.trim).toSet
-        assert(aclSet.contains("user3"))
-        assert(aclSet.contains("user4"))
-        assert(aclSet.contains(System.getProperty("user.name", "invalid")))
-      case None =>
-        fail()
-    }
-
-  }
-
-  test("test expandEnvironment result") {
-    val target = Environment.PWD
-    if (classOf[Environment].getMethods().exists(_.getName == "$$")) {
-      YarnSparkHadoopUtil.expandEnvironment(target) should be ("{{" + target + "}}")
-    } else if (Utils.isWindows) {
-      YarnSparkHadoopUtil.expandEnvironment(target) should be ("%" + target + "%")
-    } else {
-      YarnSparkHadoopUtil.expandEnvironment(target) should be ("$" + target)
-    }
-
-  }
-
-  test("test getClassPathSeparator result") {
-    if (classOf[ApplicationConstants].getFields().exists(_.getName == "CLASS_PATH_SEPARATOR")) {
-      YarnSparkHadoopUtil.getClassPathSeparator() should be ("<CPS>")
-    } else if (Utils.isWindows) {
-      YarnSparkHadoopUtil.getClassPathSeparator() should be (";")
-    } else {
-      YarnSparkHadoopUtil.getClassPathSeparator() should be (":")
-    }
-  }
-
-  test("check different hadoop utils based on env variable") {
-    try {
-      System.setProperty("SPARK_YARN_MODE", "true")
-      assert(SparkHadoopUtil.get.getClass === classOf[YarnSparkHadoopUtil])
-      System.setProperty("SPARK_YARN_MODE", "false")
-      assert(SparkHadoopUtil.get.getClass === classOf[SparkHadoopUtil])
-    } finally {
-      System.clearProperty("SPARK_YARN_MODE")
-    }
-  }
-
-
-
-  // This test needs to live here because it depends on isYarnMode returning true, which can only
-  // happen in the YARN module.
-  test("security manager token generation") {
-    try {
-      System.setProperty("SPARK_YARN_MODE", "true")
-      val initial = SparkHadoopUtil.get
-        .getSecretKeyFromUserCredentials(SecurityManager.SECRET_LOOKUP_KEY)
-      assert(initial === null || initial.length === 0)
-
-      val conf = new SparkConf()
-        .set(SecurityManager.SPARK_AUTH_CONF, "true")
-        .set(SecurityManager.SPARK_AUTH_SECRET_CONF, "unused")
-      val sm = new SecurityManager(conf)
-
-      val generated = SparkHadoopUtil.get
-        .getSecretKeyFromUserCredentials(SecurityManager.SECRET_LOOKUP_KEY)
-      assert(generated != null)
-      val genString = new Text(generated).toString()
-      assert(genString != "unused")
-      assert(sm.getSecretKey() === genString)
-    } finally {
-      // removeSecretKey() was only added in Hadoop 2.6, so instead we just set the secret
-      // to an empty string.
-      SparkHadoopUtil.get.addSecretKeyToUserCredentials(SecurityManager.SECRET_LOOKUP_KEY, "")
-      System.clearProperty("SPARK_YARN_MODE")
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala
deleted file mode 100644
index db4619e..0000000
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn.security
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.io.Text
-import org.apache.hadoop.security.Credentials
-import org.apache.hadoop.security.token.Token
-import org.scalatest.{BeforeAndAfter, Matchers}
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.yarn.config._
-
-class ConfigurableCredentialManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
-  private var credentialManager: ConfigurableCredentialManager = null
-  private var sparkConf: SparkConf = null
-  private var hadoopConf: Configuration = null
-
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-
-    sparkConf = new SparkConf()
-    hadoopConf = new Configuration()
-    System.setProperty("SPARK_YARN_MODE", "true")
-  }
-
-  override def afterAll(): Unit = {
-    System.clearProperty("SPARK_YARN_MODE")
-
-    super.afterAll()
-  }
-
-  test("Correctly load default credential providers") {
-    credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
-
-    credentialManager.getServiceCredentialProvider("hdfs") should not be (None)
-    credentialManager.getServiceCredentialProvider("hbase") should not be (None)
-    credentialManager.getServiceCredentialProvider("hive") should not be (None)
-  }
-
-  test("disable hive credential provider") {
-    sparkConf.set("spark.yarn.security.credentials.hive.enabled", "false")
-    credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
-
-    credentialManager.getServiceCredentialProvider("hdfs") should not be (None)
-    credentialManager.getServiceCredentialProvider("hbase") should not be (None)
-    credentialManager.getServiceCredentialProvider("hive") should be (None)
-  }
-
-  test("using deprecated configurations") {
-    sparkConf.set("spark.yarn.security.tokens.hdfs.enabled", "false")
-    sparkConf.set("spark.yarn.security.tokens.hive.enabled", "false")
-    credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
-
-    credentialManager.getServiceCredentialProvider("hdfs") should be (None)
-    credentialManager.getServiceCredentialProvider("hive") should be (None)
-    credentialManager.getServiceCredentialProvider("test") should not be (None)
-    credentialManager.getServiceCredentialProvider("hbase") should not be (None)
-  }
-
-  test("verify obtaining credentials from provider") {
-    credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
-    val creds = new Credentials()
-
-    // Tokens can only be obtained from TestTokenProvider, for hdfs, hbase and hive tokens cannot
-    // be obtained.
-    credentialManager.obtainCredentials(hadoopConf, creds)
-    val tokens = creds.getAllTokens
-    tokens.size() should be (1)
-    tokens.iterator().next().getService should be (new Text("test"))
-  }
-
-  test("verify getting credential renewal info") {
-    credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
-    val creds = new Credentials()
-
-    val testCredentialProvider = credentialManager.getServiceCredentialProvider("test").get
-      .asInstanceOf[TestCredentialProvider]
-    // Only TestTokenProvider can get the time of next token renewal
-    val nextRenewal = credentialManager.obtainCredentials(hadoopConf, creds)
-    nextRenewal should be (testCredentialProvider.timeOfNextTokenRenewal)
-  }
-
-  test("obtain tokens For HiveMetastore") {
-    val hadoopConf = new Configuration()
-    hadoopConf.set("hive.metastore.kerberos.principal", "bob")
-    // thrift picks up on port 0 and bails out, without trying to talk to endpoint
-    hadoopConf.set("hive.metastore.uris", "http://localhost:0")
-
-    val hiveCredentialProvider = new HiveCredentialProvider()
-    val credentials = new Credentials()
-    hiveCredentialProvider.obtainCredentials(hadoopConf, sparkConf, credentials)
-
-    credentials.getAllTokens.size() should be (0)
-  }
-
-  test("Obtain tokens For HBase") {
-    val hadoopConf = new Configuration()
-    hadoopConf.set("hbase.security.authentication", "kerberos")
-
-    val hbaseTokenProvider = new HBaseCredentialProvider()
-    val creds = new Credentials()
-    hbaseTokenProvider.obtainCredentials(hadoopConf, sparkConf, creds)
-
-    creds.getAllTokens.size should be (0)
-  }
-}
-
-class TestCredentialProvider extends ServiceCredentialProvider {
-  val tokenRenewalInterval = 86400 * 1000L
-  var timeOfNextTokenRenewal = 0L
-
-  override def serviceName: String = "test"
-
-  override def credentialsRequired(conf: Configuration): Boolean = true
-
-  override def obtainCredentials(
-      hadoopConf: Configuration,
-      sparkConf: SparkConf,
-      creds: Credentials): Option[Long] = {
-    if (creds == null) {
-      // Guard out other unit test failures.
-      return None
-    }
-
-    val emptyToken = new Token()
-    emptyToken.setService(new Text("test"))
-    creds.addToken(emptyToken.getService, emptyToken)
-
-    val currTime = System.currentTimeMillis()
-    timeOfNextTokenRenewal = (currTime - currTime % tokenRenewalInterval) + tokenRenewalInterval
-
-    Some(timeOfNextTokenRenewal)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProviderSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProviderSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProviderSuite.scala
deleted file mode 100644
index 7b2da3f..0000000
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProviderSuite.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn.security
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.scalatest.{Matchers, PrivateMethodTester}
-
-import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
-
-class HDFSCredentialProviderSuite
-    extends SparkFunSuite
-    with PrivateMethodTester
-    with Matchers {
-  private val _getTokenRenewer = PrivateMethod[String]('getTokenRenewer)
-
-  private def getTokenRenewer(
-      hdfsCredentialProvider: HDFSCredentialProvider, conf: Configuration): String = {
-    hdfsCredentialProvider invokePrivate _getTokenRenewer(conf)
-  }
-
-  private var hdfsCredentialProvider: HDFSCredentialProvider = null
-
-  override def beforeAll() {
-    super.beforeAll()
-
-    if (hdfsCredentialProvider == null) {
-      hdfsCredentialProvider = new HDFSCredentialProvider()
-    }
-  }
-
-  override def afterAll() {
-    if (hdfsCredentialProvider != null) {
-      hdfsCredentialProvider = null
-    }
-
-    super.afterAll()
-  }
-
-  test("check token renewer") {
-    val hadoopConf = new Configuration()
-    hadoopConf.set("yarn.resourcemanager.address", "myrm:8033")
-    hadoopConf.set("yarn.resourcemanager.principal", "yarn/myrm:8032@SPARKTEST.COM")
-    val renewer = getTokenRenewer(hdfsCredentialProvider, hadoopConf)
-    renewer should be ("yarn/myrm:8032@SPARKTEST.COM")
-  }
-
-  test("check token renewer default") {
-    val hadoopConf = new Configuration()
-    val caught =
-      intercept[SparkException] {
-        getTokenRenewer(hdfsCredentialProvider, hadoopConf)
-      }
-    assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer")
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/test/scala/org/apache/spark/launcher/TestClasspathBuilder.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/launcher/TestClasspathBuilder.scala b/yarn/src/test/scala/org/apache/spark/launcher/TestClasspathBuilder.scala
deleted file mode 100644
index da9e8e2..0000000
--- a/yarn/src/test/scala/org/apache/spark/launcher/TestClasspathBuilder.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.launcher
-
-import java.util.{List => JList, Map => JMap}
-
-/**
- * Exposes AbstractCommandBuilder to the YARN tests, so that they can build classpaths the same
- * way other cluster managers do.
- */
-private[spark] class TestClasspathBuilder extends AbstractCommandBuilder {
-
-  childEnv.put(CommandBuilderUtils.ENV_SPARK_HOME, sys.props("spark.test.home"))
-
-  override def buildClassPath(extraCp: String): JList[String] = super.buildClassPath(extraCp)
-
-  /** Not used by the YARN tests. */
-  override def buildCommand(env: JMap[String, String]): JList[String] =
-    throw new UnsupportedOperationException()
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala b/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala
deleted file mode 100644
index 1fed256..0000000
--- a/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.network.shuffle
-
-import java.io.File
-import java.util.concurrent.ConcurrentMap
-
-import org.apache.hadoop.yarn.api.records.ApplicationId
-import org.fusesource.leveldbjni.JniDBFactory
-import org.iq80.leveldb.{DB, Options}
-
-import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.AppExecId
-import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
-
-/**
- * just a cheat to get package-visible members in tests
- */
-object ShuffleTestAccessor {
-
-  def getBlockResolver(handler: ExternalShuffleBlockHandler): ExternalShuffleBlockResolver = {
-    handler.blockManager
-  }
-
-  def getExecutorInfo(
-      appId: ApplicationId,
-      execId: String,
-      resolver: ExternalShuffleBlockResolver
-  ): Option[ExecutorShuffleInfo] = {
-    val id = new AppExecId(appId.toString, execId)
-    Option(resolver.executors.get(id))
-  }
-
-  def registeredExecutorFile(resolver: ExternalShuffleBlockResolver): File = {
-    resolver.registeredExecutorFile
-  }
-
-  def shuffleServiceLevelDB(resolver: ExternalShuffleBlockResolver): DB = {
-    resolver.db
-  }
-
-  def reloadRegisteredExecutors(
-    file: File): ConcurrentMap[ExternalShuffleBlockResolver.AppExecId, ExecutorShuffleInfo] = {
-    val options: Options = new Options
-    options.createIfMissing(true)
-    val factory = new JniDBFactory
-    val db = factory.open(file, options)
-    val result = ExternalShuffleBlockResolver.reloadRegisteredExecutors(db)
-    db.close()
-    result
-  }
-
-  def reloadRegisteredExecutors(
-      db: DB): ConcurrentMap[ExternalShuffleBlockResolver.AppExecId, ExecutorShuffleInfo] = {
-    ExternalShuffleBlockResolver.reloadRegisteredExecutors(db)
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message