zeppelin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject zeppelin git commit: ZEPPELIN-1769. Support cancel job in SparkRInterpereter
Date Thu, 15 Dec 2016 02:57:01 GMT
Repository: zeppelin
Updated Branches:
  refs/heads/master c2e1a5a37 -> 76d77c5b3


ZEPPELIN-1769. Support cancel job in SparkRInterpereter

### What is this PR for?
Cancel is not supported for SparkR now, This PR is would construct a setJobGroup statement
before each statement. So that we can implement the cancel feature.

### What type of PR is it?
[Feature]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-1769

### How should this be tested?
Manually tested for spark 1.6 & spark 2.0.2

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zjffdu@apache.org>

Closes #1737 from zjffdu/ZEPPELIN-1769 and squashes the following commits:

cd10c2a [Jeff Zhang] remove NULL in output
2578e61 [Jeff Zhang] ZEPPELIN-1769. Support cancel job in SparkRInterpereter


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/76d77c5b
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/76d77c5b
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/76d77c5b

Branch: refs/heads/master
Commit: 76d77c5b354bb05013a3191e100f062f4de65ce4
Parents: c2e1a5a
Author: Jeff Zhang <zjffdu@apache.org>
Authored: Thu Dec 8 20:02:24 2016 +0800
Committer: Lee moon soo <moon@apache.org>
Committed: Wed Dec 14 18:56:58 2016 -0800

----------------------------------------------------------------------
 .../zeppelin/spark/SparkRInterpreter.java       | 27 +++++++++++++++++---
 spark/src/main/resources/R/zeppelin_sparkr.R    |  1 +
 2 files changed, 25 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/76d77c5b/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
index 15ce658..f4152a5 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
@@ -43,6 +43,7 @@ public class SparkRInterpreter extends Interpreter {
 
   private static String renderOptions;
   private ZeppelinR zeppelinR;
+  private SparkContext sc;
 
   public SparkRInterpreter(Properties property) {
     super(property);
@@ -60,7 +61,6 @@ public class SparkRInterpreter extends Interpreter {
       // workaround to make sparkr work without SPARK_HOME
       System.setProperty("spark.test.home", System.getenv("ZEPPELIN_HOME") + "/interpreter/spark");
     }
-
     synchronized (SparkRBackend.backend()) {
       if (!SparkRBackend.isStarted()) {
         SparkRBackend.init();
@@ -71,7 +71,7 @@ public class SparkRInterpreter extends Interpreter {
     int port = SparkRBackend.port();
 
     SparkInterpreter sparkInterpreter = getSparkInterpreter();
-    SparkContext sc = sparkInterpreter.getSparkContext();
+    this.sc = sparkInterpreter.getSparkContext();
     SparkVersion sparkVersion = new SparkVersion(sc.version());
     ZeppelinRContext.setSparkContext(sc);
     if (Utils.isSpark2()) {
@@ -94,6 +94,10 @@ public class SparkRInterpreter extends Interpreter {
     renderOptions = getProperty("zeppelin.R.render.options");
   }
 
+  String getJobGroup(InterpreterContext context){
+    return "zeppelin-" + context.getParagraphId();
+  }
+
   @Override
   public InterpreterResult interpret(String lines, InterpreterContext interpreterContext)
{
 
@@ -117,6 +121,19 @@ public class SparkRInterpreter extends Interpreter {
       }
     }
 
+    String jobGroup = getJobGroup(interpreterContext);
+    String setJobGroup = "";
+    // assign setJobGroup to dummy__, otherwise it would print NULL for this statement
+    if (Utils.isSpark2()) {
+      setJobGroup = "dummy__ <- setJobGroup(\"" + jobGroup +
+          "\", \"zeppelin sparkR job group description\", TRUE)";
+    } else if (getSparkInterpreter().getSparkVersion().newerThanEquals(SparkVersion.SPARK_1_5_0))
{
+      setJobGroup = "dummy__ <- setJobGroup(sc, \"" + jobGroup +
+          "\", \"zeppelin sparkR job group description\", TRUE)";
+    }
+    logger.debug("set JobGroup:" + setJobGroup);
+    lines = setJobGroup + "\n" + lines;
+
     try {
       // render output with knitr
       if (useKnitr()) {
@@ -155,7 +172,11 @@ public class SparkRInterpreter extends Interpreter {
   }
 
   @Override
-  public void cancel(InterpreterContext context) {}
+  public void cancel(InterpreterContext context) {
+    if (this.sc != null) {
+      sc.cancelJobGroup(getJobGroup(context));
+    }
+  }
 
   @Override
   public FormType getFormType() {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/76d77c5b/spark/src/main/resources/R/zeppelin_sparkr.R
----------------------------------------------------------------------
diff --git a/spark/src/main/resources/R/zeppelin_sparkr.R b/spark/src/main/resources/R/zeppelin_sparkr.R
index 8e66421..0d7b64d 100644
--- a/spark/src/main/resources/R/zeppelin_sparkr.R
+++ b/spark/src/main/resources/R/zeppelin_sparkr.R
@@ -45,6 +45,7 @@ assign("sc", get(".sc", envir = SparkR:::.sparkREnv), envir=.GlobalEnv)
 if (version >= 20000) {
   assign(".sparkRsession", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext",
"getSparkSession"), envir = SparkR:::.sparkREnv)
   assign("spark", get(".sparkRsession", envir = SparkR:::.sparkREnv), envir = .GlobalEnv)
+  assign(".sparkRjsc", get(".sc", envir = SparkR:::.sparkREnv), envir=SparkR:::.sparkREnv)
 }
 assign(".sqlc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSqlContext"),
envir = SparkR:::.sparkREnv)
 assign("sqlContext", get(".sqlc", envir = SparkR:::.sparkREnv), envir = .GlobalEnv)


Mime
View raw message