zeppelin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject incubator-zeppelin git commit: ZEPPELIN-79 Zeppelin does not kill some interpreters when server is stopped
Date Sun, 05 Jul 2015 17:45:05 GMT
Repository: incubator-zeppelin
Updated Branches:
  refs/heads/master d1da5684a -> 12e5abf28


ZEPPELIN-79 Zeppelin does not kill some interpreters when server is stopped

https://issues.apache.org/jira/browse/ZEPPELIN-79

Zeppelin sometimes left interpreter process after it is stopped.
This pr solve the problem by increase timeout for graceful shutdown

Author: Lee moon soo <moon@apache.org>

Closes #135 from Leemoonsoo/ZEPPELIN-79 and squashes the following commits:

d2b1fa6 [Lee moon soo] Close and destroy interpreters in parallel
4558417 [Lee moon soo] Increase graceful shutdown timeout


Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/12e5abf2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/12e5abf2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/12e5abf2

Branch: refs/heads/master
Commit: 12e5abf2803e4c5015998672b10642fc72aac0da
Parents: d1da568
Author: Lee moon soo <moon@apache.org>
Authored: Thu Jul 2 12:05:23 2015 -0700
Committer: Lee moon soo <moon@apache.org>
Committed: Sun Jul 5 10:45:01 2015 -0700

----------------------------------------------------------------------
 bin/zeppelin-daemon.sh                          | 20 ++++++---
 .../zeppelin/interpreter/InterpreterGroup.java  | 46 ++++++++++++++++++--
 .../remote/RemoteInterpreterProcess.java        |  4 +-
 .../interpreter/InterpreterFactory.java         | 25 ++++++++---
 4 files changed, 77 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/12e5abf2/bin/zeppelin-daemon.sh
----------------------------------------------------------------------
diff --git a/bin/zeppelin-daemon.sh b/bin/zeppelin-daemon.sh
index 2440c12..a386f27 100755
--- a/bin/zeppelin-daemon.sh
+++ b/bin/zeppelin-daemon.sh
@@ -101,19 +101,27 @@ function wait_for_zeppelin_to_die() {
   local pid
   local count
   pid=$1
+  timeout=$2
   count=0
-  while [[ "${count}" -lt 10 ]]; do
+  timeoutTime=$(date "+%s")
+  let "timeoutTime+=$timeout"
+  currentTime=$(date "+%s")
+  forceKill=1
+
+  while [[ $currentTime -lt $timeoutTime ]]; do
     $(kill ${pid} > /dev/null 2> /dev/null)
     if kill -0 ${pid} > /dev/null 2>&1; then
       sleep 3
-      let "count+=1"
     else
+      forceKill=0
       break
     fi
-  if [[ "${count}" == "5" ]]; then
+    currentTime=$(date "+%s")
+  done
+
+  if [[ forceKill -ne 0 ]]; then
     $(kill -9 ${pid} > /dev/null 2> /dev/null)
   fi
-  done
 }
 
 function wait_zeppelin_is_up_for_ci() {
@@ -187,7 +195,7 @@ function stop() {
     if [[ -z "${pid}" ]]; then
       echo "${ZEPPELIN_NAME} is not running"
     else
-      wait_for_zeppelin_to_die $pid
+      wait_for_zeppelin_to_die $pid 40
       $(rm -f ${ZEPPELIN_PID})
       action_msg "${ZEPPELIN_NAME} stop" "${SET_OK}"
     fi
@@ -200,7 +208,7 @@ function stop() {
     fi
 
     pid=$(cat ${f})
-    wait_for_zeppelin_to_die $pid
+    wait_for_zeppelin_to_die $pid 20
     $(rm -f ${f})
   done
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/12e5abf2/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
index 9baaef3..216663a 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
@@ -18,9 +18,11 @@
 package org.apache.zeppelin.interpreter;
 
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Properties;
 import java.util.Random;
 
+import org.apache.log4j.Logger;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 
 /**
@@ -71,14 +73,50 @@ public class InterpreterGroup extends LinkedList<Interpreter>{
   }
 
   public void close() {
-    for (Interpreter intp : this) {
-      intp.close();
+    List<Thread> closeThreads = new LinkedList<Thread>();
+
+    for (final Interpreter intp : this) {
+      Thread t = new Thread() {
+        public void run() {
+          intp.close();
+        }
+      };
+
+      t.start();
+      closeThreads.add(t);
+    }
+
+    for (Thread t : closeThreads) {
+      try {
+        t.join();
+      } catch (InterruptedException e) {
+        Logger logger = Logger.getLogger(InterpreterGroup.class);
+        logger.error("Can't close interpreter", e);
+      }
     }
   }
 
   public void destroy() {
-    for (Interpreter intp : this) {
-      intp.destroy();
+    List<Thread> destroyThreads = new LinkedList<Thread>();
+
+    for (final Interpreter intp : this) {
+      Thread t = new Thread() {
+        public void run() {
+          intp.destroy();
+        }
+      };
+
+      t.start();
+      destroyThreads.add(t);
+    }
+
+    for (Thread t : destroyThreads) {
+      try {
+        t.join();
+      } catch (InterruptedException e) {
+        Logger logger = Logger.getLogger(InterpreterGroup.class);
+        logger.error("Can't close interpreter", e);
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/12e5abf2/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
index f917eb9..91edd41 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
@@ -163,10 +163,10 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler
{
         clientPool.clear();
         clientPool.close();
 
-        // wait for 3 sec and force kill
+        // wait for some time (connectTimeout) and force kill
         // remote process server.serve() loop is not always finishing gracefully
         long startTime = System.currentTimeMillis();
-        while (System.currentTimeMillis() - startTime < 3 * 1000) {
+        while (System.currentTimeMillis() - startTime < connectTimeout) {
           if (this.isRunning()) {
             try {
               Thread.sleep(500);

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/12e5abf2/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
index 77df7c5..57e2b7a 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
@@ -546,13 +546,26 @@ public class InterpreterFactory {
 
 
   public void close() {
+    List<Thread> closeThreads = new LinkedList<Thread>();
     synchronized (interpreterSettings) {
-      synchronized (interpreterSettings) {
-        Collection<InterpreterSetting> intpsettings = interpreterSettings.values();
-        for (InterpreterSetting intpsetting : intpsettings) {
-          intpsetting.getInterpreterGroup().close();
-          intpsetting.getInterpreterGroup().destroy();
-        }
+      Collection<InterpreterSetting> intpsettings = interpreterSettings.values();
+      for (final InterpreterSetting intpsetting : intpsettings) {
+        Thread t = new Thread() {
+          public void run() {
+            intpsetting.getInterpreterGroup().close();
+            intpsetting.getInterpreterGroup().destroy();
+          }
+        };
+        t.start();
+        closeThreads.add(t);
+      }
+    }
+
+    for (Thread t : closeThreads) {
+      try {
+        t.join();
+      } catch (InterruptedException e) {
+        logger.error("Can't close interpreterGroup", e);
       }
     }
   }


Mime
View raw message