zeppelin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject incubator-zeppelin git commit: ZEPPELIN-152 Propagate error from interpreter process to the GUI
Date Sun, 05 Jul 2015 17:55:25 GMT
Repository: incubator-zeppelin
Updated Branches:
  refs/heads/master 3bd2b2122 -> d0a304354


ZEPPELIN-152 Propagate error from interpreter process to the GUI

https://issues.apache.org/jira/browse/ZEPPELIN-152
Errors from remote process are not propagated correctly. So, before go and check interpreter's
log, it's hard to see actual problem.

Author: Lee moon soo <moon@apache.org>

Closes #136 from Leemoonsoo/ZEPPELIN-152 and squashes the following commits:

47ef559 [Lee moon soo] Display ERROR status correctly
650c788 [Lee moon soo] Fix unittest
34a0210 [Lee moon soo] Let exception goes
19cfc44 [Lee moon soo] Propagte remote process's exception


Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/d0a30435
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/d0a30435
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/d0a30435

Branch: refs/heads/master
Commit: d0a30435414726e7fa6d8b8e106e4b6ddb46da67
Parents: 3bd2b21
Author: Lee moon soo <moon@apache.org>
Authored: Fri Jul 3 12:12:14 2015 -0700
Committer: Lee moon soo <moon@apache.org>
Committed: Sun Jul 5 10:55:22 2015 -0700

----------------------------------------------------------------------
 .../zeppelin/spark/SparkSqlInterpreter.java     | 12 +++-----
 .../zeppelin/spark/SparkSqlInterpreterTest.java |  9 ++++--
 .../interpreter/ClassloaderInterpreter.java     |  3 +-
 .../interpreter/remote/RemoteInterpreter.java   |  4 ++-
 .../remote/RemoteInterpreterServer.java         | 16 +++++-----
 .../java/org/apache/zeppelin/scheduler/Job.java | 14 +++------
 .../zeppelin/scheduler/RemoteScheduler.java     | 31 ++++++++++++++++---
 .../remote/RemoteInterpreterTest.java           | 32 ++++++++++++++++++++
 .../apache/zeppelin/socket/NotebookServer.java  |  4 ++-
 9 files changed, 90 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d0a30435/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
index 6e30f1f..e60ff2b 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
@@ -35,6 +35,7 @@ import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
 import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterUtils;
 import org.apache.zeppelin.interpreter.InterpreterResult.Code;
 import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
 import org.apache.zeppelin.interpreter.WrappedInterpreter;
@@ -128,13 +129,10 @@ public class SparkSqlInterpreter extends Interpreter {
       sc.setLocalProperty("spark.scheduler.pool", null);
     }
 
-    try {
-      Object rdd = sqlc.sql(st);
-      String msg = ZeppelinContext.showRDD(sc, context, rdd, maxResult);
-      return new InterpreterResult(Code.SUCCESS, msg);
-    } catch (Exception e) {
-      return new InterpreterResult(Code.ERROR, e.getMessage());
-    }
+
+    Object rdd = sqlc.sql(st);
+    String msg = ZeppelinContext.showRDD(sc, context, rdd, maxResult);
+    return new InterpreterResult(Code.SUCCESS, msg);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d0a30435/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
index eaa0a8a..30166a7 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.zeppelin.spark;
 
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
 
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -91,7 +91,12 @@ public class SparkSqlInterpreterTest {
     assertEquals(Type.TABLE, ret.type());
     assertEquals("name\tage\nmoon\t33\npark\t34\n", ret.message());
 
-    assertEquals(InterpreterResult.Code.ERROR, sql.interpret("select wrong syntax", context).code());
+    try {
+      sql.interpret("select wrong syntax", context);
+      fail("Exception not catched");
+    } catch (Exception e) {
+      // okay
+    }
     assertEquals(InterpreterResult.Code.SUCCESS, sql.interpret("select case when name==\"aa\"
then name else name end from test", context).code());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d0a30435/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ClassloaderInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ClassloaderInterpreter.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ClassloaderInterpreter.java
index d3d6c1c..3fb4eb4 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ClassloaderInterpreter.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ClassloaderInterpreter.java
@@ -55,8 +55,9 @@ public class ClassloaderInterpreter
     Thread.currentThread().setContextClassLoader(cl);
     try {
       return intp.interpret(st, context);
+    } catch (InterpreterException e) {
+      throw e;
     } catch (Exception e) {
-      e.printStackTrace();
       throw new InterpreterException(e);
     } finally {
       cl = Thread.currentThread().getContextClassLoader();

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d0a30435/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index cd77dc4..8992f55 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -30,6 +30,7 @@ import org.apache.zeppelin.interpreter.InterpreterContextRunner;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
 import org.apache.zeppelin.interpreter.InterpreterResult.Type;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
@@ -222,7 +223,8 @@ public class RemoteInterpreter extends Interpreter {
         context.getGui().setForms(remoteGui.getForms());
       }
 
-      return convert(remoteResult);
+      InterpreterResult result = convert(remoteResult);
+      return result;
     } catch (TException e) {
       throw new InterpreterException(e);
     } finally {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d0a30435/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index 8b4b236..33baf9a 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -42,6 +42,7 @@ import org.apache.zeppelin.interpreter.InterpreterContextRunner;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
 import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
@@ -210,18 +211,15 @@ public class RemoteInterpreterServer
       }
     }
 
+    InterpreterResult result;
     if (job.getStatus() == Status.ERROR) {
-      throw new TException(job.getException());
+      result = new InterpreterResult(Code.ERROR, Job.getStack(job.getException()));
     } else {
-      if (intp.getFormType() == FormType.NATIVE) {
-        // serialize dynamic form
-
-      }
-
-      return convert((InterpreterResult) job.getReturn(),
-          context.getConfig(),
-          context.getGui());
+      result = (InterpreterResult) job.getReturn();
     }
+    return convert(result,
+        context.getConfig(),
+        context.getGui());
   }
 
   class InterpretJobListener implements JobListener {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d0a30435/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
index 9837ad2..4c8c70a 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
@@ -21,7 +21,7 @@ import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.Map;
 
-import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -191,17 +191,13 @@ public abstract class Job {
     }
   }
 
-  public String getStack(Throwable e) {
-    StackTraceElement[] stacks = e.getStackTrace();
-    if (stacks == null) {
+  public static String getStack(Throwable e) {
+    if (e == null) {
       return "";
     }
-    String ss = "";
-    for (StackTraceElement s : stacks) {
-      ss += s.toString() + "\n";
-    }
 
-    return ss;
+    Throwable cause = ExceptionUtils.getRootCause(e);
+    return ExceptionUtils.getFullStackTrace(cause);
   }
 
   public Throwable getException() {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d0a30435/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
index 1bf91d6..904dc22 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
@@ -23,6 +23,8 @@ import java.util.List;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.thrift.TException;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
 import org.apache.zeppelin.scheduler.Job.Status;
@@ -179,6 +181,10 @@ public class RemoteScheduler implements Scheduler {
           }
         }
 
+        if (terminate) {
+          // terminated by shutdown
+          break;
+        }
 
         Status newStatus = getStatus();
         if (newStatus == null) { // unknown
@@ -187,9 +193,10 @@ public class RemoteScheduler implements Scheduler {
 
         if (newStatus != Status.READY && newStatus != Status.PENDING) {
           // we don't need more
-          continue;
+          break;
         }
       }
+      terminate = true;
     }
 
     public void shutdown() {
@@ -233,9 +240,9 @@ public class RemoteScheduler implements Scheduler {
         if ("Unknown".equals(statusStr)) {
           // not found this job in the remote schedulers.
           // maybe not submitted, maybe already finished
-          Status status = getLastStatus();
-          listener.afterStatusChange(job, null, status);
-          return status;
+          //Status status = getLastStatus();
+          listener.afterStatusChange(job, null, null);
+          return job.getStatus();
         }
         Status status = Status.valueOf(statusStr);
         lastStatus = status;
@@ -294,6 +301,7 @@ public class RemoteScheduler implements Scheduler {
         listener.jobStarted(scheduler, job);
       }
       job.run();
+
       jobExecuted = true;
       jobSubmittedRemotely = true;
 
@@ -304,7 +312,16 @@ public class RemoteScheduler implements Scheduler {
         logger.error("JobStatusPoller interrupted", e);
       }
 
-      job.setStatus(jobStatusPoller.getStatus());
+      // set job status based on result.
+      Status lastStatus = jobStatusPoller.getStatus();
+      Object jobResult = job.getReturn();
+      if (jobResult != null && jobResult instanceof InterpreterResult) {
+        if (((InterpreterResult) jobResult).code() == Code.ERROR) {
+          lastStatus = Status.ERROR;
+        }
+      }
+      job.setStatus(lastStatus);
+
       if (listener != null) {
         listener.jobFinished(scheduler, job);
       }
@@ -331,10 +348,14 @@ public class RemoteScheduler implements Scheduler {
       if (after == null) { // unknown. maybe before sumitted remotely, maybe already finished.
         if (jobExecuted) {
           jobSubmittedRemotely = true;
+          Object jobResult = job.getReturn();
           if (job.isAborted()) {
             job.setStatus(Status.ABORT);
           } else if (job.getException() != null) {
             job.setStatus(Status.ERROR);
+          } else if (jobResult != null && jobResult instanceof InterpreterResult
+              && ((InterpreterResult) jobResult).code() == Code.ERROR) {
+            job.setStatus(Status.ERROR);
           } else {
             job.setStatus(Status.FINISHED);
           }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d0a30435/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
index b49f86d..4338c50 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
@@ -37,6 +37,8 @@ import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterContextRunner;
 import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer.InterpretJob;
 import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA;
 import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterB;
 import org.apache.zeppelin.scheduler.Job;
@@ -129,6 +131,36 @@ public class RemoteInterpreterTest {
   }
 
   @Test
+  public void testRemoteInterperterErrorStatus() throws TTransportException, IOException
{
+    Properties p = new Properties();
+
+    RemoteInterpreter intpA = new RemoteInterpreter(
+        p,
+        MockInterpreterA.class.getName(),
+        new File("../bin/interpreter.sh").getAbsolutePath(),
+        "fake",
+        env,
+        10 * 1000
+        );
+
+    intpGroup.add(intpA);
+    intpA.setInterpreterGroup(intpGroup);
+
+    intpA.open();
+    InterpreterResult ret = intpA.interpret("non numeric value",
+        new InterpreterContext(
+            "id",
+            "title",
+            "text",
+            new HashMap<String, Object>(),
+            new GUI(),
+            new AngularObjectRegistry(intpGroup.getId(), null),
+            new LinkedList<InterpreterContextRunner>()));
+
+    assertEquals(Code.ERROR, ret.code());
+  }
+
+  @Test
   public void testRemoteSchedulerSharing() throws TTransportException, IOException {
     Properties p = new Properties();
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d0a30435/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index c8238b4..659d4df 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -568,7 +568,9 @@ public class NotebookServer extends WebSocketServer implements
     @Override
     public void afterStatusChange(Job job, Status before, Status after) {
       if (after == Status.ERROR) {
-        job.getException().printStackTrace();
+        if (job.getException() != null) {
+          LOG.error("Error", job.getException());
+        }
       }
       if (job.isTerminated()) {
         LOG.info("Job {} is finished", job.getId());


Mime
View raw message