zeppelin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject incubator-zeppelin git commit: [ZEPPELIN-287] Adds proper standard output capturing to FlinkInterpreter
Date Wed, 09 Sep 2015 14:37:48 GMT
Repository: incubator-zeppelin
Updated Branches:
  refs/heads/master d9aaf3570 -> 223f8720b


[ZEPPELIN-287] Adds proper standard output capturing to FlinkInterpreter

The `FlinkInterpreter` only captures output to `Console` and, thus, misses all output which
goes to `System.out`. Some of Flink's functions, such as `DataSet[T].print()`, print their
results to `System.out`. Consequently Zeppelin's interpreter misses this output. This PR fixes
this behaviour by redirecting `System.out` and by setting `System.out` as the `PrintStream`
for `Console`.

Author: Till Rohrmann <trohrmann@apache.org>

Closes #288 from tillrohrmann/fixSysoutCapturing and squashes the following commits:

9e019b8 [Till Rohrmann] [ZEPPELIN-287] Adds proper standard output capturing to FlinkInterpreter


Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/223f8720
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/223f8720
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/223f8720

Branch: refs/heads/master
Commit: 223f8720bb07471cb6560e20212fb4134235ce92
Parents: d9aaf35
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Mon Sep 7 15:03:52 2015 +0200
Committer: Lee moon soo <moon@apache.org>
Committed: Wed Sep 9 07:37:43 2015 -0700

----------------------------------------------------------------------
 .../apache/zeppelin/flink/FlinkInterpreter.java | 28 +++++++++++++++-----
 .../zeppelin/flink/FlinkInterpreterTest.java    | 17 ++++++++++++
 2 files changed, 39 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/223f8720/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
----------------------------------------------------------------------
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
index cb861ca..b106c7d 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
@@ -20,6 +20,9 @@ package org.apache.zeppelin.flink;
 import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
 import java.io.PrintWriter;
 import java.net.URL;
 import java.net.URLClassLoader;
@@ -45,8 +48,10 @@ import scala.Console;
 import scala.None;
 import scala.Option;
 import scala.Some;
+import scala.runtime.AbstractFunction0;
 import scala.tools.nsc.Settings;
 import scala.tools.nsc.interpreter.IMain;
+import scala.tools.nsc.interpreter.Results;
 import scala.tools.nsc.settings.MutableSettings.BooleanSetting;
 import scala.tools.nsc.settings.MutableSettings.PathSetting;
 
@@ -100,6 +105,9 @@ public class FlinkInterpreter extends Interpreter {
     
     imain = flinkIloop.intp();
 
+    org.apache.flink.api.scala.ExecutionEnvironment env = flinkIloop.scalaEnv();
+    env.getConfig().disableSysoutLogging();
+
     // prepare bindings
     imain.interpret("@transient var _binder = new java.util.HashMap[String, Object]()");
     binder = (Map<String, Object>) getValue("_binder");    
@@ -111,7 +119,7 @@ public class FlinkInterpreter extends Interpreter {
     
     imain.interpret("import org.apache.flink.api.scala._");
     imain.interpret("import org.apache.flink.api.common.functions._");
-    imain.bindValue("env", flinkIloop.scalaEnv());
+    imain.bindValue("env", env);
   }
 
   private boolean localMode() {
@@ -232,7 +240,7 @@ public class FlinkInterpreter extends Interpreter {
   }
 
   public InterpreterResult interpret(String[] lines, InterpreterContext context) {
-    IMain imain = flinkIloop.intp();
+    final IMain imain = flinkIloop.intp();
     
     String[] linesToRun = new String[lines.length + 1];
     for (int i = 0; i < lines.length; i++) {
@@ -240,13 +248,13 @@ public class FlinkInterpreter extends Interpreter {
     }
     linesToRun[lines.length] = "print(\"\")";
 
-    Console.setOut(out);
+    System.setOut(new PrintStream(out));
     out.reset();
     Code r = null;
 
     String incomplete = "";
     for (int l = 0; l < linesToRun.length; l++) {
-      String s = linesToRun[l];      
+      final String s = linesToRun[l];
       // check if next line starts with "." (but not ".." or "./") it is treated as an invocation
       if (l + 1 < linesToRun.length) {
         String nextLine = linesToRun[l + 1].trim();
@@ -256,9 +264,18 @@ public class FlinkInterpreter extends Interpreter {
         }
       }
 
+      final String currentCommand = incomplete;
+
       scala.tools.nsc.interpreter.Results.Result res = null;
       try {
-        res = imain.interpret(incomplete + s);
+        res = Console.withOut(
+          System.out,
+          new AbstractFunction0<Results.Result>() {
+            @Override
+            public Results.Result apply() {
+              return imain.interpret(currentCommand + s);
+            }
+          });
       } catch (Exception e) {
         logger.info("Interpreter exception", e);
         return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e));
@@ -328,5 +345,4 @@ public class FlinkInterpreter extends Interpreter {
   static final String toString(Object o) {
     return (o instanceof String) ? (String) o : "";
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/223f8720/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
index d0eda26..3168f04 100644
--- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
+++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
@@ -17,8 +17,10 @@
  */
 package org.apache.zeppelin.flink;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 
+import java.util.Arrays;
 import java.util.Properties;
 
 import org.apache.zeppelin.interpreter.InterpreterContext;
@@ -55,6 +57,13 @@ public class FlinkInterpreterTest {
   }
 
   @Test
+  public void testSimpleStatementWithSystemOutput() {
+    InterpreterResult result = flink.interpret("val a=1", context);
+    result = flink.interpret("System.out.print(a)", context);
+    assertEquals("1", result.message());
+  }
+
+  @Test
   public void testNextlineInvoke() {
     InterpreterResult result = flink.interpret("\"123\"\n  .toInt", context);
     assertEquals("res0: Int = 123\n", result.message());    
@@ -66,5 +75,13 @@ public class FlinkInterpreterTest {
     flink.interpret("val counts = text.flatMap { _.toLowerCase.split(\" \") }.map { (_, 1)
}.groupBy(0).sum(1)", context);
     InterpreterResult result = flink.interpret("counts.print()", context);
     assertEquals(Code.SUCCESS, result.code());
+
+    String[] expectedCounts = {"(to,2)", "(be,2)", "(or,1)", "(not,1)"};
+    Arrays.sort(expectedCounts);
+
+    String[] counts = result.message().split("\n");
+    Arrays.sort(counts);
+
+    assertArrayEquals(expectedCounts, counts);
   }
 }


Mime
View raw message