zeppelin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjf...@apache.org
Subject zeppelin git commit: ZEPPELIN-3587. Interpret paragarph text as whole code statement instead of breaking into lines
Date Mon, 23 Jul 2018 13:42:23 GMT
Repository: zeppelin
Updated Branches:
  refs/heads/branch-0.8 22f79cf69 -> 205fd30ea


ZEPPELIN-3587. Interpret paragarph text as whole code statement instead of breaking into lines

Just to make this kind of code to work.

```
sc
.range(1,10
.sum()
```

[ Improvement ]

* [ ] - Task

* https://issues.apache.org/jira/browse/ZEPPELIN-3587

* CI pass

* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zjffdu@apache.org>

Closes #3055 from zjffdu/ZEPPELIN-3587 and squashes the following commits:

0d6c24ad0 [Jeff Zhang] ZEPPELIN-3587. Interpret paragarph text as whole code statement instead
of breaking into lines

(cherry picked from commit e6af8f10e20ba7d140f9dd4db4160f46d8f0c6c7)
Signed-off-by: Jeff Zhang <zjffdu@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/205fd30e
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/205fd30e
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/205fd30e

Branch: refs/heads/branch-0.8
Commit: 205fd30ea7fd567a69dcb69cff6edb117c9bc665
Parents: 22f79cf
Author: Jeff Zhang <zjffdu@apache.org>
Authored: Fri Jul 6 08:14:35 2018 +0800
Committer: Jeff Zhang <zjffdu@apache.org>
Committed: Mon Jul 23 21:42:01 2018 +0800

----------------------------------------------------------------------
 spark/interpreter/figure/unnamed-chunk-1-1.png  | Bin 0 -> 407541 bytes
 .../zeppelin/spark/SparkZeppelinContext.java    |   3 +-
 .../zeppelin/spark/NewSparkInterpreterTest.java |   9 ++--
 .../spark/NewSparkSqlInterpreterTest.java       |   9 ++--
 .../spark/SparkScala210Interpreter.scala        |  40 ---------------
 .../spark/SparkScala211Interpreter.scala        |  38 --------------
 .../spark/BaseSparkScalaInterpreter.scala       |  51 ++++++++++++++++++-
 .../interpreter/BaseZeppelinContext.java        |   3 +-
 .../zeppelin/interpreter/InterpreterOutput.java |   7 ++-
 .../zeppelin/rest/ZeppelinSparkClusterTest.java |  19 +++----
 10 files changed, 78 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/205fd30e/spark/interpreter/figure/unnamed-chunk-1-1.png
----------------------------------------------------------------------
diff --git a/spark/interpreter/figure/unnamed-chunk-1-1.png b/spark/interpreter/figure/unnamed-chunk-1-1.png
new file mode 100644
index 0000000..6f03c95
Binary files /dev/null and b/spark/interpreter/figure/unnamed-chunk-1-1.png differ

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/205fd30e/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java
index 8847039..2a2b7e3 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java
@@ -170,7 +170,8 @@ public class SparkZeppelinContext extends BaseZeppelinContext {
       msg.append(ResultMessages.getExceedsLimitRowsMessage(maxResult,
           SparkSqlInterpreter.MAX_RESULTS));
     }
-
+    // append %text at the end, otherwise the following output will be put in table as well.
+    msg.append("\n%text ");
     sc.clearJobGroup();
     return msg.toString();
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/205fd30e/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
index 38857f1..a704a1d 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
@@ -155,11 +155,9 @@ public class NewSparkInterpreterTest {
     // case class
     result = interpreter.interpret("val bankText = sc.textFile(\"bank.csv\")", getInterpreterContext());
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
     result = interpreter.interpret(
-        "case class Bank(age:Integer, job:String, marital : String, education : String, balance
: Integer)\n",
-        getInterpreterContext());
-    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-    result = interpreter.interpret(
+        "case class Bank(age:Integer, job:String, marital : String, education : String, balance
: Integer)\n" +
         "val bank = bankText.map(s=>s.split(\";\")).filter(s => s(0)!=\"\\\"age\\\"\").map(\n"
+
             "    s => Bank(s(0).toInt, \n" +
             "            s(1).replaceAll(\"\\\"\", \"\"),\n" +
@@ -167,7 +165,7 @@ public class NewSparkInterpreterTest {
             "            s(3).replaceAll(\"\\\"\", \"\"),\n" +
             "            s(5).replaceAll(\"\\\"\", \"\").toInt\n" +
             "        )\n" +
-            ")", getInterpreterContext());
+            ").toDF()", getInterpreterContext());
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
 
     // spark version
@@ -407,6 +405,7 @@ public class NewSparkInterpreterTest {
     properties.setProperty("zeppelin.spark.useNew", "true");
     properties.setProperty("zeppelin.spark.printREPLOutput", "false");
 
+    InterpreterContext.set(getInterpreterContext());
     interpreter = new SparkInterpreter(properties);
     assertTrue(interpreter.getDelegation() instanceof NewSparkInterpreter);
     interpreter.setInterpreterGroup(mock(InterpreterGroup.class));

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/205fd30e/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
index 42289ff..0dede8a 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
@@ -59,14 +59,17 @@ public class NewSparkSqlInterpreterTest {
     intpGroup.get("session_1").add(sparkInterpreter);
     intpGroup.get("session_1").add(sqlInterpreter);
 
-    sparkInterpreter.open();
-    sqlInterpreter.open();
-
     context = new InterpreterContext("note", "id", null, "title", "text", new AuthenticationInfo(),
         new HashMap<String, Object>(), new GUI(), new GUI(),
         new AngularObjectRegistry(intpGroup.getId(), null),
         new LocalResourcePool("id"),
         new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(null));
+
+    InterpreterContext.set(context);
+
+    sparkInterpreter.open();
+    sqlInterpreter.open();
+
   }
 
   @AfterClass

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/205fd30e/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala
----------------------------------------------------------------------
diff --git a/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala
b/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala
index e829a73..c66aa71 100644
--- a/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala
+++ b/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala
@@ -93,46 +93,6 @@ class SparkScala210Interpreter(override val conf: SparkConf,
     }
   }
 
-  protected override def interpret(code: String, context: InterpreterContext): InterpreterResult
= {
-    if (context != null) {
-      interpreterOutput.setInterpreterOutput(context.out)
-      context.out.clear()
-    } else {
-      interpreterOutput.setInterpreterOutput(null)
-    }
-
-    Console.withOut(if (context != null) context.out else Console.out) {
-      interpreterOutput.ignoreLeadingNewLinesFromScalaReporter()
-      // add print("") at the end in case the last line is comment which lead to INCOMPLETE
-      val lines = code.split("\\n") ++ List("print(\"\")")
-      var incompleteCode = ""
-      var lastStatus: InterpreterResult.Code = null
-      for (line <- lines if !line.trim.isEmpty) {
-        val nextLine = if (incompleteCode != "") {
-          incompleteCode + "\n" + line
-        } else {
-          line
-        }
-        scalaInterpret(nextLine) match {
-          case scala.tools.nsc.interpreter.IR.Success =>
-            // continue the next line
-            incompleteCode = ""
-            lastStatus = InterpreterResult.Code.SUCCESS
-          case error@scala.tools.nsc.interpreter.IR.Error =>
-            return new InterpreterResult(InterpreterResult.Code.ERROR)
-          case scala.tools.nsc.interpreter.IR.Incomplete =>
-            // put this line into inCompleteCode for the next execution.
-            incompleteCode = incompleteCode + "\n" + line
-            lastStatus = InterpreterResult.Code.INCOMPLETE
-        }
-      }
-      // flush all output before returning result to frontend
-      Console.flush()
-      interpreterOutput.setInterpreterOutput(null)
-      return new InterpreterResult(lastStatus)
-    }
-  }
-
   def scalaInterpret(code: String): scala.tools.nsc.interpreter.IR.Result =
     sparkILoop.interpret(code)
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/205fd30e/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala
----------------------------------------------------------------------
diff --git a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala
b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala
index 7ddb3fb..757f7eb 100644
--- a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala
+++ b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala
@@ -101,44 +101,6 @@ class SparkScala211Interpreter(override val conf: SparkConf,
     }
   }
 
-  protected override def interpret(code: String, context: InterpreterContext): InterpreterResult
= {
-    if (context != null) {
-      interpreterOutput.setInterpreterOutput(context.out)
-      context.out.clear()
-    }
-
-    Console.withOut(if (context != null) context.out else Console.out) {
-      interpreterOutput.ignoreLeadingNewLinesFromScalaReporter()
-      // add print("") at the end in case the last line is comment which lead to INCOMPLETE
-      val lines = code.split("\\n") ++ List("print(\"\")")
-      var incompleteCode = ""
-      var lastStatus: InterpreterResult.Code = null
-      for (line <- lines if !line.trim.isEmpty) {
-        val nextLine = if (incompleteCode != "") {
-          incompleteCode + "\n" + line
-        } else {
-          line
-        }
-        scalaInterpret(nextLine) match {
-          case scala.tools.nsc.interpreter.IR.Success =>
-            // continue the next line
-            incompleteCode = ""
-            lastStatus = InterpreterResult.Code.SUCCESS
-          case error@scala.tools.nsc.interpreter.IR.Error =>
-            return new InterpreterResult(InterpreterResult.Code.ERROR)
-          case scala.tools.nsc.interpreter.IR.Incomplete =>
-            // put this line into inCompleteCode for the next execution.
-            incompleteCode = incompleteCode + "\n" + line
-            lastStatus = InterpreterResult.Code.INCOMPLETE
-        }
-      }
-      // flush all output before returning result to frontend
-      Console.flush()
-      interpreterOutput.setInterpreterOutput(null)
-      return new InterpreterResult(lastStatus)
-    }
-  }
-
   def scalaInterpret(code: String): scala.tools.nsc.interpreter.IR.Result =
     sparkILoop.interpret(code)
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/205fd30e/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
----------------------------------------------------------------------
diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
index fb46b8f..775924b 100644
--- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
+++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
@@ -79,9 +79,50 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
     System.setProperty("scala.repl.name.line", ("$line" + this.hashCode).replace('-', '0'))
   }
 
-  protected def interpret(code: String, context: InterpreterContext): InterpreterResult
+  def interpret(code: String, context: InterpreterContext): InterpreterResult = {
+
+    def _interpret(code: String): scala.tools.nsc.interpreter.Results.Result = {
+      Console.withOut(interpreterOutput) {
+        interpreterOutput.setInterpreterOutput(context.out)
+        interpreterOutput.ignoreLeadingNewLinesFromScalaReporter()
+        context.out.clear()
+
+        val status = scalaInterpret(code) match {
+          case success@scala.tools.nsc.interpreter.IR.Success =>
+            success
+          case scala.tools.nsc.interpreter.IR.Error =>
+            val errorMsg = new String(interpreterOutput.getInterpreterOutput.toByteArray)
+            if (errorMsg.contains("value toDF is not a member of org.apache.spark.rdd.RDD")
||
+              errorMsg.contains("value toDS is not a member of org.apache.spark.rdd.RDD"))
{
+              // prepend "import sqlContext.implicits._" due to
+              // https://issues.scala-lang.org/browse/SI-6649
+              scalaInterpret("import sqlContext.implicits._\n" + code)
+            } else {
+              scala.tools.nsc.interpreter.IR.Error
+            }
+          case scala.tools.nsc.interpreter.IR.Incomplete =>
+            // add print("") at the end in case the last line is comment which lead to INCOMPLETE
+            scalaInterpret(code + "\nprint(\"\")")
+        }
+        context.out.flush()
+        status
+      }
+    }
+
+    val lastStatus = _interpret(code) match {
+      case scala.tools.nsc.interpreter.IR.Success =>
+        InterpreterResult.Code.SUCCESS
+      case scala.tools.nsc.interpreter.IR.Error =>
+        InterpreterResult.Code.ERROR
+      case scala.tools.nsc.interpreter.IR.Incomplete =>
+        InterpreterResult.Code.INCOMPLETE
+    }
+
+    new InterpreterResult(lastStatus)
+  }
 
-  protected def interpret(code: String): InterpreterResult = interpret(code, null)
+  protected def interpret(code: String): InterpreterResult =
+    interpret(code, InterpreterContext.get())
 
   protected def scalaInterpret(code: String): scala.tools.nsc.interpreter.IR.Result
 
@@ -168,6 +209,9 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
     interpret("import sqlContext.implicits._")
     interpret("import sqlContext.sql")
     interpret("import org.apache.spark.sql.functions._")
+    // print empty string otherwise the last statement's output of this method
+    // (aka. import org.apache.spark.sql.functions._) will mix with the output of user code
+    interpret("print(\"\")")
   }
 
   private def spark2CreateContext(): Unit = {
@@ -229,6 +273,9 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
     interpret("import spark.implicits._")
     interpret("import spark.sql")
     interpret("import org.apache.spark.sql.functions._")
+    // print empty string otherwise the last statement's output of this method
+    // (aka. import org.apache.spark.sql.functions._) will mix with the output of user code
+    interpret("print(\"\")")
   }
 
   private def isSparkSessionPresent(): Boolean = {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/205fd30e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
index 09c868f..22fd8b5 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
@@ -230,7 +230,6 @@ public abstract class BaseZeppelinContext {
    * @param o object
    * @param maxResult maximum number of rows to display
    */
-
   @ZeppelinApi
   public void show(Object o, int maxResult) {
     try {
@@ -246,7 +245,7 @@ public abstract class BaseZeppelinContext {
     }
   }
 
-  private boolean isSupportedObject(Object obj) {
+  protected boolean isSupportedObject(Object obj) {
     for (Class supportedClass : getSupportedClasses()) {
       if (supportedClass.isInstance(obj)) {
         return true;

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/205fd30e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
index 44f83f4..8853227 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
@@ -297,7 +297,12 @@ public class InterpreterOutput extends OutputStream {
   }
 
   public void write(String string) throws IOException {
-    write(string.getBytes());
+    if (string.startsWith("%") && !startOfTheNewLine) {
+      // prepend "\n" if it starts with another type of output and startOfTheNewLine is false
+      write(("\n" + string).getBytes());
+    } else {
+      write(string.getBytes());
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/205fd30e/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
index a440419..3d4c62b 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
@@ -134,9 +134,10 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
     );
     note.run(p.getId(), true);
     assertEquals(Status.FINISHED, p.getStatus());
-    assertEquals("import java.util.Date\n" +
-        "import java.net.URL\n" +
-        "hello\n", p.getResult().message().get(0).getData());
+    assertEquals("hello\n" +
+        "import java.util.Date\n" +
+        "import java.net.URL\n",
+        p.getResult().message().get(0).getData());
 
     p.setText("%spark invalid_code");
     note.run(p.getId(), true);
@@ -173,8 +174,8 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
         "z.show(df)");
     note.run(p.getId(), true);
     assertEquals(Status.FINISHED, p.getStatus());
-    assertEquals(InterpreterResult.Type.TABLE, p.getResult().message().get(1).getType());
-    assertEquals("_1\t_2\nhello\t20\n", p.getResult().message().get(1).getData());
+    assertEquals(InterpreterResult.Type.TABLE, p.getResult().message().get(0).getType());
+    assertEquals("_1\t_2\nhello\t20\n", p.getResult().message().get(0).getData());
 
     // test display DataSet
     if (isSpark2()) {
@@ -183,8 +184,8 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
           "z.show(ds)");
       note.run(p.getId(), true);
       assertEquals(Status.FINISHED, p.getStatus());
-      assertEquals(InterpreterResult.Type.TABLE, p.getResult().message().get(1).getType());
-      assertEquals("_1\t_2\nhello\t20\n", p.getResult().message().get(1).getData());
+      assertEquals(InterpreterResult.Type.TABLE, p.getResult().message().get(0).getType());
+      assertEquals("_1\t_2\nhello\t20\n", p.getResult().message().get(0).getData());
     }
   }
 
@@ -487,8 +488,8 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
     assertEquals(4, result.length);
     assertEquals("default_name", result[0]);
     assertEquals("1", result[1]);
-    assertEquals("items: Seq[Object] = Buffer(2)", result[2]);
-    assertEquals("2", result[3]);
+    assertEquals("2", result[2]);
+    assertEquals("items: Seq[Object] = Buffer(2)", result[3]);
   }
 
   @Test


Mime
View raw message