zeppelin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject incubator-zeppelin git commit: ZEPPELIN-273 Spark 1.5 support
Date Fri, 04 Sep 2015 15:13:02 GMT
Repository: incubator-zeppelin
Updated Branches:
  refs/heads/master 48f875d42 -> 754c55ec4


ZEPPELIN-273 Spark 1.5 support

https://issues.apache.org/jira/browse/ZEPPELIN-273

"spark-1.5" profile is added

Author: Lee moon soo <moon@apache.org>

Closes #269 from Leemoonsoo/spark_1.5 and squashes the following commits:

6ba2dce [Lee moon soo] Add missing import after rebase
5279d26 [Lee moon soo] improve
8c19b09 [Lee moon soo] Add SparkVersion enum and test
699b05b [Lee moon soo] Add spark-1.5 profile
67023fa [Lee moon soo] allow spark 1.5


Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/754c55ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/754c55ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/754c55ec

Branch: refs/heads/master
Commit: 754c55ec4aa5475014b7ae24dc866fe072728182
Parents: 48f875d
Author: Lee moon soo <moon@apache.org>
Authored: Thu Sep 3 19:37:38 2015 -0700
Committer: Lee moon soo <moon@apache.org>
Committed: Fri Sep 4 08:12:55 2015 -0700

----------------------------------------------------------------------
 README.md                                       |  4 +
 spark-dependencies/pom.xml                      | 13 +++
 .../zeppelin/spark/PySparkInterpreter.java      |  6 +-
 .../apache/zeppelin/spark/SparkInterpreter.java | 37 +++-----
 .../zeppelin/spark/SparkSqlInterpreter.java     |  1 -
 .../org/apache/zeppelin/spark/SparkVersion.java | 94 ++++++++++++++++++++
 .../main/resources/python/zeppelin_pyspark.py   | 28 ++++--
 .../apache/zeppelin/spark/SparkVersionTest.java | 52 +++++++++++
 8 files changed, 197 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/754c55ec/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index a3bc0ad..e7761f0 100644
--- a/README.md
+++ b/README.md
@@ -59,6 +59,10 @@ Spark 1.1.x
 ```
 mvn clean package -Pspark-1.1 -Dhadoop.version=2.2.0 -Phadoop-2.2 -DskipTests 
 ```
+Spark 1.5.x
+```
+mvn clean package -Pspark-1.5 -Dhadoop.version=2.2.0 -Phadoop-2.2 -DskipTests
+```
 CDH 5.X
 ```
 mvn clean package -Pspark-1.2 -Dhadoop.version=2.5.0-cdh5.3.0 -Phadoop-2.4 -DskipTests

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/754c55ec/spark-dependencies/pom.xml
----------------------------------------------------------------------
diff --git a/spark-dependencies/pom.xml b/spark-dependencies/pom.xml
index 9d10cf8..eac301f 100644
--- a/spark-dependencies/pom.xml
+++ b/spark-dependencies/pom.xml
@@ -456,6 +456,19 @@
     </profile>
 
     <profile>
+      <id>spark-1.5</id>
+      <properties>
+        <spark.version>1.5.0</spark.version>
+        <akka.group>com.typesafe.akka</akka.group>
+        <akka.version>2.3.11</akka.version>
+        <protobuf.version>2.5.0</protobuf.version>
+      </properties>
+
+      <dependencies>
+      </dependencies>
+    </profile>
+    
+    <profile>
       <id>hadoop-0.23</id>
       <!-- SPARK-1121: Adds an explicit dependency on Avro to work around a
         Hadoop 0.23.X issue -->

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/754c55ec/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index c579d21..0e58729 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -137,7 +137,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
     CommandLine cmd = CommandLine.parse(getProperty("zeppelin.pyspark.python"));
     cmd.addArgument(scriptPath, false);
     cmd.addArgument(Integer.toString(port), false);
-    cmd.addArgument(getJavaSparkContext().version(), false);
+    cmd.addArgument(Integer.toString(getSparkInterpreter().getSparkVersion().toNumber()),
false);
     executor = new DefaultExecutor();
     outputStream = new ByteArrayOutputStream();
     PipedOutputStream ps = new PipedOutputStream();
@@ -286,9 +286,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
     }
 
     SparkInterpreter sparkInterpreter = getSparkInterpreter();
-    if (!sparkInterpreter.getSparkContext().version().startsWith("1.2") &&
-        !sparkInterpreter.getSparkContext().version().startsWith("1.3") &&
-        !sparkInterpreter.getSparkContext().version().startsWith("1.4")) {
+    if (!sparkInterpreter.getSparkVersion().isPysparkSupported()) {
       return new InterpreterResult(Code.ERROR, "pyspark "
           + sparkInterpreter.getSparkContext().version() + " is not supported");
     }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/754c55ec/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index e684c52..82f8556 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -128,6 +128,7 @@ public class SparkInterpreter extends Interpreter {
 
   private Map<String, Object> binder;
   private SparkEnv env;
+  private SparkVersion sparkVersion;
 
 
   public SparkInterpreter(Properties property) {
@@ -438,6 +439,8 @@ public class SparkInterpreter extends Interpreter {
       sc.taskScheduler().rootPool().addSchedulable(pool);
     }
 
+    sparkVersion = SparkVersion.fromVersionString(sc.version());
+
     sqlc = getSQLContext();
 
     dep = getDependencyResolver();
@@ -462,15 +465,9 @@ public class SparkInterpreter extends Interpreter {
                  + "_binder.get(\"sqlc\").asInstanceOf[org.apache.spark.sql.SQLContext]");
     intp.interpret("import org.apache.spark.SparkContext._");
 
-    if (sc.version().startsWith("1.1")) {
-      intp.interpret("import sqlContext._");
-    } else if (sc.version().startsWith("1.2")) {
+    if (sparkVersion.oldSqlContextImplicits()) {
       intp.interpret("import sqlContext._");
-    } else if (sc.version().startsWith("1.3")) {
-      intp.interpret("import sqlContext.implicits._");
-      intp.interpret("import sqlContext.sql");
-      intp.interpret("import org.apache.spark.sql.functions._");
-    } else if (sc.version().startsWith("1.4")) {
+    } else {
       intp.interpret("import sqlContext.implicits._");
       intp.interpret("import sqlContext.sql");
       intp.interpret("import org.apache.spark.sql.functions._");
@@ -488,14 +485,10 @@ public class SparkInterpreter extends Interpreter {
      */
 
     try {
-      if (sc.version().startsWith("1.1") || sc.version().startsWith("1.2")) {
+      if (sparkVersion.oldLoadFilesMethodName()) {
         Method loadFiles = this.interpreter.getClass().getMethod("loadFiles", Settings.class);
         loadFiles.invoke(this.interpreter, settings);
-      } else if (sc.version().startsWith("1.3")) {
-        Method loadFiles = this.interpreter.getClass().getMethod(
-                "org$apache$spark$repl$SparkILoop$$loadFiles", Settings.class);
-        loadFiles.invoke(this.interpreter, settings);
-      } else if (sc.version().startsWith("1.4")) {
+      } else {
         Method loadFiles = this.interpreter.getClass().getMethod(
                 "org$apache$spark$repl$SparkILoop$$loadFiles", Settings.class);
         loadFiles.invoke(this.interpreter, settings);
@@ -682,18 +675,10 @@ public class SparkInterpreter extends Interpreter {
         int[] progressInfo = null;
         try {
           Object finalStage = job.getClass().getMethod("finalStage").invoke(job);
-          if (sc.version().startsWith("1.0")) {
+          if (sparkVersion.getProgress1_0()) {          
             progressInfo = getProgressFromStage_1_0x(sparkListener, finalStage);
-          } else if (sc.version().startsWith("1.1")) {
-            progressInfo = getProgressFromStage_1_1x(sparkListener, finalStage);
-          } else if (sc.version().startsWith("1.2")) {
-            progressInfo = getProgressFromStage_1_1x(sparkListener, finalStage);
-          } else if (sc.version().startsWith("1.3")) {
-            progressInfo = getProgressFromStage_1_1x(sparkListener, finalStage);
-          } else if (sc.version().startsWith("1.4")) {
-            progressInfo = getProgressFromStage_1_1x(sparkListener, finalStage);
           } else {
-            continue;
+            progressInfo = getProgressFromStage_1_1x(sparkListener, finalStage);
           }
         } catch (IllegalAccessException | IllegalArgumentException
             | InvocationTargetException | NoSuchMethodException
@@ -818,4 +803,8 @@ public class SparkInterpreter extends Interpreter {
   public ZeppelinContext getZeppelinContext() {
     return z;
   }
+
+  public SparkVersion getSparkVersion() {
+    return sparkVersion;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/754c55ec/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
index 7c1ba11..7b26e34 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
@@ -162,7 +162,6 @@ public class SparkSqlInterpreter extends Interpreter {
     return sparkInterpreter.getProgress(context);
   }
 
-
   @Override
   public Scheduler getScheduler() {
     if (concurrentSQL()) {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/754c55ec/spark/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkVersion.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
new file mode 100644
index 0000000..0a52fe2
--- /dev/null
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.spark;
+
+/**
+ * Provide reading comparing capability of spark version returned from SparkContext.version()
+ */
+public enum SparkVersion {
+  SPARK_1_0_0,
+  SPARK_1_0_1,
+  SPARK_1_1_0,
+  SPARK_1_1_1,
+  SPARK_1_2_0,
+  SPARK_1_2_1,
+  SPARK_1_2_2,
+  SPARK_1_3_0,
+  SPARK_1_3_1,
+  SPARK_1_4_0,
+  SPARK_1_4_1,
+  SPARK_1_5_0;
+
+  private int version;
+
+  SparkVersion() {
+    version = Integer.parseInt(name().substring("SPARK_".length()).replaceAll("_", ""));
+  }
+
+  public int toNumber() {
+    return version;
+  }
+
+  public String toString() {
+    return name().substring("SPARK_".length()).replaceAll("_", ".");
+  }
+
+  public static SparkVersion fromVersionString(String versionString) {
+    for (SparkVersion v : values()) {
+      if (v.toString().equals(versionString)) {
+        return v;
+      }
+    }
+    throw new IllegalArgumentException();
+  }
+
+  public boolean isPysparkSupported() {
+    return this.newerThanEquals(SPARK_1_2_0);
+  }
+
+  public boolean hasDataFrame() {
+    return this.newerThanEquals(SPARK_1_4_0);
+  }
+
+  public boolean getProgress1_0() {
+    return this.olderThan(SPARK_1_1_0);
+  }
+
+  public boolean oldLoadFilesMethodName() {
+    return this.olderThan(SPARK_1_3_0);
+  }
+
+  public boolean oldSqlContextImplicits() {
+    return this.olderThan(SPARK_1_3_0);
+  }
+
+  public boolean newerThan(SparkVersion versionToCompare) {
+    return version > versionToCompare.version;
+  }
+
+  public boolean newerThanEquals(SparkVersion versionToCompare) {
+    return version >= versionToCompare.version;
+  }
+
+  public boolean olderThan(SparkVersion versionToCompare) {
+    return version < versionToCompare.version;
+  }
+
+  public boolean olderThanEquals(SparkVersion versionToCompare) {
+    return version <= versionToCompare.version;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/754c55ec/spark/src/main/resources/python/zeppelin_pyspark.py
----------------------------------------------------------------------
diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py
index 794fbc7..e57190e 100644
--- a/spark/src/main/resources/python/zeppelin_pyspark.py
+++ b/spark/src/main/resources/python/zeppelin_pyspark.py
@@ -78,15 +78,28 @@ class PyZeppelinContext(dict):
   def get(self, key):
     return self.__getitem__(key)
 
+class SparkVersion(object):
+  SPARK_1_4_0 = 140
+  SPARK_1_3_0 = 130
+
+  def __init__(self, versionNumber):
+    self.version = versionNumber
+
+  def isAutoConvertEnabled(self):
+    return self.version >= self.SPARK_1_4_0
+
+  def isImportAllPackageUnderSparkSql(self):
+    return self.version >= self.SPARK_1_3_0
+
 
 output = Logger()
 sys.stdout = output
 sys.stderr = output
 
 client = GatewayClient(port=int(sys.argv[1]))
-sparkVersion = sys.argv[2]
+sparkVersion = SparkVersion(int(sys.argv[2]))
 
-if sparkVersion.startswith("1.4"):
+if sparkVersion.isAutoConvertEnabled():
   gateway = JavaGateway(client, auto_convert = True)
 else:
   gateway = JavaGateway(client)
@@ -102,17 +115,14 @@ intp.onPythonScriptInitialized()
 
 jsc = intp.getJavaSparkContext()
 
-if sparkVersion.startswith("1.2"):
+if sparkVersion.isImportAllPackageUnderSparkSql():
+  java_import(gateway.jvm, "org.apache.spark.sql.*")
+  java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
+else:
   java_import(gateway.jvm, "org.apache.spark.sql.SQLContext")
   java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext")
   java_import(gateway.jvm, "org.apache.spark.sql.hive.LocalHiveContext")
   java_import(gateway.jvm, "org.apache.spark.sql.hive.TestHiveContext")
-elif sparkVersion.startswith("1.3"):
-  java_import(gateway.jvm, "org.apache.spark.sql.*")
-  java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
-elif sparkVersion.startswith("1.4"):
-  java_import(gateway.jvm, "org.apache.spark.sql.*")
-  java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
 
 
 java_import(gateway.jvm, "scala.Tuple2")

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/754c55ec/spark/src/test/java/org/apache/zeppelin/spark/SparkVersionTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkVersionTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkVersionTest.java
new file mode 100644
index 0000000..dd00f73
--- /dev/null
+++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkVersionTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.spark;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+public class SparkVersionTest {
+
+  @Test
+  public void testSparkVersion() {
+    // test equals
+    assertTrue(SparkVersion.SPARK_1_2_0 == SparkVersion.fromVersionString("1.2.0"));
+
+    // test newer than
+    assertFalse(SparkVersion.SPARK_1_2_0.newerThan(SparkVersion.SPARK_1_2_0));
+    assertFalse(SparkVersion.SPARK_1_2_0.newerThan(SparkVersion.SPARK_1_3_0));
+    assertTrue(SparkVersion.SPARK_1_2_0.newerThan(SparkVersion.SPARK_1_1_0));
+
+    assertTrue(SparkVersion.SPARK_1_2_0.newerThanEquals(SparkVersion.SPARK_1_2_0));
+    assertFalse(SparkVersion.SPARK_1_2_0.newerThanEquals(SparkVersion.SPARK_1_3_0));
+    assertTrue(SparkVersion.SPARK_1_2_0.newerThanEquals(SparkVersion.SPARK_1_1_0));
+
+    // test older than
+    assertFalse(SparkVersion.SPARK_1_2_0.olderThan(SparkVersion.SPARK_1_2_0));
+    assertFalse(SparkVersion.SPARK_1_2_0.olderThan(SparkVersion.SPARK_1_1_0));
+    assertTrue(SparkVersion.SPARK_1_2_0.olderThan(SparkVersion.SPARK_1_3_0));
+
+    assertTrue(SparkVersion.SPARK_1_2_0.olderThanEquals(SparkVersion.SPARK_1_2_0));
+    assertFalse(SparkVersion.SPARK_1_2_0.olderThanEquals(SparkVersion.SPARK_1_1_0));
+    assertTrue(SparkVersion.SPARK_1_2_0.olderThanEquals(SparkVersion.SPARK_1_3_0));
+
+    // conversion
+    assertEquals(120, SparkVersion.SPARK_1_2_0.toNumber());
+    assertEquals("1.2.0", SparkVersion.SPARK_1_2_0.toString());
+  }
+}


Mime
View raw message