zeppelin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mina...@apache.org
Subject zeppelin git commit: ZEPPELIN-1197. Should print output directly without invoking function print in pyspark interpreter
Date Wed, 10 Aug 2016 00:45:12 GMT
Repository: zeppelin
Updated Branches:
  refs/heads/branch-0.6 901960fe4 -> ae6751baf


ZEPPELIN-1197. Should print output directly without invoking function print in pyspark interpreter

### What is this PR for?
For now, user need to invoke print to make the output displayed on the notebook. This behavior
is not natural and consistent with other notebooks. This PR is to make the pyspark interpreter
in zeppelin behave the same as other notebook. 2 main changes
* use single mode to compile the last statement, so that the evaluation result of the last
statement will be printed to stdout, this is consistent with other notebooks (like jupyter)
* Make SparkOutputStream extends LogOutputStream so that we can see the output of inner process
(Python/R), it is helpful for diagnosing.

### What type of PR is it?
[Bug Fix]

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-1197

### How should this be tested?
Tested it manually. Input the following text in pyspark paragraph,
```
1+1
sc.version
```
And get the following output
```
u'1.6.1'
```

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? User don't need to call print explicitly.
* Does this needs documentation? Yes

Author: Jeff Zhang <zjffdu@apache.org>

Closes #1232 from zjffdu/ZEPPELIN-1197 and squashes the following commits:

3771245 [Jeff Zhang] fix and add test
10182e6 [Jeff Zhang] ZEPPELIN-1197. Should print output directly without invoking function
print in pyspark interpreter

(cherry picked from commit b885f43e4c63a4fdd7f591f8286b788d6ed2d719)
Signed-off-by: Mina Lee <minalee@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/ae6751ba
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/ae6751ba
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/ae6751ba

Branch: refs/heads/branch-0.6
Commit: ae6751bafbd06446081b1fbc289ce15d6ab022d8
Parents: 901960f
Author: Jeff Zhang <zjffdu@apache.org>
Authored: Thu Jul 28 17:36:37 2016 +0800
Committer: Mina Lee <minalee@apache.org>
Committed: Wed Aug 10 09:44:39 2016 +0900

----------------------------------------------------------------------
 .../apache/zeppelin/spark/LogOutputStream.java  | 116 +++++++++++++++++++
 .../zeppelin/spark/PySparkInterpreter.java      |   2 +-
 .../apache/zeppelin/spark/SparkInterpreter.java |   2 +-
 .../zeppelin/spark/SparkOutputStream.java       |  19 ++-
 .../org/apache/zeppelin/spark/ZeppelinR.java    |   2 +-
 .../main/resources/python/zeppelin_pyspark.py   |  31 +++--
 .../zeppelin/integration/SparkParagraphIT.java  |  19 +++
 7 files changed, 176 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ae6751ba/spark/src/main/java/org/apache/zeppelin/spark/LogOutputStream.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/LogOutputStream.java b/spark/src/main/java/org/apache/zeppelin/spark/LogOutputStream.java
new file mode 100644
index 0000000..d941cd7
--- /dev/null
+++ b/spark/src/main/java/org/apache/zeppelin/spark/LogOutputStream.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.spark;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+
+/**
+ * Minor modification of LogOutputStream of apache commons exec.
+ * LogOutputStream of apache commons exec has one issue that method flush doesn't throw IOException,
+ * so that SparkOutputStream can not extend it correctly.
+ */
+public abstract class LogOutputStream extends OutputStream {
+  private static final int INTIAL_SIZE = 132;
+  private static final int CR = 13;
+  private static final int LF = 10;
+  private final ByteArrayOutputStream buffer;
+  private boolean skip;
+  private final int level;
+
+  public LogOutputStream() {
+    this(999);
+  }
+
+  public LogOutputStream(int level) {
+    this.buffer = new ByteArrayOutputStream(132);
+    this.skip = false;
+    this.level = level;
+  }
+
+  @Override
+  public void write(int cc) throws IOException {
+    byte c = (byte) cc;
+    if (c != 10 && c != 13) {
+      this.buffer.write(cc);
+    } else if (!this.skip) {
+      this.processBuffer();
+    }
+
+    this.skip = c == 13;
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (this.buffer.size() > 0) {
+      this.processBuffer();
+    }
+
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (this.buffer.size() > 0) {
+      this.processBuffer();
+    }
+
+    super.close();
+  }
+
+  public int getMessageLevel() {
+    return this.level;
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    int offset = off;
+    int blockStartOffset = off;
+
+    for (int remaining = len; remaining > 0; blockStartOffset = offset) {
+      while (remaining > 0 && b[offset] != 10 && b[offset] != 13) {
+        ++offset;
+        --remaining;
+      }
+
+      int blockLength = offset - blockStartOffset;
+      if (blockLength > 0) {
+        this.buffer.write(b, blockStartOffset, blockLength);
+      }
+
+      while (remaining > 0 && (b[offset] == 10 || b[offset] == 13)) {
+        this.write(b[offset]);
+        ++offset;
+        --remaining;
+      }
+    }
+
+  }
+
+  protected void processBuffer() {
+    this.processLine(this.buffer.toString());
+    this.buffer.reset();
+  }
+
+  protected void processLine(String line) {
+    this.processLine(line, this.level);
+  }
+
+  protected abstract void processLine(String var1, int var2);
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ae6751ba/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index c93c55a..6b585dd 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -179,7 +179,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
     cmd.addArgument(Integer.toString(port), false);
     cmd.addArgument(Integer.toString(getSparkInterpreter().getSparkVersion().toNumber()),
false);
     executor = new DefaultExecutor();
-    outputStream = new SparkOutputStream();
+    outputStream = new SparkOutputStream(logger);
     PipedOutputStream ps = new PipedOutputStream();
     in = null;
     try {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ae6751ba/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index 8aec939..2322ca1 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -120,7 +120,7 @@ public class SparkInterpreter extends Interpreter {
 
   public SparkInterpreter(Properties property) {
     super(property);
-    out = new SparkOutputStream();
+    out = new SparkOutputStream(logger);
   }
 
   public SparkInterpreter(Properties property, SparkContext sc) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ae6751ba/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java
index 98a4090..e454994 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java
@@ -17,17 +17,20 @@
 package org.apache.zeppelin.spark;
 
 import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.slf4j.Logger;
 
 import java.io.IOException;
-import java.io.OutputStream;
 
 /**
  * InterpreterOutput can be attached / detached.
  */
-public class SparkOutputStream extends OutputStream {
+public class SparkOutputStream extends LogOutputStream {
+
+  public static Logger logger;
   InterpreterOutput interpreterOutput;
 
-  public SparkOutputStream() {
+  public SparkOutputStream(Logger logger) {
+    this.logger = logger;
   }
 
   public InterpreterOutput getInterpreterOutput() {
@@ -40,6 +43,7 @@ public class SparkOutputStream extends OutputStream {
 
   @Override
   public void write(int b) throws IOException {
+    super.write(b);
     if (interpreterOutput != null) {
       interpreterOutput.write(b);
     }
@@ -47,6 +51,7 @@ public class SparkOutputStream extends OutputStream {
 
   @Override
   public void write(byte [] b) throws IOException {
+    super.write(b);
     if (interpreterOutput != null) {
       interpreterOutput.write(b);
     }
@@ -54,13 +59,20 @@ public class SparkOutputStream extends OutputStream {
 
   @Override
   public void write(byte [] b, int offset, int len) throws IOException {
+    super.write(b, offset, len);
     if (interpreterOutput != null) {
       interpreterOutput.write(b, offset, len);
     }
   }
 
   @Override
+  protected void processLine(String s, int i) {
+    logger.debug("Interpreter output:" + s);
+  }
+
+  @Override
   public void close() throws IOException {
+    super.close();
     if (interpreterOutput != null) {
       interpreterOutput.close();
     }
@@ -68,6 +80,7 @@ public class SparkOutputStream extends OutputStream {
 
   @Override
   public void flush() throws IOException {
+    super.flush();
     if (interpreterOutput != null) {
       interpreterOutput.flush();
     }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ae6751ba/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java
index e0a47b7..2648833 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java
@@ -143,7 +143,7 @@ public class ZeppelinR implements ExecuteResultHandler {
     cmd.addArgument(Integer.toString(sparkVersion.toNumber()));
 
     executor = new DefaultExecutor();
-    outputStream = new SparkOutputStream();
+    outputStream = new SparkOutputStream(logger);
 
     input = new PipedOutputStream();
     PipedInputStream in = new PipedInputStream(input);

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ae6751ba/spark/src/main/resources/python/zeppelin_pyspark.py
----------------------------------------------------------------------
diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py
index 0380afa..2e95c85 100644
--- a/spark/src/main/resources/python/zeppelin_pyspark.py
+++ b/spark/src/main/resources/python/zeppelin_pyspark.py
@@ -27,19 +27,20 @@ from pyspark.storagelevel import StorageLevel
 from pyspark.accumulators import Accumulator, AccumulatorParam
 from pyspark.broadcast import Broadcast
 from pyspark.serializers import MarshalSerializer, PickleSerializer
+import ast
 
 # for back compatibility
 from pyspark.sql import SQLContext, HiveContext, Row
 
 class Logger(object):
   def __init__(self):
-    self.out = ""
+    pass
 
   def write(self, message):
     intp.appendOutput(message)
 
   def reset(self):
-    self.out = ""
+    pass
 
   def flush(self):
     pass
@@ -230,7 +231,7 @@ while True :
   try:
     stmts = req.statements().split("\n")
     jobGroup = req.jobGroup()
-    final_code = None
+    final_code = []
 
     for s in stmts:
       if s == None:
@@ -241,15 +242,27 @@ while True :
       if len(s_stripped) == 0 or s_stripped.startswith("#"):
         continue
 
-      if final_code:
-        final_code += "\n" + s
-      else:
-        final_code = s
+      final_code.append(s)
 
     if final_code:
-      compiledCode = compile(final_code, "<string>", "exec")
+      # use exec mode to compile the statements except the last statement,
+      # so that the last statement's evaluation will be printed to stdout
       sc.setJobGroup(jobGroup, "Zeppelin")
-      eval(compiledCode)
+      code = compile('\n'.join(final_code), '<stdin>', 'exec', ast.PyCF_ONLY_AST, 1)
+      to_run_exec, to_run_single = code.body[:-1], code.body[-1:]
+
+      try:
+        for node in to_run_exec:
+          mod = ast.Module([node])
+          code = compile(mod, '<stdin>', 'exec')
+          exec(code)
+
+        for node in to_run_single:
+          mod = ast.Interactive([node])
+          code = compile(mod, '<stdin>', 'single')
+          exec(code)
+      except:
+        raise Execution(sys.exc_info())
 
     intp.setStatementsFinished("", False)
   except Py4JJavaError:

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ae6751ba/zeppelin-server/src/test/java/org/apache/zeppelin/integration/SparkParagraphIT.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/integration/SparkParagraphIT.java
b/zeppelin-server/src/test/java/org/apache/zeppelin/integration/SparkParagraphIT.java
index 81c7190..8c160c4 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/integration/SparkParagraphIT.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/integration/SparkParagraphIT.java
@@ -140,6 +140,25 @@ public class SparkParagraphIT extends AbstractZeppelinIT {
           paragraph1Result.getText().toString(), CoreMatchers.equalTo("test loop 0\ntest
loop 1\ntest loop 2")
       );
 
+      // the last statement's evaluation result is printed
+      setTextOfParagraph(2, "%pyspark\\n" +
+              "sc.version\\n" +
+              "1+1");
+      runParagraph(2);
+      try {
+        waitForParagraph(2, "FINISHED");
+      } catch (TimeoutException e) {
+        waitForParagraph(2, "ERROR");
+        collector.checkThat("Paragraph from SparkParagraphIT of testPySpark status: ",
+                "ERROR", CoreMatchers.equalTo("FINISHED")
+        );
+      }
+      WebElement paragraph2Result = driver.findElement(By.xpath(
+              getParagraphXPath(2) + "//div[@class=\"tableDisplay\"]"));
+      collector.checkThat("Paragraph from SparkParagraphIT of testPySpark result: ",
+              paragraph2Result.getText().toString(), CoreMatchers.equalTo("2")
+      );
+
     } catch (Exception e) {
       handleException("Exception in SparkParagraphIT while testPySpark", e);
     }


Mime
View raw message