zeppelin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject [6/7] zeppelin git commit: [ZEPPELIN-212] Multiple paragraph results
Date Thu, 01 Dec 2016 01:25:18 GMT
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/850fd81a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterMatplotlibTest.java
----------------------------------------------------------------------
diff --git a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterMatplotlibTest.java b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterMatplotlibTest.java
index 03868c4..75604dc 100644
--- a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterMatplotlibTest.java
+++ b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterMatplotlibTest.java
@@ -18,10 +18,6 @@
 
 package org.apache.zeppelin.python;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
 import java.util.*;
 
 import org.apache.zeppelin.display.AngularObjectRegistry;
@@ -38,6 +34,8 @@ import org.apache.zeppelin.user.AuthenticationInfo;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.junit.Assert.*;
+
 /**
  * In order for this test to work, test env must have installed:
  * <ol>
@@ -79,22 +77,18 @@ public class PythonInterpreterMatplotlibTest {
     context = new InterpreterContext("note", "id", null, "title", "text", new AuthenticationInfo(),
         new HashMap<String, Object>(), new GUI(),
         new AngularObjectRegistry(intpGroup.getId(), null), null,
-        new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(
-            new InterpreterOutputListener() {
-              @Override public void onAppend(InterpreterOutput out, byte[] line) {}
-              @Override public void onUpdate(InterpreterOutput out, byte[] output) {}
-            }));
+        new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(null));
   }
 
   @Test
   public void dependenciesAreInstalled() {
     // matplotlib
     InterpreterResult ret = python.interpret("import matplotlib", context);
-    assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
+    assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
     
     // inline backend
     ret = python.interpret("import backend_zinline", context);
-    assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
+    assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
   }
 
   @Test
@@ -106,10 +100,10 @@ public class PythonInterpreterMatplotlibTest {
     ret = python.interpret("plt.plot([1, 2, 3])", context);
     ret = python.interpret("plt.show()", context);
 
-    assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
-    assertEquals(ret.message(), Type.HTML, ret.type());
-    assertTrue(ret.message().contains("data:image/png;base64"));
-    assertTrue(ret.message().contains("<div>"));
+    assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.SUCCESS, ret.code());
+    assertEquals(ret.message().get(0).getData(), Type.HTML, ret.message().get(0).getType());
+    assertTrue(ret.message().get(0).getData().contains("data:image/png;base64"));
+    assertTrue(ret.message().get(0).getData().contains("<div>"));
   }
 
   @Test
@@ -128,15 +122,14 @@ public class PythonInterpreterMatplotlibTest {
     // of FigureManager, causing show() to return before setting the output
     // type to HTML.
     ret = python.interpret("plt.show()", context);
-    assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
-    assertEquals(ret.message(), Type.TEXT, ret.type());
-    assertTrue(ret.message().equals(""));
+    assertEquals(0, ret.message().size());
     
     // Now test that new plot is drawn. It should be identical to the
     // previous one.
     ret = python.interpret("plt.plot([1, 2, 3])", context);
     ret2 = python.interpret("plt.show()", context);
-    assertTrue(ret1.message().equals(ret2.message()));
+    assertEquals(ret1.message().get(0).getType(), ret2.message().get(0).getType());
+    assertEquals(ret1.message().get(0).getData(), ret2.message().get(0).getData());
   }
   
   @Test
@@ -155,15 +148,13 @@ public class PythonInterpreterMatplotlibTest {
     // of FigureManager, causing show() to set the output
     // type to HTML even though the figure is inactive.
     ret = python.interpret("plt.show()", context);
-    assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
-    assertEquals(ret.message(), Type.HTML, ret.type());
-    assertTrue(ret.message().equals(""));
+    assertEquals("", ret.message().get(0).getData());
     
     // Now test that plot can be reshown if it is updated. It should be
     // different from the previous one because it will plot the same line
     // again but in a different color.
     ret = python.interpret("plt.plot([1, 2, 3])", context);
     ret2 = python.interpret("plt.show()", context);
-    assertTrue(!ret1.message().equals(ret2.message()));
+    assertNotSame(ret1.message().get(0).getData(), ret2.message().get(0).getData());
   }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/850fd81a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java
----------------------------------------------------------------------
diff --git a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java
index 7fbe1d7..86fb22b 100644
--- a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java
+++ b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java
@@ -81,11 +81,7 @@ public class PythonInterpreterPandasSqlTest {
     context = new InterpreterContext("note", "id", null, "title", "text", new AuthenticationInfo(),
         new HashMap<String, Object>(), new GUI(),
         new AngularObjectRegistry(intpGroup.getId(), null), null,
-        new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(
-            new InterpreterOutputListener() {
-              @Override public void onAppend(InterpreterOutput out, byte[] line) {}
-              @Override public void onUpdate(InterpreterOutput out, byte[] output) {}
-            }));
+        new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(null));
 
     //important to be last step
     sql.open();
@@ -95,7 +91,7 @@ public class PythonInterpreterPandasSqlTest {
   @Test
   public void dependenciesAreInstalled() {
     InterpreterResult ret = python.interpret("import pandas\nimport pandasql\nimport numpy\n", context);
-    assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
+    assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
   }
 
   @Test
@@ -105,15 +101,15 @@ public class PythonInterpreterPandasSqlTest {
     ret = python.interpret(
         "pysqldf = lambda q: print('Can not execute SQL as Python dependency is not installed')",
          context);
-    assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
+    assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
 
     // when
     ret = sql.interpret("SELECT * from something", context);
 
     // then
     assertNotNull(ret);
-    assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
-    assertTrue(ret.message().contains("dependency is not installed"));
+    assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.SUCCESS, ret.code());
+    assertTrue(ret.message().get(0).getData().contains("dependency is not installed"));
   }
 
   @Test
@@ -126,17 +122,17 @@ public class PythonInterpreterPandasSqlTest {
     // DataFrame df2 \w test data
     ret = python.interpret("df2 = pd.DataFrame({ 'age'  : np.array([33, 51, 51, 34]), "+
                            "'name' : pd.Categorical(['moon','jobs','gates','park'])})", context);
-    assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
+    assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
 
     //when
     ret = sql.interpret("select name, age from df2 where age < 40", context);
 
     //then
-    assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
-    assertEquals(ret.message(), Type.TABLE, ret.type());
+    assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.SUCCESS, ret.code());
+    assertEquals(ret.message().get(0).getData(), Type.TABLE, ret.message().get(0).getType());
     //assertEquals(expectedTable, ret.message()); //somehow it's same but not equal
-    assertTrue(ret.message().indexOf("moon\t33") > 0);
-    assertTrue(ret.message().indexOf("park\t34") > 0);
+    assertTrue(ret.message().get(0).getData().indexOf("moon\t33") > 0);
+    assertTrue(ret.message().get(0).getData().indexOf("park\t34") > 0);
 
     assertEquals(InterpreterResult.Code.SUCCESS, sql.interpret("select case when name==\"aa\" then name else name end from df2", context).code());
   }
@@ -150,7 +146,7 @@ public class PythonInterpreterPandasSqlTest {
     assertNotNull("Interpreter returned 'null'", ret);
     //System.out.println("\nInterpreter response: \n" + ret.message());
     assertEquals(ret.toString(), InterpreterResult.Code.ERROR, ret.code());
-    assertTrue(ret.message().length() > 0);
+    assertTrue(ret.message().get(0).getData().length() > 0);
   }
 
   @Test
@@ -163,17 +159,17 @@ public class PythonInterpreterPandasSqlTest {
     ret = python.interpret("index = pd.Index([10, 11, 12, 13], name='index_name')", context);
     ret = python.interpret("d1 = {1 : [np.nan, 1, 2, 3], 'two' : [3., 4., 5., 6.7]}", context);
     ret = python.interpret("df1 = pd.DataFrame(d1, index=index)", context);
-    assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
+    assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
 
     // when
     ret = python.interpret("z.show(df1, show_index=True)", context);
 
     // then
-    assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
-    assertEquals(ret.message(), Type.TABLE, ret.type());
-    assertTrue(ret.message().indexOf("index_name") == 0);
-    assertTrue(ret.message().indexOf("13") > 0);
-    assertTrue(ret.message().indexOf("nan") > 0);
-    assertTrue(ret.message().indexOf("6.7") > 0);
+    assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.SUCCESS, ret.code());
+    assertEquals(ret.message().get(0).getData(), Type.TABLE, ret.message().get(0).getType());
+    assertTrue(ret.message().get(0).getData().indexOf("index_name") == 0);
+    assertTrue(ret.message().get(0).getData().indexOf("13") > 0);
+    assertTrue(ret.message().get(0).getData().indexOf("nan") > 0);
+    assertTrue(ret.message().get(0).getData().indexOf("6.7") > 0);
   }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/850fd81a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java
----------------------------------------------------------------------
diff --git a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java
index 52a6914..e7204ef 100644
--- a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java
+++ b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java
@@ -169,7 +169,7 @@ public class PythonInterpreterTest {
     cmdHistory = "";
     InterpreterResult result = pythonInterpreter.interpret("print a", null);
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-    assertEquals("%text print a", result.toString());
+    assertEquals("%text print a", result.message().get(0).toString());
   }
 
   /**
@@ -250,13 +250,13 @@ public class PythonInterpreterTest {
 
     System.err.println("ret = '" + ret + "'");
     assertEquals(InterpreterResult.Code.ERROR, ret.code());
-    assertTrue(ret.message().length() > 0);
+    assertTrue(ret.message().get(0).getData().length() > 0);
 
     assertNotNull("Interpreter result for text is Null", ret);
     String codePrintText = "print (\"Exception(\\\"test exception\\\")\")";
     ret = pythonInterpreter.interpret(codePrintText, null);
     assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
-    assertTrue(ret.message().length() > 0);
+    assertTrue(ret.message().get(0).getData().length() > 0);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/850fd81a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterWithPythonInstalledTest.java
----------------------------------------------------------------------
diff --git a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterWithPythonInstalledTest.java b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterWithPythonInstalledTest.java
index 9cd4d6f..7b889ad 100644
--- a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterWithPythonInstalledTest.java
+++ b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterWithPythonInstalledTest.java
@@ -64,7 +64,7 @@ public class PythonInterpreterWithPythonInstalledTest {
     assertNotNull("Interpreter returned 'null'", ret);
     //System.out.println("\nInterpreter response: \n" + ret.message());
     assertEquals(InterpreterResult.Code.ERROR, ret.code());
-    assertTrue(ret.message().length() > 0);
+    assertTrue(ret.message().get(0).getData().length() > 0);
 
     realPython.close();
   }
@@ -86,7 +86,7 @@ public class PythonInterpreterWithPythonInstalledTest {
     assertNotNull("Interpreter returned 'null'", ret);
     //System.out.println("\nInterpreter response: \n" + ret.message());
     assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
-    assertTrue(ret.message().length() > 0);
+    assertTrue(ret.message().get(0).getData().length() > 0);
 
     realPython.close();
   }
@@ -107,7 +107,7 @@ public class PythonInterpreterWithPythonInstalledTest {
     //then
     //System.out.println("\nInterpreter response: \n" + ret.message());
     assertEquals(InterpreterResult.Code.SUCCESS, ret1.code());
-    assertEquals("...\n", ret1.message());
+    assertEquals("...\n", ret1.message().get(0).getData());
 
 
     InterpreterResult ret2 = realPython.interpret("for i in range(5):", null);
@@ -117,7 +117,7 @@ public class PythonInterpreterWithPythonInstalledTest {
     assertEquals("   File \"<stdin>\", line 2\n" +
             "    \n" +
             "    ^\n" +
-            "IndentationError: expected an indented block\n", ret2.message());
+            "IndentationError: expected an indented block\n", ret2.message().get(0).getData());
 
     realPython.close();
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/850fd81a/r/src/test/scala/org/apache/zeppelin/rinterpreter/WrapperTest.scala
----------------------------------------------------------------------
diff --git a/r/src/test/scala/org/apache/zeppelin/rinterpreter/WrapperTest.scala b/r/src/test/scala/org/apache/zeppelin/rinterpreter/WrapperTest.scala
index 43fb0d2..a85cfe6 100644
--- a/r/src/test/scala/org/apache/zeppelin/rinterpreter/WrapperTest.scala
+++ b/r/src/test/scala/org/apache/zeppelin/rinterpreter/WrapperTest.scala
@@ -77,10 +77,9 @@ class WrapperTest extends FlatSpec {
         |2 + 2
         |```
       """.stripMargin, null)
-    withClue(result.message()) {
+    withClue(result.message().get(0).getData()) {
       result should have (
-      'code (InterpreterResult.Code.SUCCESS),
-      'type (InterpreterResult.Type.HTML)
+      'code (InterpreterResult.Code.SUCCESS)
       )
     }
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/850fd81a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
----------------------------------------------------------------------
diff --git a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
index eb57b14..8a23c42 100644
--- a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
+++ b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
@@ -108,7 +108,7 @@ public class ScaldingInterpreterTest {
     // when interpret incomplete expression
     InterpreterResult incomplete = repl.interpret("val a = \"\"\"", context);
     assertEquals(InterpreterResult.Code.INCOMPLETE, incomplete.code());
-    assertTrue(incomplete.message().length() > 0); // expecting some error
+    assertTrue(incomplete.message().get(0).getData().length() > 0); // expecting some error
                                                    // message
   }
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/850fd81a/scio/src/test/java/org/apache/zeppelin/scio/ScioInterpreterTest.java
----------------------------------------------------------------------
diff --git a/scio/src/test/java/org/apache/zeppelin/scio/ScioInterpreterTest.java b/scio/src/test/java/org/apache/zeppelin/scio/ScioInterpreterTest.java
index 37733a1..ec17879 100644
--- a/scio/src/test/java/org/apache/zeppelin/scio/ScioInterpreterTest.java
+++ b/scio/src/test/java/org/apache/zeppelin/scio/ScioInterpreterTest.java
@@ -47,12 +47,7 @@ public class ScioInterpreterTest {
         new AngularObjectRegistry(intpGroup.getId(), null),
         new LocalResourcePool("id"),
         new LinkedList<InterpreterContextRunner>(),
-        new InterpreterOutput(new InterpreterOutputListener() {
-          @Override
-          public void onAppend(InterpreterOutput out, byte[] line) {}
-          @Override
-          public void onUpdate(InterpreterOutput out, byte[] output) {}
-        }));
+        new InterpreterOutput(null));
   }
 
   @Before
@@ -79,14 +74,14 @@ public class ScioInterpreterTest {
   public void testBasicSyntaxError() {
     InterpreterResult error = repl.interpret("val a:Int = 'ds'", context);
     assertEquals(InterpreterResult.Code.ERROR, error.code());
-    assertEquals("Interpreter error", error.message());
+    assertEquals("Interpreter error", error.message().get(0).getData());
   }
 
   @Test
   public void testBasicIncomplete() {
     InterpreterResult incomplete = repl.interpret("val a = \"\"\"", context);
     assertEquals(InterpreterResult.Code.INCOMPLETE, incomplete.code());
-    assertEquals("Incomplete expression", incomplete.message());
+    assertEquals("Incomplete expression", incomplete.message().get(0).getData());
   }
 
   @Test
@@ -112,7 +107,7 @@ public class ScioInterpreterTest {
     InterpreterResult exception = repl.interpret("val (sc, _) = ContextAndArgs(argz)" + newline
         + "throw new Exception(\"test\")", context);
     assertEquals(InterpreterResult.Code.ERROR, exception.code());
-    assertTrue(exception.message().length() > 0);
+    assertTrue(exception.message().get(0).getData().length() > 0);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/850fd81a/shell/src/test/java/org/apache/zeppelin/shell/ShellInterpreterTest.java
----------------------------------------------------------------------
diff --git a/shell/src/test/java/org/apache/zeppelin/shell/ShellInterpreterTest.java b/shell/src/test/java/org/apache/zeppelin/shell/ShellInterpreterTest.java
index a796ac5..e52253f 100644
--- a/shell/src/test/java/org/apache/zeppelin/shell/ShellInterpreterTest.java
+++ b/shell/src/test/java/org/apache/zeppelin/shell/ShellInterpreterTest.java
@@ -72,7 +72,7 @@ public class ShellInterpreterTest {
       result = shell.interpret("invalid_command\nls",context);
     }
     assertEquals(InterpreterResult.Code.SUCCESS,result.code());
-    assertTrue(result.message().contains("invalid_command"));
+    assertTrue(result.message().get(0).getData().contains("invalid_command"));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/850fd81a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index 9701231..58f17e9 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -45,14 +45,8 @@ import org.apache.commons.exec.environment.EnvironmentUtils;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SQLContext;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType;
-import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
-import org.apache.zeppelin.interpreter.WrappedInterpreter;
 import org.apache.zeppelin.interpreter.InterpreterResult.Code;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
@@ -342,10 +336,10 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
       }
     }
 
-    String errorMessage = "";
+    List<InterpreterResultMessage> errorMessage;
     try {
       context.out.flush();
-      errorMessage = new String(context.out.toByteArray());
+      errorMessage = context.out.toInterpreterResultMessage();
     } catch (IOException e) {
       throw new InterpreterException(e);
     }
@@ -353,18 +347,22 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
 
     if (pythonscriptRunning == false) {
       // python script failed to initialize and terminated
-      return new InterpreterResult(Code.ERROR, "failed to start pyspark"
-          + errorMessage);
+      errorMessage.add(new InterpreterResultMessage(
+          InterpreterResult.Type.TEXT, "failed to start pyspark"));
+      return new InterpreterResult(Code.ERROR, errorMessage);
     }
     if (pythonScriptInitialized == false) {
       // timeout. didn't get initialized message
-      return new InterpreterResult(Code.ERROR, "pyspark is not responding "
-          + errorMessage);
+      errorMessage.add(new InterpreterResultMessage(
+          InterpreterResult.Type.TEXT, "pyspark is not responding"));
+      return new InterpreterResult(Code.ERROR, errorMessage);
     }
 
     if (!sparkInterpreter.getSparkVersion().isPysparkSupported()) {
-      return new InterpreterResult(Code.ERROR, "pyspark "
-          + sparkInterpreter.getSparkContext().version() + " is not supported");
+      errorMessage.add(new InterpreterResultMessage(
+          InterpreterResult.Type.TEXT,
+          "pyspark " + sparkInterpreter.getSparkContext().version() + " is not supported"));
+      return new InterpreterResult(Code.ERROR, errorMessage);
     }
     String jobGroup = sparkInterpreter.getJobGroup(context);
     ZeppelinContext z = sparkInterpreter.getZeppelinContext();
@@ -460,10 +458,8 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
       if (statementError) {
         return new LinkedList<>();
       }
-      InterpreterResult completionResult;
-      completionResult = new InterpreterResult(Code.SUCCESS, statementOutput);
       Gson gson = new Gson();
-      completionList = gson.fromJson(completionResult.message(), String[].class);
+      completionList = gson.fromJson(statementOutput, String[].class);
     }
     //end code for completion
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/850fd81a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java
index 98c6de3..95bc587 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java
@@ -22,6 +22,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterOutput;
 import org.apache.zeppelin.interpreter.InterpreterOutputListener;
+import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
 import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -158,16 +159,7 @@ public class ZeppelinR implements ExecuteResultHandler {
     Map env = EnvironmentUtils.getProcEnvironment();
 
 
-    initialOutput = new InterpreterOutput(new InterpreterOutputListener() {
-      @Override
-      public void onAppend(InterpreterOutput out, byte[] line) {
-        logger.debug(new String(line));
-      }
-
-      @Override
-      public void onUpdate(InterpreterOutput out, byte[] output) {
-      }
-    });
+    initialOutput = new InterpreterOutput(null);
     outputStream.setInterpreterOutput(initialOutput);
     executor.execute(cmd, env, this);
     rScriptRunning = true;

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/850fd81a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java
index 1818638..abad8e7 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java
@@ -18,14 +18,7 @@
 package org.apache.zeppelin.spark;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterContextRunner;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterOutputListener;
-import org.apache.zeppelin.interpreter.InterpreterOutput;
-import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.interpreter.InterpreterResult.Type;
 import org.apache.zeppelin.resource.LocalResourcePool;
 import org.apache.zeppelin.user.AuthenticationInfo;
@@ -40,6 +33,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Properties;
 
 import static org.junit.Assert.*;
@@ -70,35 +64,18 @@ public class PySparkInterpreterMatplotlibTest {
      */    
     @Override
     public InterpreterResult interpret(String st, InterpreterContext context) {
+      context.out.clear();
       InterpreterResult result = super.interpret(st, context);
-      String message = "";
-      Type outputType;
-      byte[] interpreterOutput;
+      List<InterpreterResultMessage> resultMessages = null;
       try {
         context.out.flush();
-        outputType = context.out.getType();
-        interpreterOutput = context.out.toByteArray();
+        resultMessages = context.out.toInterpreterResultMessage();
       } catch (IOException e) {
-        throw new InterpreterException(e);
-      }
-      
-
-      if (interpreterOutput != null && interpreterOutput.length > 0) {
-        message = new String(interpreterOutput);
+        e.printStackTrace();
       }
+      resultMessages.addAll(result.message());
 
-      String interpreterResultMessage = result.message();
-
-      InterpreterResult combinedResult;
-      if (interpreterResultMessage != null && !interpreterResultMessage.isEmpty()) {
-        message += interpreterResultMessage;
-        combinedResult = new InterpreterResult(result.code(), result.type(), message);
-      } else {
-        combinedResult = new InterpreterResult(result.code(), outputType, message);
-      }
-      
-      context.out.clear();
-      return combinedResult;      
+      return new InterpreterResult(result.code(), resultMessages);
     }
   }
 
@@ -157,17 +134,7 @@ public class PySparkInterpreterMatplotlibTest {
       new AngularObjectRegistry(intpGroup.getId(), null),
       new LocalResourcePool("id"),
       new LinkedList<InterpreterContextRunner>(),
-      new InterpreterOutput(new InterpreterOutputListener() {
-        @Override
-        public void onAppend(InterpreterOutput out, byte[] line) {
-
-        }
-
-        @Override
-        public void onUpdate(InterpreterOutput out, byte[] output) {
-
-        }
-      }));
+      new InterpreterOutput(null));
   }
 
   @After
@@ -192,11 +159,11 @@ public class PySparkInterpreterMatplotlibTest {
   public void dependenciesAreInstalled() {
     // matplotlib
     InterpreterResult ret = pyspark.interpret("import matplotlib", context);
-    assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
+    assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
     
     // inline backend
     ret = pyspark.interpret("import backend_zinline", context);
-    assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
+    assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
   }
 
   @Test
@@ -209,10 +176,10 @@ public class PySparkInterpreterMatplotlibTest {
     ret = pyspark.interpret("plt.plot([1, 2, 3])", context);
     ret = pyspark.interpret("plt.show()", context);
 
-    assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
-    assertEquals(ret.message(), Type.HTML, ret.type());
-    assertTrue(ret.message().contains("data:image/png;base64"));
-    assertTrue(ret.message().contains("<div>"));
+    assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
+    assertEquals(ret.message().toString(), Type.HTML, ret.message().get(0).getType());
+    assertTrue(ret.message().get(0).getData().contains("data:image/png;base64"));
+    assertTrue(ret.message().get(0).getData().contains("<div>"));
   }
 
   @Test
@@ -232,15 +199,14 @@ public class PySparkInterpreterMatplotlibTest {
     // of FigureManager, causing show() to return before setting the output
     // type to HTML.
     ret = pyspark.interpret("plt.show()", context);
-    assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
-    assertEquals(ret.message(), Type.TEXT, ret.type());
-    assertTrue(ret.message().equals(""));
+    assertEquals(0, ret.message().size());
     
     // Now test that new plot is drawn. It should be identical to the
     // previous one.
     ret = pyspark.interpret("plt.plot([1, 2, 3])", context);
     ret2 = pyspark.interpret("plt.show()", context);
-    assertTrue(ret1.message().equals(ret2.message()));
+    assertEquals(ret1.message().get(0).getType(), ret2.message().get(0).getType());
+    assertEquals(ret1.message().get(0).getData(), ret2.message().get(0).getData());
   }
   
   @Test
@@ -260,16 +226,14 @@ public class PySparkInterpreterMatplotlibTest {
     // of FigureManager, causing show() to set the output
     // type to HTML even though the figure is inactive.
     ret = pyspark.interpret("plt.show()", context);
-    assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
-    assertEquals(ret.message(), Type.HTML, ret.type());
-    assertTrue(ret.message().equals(""));
-    
+    assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
+
     // Now test that plot can be reshown if it is updated. It should be
     // different from the previous one because it will plot the same line
     // again but in a different color.
     ret = pyspark.interpret("plt.plot([1, 2, 3])", context);
     ret2 = pyspark.interpret("plt.show()", context);
-    assertTrue(!ret1.message().equals(ret2.message()));
+    assertNotSame(ret1.message().get(1).getData(), ret2.message().get(1).getData());
   }
   
   @Test
@@ -281,8 +245,8 @@ public class PySparkInterpreterMatplotlibTest {
     ret = pyspark.interpret("z.configure_mpl(interactive=False, close=False, angular=True)", context);
     ret = pyspark.interpret("plt.plot([1, 2, 3])", context);
     ret = pyspark.interpret("plt.show()", context);    
-    assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
-    assertEquals(ret.message(), Type.ANGULAR, ret.type());
+    assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
+    assertEquals(ret.message().toString(), Type.ANGULAR, ret.message().get(1).getType());
 
     // Check if the figure data is in the Angular Object Registry
     AngularObjectRegistry registry = context.getAngularObjectRegistry();

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/850fd81a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
index 85cc46e..64e4abd 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
@@ -18,13 +18,7 @@
 package org.apache.zeppelin.spark;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterContextRunner;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterOutputListener;
-import org.apache.zeppelin.interpreter.InterpreterOutput;
-import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 import org.apache.zeppelin.resource.LocalResourcePool;
 import org.apache.zeppelin.user.AuthenticationInfo;
@@ -107,17 +101,7 @@ public class PySparkInterpreterTest {
       new AngularObjectRegistry(intpGroup.getId(), null),
       new LocalResourcePool("id"),
       new LinkedList<InterpreterContextRunner>(),
-      new InterpreterOutput(new InterpreterOutputListener() {
-        @Override
-        public void onAppend(InterpreterOutput out, byte[] line) {
-
-        }
-
-        @Override
-        public void onUpdate(InterpreterOutput out, byte[] output) {
-
-        }
-      }));
+      new InterpreterOutput(null));
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/850fd81a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
index fe127a6..402962d 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
@@ -99,17 +99,7 @@ public class SparkInterpreterTest {
         new AngularObjectRegistry(intpGroup.getId(), null),
         new LocalResourcePool("id"),
         new LinkedList<InterpreterContextRunner>(),
-        new InterpreterOutput(new InterpreterOutputListener() {
-          @Override
-          public void onAppend(InterpreterOutput out, byte[] line) {
-
-          }
-
-          @Override
-          public void onUpdate(InterpreterOutput out, byte[] output) {
-
-          }
-        }));
+        new InterpreterOutput(null));
   }
 
   @After
@@ -138,7 +128,7 @@ public class SparkInterpreterTest {
     // when interpret incomplete expression
     InterpreterResult incomplete = repl.interpret("val a = \"\"\"", context);
     assertEquals(InterpreterResult.Code.INCOMPLETE, incomplete.code());
-    assertTrue(incomplete.message().length() > 0); // expecting some error
+    assertTrue(incomplete.message().get(0).getData().length() > 0); // expecting some error
                                                    // message
 
     /*

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/850fd81a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
index 303a54d..5e5d32c 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
@@ -79,17 +79,7 @@ public class SparkSqlInterpreterTest {
         new HashMap<String, Object>(), new GUI(),
         new AngularObjectRegistry(intpGroup.getId(), null),
         new LocalResourcePool("id"),
-        new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(new InterpreterOutputListener() {
-      @Override
-      public void onAppend(InterpreterOutput out, byte[] line) {
-
-      }
-
-      @Override
-      public void onUpdate(InterpreterOutput out, byte[] output) {
-
-      }
-    }));
+        new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(null));
   }
 
   @After
@@ -112,12 +102,12 @@ public class SparkSqlInterpreterTest {
 
     InterpreterResult ret = sql.interpret("select name, age from test where age < 40", context);
     assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
-    assertEquals(Type.TABLE, ret.type());
-    assertEquals("name\tage\nmoon\t33\npark\t34\n", ret.message());
+    assertEquals(Type.TABLE, ret.message().get(0).getType());
+    assertEquals("name\tage\nmoon\t33\npark\t34\n", ret.message().get(0).getData());
 
     ret = sql.interpret("select wrong syntax", context);
     assertEquals(InterpreterResult.Code.ERROR, ret.code());
-    assertTrue(ret.message().length() > 0);
+    assertTrue(ret.message().get(0).getData().length() > 0);
 
     assertEquals(InterpreterResult.Code.SUCCESS, sql.interpret("select case when name==\"aa\" then name else name end from test", context).code());
   }
@@ -173,7 +163,7 @@ public class SparkSqlInterpreterTest {
         "select name, age from people where name = 'gates'", context);
     System.err.println("RET=" + ret.message());
     assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
-    assertEquals(Type.TABLE, ret.type());
-    assertEquals("name\tage\ngates\tnull\n", ret.message());
+    assertEquals(Type.TABLE, ret.message().get(0).getType());
+    assertEquals("name\tage\ngates\tnull\n", ret.message().get(0).getData());
   }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/850fd81a/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularElemTest.scala
----------------------------------------------------------------------
diff --git a/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularElemTest.scala b/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularElemTest.scala
index 196e5cc..de8c6d0 100644
--- a/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularElemTest.scala
+++ b/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularElemTest.scala
@@ -38,15 +38,7 @@ trait AbstractAngularElemTest
       new AngularObjectRegistry(intpGroup.getId(), null),
       null,
       new util.LinkedList[InterpreterContextRunner](),
-      new InterpreterOutput(new InterpreterOutputListener() {
-        override def onAppend(out: InterpreterOutput, line: Array[Byte]): Unit = {
-          // nothing to do
-        }
-
-        override def onUpdate(out: InterpreterOutput, output: Array[Byte]): Unit = {
-          // nothing to do
-        }
-      }))
+      new InterpreterOutput(null));
 
     InterpreterContext.set(context)
     super.beforeEach() // To be stackable, must call super.beforeEach

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/850fd81a/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularModelTest.scala
----------------------------------------------------------------------
diff --git a/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularModelTest.scala b/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularModelTest.scala
index dba2a33..0ab52ec 100644
--- a/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularModelTest.scala
+++ b/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularModelTest.scala
@@ -34,15 +34,7 @@ with BeforeAndAfter with BeforeAndAfterEach with Eventually with Matchers {
         intpGroup.getId(), null),
       null,
       new java.util.LinkedList[InterpreterContextRunner](),
-      new InterpreterOutput(new InterpreterOutputListener() {
-        override def onAppend(out: InterpreterOutput, line: Array[Byte]): Unit = {
-          // nothing to do
-        }
-
-        override def onUpdate(out: InterpreterOutput, output: Array[Byte]): Unit = {
-          // nothing to do
-        }
-      }))
+      new InterpreterOutput(null));
 
     InterpreterContext.set(context)
     super.beforeEach() // To be stackable, must call super.beforeEach

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/850fd81a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistry.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistry.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistry.java
index d954294..6b2bfac 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistry.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistry.java
@@ -162,7 +162,7 @@ public class AngularObjectRegistry {
       Map<String, AngularObject> r = getRegistryForKey(noteId, paragraphId);
       AngularObject o = r.remove(name);
       if (listener != null && emit) {
-        listener.onRemove(interpreterId, name, noteId, paragraphId);;
+        listener.onRemove(interpreterId, name, noteId, paragraphId);
       }
       return o;
     }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/850fd81a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/helium/ApplicationEventListener.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/helium/ApplicationEventListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/helium/ApplicationEventListener.java
index f89bb21..eda907a 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/helium/ApplicationEventListener.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/helium/ApplicationEventListener.java
@@ -16,12 +16,17 @@
  */
 package org.apache.zeppelin.helium;
 
+import org.apache.zeppelin.interpreter.InterpreterResult;
+
 /**
  * Event from HeliumApplication running on remote interpreter process
  */
 public interface ApplicationEventListener {
-  public void onOutputAppend(String noteId, String paragraphId, String appId, String output);
-  public void onOutputUpdated(String noteId, String paragraphId, String appId, String output);
+  public void onOutputAppend(
+      String noteId, String paragraphId, int index, String appId, String output);
+  public void onOutputUpdated(
+      String noteId, String paragraphId, int index, String appId,
+      InterpreterResult.Type type, String output);
   public void onLoad(String noteId, String paragraphId, String appId, HeliumPackage pkg);
   public void onStatusChange(String noteId, String paragraphId, String appId, String status);
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/850fd81a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
index 5a2e0ca..abdde8c 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
@@ -35,87 +35,200 @@ import java.util.concurrent.ConcurrentHashMap;
 public class InterpreterOutput extends OutputStream {
   Logger logger = LoggerFactory.getLogger(InterpreterOutput.class);
   private final int NEW_LINE_CHAR = '\n';
+
+  private List<InterpreterResultMessageOutput> resultMessageOutputs = new LinkedList<>();
+  private InterpreterResultMessageOutput currentOut;
   private List<String> resourceSearchPaths = Collections.synchronizedList(new LinkedList<String>());
 
   ByteArrayOutputStream buffer = new ByteArrayOutputStream();
 
-  private final List<Object> outList = new LinkedList<>();
-  private InterpreterOutputChangeWatcher watcher;
   private final InterpreterOutputListener flushListener;
-  private InterpreterResult.Type type = InterpreterResult.Type.TEXT;
-  private boolean firstWrite = true;
+  private final InterpreterOutputChangeListener changeListener;
 
   public InterpreterOutput(InterpreterOutputListener flushListener) {
     this.flushListener = flushListener;
+    changeListener = null;
     clear();
   }
 
   public InterpreterOutput(InterpreterOutputListener flushListener,
                            InterpreterOutputChangeListener listener) throws IOException {
     this.flushListener = flushListener;
+    this.changeListener = listener;
     clear();
-    watcher = new InterpreterOutputChangeWatcher(listener);
-    watcher.start();
   }
 
-  public InterpreterResult.Type getType() {
-    return type;
+  public void setType(InterpreterResult.Type type) throws IOException {
+    InterpreterResultMessageOutput out = null;
+
+    synchronized (resultMessageOutputs) {
+      int index = resultMessageOutputs.size();
+      InterpreterResultMessageOutputListener listener =
+          createInterpreterResultMessageOutputListener(index);
+
+      if (changeListener == null) {
+        out = new InterpreterResultMessageOutput(type, listener);
+      } else {
+        out = new InterpreterResultMessageOutput(type, listener, changeListener);
+      }
+      out.setResourceSearchPaths(resourceSearchPaths);
+
+      buffer.reset();
+
+      if (currentOut != null) {
+        currentOut.flush();
+      }
+
+      resultMessageOutputs.add(out);
+      currentOut = out;
+    }
+  }
+
+  public InterpreterResultMessageOutputListener createInterpreterResultMessageOutputListener(
+      final int index) {
+
+    return new InterpreterResultMessageOutputListener() {
+      final int idx = index;
+
+      @Override
+      public void onAppend(InterpreterResultMessageOutput out, byte[] line) {
+        if (flushListener != null) {
+          flushListener.onAppend(idx, out, line);
+        }
+      }
+
+      @Override
+      public void onUpdate(InterpreterResultMessageOutput out) {
+        if (flushListener != null) {
+          flushListener.onUpdate(idx, out);
+        }
+      }
+    };
+  }
+
+  public InterpreterResultMessageOutput getCurrentOutput() {
+    synchronized (resultMessageOutputs) {
+      return currentOut;
+    }
   }
 
-  public void setType(InterpreterResult.Type type) {
-    if (this.type != type) {
-      clear();
-      this.type = type;
+  public InterpreterResultMessageOutput getOutputAt(int index) {
+    synchronized (resultMessageOutputs) {
+      return resultMessageOutputs.get(index);
+    }
+  }
+
+  public int size() {
+    synchronized (resultMessageOutputs) {
+      return resultMessageOutputs.size();
     }
   }
 
   public void clear() {
-    synchronized (outList) {
-      type = InterpreterResult.Type.TEXT;
-      buffer.reset();
-      outList.clear();
-      if (watcher != null) {
-        watcher.clear();
+    buffer.reset();
+
+    synchronized (resultMessageOutputs) {
+      for (InterpreterResultMessageOutput out : resultMessageOutputs) {
+        out.clear();
+        try {
+          out.close();
+        } catch (IOException e) {
+          logger.error(e.getMessage(), e);
+        }
       }
 
-      flushListener.onUpdate(this, new byte[]{});
+      // clear all ResultMessages
+      resultMessageOutputs.clear();
+      currentOut = null;
+      startOfTheNewLine = true;
+      firstCharIsPercentSign = false;
+      updateAllResultMessages();
     }
   }
 
+  private void updateAllResultMessages() {
+    if (flushListener != null) {
+      flushListener.onUpdateAll(this);
+    }
+  }
+
+
+  int previousChar = 0;
+  boolean startOfTheNewLine = true;
+  boolean firstCharIsPercentSign = false;
+
   @Override
   public void write(int b) throws IOException {
-    synchronized (outList) {
-      buffer.write(b);
+    InterpreterResultMessageOutput out;
+
+    synchronized (resultMessageOutputs) {
+      if (startOfTheNewLine) {
+        if (b == '%') {
+          startOfTheNewLine = false;
+          firstCharIsPercentSign = true;
+          buffer.write(b);
+          previousChar = b;
+          return;
+        } else if (b != NEW_LINE_CHAR) {
+          startOfTheNewLine = false;
+        }
+      }
+
       if (b == NEW_LINE_CHAR) {
-        // first time use of this outputstream.
-        if (firstWrite) {
-          // clear the output on gui
-          flushListener.onUpdate(this, new byte[]{});
-          firstWrite = false;
+        currentOut = getCurrentOutput();
+        if (currentOut != null && currentOut.getType() == InterpreterResult.Type.TABLE) {
+          if (previousChar == NEW_LINE_CHAR) {
+            startOfTheNewLine = true;
+            return;
+          }
+        } else {
+          startOfTheNewLine = true;
         }
+      }
+
+      boolean flushBuffer = false;
+      if (firstCharIsPercentSign) {
+        if (b == ' ' || b == NEW_LINE_CHAR || b == '\t') {
+          firstCharIsPercentSign = false;
+          String displaySystem = buffer.toString();
+          for (InterpreterResult.Type type : InterpreterResult.Type.values()) {
+            if (displaySystem.equals('%' + type.name().toLowerCase())) {
+              // new type detected
+              setType(type);
+              previousChar = b;
+              return;
+            }
+          }
+          // not a defined display system
+          flushBuffer = true;
+        } else {
+          buffer.write(b);
+          previousChar = b;
+          return;
+        }
+      }
+
+      out = getCurrentOutputForWriting();
 
-        flush();
+      if (flushBuffer) {
+        out.write(buffer.toByteArray());
+        buffer.reset();
       }
+      out.write(b);
+      previousChar = b;
     }
   }
 
-  private byte [] detectTypeFromLine(byte [] byteArray) {
-    // check output type directive
-    String line = new String(byteArray);
-    for (InterpreterResult.Type t : InterpreterResult.Type.values()) {
-      String typeString = '%' + t.name().toLowerCase();
-      if ((typeString + "\n").equals(line)) {
-        setType(t);
-        byteArray = null;
-        break;
-      } else if (line.startsWith(typeString + " ")) {
-        setType(t);
-        byteArray = line.substring(typeString.length() + 1).getBytes();
-        break;
+  private InterpreterResultMessageOutput getCurrentOutputForWriting() throws IOException {
+    synchronized (resultMessageOutputs) {
+      InterpreterResultMessageOutput out = getCurrentOutput();
+      if (out == null) {
+        // add text type result message
+        setType(InterpreterResult.Type.TEXT);
+        out = getCurrentOutput();
       }
+      return out;
     }
-
-    return byteArray;
   }
 
   @Override
@@ -125,10 +238,8 @@ public class InterpreterOutput extends OutputStream {
 
   @Override
   public void write(byte [] b, int off, int len) throws IOException {
-    synchronized (outList) {
-      for (int i = off; i < len; i++) {
-        write(b[i]);
-      }
+    for (int i = off; i < len; i++) {
+      write(b[i]);
     }
   }
 
@@ -138,10 +249,8 @@ public class InterpreterOutput extends OutputStream {
    * @throws IOException
    */
   public void write(File file) throws IOException {
-    outList.add(file);
-    if (watcher != null) {
-      watcher.watch(file);
-    }
+    InterpreterResultMessageOutput out = getCurrentOutputForWriting();
+    out.write(file);
   }
 
   public void write(String string) throws IOException {
@@ -154,7 +263,8 @@ public class InterpreterOutput extends OutputStream {
    * @throws IOException
    */
   public void write(URL url) throws IOException {
-    outList.add(url);
+    InterpreterResultMessageOutput out = getCurrentOutputForWriting();
+    out.write(url);
   }
 
   public void addResourceSearchPath(String path) {
@@ -162,99 +272,44 @@ public class InterpreterOutput extends OutputStream {
   }
 
   public void writeResource(String resourceName) throws IOException {
-    // search file under provided paths first, for dev mode
-    for (String path : resourceSearchPaths) {
-      File res = new File(path + "/" + resourceName);
-      if (res.isFile()) {
-        write(res);
-        return;
-      }
-    }
-
-    // search from classpath
-    ClassLoader cl = Thread.currentThread().getContextClassLoader();
-    if (cl == null) {
-      cl = this.getClass().getClassLoader();
-    }
-    if (cl == null) {
-      cl = ClassLoader.getSystemClassLoader();
-    }
-
-    write(cl.getResource(resourceName));
+    InterpreterResultMessageOutput out = getCurrentOutputForWriting();
+    out.writeResource(resourceName);
   }
 
-  public byte[] toByteArray() throws IOException {
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-    List<Object> all = new LinkedList<>();
-
-    synchronized (outList) {
-      all.addAll(outList);
-    }
-
-    for (Object o : all) {
-      if (o instanceof File) {
-        File f = (File) o;
-        FileInputStream fin = new FileInputStream(f);
-        copyStream(fin, out);
-        fin.close();
-      } else if (o instanceof byte[]) {
-        out.write((byte[]) o);
-      } else if (o instanceof Integer) {
-        out.write((int) o);
-      } else if (o instanceof URL) {
-        InputStream fin = ((URL) o).openStream();
-        copyStream(fin, out);
-        fin.close();
-      } else {
-        // can not handle the object
+  public List<InterpreterResultMessage> toInterpreterResultMessage() throws IOException {
+    List<InterpreterResultMessage> list = new LinkedList<>();
+    synchronized (resultMessageOutputs) {
+      for (InterpreterResultMessageOutput out : resultMessageOutputs) {
+        list.add(out.toInterpreterResultMessage());
       }
     }
-    out.close();
-    return out.toByteArray();
-  }
-
-  private boolean typeShouldBeDetected() {
-    return getType() == InterpreterResult.Type.TABLE ? false : true;
+    return list;
   }
 
   public void flush() throws IOException {
-    synchronized (outList) {
-      buffer.flush();
-      byte[] bytes = buffer.toByteArray();
-      if (typeShouldBeDetected()) {
-        bytes = detectTypeFromLine(bytes);
-      }
-      if (bytes != null) {
-        outList.add(bytes);
-        if (type == InterpreterResult.Type.TEXT) {
-          flushListener.onAppend(this, bytes);
-        }
-      }
-      buffer.reset();
+    InterpreterResultMessageOutput out = getCurrentOutput();
+    if (out != null) {
+      out.flush();
     }
   }
 
-  private void copyStream(InputStream in, OutputStream out) throws IOException {
-    int bufferSize = 8192;
-    byte[] buffer = new byte[bufferSize];
-
-    while (true) {
-      int bytesRead = in.read(buffer);
-      if (bytesRead == -1) {
-        break;
-      } else {
-        out.write(buffer, 0, bytesRead);
+  public byte[] toByteArray() throws IOException {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    synchronized (resultMessageOutputs) {
+      for (InterpreterResultMessageOutput m : resultMessageOutputs) {
+        out.write(m.toByteArray());
       }
     }
+
+    return out.toByteArray();
   }
 
   @Override
   public void close() throws IOException {
-    flush();
-
-    if (watcher != null) {
-      watcher.clear();
-      watcher.shutdown();
+    synchronized (resultMessageOutputs) {
+      for (InterpreterResultMessageOutput out : resultMessageOutputs) {
+        out.close();
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/850fd81a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputListener.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputListener.java
index bdb262a..42f6cfa 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputListener.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutputListener.java
@@ -21,14 +21,22 @@ package org.apache.zeppelin.interpreter;
  */
 public interface InterpreterOutputListener {
   /**
+   * update all message outputs
+   */
+  public void onUpdateAll(InterpreterOutput out);
+
+  /**
    * called when newline is detected
+   * @param index
+   * @param out
    * @param line
    */
-  public void onAppend(InterpreterOutput out, byte[] line);
+  public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line);
 
   /**
    * when entire output is updated. eg) after detecting new display system
-   * @param output
+   * @param index
+   * @param out
    */
-  public void onUpdate(InterpreterOutput out, byte[] output);
+  public void onUpdate(int index, InterpreterResultMessageOutput out);
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/850fd81a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
index 499239b..5288f6f 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
@@ -17,15 +17,19 @@
 
 package org.apache.zeppelin.interpreter;
 
+import java.io.IOException;
 import java.io.Serializable;
-import org.apache.commons.lang3.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.*;
 
 /**
  * Interpreter result template.
  */
 public class InterpreterResult implements Serializable {
-
+  transient Logger logger = LoggerFactory.getLogger(InterpreterResult.class);
   /**
    *  Type of result after code execution.
    */
@@ -50,104 +54,70 @@ public class InterpreterResult implements Serializable {
   }
 
   Code code;
-  Type type;
-  String msg;
+  List<InterpreterResultMessage> msg = new LinkedList<>();
 
   public InterpreterResult(Code code) {
     this.code = code;
-    this.msg = null;
-    this.type = Type.TEXT;
+  }
+
+  public InterpreterResult(Code code, List<InterpreterResultMessage> msgs) {
+    this.code = code;
+    msg.addAll(msgs);
   }
 
   public InterpreterResult(Code code, String msg) {
     this.code = code;
-    this.msg = getData(msg);
-    this.type = getType(msg);
+    add(msg);
   }
 
   public InterpreterResult(Code code, Type type, String msg) {
     this.code = code;
-    this.msg = msg;
-    this.type = type;
+    add(type, msg);
   }
 
   /**
-   * Magic is like %html %text.
-   *
+   * Automatically detect %[display_system] directives
    * @param msg
-   * @return
    */
-  private String getData(String msg) {
-    if (msg == null) {
-      return null;
-    }
-    Type[] types = type.values();
-    TreeMap<Integer, Type> typesLastIndexInMsg = buildIndexMap(msg);
-    if (typesLastIndexInMsg.size() == 0) {
-      return msg;
-    } else {
-      Map.Entry<Integer, Type> lastType = typesLastIndexInMsg.firstEntry();
-      //add 1 for the % char
-      int magicLength = lastType.getValue().name().length() + 1;
-      // 1 for the last \n or space after magic
-      int subStringPos = magicLength + lastType.getKey() + 1;
-      return msg.substring(subStringPos);
-    }
-  }
-
-  private Type getType(String msg) {
-    if (msg == null) {
-      return Type.TEXT;
-    }
-    Type[] types = type.values();
-    TreeMap<Integer, Type> typesLastIndexInMsg = buildIndexMap(msg);
-    if (typesLastIndexInMsg.size() == 0) {
-      return Type.TEXT;
-    } else {
-      Map.Entry<Integer, Type> lastType = typesLastIndexInMsg.firstEntry();
-      return lastType.getValue();
+  public void add(String msg) {
+    InterpreterOutput out = new InterpreterOutput(null);
+    try {
+      out.write(msg);
+      out.flush();
+      this.msg.addAll(out.toInterpreterResultMessage());
+      out.close();
+    } catch (IOException e) {
+      logger.error(e.getMessage(), e);
     }
-  }
 
-  private int getIndexOfType(String msg, Type t) {
-    if (msg == null) {
-      return 0;
-    }
-    String typeString = "%" + t.name().toLowerCase();
-    return StringUtils.indexOf(msg, typeString );
   }
 
-  private TreeMap<Integer, Type> buildIndexMap(String msg) {
-    int lastIndexOftypes = 0;
-    TreeMap<Integer, Type> typesLastIndexInMsg = new TreeMap<>();
-    Type[] types = Type.values();
-    for (Type t : types) {
-      lastIndexOftypes = getIndexOfType(msg, t);
-      if (lastIndexOftypes >= 0) {
-        typesLastIndexInMsg.put(lastIndexOftypes, t);
-      }
-    }
-    return typesLastIndexInMsg;
+  public void add(Type type, String data) {
+    msg.add(new InterpreterResultMessage(type, data));
   }
 
   public Code code() {
     return code;
   }
 
-  public String message() {
+  public List<InterpreterResultMessage> message() {
     return msg;
   }
 
-  public Type type() {
-    return type;
-  }
-
-  public InterpreterResult type(Type type) {
-    this.type = type;
-    return this;
-  }
-
   public String toString() {
-    return "%" + type.name().toLowerCase() + " " + msg;
+    StringBuilder sb = new StringBuilder();
+    Type prevType = null;
+    for (InterpreterResultMessage m : msg) {
+      if (prevType != null) {
+        sb.append("\n");
+        if (prevType == Type.TABLE) {
+          sb.append("\n");
+        }
+      }
+      sb.append(m.toString());
+      prevType = m.getType();
+    }
+
+    return sb.toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/850fd81a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResultMessage.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResultMessage.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResultMessage.java
new file mode 100644
index 0000000..20d9951
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResultMessage.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter;
+
+/**
+ * Interpreter result message
+ */
+public class InterpreterResultMessage {
+  InterpreterResult.Type type;
+  String data;
+
+  public InterpreterResultMessage(InterpreterResult.Type type, String data) {
+    this.type = type;
+    this.data = data;
+  }
+
+  public InterpreterResult.Type getType() {
+    return type;
+  }
+
+  public String getData() {
+    return data;
+  }
+
+  public String toString() {
+    return "%" + type.name().toLowerCase() + " " + data;
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/850fd81a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResultMessageOutput.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResultMessageOutput.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResultMessageOutput.java
new file mode 100644
index 0000000..41e1fd0
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResultMessageOutput.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.net.URL;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * InterpreterMessageOutputStream
+ */
+public class InterpreterResultMessageOutput extends OutputStream {
+  Logger logger = LoggerFactory.getLogger(InterpreterResultMessageOutput.class);
+  private final int NEW_LINE_CHAR = '\n';
+  private List<String> resourceSearchPaths;
+
+  ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+
+  private final List<Object> outList = new LinkedList<>();
+  private InterpreterOutputChangeWatcher watcher;
+  private final InterpreterResultMessageOutputListener flushListener;
+  private InterpreterResult.Type type = InterpreterResult.Type.TEXT;
+  private boolean firstWrite = true;
+
+  public InterpreterResultMessageOutput(
+      InterpreterResult.Type type,
+      InterpreterResultMessageOutputListener listener) {
+    this.type = type;
+    this.flushListener = listener;
+  }
+
+  public InterpreterResultMessageOutput(
+      InterpreterResult.Type type,
+      InterpreterResultMessageOutputListener flushListener,
+      InterpreterOutputChangeListener listener) throws IOException {
+    this.type = type;
+    this.flushListener = flushListener;
+    watcher = new InterpreterOutputChangeWatcher(listener);
+    watcher.start();
+  }
+
+  public InterpreterResult.Type getType() {
+    return type;
+  }
+
+  public void setType(InterpreterResult.Type type) {
+    if (this.type != type) {
+      clear();
+      this.type = type;
+    }
+  }
+
+  public void clear() {
+    synchronized (outList) {
+      buffer.reset();
+      outList.clear();
+      if (watcher != null) {
+        watcher.clear();
+      }
+
+      if (flushListener != null) {
+        flushListener.onUpdate(this);
+      }
+    }
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    synchronized (outList) {
+      buffer.write(b);
+      if (b == NEW_LINE_CHAR) {
+        // first time use of this outputstream.
+        if (firstWrite) {
+          // clear the output on gui
+          if (flushListener != null) {
+            flushListener.onUpdate(this);
+          }
+          firstWrite = false;
+        }
+
+        if (isAppendSupported()) {
+          flush(true);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void write(byte [] b) throws IOException {
+    write(b, 0, b.length);
+  }
+
+  @Override
+  public void write(byte [] b, int off, int len) throws IOException {
+    synchronized (outList) {
+      for (int i = off; i < len; i++) {
+        write(b[i]);
+      }
+    }
+  }
+
+  /**
+   * In dev mode, it monitors file and update ZeppelinServer
+   * @param file
+   * @throws IOException
+   */
+  public void write(File file) throws IOException {
+    outList.add(file);
+    if (watcher != null) {
+      watcher.watch(file);
+    }
+  }
+
+  public void write(String string) throws IOException {
+    write(string.getBytes());
+  }
+
+  /**
+   * write contents in the resource file in the classpath
+   * @param url
+   * @throws IOException
+   */
+  public void write(URL url) throws IOException {
+    outList.add(url);
+  }
+
+  public void setResourceSearchPaths(List<String> resourceSearchPaths) {
+    this.resourceSearchPaths = resourceSearchPaths;
+  }
+
+  public void writeResource(String resourceName) throws IOException {
+    // search file under provided paths first, for dev mode
+    for (String path : resourceSearchPaths) {
+      File res = new File(path + "/" + resourceName);
+      if (res.isFile()) {
+        write(res);
+        return;
+      }
+    }
+
+    // search from classpath
+    ClassLoader cl = Thread.currentThread().getContextClassLoader();
+    if (cl == null) {
+      cl = this.getClass().getClassLoader();
+    }
+    if (cl == null) {
+      cl = ClassLoader.getSystemClassLoader();
+    }
+
+    write(cl.getResource(resourceName));
+  }
+
+  public byte[] toByteArray() throws IOException {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    List<Object> all = new LinkedList<>();
+
+    synchronized (outList) {
+      all.addAll(outList);
+    }
+
+    for (Object o : all) {
+      if (o instanceof File) {
+        File f = (File) o;
+        FileInputStream fin = new FileInputStream(f);
+        copyStream(fin, out);
+        fin.close();
+      } else if (o instanceof byte[]) {
+        out.write((byte[]) o);
+      } else if (o instanceof Integer) {
+        out.write((int) o);
+      } else if (o instanceof URL) {
+        InputStream fin = ((URL) o).openStream();
+        copyStream(fin, out);
+        fin.close();
+      } else {
+        // can not handle the object
+      }
+    }
+    out.close();
+    return out.toByteArray();
+  }
+
+  public InterpreterResultMessage toInterpreterResultMessage() throws IOException {
+    return new InterpreterResultMessage(type, new String(toByteArray()));
+  }
+
+  private void flush(boolean append) throws IOException {
+    synchronized (outList) {
+      buffer.flush();
+      byte[] bytes = buffer.toByteArray();
+      if (bytes != null && bytes.length > 0) {
+        outList.add(bytes);
+        if (append) {
+          if (flushListener != null) {
+            flushListener.onAppend(this, bytes);
+          }
+        } else {
+          if (flushListener != null) {
+            flushListener.onUpdate(this);
+          }
+        }
+      }
+      buffer.reset();
+    }
+  }
+
+  public void flush() throws IOException {
+    flush(isAppendSupported());
+  }
+
+  public boolean isAppendSupported() {
+    return type == InterpreterResult.Type.TEXT;
+  }
+
+  private void copyStream(InputStream in, OutputStream out) throws IOException {
+    int bufferSize = 8192;
+    byte[] buffer = new byte[bufferSize];
+
+    while (true) {
+      int bytesRead = in.read(buffer);
+      if (bytesRead == -1) {
+        break;
+      } else {
+        out.write(buffer, 0, bytesRead);
+      }
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    flush();
+    if (watcher != null) {
+      watcher.clear();
+      watcher.shutdown();
+    }
+  }
+
+  public String toString() {
+    try {
+      return "%" + type.name().toLowerCase() + " " + new String(toByteArray());
+    } catch (IOException e) {
+      logger.error(e.getMessage(), e);
+      return "%" + type.name().toLowerCase() + "\n";
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/850fd81a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResultMessageOutputListener.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResultMessageOutputListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResultMessageOutputListener.java
new file mode 100644
index 0000000..ba5acf9
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResultMessageOutputListener.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter;
+
+/**
+ * InterpreterResultMessage update events
+ */
+public interface InterpreterResultMessageOutputListener {
+  /**
+   * called when newline is detected
+   * @param line
+   */
+  public void onAppend(InterpreterResultMessageOutput out, byte[] line);
+
+  /**
+   * when entire output is updated. eg) after detecting new display system
+   */
+  public void onUpdate(InterpreterResultMessageOutput out);
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/850fd81a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/dev/ZeppelinApplicationDevServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/dev/ZeppelinApplicationDevServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/dev/ZeppelinApplicationDevServer.java
index 941fdfe..5e61389 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/dev/ZeppelinApplicationDevServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/dev/ZeppelinApplicationDevServer.java
@@ -111,14 +111,16 @@ public class ZeppelinApplicationDevServer extends ZeppelinDevServer {
       return;
     }
 
-    InterpreterResult result = (InterpreterResult) results.get(0).get();
+    InterpreterResultMessage result = (InterpreterResultMessage) results.get(0).get();
     Gson gson = new Gson();
     String resultJson = gson.toJson(result);
     StringBuffer transferResult = new StringBuffer();
     transferResult.append("$z.result = " + resultJson + ";\n");
-    if (result.type() == InterpreterResult.Type.TABLE) {
+
+    if (result.getType() == InterpreterResult.Type.TABLE) {
       transferResult.append("$z.scope.loadTableData($z.result);\n");
     }
+
     transferResult.append("$z.scope._devmodeResult = $z.result;\n");
     app.printStringAsJavascript(transferResult.toString());
   }
@@ -143,14 +145,25 @@ public class ZeppelinApplicationDevServer extends ZeppelinDevServer {
       try {
         out = new InterpreterOutput(new InterpreterOutputListener() {
           @Override
-          public void onAppend(InterpreterOutput out, byte[] line) {
-            eventClient.onInterpreterOutputAppend(noteId, paragraphId, new String(line));
+          public void onUpdateAll(InterpreterOutput out) {
+
           }
 
           @Override
-          public void onUpdate(InterpreterOutput out, byte[] output) {
-            eventClient.onInterpreterOutputUpdate(noteId, paragraphId, new String(output));
+          public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) {
+            eventClient.onInterpreterOutputAppend(noteId, paragraphId, index, new String(line));
           }
+
+          @Override
+          public void onUpdate(int index, InterpreterResultMessageOutput out) {
+            try {
+              eventClient.onInterpreterOutputUpdate(noteId, paragraphId,
+                  index, out.getType(), new String(out.toByteArray()));
+            } catch (IOException e) {
+              logger.error(e.getMessage(), e);
+            }
+          }
+
         }, this);
       } catch (IOException e) {
         return null;

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/850fd81a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/dev/ZeppelinDevServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/dev/ZeppelinDevServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/dev/ZeppelinDevServer.java
index 5414c70..4c2fd0c 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/dev/ZeppelinDevServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/dev/ZeppelinDevServer.java
@@ -73,13 +73,23 @@ public class ZeppelinDevServer extends
       try {
         out = new InterpreterOutput(new InterpreterOutputListener() {
           @Override
-          public void onAppend(InterpreterOutput out, byte[] line) {
-            eventClient.onInterpreterOutputAppend(noteId, paragraphId, new String(line));
+          public void onUpdateAll(InterpreterOutput out) {
+
+          }
+
+          @Override
+          public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) {
+            eventClient.onInterpreterOutputAppend(noteId, paragraphId, index, new String(line));
           }
 
           @Override
-          public void onUpdate(InterpreterOutput out, byte[] output) {
-            eventClient.onInterpreterOutputUpdate(noteId, paragraphId, new String(output));
+          public void onUpdate(int index, InterpreterResultMessageOutput out) {
+            try {
+              eventClient.onInterpreterOutputUpdate(noteId, paragraphId,
+                  index, out.getType(), new String(out.toByteArray()));
+            } catch (IOException e) {
+              logger.error(e.getMessage(), e);
+            }
           }
         }, this);
       } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/850fd81a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java
index e1484da..b139404 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java
@@ -25,11 +25,13 @@ public class AppendOutputBuffer {
 
   private String noteId;
   private String paragraphId;
+  private int index;
   private String data;
 
-  public AppendOutputBuffer(String noteId, String paragraphId, String data) {
+  public AppendOutputBuffer(String noteId, String paragraphId, int index, String data) {
     this.noteId = noteId;
     this.paragraphId = paragraphId;
+    this.index = index;
     this.data = data;
   }
 
@@ -41,6 +43,10 @@ public class AppendOutputBuffer {
     return paragraphId;
   }
 
+  public int getIndex() {
+    return index;
+  }
+
   public String getData() {
     return data;
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/850fd81a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java
index 86ea11a..03d9191 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java
@@ -51,7 +51,7 @@ public class AppendOutputRunner implements Runnable {
   @Override
   public void run() {
 
-    Map<String, Map<String, StringBuilder> > noteMap = new HashMap<>();
+    Map<String, StringBuilder> stringBufferMap = new HashMap<>();
     List<AppendOutputBuffer> list = new LinkedList<>();
 
     /* "drainTo" method does not wait for any element
@@ -73,15 +73,14 @@ public class AppendOutputRunner implements Runnable {
     for (AppendOutputBuffer buffer: list) {
       String noteId = buffer.getNoteId();
       String paragraphId = buffer.getParagraphId();
+      int index = buffer.getIndex();
+      String stringBufferKey = noteId + ":" + paragraphId + ":" + index;
 
-      Map<String, StringBuilder> paragraphMap = (noteMap.containsKey(noteId)) ?
-          noteMap.get(noteId) : new HashMap<String, StringBuilder>();
-      StringBuilder builder = paragraphMap.containsKey(paragraphId) ?
-          paragraphMap.get(paragraphId) : new StringBuilder();
+      StringBuilder builder = stringBufferMap.containsKey(stringBufferKey) ?
+          stringBufferMap.get(stringBufferKey) : new StringBuilder();
 
       builder.append(buffer.getData());
-      paragraphMap.put(paragraphId, builder);
-      noteMap.put(noteId, paragraphMap);
+      stringBufferMap.put(stringBufferKey, builder);
     }
     Long processingTime = System.currentTimeMillis() - processingStartTime;
 
@@ -94,12 +93,11 @@ public class AppendOutputRunner implements Runnable {
     }
 
     Long sizeProcessed = new Long(0);
-    for (String noteId: noteMap.keySet()) {
-      for (String paragraphId: noteMap.get(noteId).keySet()) {
-        String data = noteMap.get(noteId).get(paragraphId).toString();
-        sizeProcessed += data.length();
-        listener.onOutputAppend(noteId, paragraphId, data);
-      }
+    for (String stringBufferKey : stringBufferMap.keySet()) {
+      StringBuilder buffer = stringBufferMap.get(stringBufferKey);
+      sizeProcessed += buffer.length();
+      String[] keys = stringBufferKey.split(":");
+      listener.onOutputAppend(keys[0], keys[1], Integer.parseInt(keys[2]), buffer.toString());
     }
 
     if (sizeProcessed > SAFE_PROCESSING_STRING_SIZE) {
@@ -111,8 +109,8 @@ public class AppendOutputRunner implements Runnable {
     }
   }
 
-  public void appendBuffer(String noteId, String paragraphId, String outputToAppend) {
-    queue.offer(new AppendOutputBuffer(noteId, paragraphId, outputToAppend));
+  public void appendBuffer(String noteId, String paragraphId, int index, String outputToAppend) {
+    queue.offer(new AppendOutputBuffer(noteId, paragraphId, index, outputToAppend));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/850fd81a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index 1ffba42..50ff689 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -30,6 +30,7 @@ import org.apache.zeppelin.interpreter.InterpreterResult.Type;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
 import org.apache.zeppelin.scheduler.Scheduler;
 import org.apache.zeppelin.scheduler.SchedulerFactory;
@@ -503,10 +504,14 @@ public class RemoteInterpreter extends Interpreter {
   }
 
   private InterpreterResult convert(RemoteInterpreterResult result) {
-    return new InterpreterResult(
-        InterpreterResult.Code.valueOf(result.getCode()),
-        Type.valueOf(result.getType()),
-        result.getMsg());
+    InterpreterResult r = new InterpreterResult(
+        InterpreterResult.Code.valueOf(result.getCode()));
+
+    for (RemoteInterpreterResultMessage m : result.getMsg()) {
+      r.add(InterpreterResult.Type.valueOf(m.getType()), m.getData());
+    }
+
+    return r;
   }
 
   /**


Mime
View raw message