zeppelin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject incubator-zeppelin git commit: [ZEPPELIN-97][ZEPPELIN-134] pyspark issue with mllib api
Date Tue, 30 Jun 2015 17:41:09 GMT
Repository: incubator-zeppelin
Updated Branches:
  refs/heads/branch-0.5 bfa53a222 -> 647df1098


[ZEPPELIN-97][ZEPPELIN-134] pyspark issue with mllib api

There were issue [ZEPPELIN-97](https://issues.apache.org/jira/browse/ZEPPELIN-97) with pyspark
1.4. The reason is, from pyspark 1.4, java gateway is created with `auto_convert = True` option.
This PR fixes the problem.

This PR also handles [ZEPPELIN-134](https://issues.apache.org/jira/browse/ZEPPELIN-134), inject
sqlContext.

And it finally improves to print more verbose stacktrace message, for example

from

```
(<type 'exceptions.AttributeError'>, AttributeError("'list' object has no attribute
'_get_object_id'",), <traceback object at 0x392b638>)
```

to

```
Traceback (most recent call last):
  File "/var/folders/zt/nd4j13y14jjg7_5pc4xgy7t80000gn/T//zeppelin_pyspark.py", line 110,
in <module>
    eval(compiledCode)
  File "<string>", line 3, in <module>
  File "/Users/moon/Projects/zeppelin/spark-1.4.0-bin-hadoop2.3/python/pyspark/sql/dataframe.py",
line 1200, in withColumn
    return self.select('*', col.alias(colName))
  File "/Users/moon/Projects/zeppelin/spark-1.4.0-bin-hadoop2.3/python/pyspark/sql/dataframe.py",
line 738, in select
    jdf = self._jdf.select(self._jcols(*cols))
  File "/Users/moon/Projects/zeppelin/spark-1.4.0-bin-hadoop2.3/python/pyspark/sql/dataframe.py",
line 630, in _jcols
    return self._jseq(cols, _to_java_column)
  File "/Users/moon/Projects/zeppelin/spark-1.4.0-bin-hadoop2.3/python/pyspark/sql/dataframe.py",
line 617, in _jseq
    return _to_seq(self.sql_ctx._sc, cols, converter)
  File "/Users/moon/Projects/zeppelin/spark-1.4.0-bin-hadoop2.3/python/pyspark/sql/column.py",
line 60, in _to_seq
    return sc._jvm.PythonUtils.toSeq(cols)
  File "/Users/moon/Projects/zeppelin/spark-1.4.0-bin-hadoop2.3/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
line 529, in __call__
    [get_command_part(arg, self.pool) for arg in new_args])
  File "/Users/moon/Projects/zeppelin/spark-1.4.0-bin-hadoop2.3/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
line 265, in get_command_part
    command_part = REFERENCE_TYPE + parameter._get_object_id()
AttributeError: 'list' object has no attribute '_get_object_id'
```

Author: Lee moon soo <moon@apache.org>

Closes #129 from Leemoonsoo/ZEPPELIN-97 and squashes the following commits:

1fa4bf6 [Lee moon soo] apply auto_convert for spark 1.4
bce3c1d [Lee moon soo] Print more stacktrace

(cherry picked from commit 6a894b09fbc599286df4db49993056b77b6bb6f6)
Signed-off-by: Lee moon soo <moon@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/647df109
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/647df109
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/647df109

Branch: refs/heads/branch-0.5
Commit: 647df109824237883094f2323fa99b9b6019dd05
Parents: bfa53a2
Author: Lee moon soo <moon@apache.org>
Authored: Mon Jun 29 14:07:37 2015 -0700
Committer: Lee moon soo <moon@apache.org>
Committed: Tue Jun 30 10:41:04 2015 -0700

----------------------------------------------------------------------
 .../zeppelin/spark/PySparkInterpreter.java       |  1 +
 .../main/resources/python/zeppelin_pyspark.py    | 16 +++++++++++-----
 .../zeppelin/rest/ZeppelinSparkClusterTest.java  | 19 +++++++++++++++++++
 3 files changed, 31 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/647df109/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index 95eefd8..092b077 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -137,6 +137,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
     CommandLine cmd = CommandLine.parse(getProperty("zeppelin.pyspark.python"));
     cmd.addArgument(scriptPath, false);
     cmd.addArgument(Integer.toString(port), false);
+    cmd.addArgument(getJavaSparkContext().version(), false);
     executor = new DefaultExecutor();
     outputStream = new ByteArrayOutputStream();
     PipedOutputStream ps = new PipedOutputStream();

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/647df109/spark/src/main/resources/python/zeppelin_pyspark.py
----------------------------------------------------------------------
diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py
index e29544e..802015d 100644
--- a/spark/src/main/resources/python/zeppelin_pyspark.py
+++ b/spark/src/main/resources/python/zeppelin_pyspark.py
@@ -32,7 +32,12 @@ from pyspark.serializers import MarshalSerializer, PickleSerializer
 from pyspark.sql import SQLContext, HiveContext, SchemaRDD, Row
 
 client = GatewayClient(port=int(sys.argv[1]))
-gateway = JavaGateway(client)
+sparkVersion = sys.argv[2]
+
+if sparkVersion.startswith("1.4"):
+  gateway = JavaGateway(client, auto_convert = True)
+else:
+  gateway = JavaGateway(client)
 
 java_import(gateway.jvm, "org.apache.spark.SparkEnv")
 java_import(gateway.jvm, "org.apache.spark.SparkConf")
@@ -45,15 +50,15 @@ intp.onPythonScriptInitialized()
 
 jsc = intp.getJavaSparkContext()
 
-if jsc.version().startswith("1.2"):
+if sparkVersion.startswith("1.2"):
   java_import(gateway.jvm, "org.apache.spark.sql.SQLContext")
   java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext")
   java_import(gateway.jvm, "org.apache.spark.sql.hive.LocalHiveContext")
   java_import(gateway.jvm, "org.apache.spark.sql.hive.TestHiveContext")
-elif jsc.version().startswith("1.3"):
+elif sparkVersion.startswith("1.3"):
   java_import(gateway.jvm, "org.apache.spark.sql.*")
   java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
-elif jsc.version().startswith("1.4"):
+elif sparkVersion.startswith("1.4"):
   java_import(gateway.jvm, "org.apache.spark.sql.*")
   java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
 
@@ -64,6 +69,7 @@ jconf = intp.getSparkConf()
 conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf)
 sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
 sqlc = SQLContext(sc, intp.getSQLContext())
+sqlContext = sqlc
 
 z = intp.getZeppelinContext()
 
@@ -117,6 +123,6 @@ while True :
        excInnerError = excInnerError[innerErrorStart:]
     intp.setStatementsFinished(excInnerError + str(sys.exc_info()), True)
   except:
-    intp.setStatementsFinished(str(sys.exc_info()), True)
+    intp.setStatementsFinished(traceback.format_exc(), True)
 
   output.reset()

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/647df109/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
index 758a1e4..fd4a8b3 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
@@ -88,6 +88,25 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
     }
     ZeppelinServer.notebook.removeNote(note.id());
   }
+  
+  @Test
+  public void pySparkAutoConvertOptionTest() throws IOException {
+    // create new note
+    Note note = ZeppelinServer.notebook.createNote();
+
+    int sparkVersion = getSparkVersionNumber(note);
+
+    if (isPyspark() && sparkVersion >= 14) {   // auto_convert enabled from spark
1.4
+      // run markdown paragraph, again
+      Paragraph p = note.addParagraph();
+      p.setText("%pyspark\nfrom pyspark.sql.functions import *\n"
+          + "print(sqlContext.range(0, 10).withColumn('uniform', rand(seed=10) * 3.14).count())");
+      note.run(p.getId());
+      waitForFinish(p);
+      assertEquals("10\n", p.getResult().message());
+    }
+    ZeppelinServer.notebook.removeNote(note.id());
+  }
 
   @Test
   public void zRunTest() throws IOException {


Mime
View raw message