spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject spark git commit: [SPARK-5979][SPARK-6032] Smaller safer --packages fix
Date Sat, 28 Feb 2015 06:59:38 GMT
Repository: spark
Updated Branches:
  refs/heads/master dba08d1fc -> 6d8e5fbc0


[SPARK-5979][SPARK-6032] Smaller safer --packages fix

pwendell tdas
This is the safer parts of PR #4754:
 - SPARK-5979: All dependencies with the groupId `org.apache.spark` passed through `--packages`,
were being excluded from the dependency tree on the assumption that they would be in the assembly
jar. This is not the case, therefore the exclusion rules had to be defined more explicitly.
 - SPARK-6032: Ivy prints a whole lot of logs while retrieving dependencies. These were printed
to `System.out`. Moved the logging to `System.err`.

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #4802 from brkyvz/simple-streaming-fix and squashes the following commits:

e0f38cb [Burak Yavuz] Merge branch 'master' of github.com:apache/spark into simple-streaming-fix
bad921c [Burak Yavuz] [SPARK-5979][SPARK-6032] Smaller safer fix


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6d8e5fbc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6d8e5fbc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6d8e5fbc

Branch: refs/heads/master
Commit: 6d8e5fbc0d83411174ffa59ff6a761a862eca32c
Parents: dba08d1
Author: Burak Yavuz <brkyvz@gmail.com>
Authored: Fri Feb 27 22:59:35 2015 -0800
Committer: Patrick Wendell <patrick@databricks.com>
Committed: Fri Feb 27 22:59:35 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/deploy/SparkSubmit.scala   | 53 ++++++++++++++------
 .../spark/deploy/SparkSubmitUtilsSuite.scala    | 16 +++++-
 2 files changed, 51 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6d8e5fbc/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 4c41108..4a74641 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -655,8 +655,7 @@ private[spark] object SparkSubmitUtils {
 
 /**
  * Extracts maven coordinates from a comma-delimited string. Coordinates should be provided
- * in the format `groupId:artifactId:version` or `groupId/artifactId:version`. The latter
provides
- * simplicity for Spark Package users.
+ * in the format `groupId:artifactId:version` or `groupId/artifactId:version`.
  * @param coordinates Comma-delimited string of maven coordinates
  * @return Sequence of Maven coordinates
  */
@@ -747,6 +746,35 @@ private[spark] object SparkSubmitUtils {
       md.addDependency(dd)
     }
   }
+  
+  /** Add exclusion rules for dependencies already included in the spark-assembly */
+  private[spark] def addExclusionRules(
+      ivySettings: IvySettings,
+      ivyConfName: String,
+      md: DefaultModuleDescriptor): Unit = {
+    // Add scala exclusion rule
+    val scalaArtifacts = new ArtifactId(new ModuleId("*", "scala-library"), "*", "*", "*")
+    val scalaDependencyExcludeRule =
+      new DefaultExcludeRule(scalaArtifacts, ivySettings.getMatcher("glob"), null)
+    scalaDependencyExcludeRule.addConfiguration(ivyConfName)
+    md.addExcludeRule(scalaDependencyExcludeRule)
+
+    // We need to specify each component explicitly, otherwise we miss spark-streaming-kafka
and
+    // other spark-streaming utility components. Underscore is there to differentiate between
+    // spark-streaming_2.1x and spark-streaming-kafka-assembly_2.1x
+    val components = Seq("bagel_", "catalyst_", "core_", "graphx_", "hive_", "mllib_", "repl_",
+      "sql_", "streaming_", "yarn_", "network-common_", "network-shuffle_", "network-yarn_")
+
+    components.foreach { comp =>
+      val sparkArtifacts =
+        new ArtifactId(new ModuleId("org.apache.spark", s"spark-$comp*"), "*", "*", "*")
+      val sparkDependencyExcludeRule =
+        new DefaultExcludeRule(sparkArtifacts, ivySettings.getMatcher("glob"), null)
+      sparkDependencyExcludeRule.addConfiguration(ivyConfName)
+
+      md.addExcludeRule(sparkDependencyExcludeRule)
+    }
+  }
 
   /** A nice function to use in tests as well. Values are dummy strings. */
   private[spark] def getModuleDescriptor = DefaultModuleDescriptor.newDefaultInstance(
@@ -768,6 +796,9 @@ private[spark] object SparkSubmitUtils {
     if (coordinates == null || coordinates.trim.isEmpty) {
       ""
     } else {
+      val sysOut = System.out
+      // To prevent ivy from logging to system out
+      System.setOut(printStream)
       val artifacts = extractMavenCoordinates(coordinates)
       // Default configuration name for ivy
       val ivyConfName = "default"
@@ -811,19 +842,9 @@ private[spark] object SparkSubmitUtils {
       val md = getModuleDescriptor
       md.setDefaultConf(ivyConfName)
 
-      // Add an exclusion rule for Spark and Scala Library
-      val sparkArtifacts = new ArtifactId(new ModuleId("org.apache.spark", "*"), "*", "*",
"*")
-      val sparkDependencyExcludeRule =
-        new DefaultExcludeRule(sparkArtifacts, ivySettings.getMatcher("glob"), null)
-      sparkDependencyExcludeRule.addConfiguration(ivyConfName)
-      val scalaArtifacts = new ArtifactId(new ModuleId("*", "scala-library"), "*", "*", "*")
-      val scalaDependencyExcludeRule =
-        new DefaultExcludeRule(scalaArtifacts, ivySettings.getMatcher("glob"), null)
-      scalaDependencyExcludeRule.addConfiguration(ivyConfName)
-
-      // Exclude any Spark dependencies, and add all supplied maven artifacts as dependencies
-      md.addExcludeRule(sparkDependencyExcludeRule)
-      md.addExcludeRule(scalaDependencyExcludeRule)
+      // Add exclusion rules for Spark and Scala Library
+      addExclusionRules(ivySettings, ivyConfName, md)
+      // add all supplied maven artifacts as dependencies
       addDependenciesToIvy(md, artifacts, ivyConfName)
 
       // resolve dependencies
@@ -835,7 +856,7 @@ private[spark] object SparkSubmitUtils {
       ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId,
         packagesDirectory.getAbsolutePath + File.separator + "[artifact](-[classifier]).[ext]",
         retrieveOptions.setConfs(Array(ivyConfName)))
-
+      System.setOut(sysOut)
       resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/6d8e5fbc/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala
index ad62b35..8bcca92 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala
@@ -117,8 +117,20 @@ class SparkSubmitUtilsSuite extends FunSuite with BeforeAndAfterAll {
   }
 
   test("neglects Spark and Spark's dependencies") {
-    val path = SparkSubmitUtils.resolveMavenCoordinates(
-      "org.apache.spark:spark-core_2.10:1.2.0", None, None, true)
+    val components = Seq("bagel_", "catalyst_", "core_", "graphx_", "hive_", "mllib_", "repl_",
+      "sql_", "streaming_", "yarn_", "network-common_", "network-shuffle_", "network-yarn_")
+
+    val coordinates =
+      components.map(comp => s"org.apache.spark:spark-${comp}2.10:1.2.0").mkString(",")
+
+      ",org.apache.spark:spark-core_fake:1.2.0"
+
+    val path = SparkSubmitUtils.resolveMavenCoordinates(coordinates, None, None, true)
     assert(path === "", "should return empty path")
+    // Should not exclude the following dependency. Will throw an error, because it doesn't
exist,
+    // but the fact that it is checking means that it wasn't excluded.
+    intercept[RuntimeException] {
+      SparkSubmitUtils.resolveMavenCoordinates(coordinates +
+        ",org.apache.spark:spark-streaming-kafka-assembly_2.10:1.2.0", None, None, true)
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message