zeppelin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From felixche...@apache.org
Subject incubator-zeppelin git commit: Shell interpreter improvements
Date Thu, 28 Jan 2016 23:14:02 GMT
Repository: incubator-zeppelin
Updated Branches:
  refs/heads/master 1b3784d23 -> 90cc2b3d1


Shell interpreter improvements

Creating new PR with the changes from
https://github.com/apache/incubator-zeppelin/pull/615
Please check the above PR for prior discussions.

### What is this PR for?
*Provide ability to to run  shell commands in parallel
*Provide ability to cancel shell command
*Propagate the error from shell commands to UI

### What type of PR is it?
Improvement

### Todos
NA

### Is there a relevant Jira issue?
No

### How should this be tested?
*To check parallelism, run more than 10 shell commands concurrently.
*To verify whether error is propagate to UI, execute a shell command which will error out(simplest
being cd of a non existent directory )
*To verify the cancel functionality, try cancelling a shell command that is running.

### Screenshots (if appropriate)
NA

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: karuppayya <karuppayya1990@gmail.com>
Author: Karup <karuppayya@outlook.com>

Closes #666 from Karuppayya/shell_imp and squashes the following commits:

6293781 [karuppayya] Fix test failure, fixes based on discussion
431cc79 [karuppayya] Shell interpreter improvements
825f696 [karuppayya] merge master
4fd2113 [Karup] Send exitvalue of  shell command in interpreter result
d259c48 [karuppayya] Fix typo, log exit value of a succesful shell commnad
351888d [karuppayya] Increase thread pool size
8cd6fd4 [karuppayya] Add log messages
9eb3eca [karuppayya] Fix command timeout period
87364b1 [karuppayya] Remove unnecessary changes
fcdc494 [karuppayya] Fix indentation
30078ac [karuppayya] fix
540bfa8 [Karup] Merge branch 'shell1' of github.com:Karuppayya/incubator-zeppelin into shell1
7d938bd [Karup] fix
b0a97a1 [Karup] fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/90cc2b3d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/90cc2b3d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/90cc2b3d

Branch: refs/heads/master
Commit: 90cc2b3d1b6902cc56c7231a6802a3baac3ee0d7
Parents: 1b3784d
Author: karuppayya <karuppayya1990@gmail.com>
Authored: Sun Jan 24 21:24:16 2016 +0530
Committer: Felix Cheung <felixcheung@apache.org>
Committed: Thu Jan 28 15:14:00 2016 -0800

----------------------------------------------------------------------
 .../apache/zeppelin/shell/ShellInterpreter.java | 59 +++++++++++++++++---
 .../interpreter/remote/RemoteInterpreter.java   |  9 ++-
 .../remote/RemoteInterpreterProcess.java        |  6 ++
 .../remote/RemoteInterpreterServer.java         |  6 +-
 .../zeppelin/conf/ZeppelinConfiguration.java    |  1 +
 .../interpreter/InterpreterFactory.java         |  3 +-
 6 files changed, 70 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/90cc2b3d/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java b/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java
index 090c0e9..85aafc5 100644
--- a/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java
+++ b/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java
@@ -19,18 +19,22 @@ package org.apache.zeppelin.shell;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import org.apache.commons.exec.CommandLine;
 import org.apache.commons.exec.DefaultExecutor;
 import org.apache.commons.exec.ExecuteException;
 import org.apache.commons.exec.ExecuteWatchdog;
+import org.apache.commons.exec.Executor;
 import org.apache.commons.exec.PumpStreamHandler;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.scheduler.Scheduler;
 import org.apache.zeppelin.scheduler.SchedulerFactory;
 import org.slf4j.Logger;
@@ -41,6 +45,7 @@ import org.slf4j.LoggerFactory;
  */
 public class ShellInterpreter extends Interpreter {
   Logger logger = LoggerFactory.getLogger(ShellInterpreter.class);
+  private static final String EXECUTOR_KEY = "executor";
   int commandTimeOut = 600000;
 
   static {
@@ -61,31 +66,67 @@ public class ShellInterpreter extends Interpreter {
   @Override
   public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
     logger.debug("Run shell command '" + cmd + "'");
-    long start = System.currentTimeMillis();
     CommandLine cmdLine = CommandLine.parse("bash");
     cmdLine.addArgument("-c", false);
     cmdLine.addArgument(cmd, false);
     DefaultExecutor executor = new DefaultExecutor();
     ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-    executor.setStreamHandler(new PumpStreamHandler(outputStream));
-
+    ByteArrayOutputStream errorStream = new ByteArrayOutputStream();
+    executor.setStreamHandler(new PumpStreamHandler(outputStream, errorStream));
     executor.setWatchdog(new ExecuteWatchdog(commandTimeOut));
+
+    Job runningJob = getRunningJob(contextInterpreter.getParagraphId());
+    Map<String, Object> info = runningJob.info();
+    info.put(EXECUTOR_KEY, executor);
     try {
-      int exitValue = executor.execute(cmdLine);
+      int exitVal = executor.execute(cmdLine);
+      logger.info("Paragraph " + contextInterpreter.getParagraphId()
+          + "return with exit value: " + exitVal);
       return new InterpreterResult(InterpreterResult.Code.SUCCESS, outputStream.toString());
     } catch (ExecuteException e) {
+      int exitValue = e.getExitValue();
       logger.error("Can not run " + cmd, e);
-      return new InterpreterResult(Code.ERROR, e.getMessage());
+      Code code = Code.ERROR;
+      String msg = errorStream.toString();
+      if (exitValue == 143) {
+        code = Code.INCOMPLETE;
+        msg = msg + "Paragraph received a SIGTERM.\n";
+        logger.info("The paragraph " + contextInterpreter.getParagraphId()
+            + " stopped executing: " + msg);
+      }
+      msg += "Exitvalue: " + exitValue;
+      return new InterpreterResult(code, msg);
     } catch (IOException e) {
       logger.error("Can not run " + cmd, e);
       return new InterpreterResult(Code.ERROR, e.getMessage());
     }
   }
 
-  @Override
-  public void cancel(InterpreterContext context) {}
+  private Job getRunningJob(String paragraphId) {
+    Job foundJob = null;
+    Collection<Job> jobsRunning = getScheduler().getJobsRunning();
+    for (Job job : jobsRunning) {
+      if (job.getId().equals(paragraphId)) {
+        foundJob = job;
+      }
+    }
+    return foundJob;
+  }
 
   @Override
+  public void cancel(InterpreterContext context) {
+    Job runningJob = getRunningJob(context.getParagraphId());
+    if (runningJob != null) {
+      Map<String, Object> info = runningJob.info();
+      Object object = info.get(EXECUTOR_KEY);
+      if (object != null) {
+        Executor executor = (Executor) object;
+        ExecuteWatchdog watchdog = executor.getWatchdog();
+        watchdog.destroyProcess();
+      }
+    }
+  }
+  @Override
   public FormType getFormType() {
     return FormType.SIMPLE;
   }
@@ -97,8 +138,8 @@ public class ShellInterpreter extends Interpreter {
 
   @Override
   public Scheduler getScheduler() {
-    return SchedulerFactory.singleton().createOrGetFIFOScheduler(
-        ShellInterpreter.class.getName() + this.hashCode());
+    return SchedulerFactory.singleton().createOrGetParallelScheduler(
+        ShellInterpreter.class.getName() + this.hashCode(), 10);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/90cc2b3d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index d2a24e8..d8cb223 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -57,14 +57,15 @@ public class RemoteInterpreter extends Interpreter {
   FormType formType;
   boolean initialized;
   private Map<String, String> env;
-
   private int connectTimeout;
+  private int maxPoolSize;
 
   public RemoteInterpreter(Properties property,
                            String className,
                            String interpreterRunner,
                            String interpreterPath,
                            int connectTimeout,
+                           int maxPoolSize,
                            RemoteInterpreterProcessListener remoteInterpreterProcessListener)
{
     super(property);
     this.className = className;
@@ -73,6 +74,7 @@ public class RemoteInterpreter extends Interpreter {
     this.interpreterPath = interpreterPath;
     env = new HashMap<String, String>();
     this.connectTimeout = connectTimeout;
+    this.maxPoolSize = maxPoolSize;
     this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
   }
 
@@ -89,6 +91,7 @@ public class RemoteInterpreter extends Interpreter {
     this.interpreterPath = interpreterPath;
     this.env = env;
     this.connectTimeout = connectTimeout;
+    this.maxPoolSize = 10;
     this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
   }
 
@@ -124,7 +127,7 @@ public class RemoteInterpreter extends Interpreter {
 
     RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
     int rc = interpreterProcess.reference(getInterpreterGroup());
-
+    interpreterProcess.setMaxPoolSize(this.maxPoolSize);
     synchronized (interpreterProcess) {
       // when first process created
       if (rc == 1) {
@@ -330,7 +333,7 @@ public class RemoteInterpreter extends Interpreter {
 
   @Override
   public Scheduler getScheduler() {
-    int maxConcurrency = 10;
+    int maxConcurrency = maxPoolSize;
     RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
     if (interpreterProcess == null) {
       return null;

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/90cc2b3d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
index 5612a2b..5237b0b 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
@@ -261,6 +261,12 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler
{
     }
   }
 
+  public void setMaxPoolSize(int size) {
+    if (clientPool != null) {
+      //Size + 2 for progress poller , cancel operation
+      clientPool.setMaxTotal(size + 2);
+    }
+  }
   /**
    * Called when angular object is updated in client side to propagate
    * change to the remote process

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/90cc2b3d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index a59293b..02736fe 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -263,6 +263,7 @@ public class RemoteInterpreterServer
     private Interpreter interpreter;
     private String script;
     private InterpreterContext context;
+    private Map<String, Object> infos;
 
     public InterpretJob(
         String jobId,
@@ -285,7 +286,10 @@ public class RemoteInterpreterServer
 
     @Override
     public Map<String, Object> info() {
-      return null;
+      if (infos == null) {
+        infos = new HashMap<>();
+      }
+      return infos;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/90cc2b3d/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index 6d726f5..9e606ee 100755
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -457,6 +457,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
         + "org.apache.zeppelin.jdbc.JDBCInterpreter"),
     ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"),
     ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 30000),
+    ZEPPELIN_INTERPRETER_MAX_POOL_SIZE("zeppelin.interpreter.max.poolsize", 10),
     ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
     ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),
     // use specified notebook (id) as homescreen

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/90cc2b3d/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
index 039d970..3cd1257 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
@@ -664,9 +664,10 @@ public class InterpreterFactory {
       Properties property) {
 
     int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
+    int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE);
     LazyOpenInterpreter intp = new LazyOpenInterpreter(new RemoteInterpreter(
         property, className, conf.getInterpreterRemoteRunnerPath(),
-        interpreterPath, connectTimeout, remoteInterpreterProcessListener));
+        interpreterPath, connectTimeout, maxPoolSize, remoteInterpreterProcessListener));
     return intp;
   }
 


Mime
View raw message