spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andrewo...@apache.org
Subject [1/2] spark git commit: [SPARK-2996] Implement userClassPathFirst for driver, yarn.
Date Tue, 10 Feb 2015 05:17:31 GMT
Repository: spark
Updated Branches:
  refs/heads/master 36c4e1d75 -> 20a601310


http://git-wip-us.apache.org/repos/asf/spark/blob/20a60131/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index e39de82..0e37276 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -17,27 +17,34 @@
 
 package org.apache.spark.deploy.yarn
 
-import java.io.File
+import java.io.{File, FileOutputStream, OutputStreamWriter}
+import java.util.Properties
 import java.util.concurrent.TimeUnit
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable
 
-import com.google.common.base.Charsets
+import com.google.common.base.Charsets.UTF_8
+import com.google.common.io.ByteStreams
 import com.google.common.io.Files
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.server.MiniYARNCluster
 import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
 
-import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException}
+import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException, TestUtils}
 import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded}
 import org.apache.spark.util.Utils
 
+/**
+ * Integration tests for YARN; these tests use a mini Yarn cluster to run Spark-on-YARN
+ * applications, and require the Spark assembly to be built before they can be successfully
+ * run.
+ */
 class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers with Logging
{
 
-  // log4j configuration for the Yarn containers, so that their output is collected
-  // by Yarn instead of trying to overwrite unit-tests.log.
+  // log4j configuration for the YARN containers, so that their output is collected
+  // by YARN instead of trying to overwrite unit-tests.log.
   private val LOG4J_CONF = """
     |log4j.rootCategory=DEBUG, console
     |log4j.appender.console=org.apache.log4j.ConsoleAppender
@@ -52,13 +59,11 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers
wit
     |
     |from pyspark import SparkConf , SparkContext
     |if __name__ == "__main__":
-    |    if len(sys.argv) != 3:
-    |        print >> sys.stderr, "Usage: test.py [master] [result file]"
+    |    if len(sys.argv) != 2:
+    |        print >> sys.stderr, "Usage: test.py [result file]"
     |        exit(-1)
-    |    conf = SparkConf()
-    |    conf.setMaster(sys.argv[1]).setAppName("python test in yarn cluster mode")
-    |    sc = SparkContext(conf=conf)
-    |    status = open(sys.argv[2],'w')
+    |    sc = SparkContext(conf=SparkConf())
+    |    status = open(sys.argv[1],'w')
     |    result = "failure"
     |    rdd = sc.parallelize(range(10))
     |    cnt = rdd.count()
@@ -72,23 +77,17 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers
wit
   private var yarnCluster: MiniYARNCluster = _
   private var tempDir: File = _
   private var fakeSparkJar: File = _
-  private var oldConf: Map[String, String] = _
+  private var logConfDir: File = _
 
   override def beforeAll() {
     super.beforeAll()
 
     tempDir = Utils.createTempDir()
-
-    val logConfDir = new File(tempDir, "log4j")
+    logConfDir = new File(tempDir, "log4j")
     logConfDir.mkdir()
 
     val logConfFile = new File(logConfDir, "log4j.properties")
-    Files.write(LOG4J_CONF, logConfFile, Charsets.UTF_8)
-
-    val childClasspath = logConfDir.getAbsolutePath() + File.pathSeparator +
-      sys.props("java.class.path")
-
-    oldConf = sys.props.filter { case (k, v) => k.startsWith("spark.") }.toMap
+    Files.write(LOG4J_CONF, logConfFile, UTF_8)
 
     yarnCluster = new MiniYARNCluster(getClass().getName(), 1, 1, 1)
     yarnCluster.init(new YarnConfiguration())
@@ -119,99 +118,165 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with
Matchers wit
     }
 
     logInfo(s"RM address in configuration is ${config.get(YarnConfiguration.RM_ADDRESS)}")
-    config.foreach { e =>
-      sys.props += ("spark.hadoop." + e.getKey() -> e.getValue())
-    }
 
     fakeSparkJar = File.createTempFile("sparkJar", null, tempDir)
-    val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
-    sys.props += ("spark.yarn.appMasterEnv.SPARK_HOME" ->  sparkHome)
-    sys.props += ("spark.executorEnv.SPARK_HOME" -> sparkHome)
-    sys.props += ("spark.yarn.jar" -> ("local:" + fakeSparkJar.getAbsolutePath()))
-    sys.props += ("spark.executor.instances" -> "1")
-    sys.props += ("spark.driver.extraClassPath" -> childClasspath)
-    sys.props += ("spark.executor.extraClassPath" -> childClasspath)
-    sys.props += ("spark.executor.extraJavaOptions" -> "-Dfoo=\"one two three\"")
-    sys.props += ("spark.driver.extraJavaOptions" -> "-Dfoo=\"one two three\"")
   }
 
   override def afterAll() {
     yarnCluster.stop()
-    sys.props.retain { case (k, v) => !k.startsWith("spark.") }
-    sys.props ++= oldConf
     super.afterAll()
   }
 
   test("run Spark in yarn-client mode") {
-    var result = File.createTempFile("result", null, tempDir)
-    YarnClusterDriver.main(Array("yarn-client", result.getAbsolutePath()))
-    checkResult(result)
-
-    // verify log urls are present
-    YarnClusterDriver.listener.addedExecutorInfos.values.foreach { info =>
-      assert(info.logUrlMap.nonEmpty)
-    }
+    testBasicYarnApp(true)
   }
 
   test("run Spark in yarn-cluster mode") {
-    val main = YarnClusterDriver.getClass.getName().stripSuffix("$")
-    var result = File.createTempFile("result", null, tempDir)
-
-    val args = Array("--class", main,
-      "--jar", "file:" + fakeSparkJar.getAbsolutePath(),
-      "--arg", "yarn-cluster",
-      "--arg", result.getAbsolutePath(),
-      "--num-executors", "1")
-    Client.main(args)
-    checkResult(result)
-
-    // verify log urls are present.
-    YarnClusterDriver.listener.addedExecutorInfos.values.foreach { info =>
-      assert(info.logUrlMap.nonEmpty)
-    }
+    testBasicYarnApp(false)
   }
 
   test("run Spark in yarn-cluster mode unsuccessfully") {
-    val main = YarnClusterDriver.getClass.getName().stripSuffix("$")
-
-    // Use only one argument so the driver will fail
-    val args = Array("--class", main,
-      "--jar", "file:" + fakeSparkJar.getAbsolutePath(),
-      "--arg", "yarn-cluster",
-      "--num-executors", "1")
+    // Don't provide arguments so the driver will fail.
     val exception = intercept[SparkException] {
-      Client.main(args)
+      runSpark(false, mainClassName(YarnClusterDriver.getClass))
+      fail("Spark application should have failed.")
     }
-    assert(Utils.exceptionString(exception).contains("Application finished with failed status"))
   }
 
   test("run Python application in yarn-cluster mode") {
     val primaryPyFile = new File(tempDir, "test.py")
-    Files.write(TEST_PYFILE, primaryPyFile, Charsets.UTF_8)
+    Files.write(TEST_PYFILE, primaryPyFile, UTF_8)
     val pyFile = new File(tempDir, "test2.py")
-    Files.write(TEST_PYFILE, pyFile, Charsets.UTF_8)
+    Files.write(TEST_PYFILE, pyFile, UTF_8)
     var result = File.createTempFile("result", null, tempDir)
 
-    val args = Array("--class", "org.apache.spark.deploy.PythonRunner",
-      "--primary-py-file", primaryPyFile.getAbsolutePath(),
-      "--py-files", pyFile.getAbsolutePath(),
-      "--arg", "yarn-cluster",
-      "--arg", result.getAbsolutePath(),
-      "--name", "python test in yarn-cluster mode",
-      "--num-executors", "1")
-    Client.main(args)
+    // The sbt assembly does not include pyspark / py4j python dependencies, so we need to
+    // propagate SPARK_HOME so that those are added to PYTHONPATH. See PythonUtils.scala.
+    val sparkHome = sys.props("spark.test.home")
+    val extraConf = Map(
+      "spark.executorEnv.SPARK_HOME" -> sparkHome,
+      "spark.yarn.appMasterEnv.SPARK_HOME" -> sparkHome)
+
+    runSpark(false, primaryPyFile.getAbsolutePath(),
+      sparkArgs = Seq("--py-files", pyFile.getAbsolutePath()),
+      appArgs = Seq(result.getAbsolutePath()),
+      extraConf = extraConf)
     checkResult(result)
   }
 
+  test("user class path first in client mode") {
+    testUseClassPathFirst(true)
+  }
+
+  test("user class path first in cluster mode") {
+    testUseClassPathFirst(false)
+  }
+
+  private def testBasicYarnApp(clientMode: Boolean): Unit = {
+    var result = File.createTempFile("result", null, tempDir)
+    runSpark(clientMode, mainClassName(YarnClusterDriver.getClass),
+      appArgs = Seq(result.getAbsolutePath()))
+    checkResult(result)
+  }
+
+  private def testUseClassPathFirst(clientMode: Boolean): Unit = {
+    // Create a jar file that contains a different version of "test.resource".
+    val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> "ORIGINAL"),
tempDir)
+    val userJar = TestUtils.createJarWithFiles(Map("test.resource" -> "OVERRIDDEN"), tempDir)
+    val driverResult = File.createTempFile("driver", null, tempDir)
+    val executorResult = File.createTempFile("executor", null, tempDir)
+    runSpark(clientMode, mainClassName(YarnClasspathTest.getClass),
+      appArgs = Seq(driverResult.getAbsolutePath(), executorResult.getAbsolutePath()),
+      extraClassPath = Seq(originalJar.getPath()),
+      extraJars = Seq("local:" + userJar.getPath()),
+      extraConf = Map(
+        "spark.driver.userClassPathFirst" -> "true",
+        "spark.executor.userClassPathFirst" -> "true"))
+    checkResult(driverResult, "OVERRIDDEN")
+    checkResult(executorResult, "OVERRIDDEN")
+  }
+
+  private def runSpark(
+      clientMode: Boolean,
+      klass: String,
+      appArgs: Seq[String] = Nil,
+      sparkArgs: Seq[String] = Nil,
+      extraClassPath: Seq[String] = Nil,
+      extraJars: Seq[String] = Nil,
+      extraConf: Map[String, String] = Map()): Unit = {
+    val master = if (clientMode) "yarn-client" else "yarn-cluster"
+    val props = new Properties()
+
+    props.setProperty("spark.yarn.jar", "local:" + fakeSparkJar.getAbsolutePath())
+
+    val childClasspath = logConfDir.getAbsolutePath() +
+      File.pathSeparator +
+      sys.props("java.class.path") +
+      File.pathSeparator +
+      extraClassPath.mkString(File.pathSeparator)
+    props.setProperty("spark.driver.extraClassPath", childClasspath)
+    props.setProperty("spark.executor.extraClassPath", childClasspath)
+
+    // SPARK-4267: make sure java options are propagated correctly.
+    props.setProperty("spark.driver.extraJavaOptions", "-Dfoo=\"one two three\"")
+    props.setProperty("spark.executor.extraJavaOptions", "-Dfoo=\"one two three\"")
+
+    yarnCluster.getConfig().foreach { e =>
+      props.setProperty("spark.hadoop." + e.getKey(), e.getValue())
+    }
+
+    sys.props.foreach { case (k, v) =>
+      if (k.startsWith("spark.")) {
+        props.setProperty(k, v)
+      }
+    }
+
+    extraConf.foreach { case (k, v) => props.setProperty(k, v) }
+
+    val propsFile = File.createTempFile("spark", ".properties", tempDir)
+    val writer = new OutputStreamWriter(new FileOutputStream(propsFile), UTF_8)
+    props.store(writer, "Spark properties.")
+    writer.close()
+
+    val extraJarArgs = if (!extraJars.isEmpty()) Seq("--jars", extraJars.mkString(",")) else
Nil
+    val mainArgs =
+      if (klass.endsWith(".py")) {
+        Seq(klass)
+      } else {
+        Seq("--class", klass, fakeSparkJar.getAbsolutePath())
+      }
+    val argv =
+      Seq(
+        new File(sys.props("spark.test.home"), "bin/spark-submit").getAbsolutePath(),
+        "--master", master,
+        "--num-executors", "1",
+        "--properties-file", propsFile.getAbsolutePath()) ++
+      extraJarArgs ++
+      sparkArgs ++
+      mainArgs ++
+      appArgs
+
+    Utils.executeAndGetOutput(argv,
+      extraEnvironment = Map("YARN_CONF_DIR" -> tempDir.getAbsolutePath()))
+  }
+
   /**
    * This is a workaround for an issue with yarn-cluster mode: the Client class will not
provide
    * any sort of error when the job process finishes successfully, but the job itself fails.
So
    * the tests enforce that something is written to a file after everything is ok to indicate
    * that the job succeeded.
    */
-  private def checkResult(result: File) = {
-    var resultString = Files.toString(result, Charsets.UTF_8)
-    resultString should be ("success")
+  private def checkResult(result: File): Unit = {
+    checkResult(result, "success")
+  }
+
+  private def checkResult(result: File, expected: String): Unit = {
+    var resultString = Files.toString(result, UTF_8)
+    resultString should be (expected)
+  }
+
+  private def mainClassName(klass: Class[_]): String = {
+    klass.getName().stripSuffix("$")
   }
 
 }
@@ -229,22 +294,22 @@ private object YarnClusterDriver extends Logging with Matchers {
   val WAIT_TIMEOUT_MILLIS = 10000
   var listener: SaveExecutorInfo = null
 
-  def main(args: Array[String]) = {
-    if (args.length != 2) {
+  def main(args: Array[String]): Unit = {
+    if (args.length != 1) {
       System.err.println(
         s"""
         |Invalid command line: ${args.mkString(" ")}
         |
-        |Usage: YarnClusterDriver [master] [result file]
+        |Usage: YarnClusterDriver [result file]
         """.stripMargin)
       System.exit(1)
     }
 
     listener = new SaveExecutorInfo
-    val sc = new SparkContext(new SparkConf().setMaster(args(0))
+    val sc = new SparkContext(new SparkConf()
       .setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns"))
     sc.addSparkListener(listener)
-    val status = new File(args(1))
+    val status = new File(args(0))
     var result = "failure"
     try {
       val data = sc.parallelize(1 to 4, 4).collect().toSet
@@ -253,7 +318,48 @@ private object YarnClusterDriver extends Logging with Matchers {
       result = "success"
     } finally {
       sc.stop()
-      Files.write(result, status, Charsets.UTF_8)
+      Files.write(result, status, UTF_8)
+    }
+
+    // verify log urls are present
+    listener.addedExecutorInfos.values.foreach { info =>
+      assert(info.logUrlMap.nonEmpty)
+    }
+  }
+
+}
+
+private object YarnClasspathTest {
+
+  def main(args: Array[String]): Unit = {
+    if (args.length != 2) {
+      System.err.println(
+        s"""
+        |Invalid command line: ${args.mkString(" ")}
+        |
+        |Usage: YarnClasspathTest [driver result file] [executor result file]
+        """.stripMargin)
+      System.exit(1)
+    }
+
+    readResource(args(0))
+    val sc = new SparkContext(new SparkConf())
+    try {
+      sc.parallelize(Seq(1)).foreach { x => readResource(args(1)) }
+    } finally {
+      sc.stop()
+    }
+  }
+
+  private def readResource(resultPath: String): Unit = {
+    var result = "failure"
+    try {
+      val ccl = Thread.currentThread().getContextClassLoader()
+      val resource = ccl.getResourceAsStream("test.resource")
+      val bytes = ByteStreams.toByteArray(resource)
+      result = new String(bytes, 0, bytes.length, UTF_8)
+    } finally {
+      Files.write(result, new File(resultPath), UTF_8)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message