Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 55BAE200B6F for ; Wed, 10 Aug 2016 02:45:14 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 5440D160AAD; Wed, 10 Aug 2016 00:45:14 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4CE73160AA5 for ; Wed, 10 Aug 2016 02:45:13 +0200 (CEST) Received: (qmail 19320 invoked by uid 500); 10 Aug 2016 00:45:12 -0000 Mailing-List: contact commits-help@zeppelin.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@zeppelin.apache.org Delivered-To: mailing list commits@zeppelin.apache.org Received: (qmail 19311 invoked by uid 99); 10 Aug 2016 00:45:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Aug 2016 00:45:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 662C4DFB38; Wed, 10 Aug 2016 00:45:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: minalee@apache.org To: commits@zeppelin.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: zeppelin git commit: ZEPPELIN-1197. Should print output directly without invoking function print in pyspark interpreter Date: Wed, 10 Aug 2016 00:45:12 +0000 (UTC) archived-at: Wed, 10 Aug 2016 00:45:14 -0000 Repository: zeppelin Updated Branches: refs/heads/branch-0.6 901960fe4 -> ae6751baf ZEPPELIN-1197. Should print output directly without invoking function print in pyspark interpreter ### What is this PR for? For now, user need to invoke print to make the output displayed on the notebook. This behavior is not natural and consistent with other notebooks. This PR is to make the pyspark interpreter in zeppelin behave the same as other notebook. 2 main changes * use single mode to compile the last statement, so that the evaluation result of the last statement will be printed to stdout, this is consistent with other notebooks (like jupyter) * Make SparkOutputStream extends LogOutputStream so that we can see the output of inner process (Python/R), it is helpful for diagnosing. ### What type of PR is it? [Bug Fix] ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-1197 ### How should this be tested? Tested it manually. Input the following text in pyspark paragraph, ``` 1+1 sc.version ``` And get the following output ``` u'1.6.1' ``` ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? User don't need to call print explicitly. * Does this needs documentation? Yes Author: Jeff Zhang Closes #1232 from zjffdu/ZEPPELIN-1197 and squashes the following commits: 3771245 [Jeff Zhang] fix and add test 10182e6 [Jeff Zhang] ZEPPELIN-1197. Should print output directly without invoking function print in pyspark interpreter (cherry picked from commit b885f43e4c63a4fdd7f591f8286b788d6ed2d719) Signed-off-by: Mina Lee Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/ae6751ba Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/ae6751ba Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/ae6751ba Branch: refs/heads/branch-0.6 Commit: ae6751bafbd06446081b1fbc289ce15d6ab022d8 Parents: 901960f Author: Jeff Zhang Authored: Thu Jul 28 17:36:37 2016 +0800 Committer: Mina Lee Committed: Wed Aug 10 09:44:39 2016 +0900 ---------------------------------------------------------------------- .../apache/zeppelin/spark/LogOutputStream.java | 116 +++++++++++++++++++ .../zeppelin/spark/PySparkInterpreter.java | 2 +- .../apache/zeppelin/spark/SparkInterpreter.java | 2 +- .../zeppelin/spark/SparkOutputStream.java | 19 ++- .../org/apache/zeppelin/spark/ZeppelinR.java | 2 +- .../main/resources/python/zeppelin_pyspark.py | 31 +++-- .../zeppelin/integration/SparkParagraphIT.java | 19 +++ 7 files changed, 176 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ae6751ba/spark/src/main/java/org/apache/zeppelin/spark/LogOutputStream.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/LogOutputStream.java b/spark/src/main/java/org/apache/zeppelin/spark/LogOutputStream.java new file mode 100644 index 0000000..d941cd7 --- /dev/null +++ b/spark/src/main/java/org/apache/zeppelin/spark/LogOutputStream.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.spark; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; + + +/** + * Minor modification of LogOutputStream of apache commons exec. + * LogOutputStream of apache commons exec has one issue that method flush doesn't throw IOException, + * so that SparkOutputStream can not extend it correctly. + */ +public abstract class LogOutputStream extends OutputStream { + private static final int INTIAL_SIZE = 132; + private static final int CR = 13; + private static final int LF = 10; + private final ByteArrayOutputStream buffer; + private boolean skip; + private final int level; + + public LogOutputStream() { + this(999); + } + + public LogOutputStream(int level) { + this.buffer = new ByteArrayOutputStream(132); + this.skip = false; + this.level = level; + } + + @Override + public void write(int cc) throws IOException { + byte c = (byte) cc; + if (c != 10 && c != 13) { + this.buffer.write(cc); + } else if (!this.skip) { + this.processBuffer(); + } + + this.skip = c == 13; + } + + @Override + public void flush() throws IOException { + if (this.buffer.size() > 0) { + this.processBuffer(); + } + + } + + @Override + public void close() throws IOException { + if (this.buffer.size() > 0) { + this.processBuffer(); + } + + super.close(); + } + + public int getMessageLevel() { + return this.level; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + int offset = off; + int blockStartOffset = off; + + for (int remaining = len; remaining > 0; blockStartOffset = offset) { + while (remaining > 0 && b[offset] != 10 && b[offset] != 13) { + ++offset; + --remaining; + } + + int blockLength = offset - blockStartOffset; + if (blockLength > 0) { + this.buffer.write(b, blockStartOffset, blockLength); + } + + while (remaining > 0 && (b[offset] == 10 || b[offset] == 13)) { + this.write(b[offset]); + ++offset; + --remaining; + } + } + + } + + protected void processBuffer() { + this.processLine(this.buffer.toString()); + this.buffer.reset(); + } + + protected void processLine(String line) { + this.processLine(line, this.level); + } + + protected abstract void processLine(String var1, int var2); +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ae6751ba/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index c93c55a..6b585dd 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -179,7 +179,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand cmd.addArgument(Integer.toString(port), false); cmd.addArgument(Integer.toString(getSparkInterpreter().getSparkVersion().toNumber()), false); executor = new DefaultExecutor(); - outputStream = new SparkOutputStream(); + outputStream = new SparkOutputStream(logger); PipedOutputStream ps = new PipedOutputStream(); in = null; try { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ae6751ba/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 8aec939..2322ca1 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -120,7 +120,7 @@ public class SparkInterpreter extends Interpreter { public SparkInterpreter(Properties property) { super(property); - out = new SparkOutputStream(); + out = new SparkOutputStream(logger); } public SparkInterpreter(Properties property, SparkContext sc) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ae6751ba/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java index 98a4090..e454994 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java @@ -17,17 +17,20 @@ package org.apache.zeppelin.spark; import org.apache.zeppelin.interpreter.InterpreterOutput; +import org.slf4j.Logger; import java.io.IOException; -import java.io.OutputStream; /** * InterpreterOutput can be attached / detached. */ -public class SparkOutputStream extends OutputStream { +public class SparkOutputStream extends LogOutputStream { + + public static Logger logger; InterpreterOutput interpreterOutput; - public SparkOutputStream() { + public SparkOutputStream(Logger logger) { + this.logger = logger; } public InterpreterOutput getInterpreterOutput() { @@ -40,6 +43,7 @@ public class SparkOutputStream extends OutputStream { @Override public void write(int b) throws IOException { + super.write(b); if (interpreterOutput != null) { interpreterOutput.write(b); } @@ -47,6 +51,7 @@ public class SparkOutputStream extends OutputStream { @Override public void write(byte [] b) throws IOException { + super.write(b); if (interpreterOutput != null) { interpreterOutput.write(b); } @@ -54,13 +59,20 @@ public class SparkOutputStream extends OutputStream { @Override public void write(byte [] b, int offset, int len) throws IOException { + super.write(b, offset, len); if (interpreterOutput != null) { interpreterOutput.write(b, offset, len); } } @Override + protected void processLine(String s, int i) { + logger.debug("Interpreter output:" + s); + } + + @Override public void close() throws IOException { + super.close(); if (interpreterOutput != null) { interpreterOutput.close(); } @@ -68,6 +80,7 @@ public class SparkOutputStream extends OutputStream { @Override public void flush() throws IOException { + super.flush(); if (interpreterOutput != null) { interpreterOutput.flush(); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ae6751ba/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java index e0a47b7..2648833 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java @@ -143,7 +143,7 @@ public class ZeppelinR implements ExecuteResultHandler { cmd.addArgument(Integer.toString(sparkVersion.toNumber())); executor = new DefaultExecutor(); - outputStream = new SparkOutputStream(); + outputStream = new SparkOutputStream(logger); input = new PipedOutputStream(); PipedInputStream in = new PipedInputStream(input); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ae6751ba/spark/src/main/resources/python/zeppelin_pyspark.py ---------------------------------------------------------------------- diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py index 0380afa..2e95c85 100644 --- a/spark/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/src/main/resources/python/zeppelin_pyspark.py @@ -27,19 +27,20 @@ from pyspark.storagelevel import StorageLevel from pyspark.accumulators import Accumulator, AccumulatorParam from pyspark.broadcast import Broadcast from pyspark.serializers import MarshalSerializer, PickleSerializer +import ast # for back compatibility from pyspark.sql import SQLContext, HiveContext, Row class Logger(object): def __init__(self): - self.out = "" + pass def write(self, message): intp.appendOutput(message) def reset(self): - self.out = "" + pass def flush(self): pass @@ -230,7 +231,7 @@ while True : try: stmts = req.statements().split("\n") jobGroup = req.jobGroup() - final_code = None + final_code = [] for s in stmts: if s == None: @@ -241,15 +242,27 @@ while True : if len(s_stripped) == 0 or s_stripped.startswith("#"): continue - if final_code: - final_code += "\n" + s - else: - final_code = s + final_code.append(s) if final_code: - compiledCode = compile(final_code, "", "exec") + # use exec mode to compile the statements except the last statement, + # so that the last statement's evaluation will be printed to stdout sc.setJobGroup(jobGroup, "Zeppelin") - eval(compiledCode) + code = compile('\n'.join(final_code), '', 'exec', ast.PyCF_ONLY_AST, 1) + to_run_exec, to_run_single = code.body[:-1], code.body[-1:] + + try: + for node in to_run_exec: + mod = ast.Module([node]) + code = compile(mod, '', 'exec') + exec(code) + + for node in to_run_single: + mod = ast.Interactive([node]) + code = compile(mod, '', 'single') + exec(code) + except: + raise Execution(sys.exc_info()) intp.setStatementsFinished("", False) except Py4JJavaError: http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ae6751ba/zeppelin-server/src/test/java/org/apache/zeppelin/integration/SparkParagraphIT.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/integration/SparkParagraphIT.java b/zeppelin-server/src/test/java/org/apache/zeppelin/integration/SparkParagraphIT.java index 81c7190..8c160c4 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/integration/SparkParagraphIT.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/integration/SparkParagraphIT.java @@ -140,6 +140,25 @@ public class SparkParagraphIT extends AbstractZeppelinIT { paragraph1Result.getText().toString(), CoreMatchers.equalTo("test loop 0\ntest loop 1\ntest loop 2") ); + // the last statement's evaluation result is printed + setTextOfParagraph(2, "%pyspark\\n" + + "sc.version\\n" + + "1+1"); + runParagraph(2); + try { + waitForParagraph(2, "FINISHED"); + } catch (TimeoutException e) { + waitForParagraph(2, "ERROR"); + collector.checkThat("Paragraph from SparkParagraphIT of testPySpark status: ", + "ERROR", CoreMatchers.equalTo("FINISHED") + ); + } + WebElement paragraph2Result = driver.findElement(By.xpath( + getParagraphXPath(2) + "//div[@class=\"tableDisplay\"]")); + collector.checkThat("Paragraph from SparkParagraphIT of testPySpark result: ", + paragraph2Result.getText().toString(), CoreMatchers.equalTo("2") + ); + } catch (Exception e) { handleException("Exception in SparkParagraphIT while testPySpark", e); }