zeppelin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject incubator-zeppelin git commit: ZEPPELIN-286 Scheduler already terminated Exception
Date Sun, 15 Nov 2015 07:05:29 GMT
Repository: incubator-zeppelin
Updated Branches:
  refs/heads/master 0fde27fda -> c3892d56e


ZEPPELIN-286 Scheduler already terminated Exception

Fixes https://issues.apache.org/jira/browse/ZEPPELIN-286

Author: Lee moon soo <moon@apache.org>

Closes #427 from Leemoonsoo/ZEPPELIN-286 and squashes the following commits:

98eaad7 [Lee moon soo] Allow null return on getInterpreterProcess
e289206 [Lee moon soo] Close all interpreter processes when server stops
dddca9b [Lee moon soo] Shutdown event poller when interpreter process dies
e4a306f [Lee moon soo] Clear reference to interpreterProcess from interpreterGroupReference
with in close() to make sure not reusing after restart
cdf3c4b [Lee moon soo] Add test for sharing scheduler instance
e122171 [Lee moon soo] Better handling of RemoteInterpreter shutdown. Share scheduler instance
among RemoteInterpreter in the same group


Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/c3892d56
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/c3892d56
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/c3892d56

Branch: refs/heads/master
Commit: c3892d56e2b00c6b8a8131ab2c55f975380f1b68
Parents: 0fde27f
Author: Lee moon soo <moon@apache.org>
Authored: Sat Nov 14 00:05:38 2015 +0900
Committer: Lee moon soo <moon@apache.org>
Committed: Sun Nov 15 16:05:51 2015 +0900

----------------------------------------------------------------------
 .../interpreter/remote/RemoteInterpreter.java   | 17 +++-
 .../remote/RemoteInterpreterEventPoller.java    |  2 +-
 .../remote/RemoteInterpreterProcess.java        | 15 +--
 .../remote/RemoteInterpreterServer.java         | 18 +++-
 .../remote/RemoteInterpreterTest.java           | 98 +++++++++++++++++---
 .../apache/zeppelin/server/ZeppelinServer.java  |  2 +
 6 files changed, 126 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/c3892d56/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index d5d92c8..9d01561 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -32,6 +32,7 @@ import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResult.Code;
 import org.apache.zeppelin.interpreter.InterpreterResult.Type;
+import org.apache.zeppelin.interpreter.WrappedInterpreter;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
@@ -86,7 +87,7 @@ public class RemoteInterpreter extends Interpreter {
     this.interpreterRunner = interpreterRunner;
     this.interpreterPath = interpreterPath;
     this.env = env;
-    this.connectTimeout = connectTimeout;  
+    this.connectTimeout = connectTimeout;
   }
 
   @Override
@@ -105,7 +106,8 @@ public class RemoteInterpreter extends Interpreter {
           throw new InterpreterException(e);
         }
       } else {
-        throw new InterpreterException("Unexpected error");
+        // closed or not opened yet
+        return null;
       }
     }
   }
@@ -180,7 +182,13 @@ public class RemoteInterpreter extends Interpreter {
       interpreterProcess.releaseClient(client);
     }
 
-    interpreterProcess.dereference();
+    int r = interpreterProcess.dereference();
+    if (r == 0) {
+      synchronized (interpreterGroupReference) {
+        InterpreterGroup intpGroup = getInterpreterGroup();
+        interpreterGroupReference.remove(getInterpreterGroupKey(intpGroup));
+      }
+    }
   }
 
   @Override
@@ -322,8 +330,7 @@ public class RemoteInterpreter extends Interpreter {
     int maxConcurrency = 10;
     RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
     return SchedulerFactory.singleton().createOrGetRemoteScheduler(
-        "remoteinterpreter_" + interpreterProcess.hashCode(),
-        getInterpreterProcess(),
+        "remoteinterpreter_" + interpreterProcess.hashCode(), getInterpreterProcess(),
         maxConcurrency);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/c3892d56/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
index f39f6a6..1b734b7 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
@@ -56,7 +56,7 @@ public class RemoteInterpreterEventPoller extends Thread {
   public void run() {
     Client client = null;
 
-    while (!shutdown) {
+    while (!shutdown && interpreterProcess.isRunning()) {
       try {
         client = interpreterProcess.getClient();
       } catch (Exception e1) {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/c3892d56/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
index 534af27..0c9e877 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
@@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 public class RemoteInterpreterProcess implements ExecuteResultHandler {
   private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterProcess.class);
-  
+
   private final AtomicInteger referenceCount;
   private DefaultExecutor executor;
   private ExecuteWatchdog watchdog;
@@ -124,7 +124,7 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler
{
             }
           }
         }
-        
+
         clientPool = new GenericObjectPool<Client>(new ClientFactory("localhost", port));
 
         remoteInterpreterEventPoller.setInterpreterGroup(interpreterGroup);
@@ -151,13 +151,16 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler
{
         remoteInterpreterEventPoller.shutdown();
 
         // first try shutdown
+        Client client = null;
         try {
-          Client client = getClient();
+          client = getClient();
           client.shutdown();
-          releaseClient(client);
         } catch (Exception e) {
-          logger.error("Error", e);
-          watchdog.destroyProcess();
+          // safely ignore exception while client.shutdown() may terminates remote process
+        } finally {
+          if (client != null) {
+            releaseClient(client);
+          }
         }
 
         clientPool.clear();

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/c3892d56/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index 16b1883..7405a66 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -100,10 +100,26 @@ public class RemoteInterpreterServer
 
   @Override
   public void shutdown() throws TException {
+    interpreterGroup.close();
+    interpreterGroup.destroy();
+
+    server.stop();
+
     // server.stop() does not always finish server.serve() loop
     // sometimes server.serve() is hanging even after server.stop() call.
     // this case, need to force kill the process
-    server.stop();
+
+    long startTime = System.currentTimeMillis();
+    while (System.currentTimeMillis() - startTime < 2000 && server.isServing())
{
+      try {
+        Thread.sleep(300);
+      } catch (InterruptedException e) {
+      }
+    }
+
+    if (server.isServing()) {
+      System.exit(0);
+    }
   }
 
   public int getPort() {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/c3892d56/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
index 0c74cea..bbda252 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
@@ -17,10 +17,7 @@
 
 package org.apache.zeppelin.interpreter.remote;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 import java.io.File;
 import java.io.IOException;
@@ -38,7 +35,6 @@ import org.apache.zeppelin.interpreter.InterpreterContextRunner;
 import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer.InterpretJob;
 import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA;
 import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterB;
 import org.apache.zeppelin.scheduler.Job;
@@ -63,7 +59,7 @@ public class RemoteInterpreterTest {
 
   @After
   public void tearDown() throws Exception {
-    intpGroup.clone();
+    intpGroup.close();
     intpGroup.destroy();
   }
 
@@ -225,7 +221,7 @@ public class RemoteInterpreterTest {
     intpB.close();
 
     RemoteInterpreterProcess process = intpA.getInterpreterProcess();
-    assertFalse(process.isRunning());
+    assertNull(process);
   }
 
   @Test
@@ -343,7 +339,7 @@ public class RemoteInterpreterTest {
     intpB.close();
 
     RemoteInterpreterProcess process = intpA.getInterpreterProcess();
-    assertFalse(process.isRunning());
+    assertNull(process);
   }
 
   @Test
@@ -555,10 +551,10 @@ public class RemoteInterpreterTest {
   }
 
   @Test
-  public void testInterpreterGroupResetDuringProcessRunning() {
+  public void testInterpreterGroupResetDuringProcessRunning() throws InterruptedException
{
     Properties p = new Properties();
 
-    RemoteInterpreter intpA = new RemoteInterpreter(
+    final RemoteInterpreter intpA = new RemoteInterpreter(
         p,
         MockInterpreterA.class.getName(),
         new File("../bin/interpreter.sh").getAbsolutePath(),
@@ -567,16 +563,92 @@ public class RemoteInterpreterTest {
         10 * 1000
         );
 
+    intpGroup.add(intpA);
     intpA.setInterpreterGroup(intpGroup);
-    RemoteInterpreterProcess processA = intpA.getInterpreterProcess();
+
     intpA.open();
 
+    Job jobA = new Job("jobA", null) {
+
+      @Override
+      public int progress() {
+        return 0;
+      }
+
+      @Override
+      public Map<String, Object> info() {
+        return null;
+      }
+
+      @Override
+      protected Object jobRun() throws Throwable {
+        return intpA.interpret("2000",
+            new InterpreterContext(
+                "note",
+                "jobA",
+                "title",
+                "text",
+                new HashMap<String, Object>(),
+                new GUI(),
+                new AngularObjectRegistry(intpGroup.getId(), null),
+                new LinkedList<InterpreterContextRunner>()));
+      }
+
+      @Override
+      protected boolean jobAbort() {
+        return false;
+      }
+
+    };
+    intpA.getScheduler().submit(jobA);
+
+    // wait for job started
+    while (intpA.getScheduler().getJobsRunning().size() == 0) {
+      Thread.sleep(100);
+    }
+
+    // restart interpreter
+    RemoteInterpreterProcess processA = intpA.getInterpreterProcess();
+    intpA.close();
     intpA.setInterpreterGroup(new InterpreterGroup(intpA.getInterpreterGroup().getId()));
+    intpA.open();
     RemoteInterpreterProcess processB = intpA.getInterpreterProcess();
 
-    assertEquals(processA.hashCode(), processB.hashCode());
+    assertNotSame(processA.hashCode(), processB.hashCode());
+
+  }
+
+  @Test
+  public void testRemoteInterpreterSharesTheSameSchedulerInstanceInTheSameGroup() {
+    Properties p = new Properties();
 
-    processA.dereference();     // intpA.close();
+    RemoteInterpreter intpA = new RemoteInterpreter(
+        p,
+        MockInterpreterA.class.getName(),
+        new File("../bin/interpreter.sh").getAbsolutePath(),
+        "fake",
+        env,
+        10 * 1000
+        );
+
+    intpGroup.add(intpA);
+    intpA.setInterpreterGroup(intpGroup);
+
+    RemoteInterpreter intpB = new RemoteInterpreter(
+        p,
+        MockInterpreterB.class.getName(),
+        new File("../bin/interpreter.sh").getAbsolutePath(),
+        "fake",
+        env,
+        10 * 1000
+        );
+
+    intpGroup.add(intpB);
+    intpB.setInterpreterGroup(intpGroup);
+
+    intpA.open();
+    intpB.open();
 
+    assertEquals(intpA.getScheduler(), intpB.getScheduler());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/c3892d56/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
index a6e944d..3717ecc 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
@@ -112,6 +112,7 @@ public class ZeppelinServer extends Application {
         LOG.info("Shutting down Zeppelin Server ... ");
         try {
           jettyServer.stop();
+          ZeppelinServer.notebook.getInterpreterFactory().close();
         } catch (Exception e) {
           LOG.error("Error while stopping servlet container", e);
         }
@@ -131,6 +132,7 @@ public class ZeppelinServer extends Application {
     }
 
     jettyServer.join();
+    ZeppelinServer.notebook.getInterpreterFactory().close();
   }
 
   private static Server setupJettyServer(ZeppelinConfiguration conf)


Mime
View raw message