zeppelin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mina...@apache.org
Subject zeppelin git commit: [ZEPPELIN-1260] R interpreter doesn't work with Spark 2.0
Date Wed, 03 Aug 2016 02:54:13 GMT
Repository: zeppelin
Updated Branches:
  refs/heads/master c1935e1e8 -> 52b3cbff7


[ZEPPELIN-1260] R interpreter doesn't work with Spark 2.0

### What is this PR for?
This PR fixes R interpreter doesn't work with Spark 2.0

### What type of PR is it?
Bug Fix

### Todos
* [x] - Create and inject SparkSession into SparkRInterpreter

### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-1260

### How should this be tested?
Run Zeppelin with Spark 2.0 and run following codes and see if it returns `[1] 3`
```
%r
localDF <- data.frame(name=c("a", "b", "c"), age=c(19, 23, 18))
df <- createDataFrame(spark, localDF)
count(df)
```

### Screenshots (if appropriate)
![image](https://cloud.githubusercontent.com/assets/1540981/17324196/fbf3df6c-586a-11e6-896f-68fd86e630e8.png)

### Questions:
* Does the licenses files need update? no
* Is there breaking changes for older versions? no
* Does this needs documentation? no

Author: Lee moon soo <moon@apache.org>

Closes #1259 from Leemoonsoo/ZEPPELIN-1260 and squashes the following commits:

b3df11f [Lee moon soo] inject sqlContext as well
02822ac [Lee moon soo] Change indent
6bd1128 [Lee moon soo] Create and inject spark session into sparkr interpreter


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/52b3cbff
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/52b3cbff
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/52b3cbff

Branch: refs/heads/master
Commit: 52b3cbff76e9e1d684443e442673d4468f327108
Parents: c1935e1
Author: Lee moon soo <moon@apache.org>
Authored: Tue Aug 2 07:59:36 2016 -0500
Committer: Mina Lee <minalee@apache.org>
Committed: Wed Aug 3 11:54:02 2016 +0900

----------------------------------------------------------------------
 .../java/org/apache/zeppelin/spark/SparkRInterpreter.java | 10 ++++++++--
 .../main/java/org/apache/zeppelin/spark/ZeppelinR.java    |  6 +++++-
 .../java/org/apache/zeppelin/spark/ZeppelinRContext.java  |  8 ++++++++
 spark/src/main/resources/R/zeppelin_sparkr.R              |  5 +++++
 .../apache/zeppelin/rest/ZeppelinSparkClusterTest.java    |  7 +++++--
 5 files changed, 31 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/52b3cbff/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
index 8329641..5598f09 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
@@ -21,6 +21,7 @@ import static org.apache.zeppelin.spark.ZeppelinRDisplay.render;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.spark.SparkContext;
 import org.apache.spark.SparkRBackend;
 import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
@@ -70,11 +71,16 @@ public class SparkRInterpreter extends Interpreter {
     int port = SparkRBackend.port();
 
     SparkInterpreter sparkInterpreter = getSparkInterpreter();
-    ZeppelinRContext.setSparkContext(sparkInterpreter.getSparkContext());
+    SparkContext sc = sparkInterpreter.getSparkContext();
+    SparkVersion sparkVersion = new SparkVersion(sc.version());
+    ZeppelinRContext.setSparkContext(sc);
+    if (Utils.isSpark2()) {
+      ZeppelinRContext.setSparkSession(sparkInterpreter.getSparkSession());
+    }
     ZeppelinRContext.setSqlContext(sparkInterpreter.getSQLContext());
     ZeppelinRContext.setZepplinContext(sparkInterpreter.getZeppelinContext());
 
-    zeppelinR = new ZeppelinR(rCmdPath, sparkRLibPath, port);
+    zeppelinR = new ZeppelinR(rCmdPath, sparkRLibPath, port, sparkVersion);
     try {
       zeppelinR.open();
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/52b3cbff/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java
index 0ff0740..e0a47b7 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java
@@ -36,6 +36,7 @@ import java.util.Map;
 public class ZeppelinR implements ExecuteResultHandler {
   Logger logger = LoggerFactory.getLogger(ZeppelinR.class);
   private final String rCmdPath;
+  private final SparkVersion sparkVersion;
   private DefaultExecutor executor;
   private SparkOutputStream outputStream;
   private PipedOutputStream input;
@@ -107,9 +108,11 @@ public class ZeppelinR implements ExecuteResultHandler {
    * @param rCmdPath R repl commandline path
    * @param libPath sparkr library path
    */
-  public ZeppelinR(String rCmdPath, String libPath, int sparkRBackendPort) {
+  public ZeppelinR(String rCmdPath, String libPath, int sparkRBackendPort,
+      SparkVersion sparkVersion) {
     this.rCmdPath = rCmdPath;
     this.libPath = libPath;
+    this.sparkVersion = sparkVersion;
     this.port = sparkRBackendPort;
     try {
       File scriptFile = File.createTempFile("zeppelin_sparkr-", ".R");
@@ -137,6 +140,7 @@ public class ZeppelinR implements ExecuteResultHandler {
     cmd.addArgument(Integer.toString(hashCode()));
     cmd.addArgument(Integer.toString(port));
     cmd.addArgument(libPath);
+    cmd.addArgument(Integer.toString(sparkVersion.toNumber()));
 
     executor = new DefaultExecutor();
     outputStream = new SparkOutputStream();

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/52b3cbff/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java
index 82c320d..9ad156e 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java
@@ -27,6 +27,7 @@ public class ZeppelinRContext {
   private static SparkContext sparkContext;
   private static SQLContext sqlContext;
   private static ZeppelinContext zeppelinContext;
+  private static Object sparkSession;
 
   public static void setSparkContext(SparkContext sparkContext) {
     ZeppelinRContext.sparkContext = sparkContext;
@@ -40,6 +41,10 @@ public class ZeppelinRContext {
     ZeppelinRContext.sqlContext = sqlContext;
   }
 
+  public static void setSparkSession(Object sparkSession) {
+    ZeppelinRContext.sparkSession = sparkSession;
+  }
+
   public static SparkContext getSparkContext() {
     return sparkContext;
   }
@@ -52,4 +57,7 @@ public class ZeppelinRContext {
     return zeppelinContext;
   }
 
+  public static Object getSparkSession() {
+    return sparkSession;
+  }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/52b3cbff/spark/src/main/resources/R/zeppelin_sparkr.R
----------------------------------------------------------------------
diff --git a/spark/src/main/resources/R/zeppelin_sparkr.R b/spark/src/main/resources/R/zeppelin_sparkr.R
index fe2a16b..d951774 100644
--- a/spark/src/main/resources/R/zeppelin_sparkr.R
+++ b/spark/src/main/resources/R/zeppelin_sparkr.R
@@ -21,6 +21,7 @@ args <- commandArgs(trailingOnly = TRUE)
 hashCode <- as.integer(args[1])
 port <- as.integer(args[2])
 libPath <- args[3]
+version <- as.integer(args[4])
 rm(args)
 
 print(paste("Port ", toString(port)))
@@ -41,6 +42,10 @@ assign(".scStartTime", as.integer(Sys.time()), envir = SparkR:::.sparkREnv)
 # setup spark env
 assign(".sc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSparkContext"),
envir = SparkR:::.sparkREnv)
 assign("sc", get(".sc", envir = SparkR:::.sparkREnv), envir=.GlobalEnv)
+if (version >= 200) {
+  assign(".sparkRsession", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext",
"getSparkSession"), envir = SparkR:::.sparkREnv)
+  assign("spark", get(".sparkRsession", envir = SparkR:::.sparkREnv), envir = .GlobalEnv)
+}
 assign(".sqlc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSqlContext"),
envir = SparkR:::.sparkREnv)
 assign("sqlContext", get(".sqlc", envir = SparkR:::.sparkREnv), envir = .GlobalEnv)
 assign(".zeppelinContext", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext",
"getZeppelinContext"), envir = .GlobalEnv)

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/52b3cbff/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
index a65ccbc..61dc6d1 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
@@ -100,13 +100,16 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
           }
         }
 
-        // run markdown paragraph, again
+        String sqlContextName = "sqlContext";
+        if (sparkVersion >= 20) {
+          sqlContextName = "spark";
+        }
         Paragraph p = note.addParagraph();
         Map config = p.getConfig();
         config.put("enabled", true);
         p.setConfig(config);
         p.setText("%r localDF <- data.frame(name=c(\"a\", \"b\", \"c\"), age=c(19, 23,
18))\n" +
-            "df <- createDataFrame(sqlContext, localDF)\n" +
+            "df <- createDataFrame(" + sqlContextName + ", localDF)\n" +
             "count(df)"
         );
         note.run(p.getId());


Mime
View raw message