Return-Path: X-Original-To: apmail-zeppelin-commits-archive@minotaur.apache.org Delivered-To: apmail-zeppelin-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D968618F4D for ; Sun, 15 Nov 2015 07:05:36 +0000 (UTC) Received: (qmail 28847 invoked by uid 500); 15 Nov 2015 07:05:36 -0000 Delivered-To: apmail-zeppelin-commits-archive@zeppelin.apache.org Received: (qmail 28796 invoked by uid 500); 15 Nov 2015 07:05:36 -0000 Mailing-List: contact commits-help@zeppelin.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@zeppelin.incubator.apache.org Delivered-To: mailing list commits@zeppelin.incubator.apache.org Received: (qmail 28776 invoked by uid 99); 15 Nov 2015 07:05:36 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 15 Nov 2015 07:05:36 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 554DDC1FC8 for ; Sun, 15 Nov 2015 07:05:36 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.39 X-Spam-Level: * X-Spam-Status: No, score=1.39 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.391, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id Q2dX1JCbA90c for ; Sun, 15 Nov 2015 07:05:30 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id 0297120C08 for ; Sun, 15 Nov 2015 07:05:29 +0000 (UTC) Received: (qmail 28757 invoked by uid 99); 15 Nov 2015 07:05:29 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 15 Nov 2015 07:05:29 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BCA06DFE04; Sun, 15 Nov 2015 07:05:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: moon@apache.org To: commits@zeppelin.incubator.apache.org Message-Id: <539ab226ac9541e4b3f70d3e5bb746fd@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-zeppelin git commit: ZEPPELIN-286 Scheduler already terminated Exception Date: Sun, 15 Nov 2015 07:05:29 +0000 (UTC) Repository: incubator-zeppelin Updated Branches: refs/heads/master 0fde27fda -> c3892d56e ZEPPELIN-286 Scheduler already terminated Exception Fixes https://issues.apache.org/jira/browse/ZEPPELIN-286 Author: Lee moon soo Closes #427 from Leemoonsoo/ZEPPELIN-286 and squashes the following commits: 98eaad7 [Lee moon soo] Allow null return on getInterpreterProcess e289206 [Lee moon soo] Close all interpreter processes when server stops dddca9b [Lee moon soo] Shutdown event poller when interpreter process dies e4a306f [Lee moon soo] Clear reference to interpreterProcess from interpreterGroupReference with in close() to make sure not reusing after restart cdf3c4b [Lee moon soo] Add test for sharing scheduler instance e122171 [Lee moon soo] Better handling of RemoteInterpreter shutdown. Share scheduler instance among RemoteInterpreter in the same group Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/c3892d56 Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/c3892d56 Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/c3892d56 Branch: refs/heads/master Commit: c3892d56e2b00c6b8a8131ab2c55f975380f1b68 Parents: 0fde27f Author: Lee moon soo Authored: Sat Nov 14 00:05:38 2015 +0900 Committer: Lee moon soo Committed: Sun Nov 15 16:05:51 2015 +0900 ---------------------------------------------------------------------- .../interpreter/remote/RemoteInterpreter.java | 17 +++- .../remote/RemoteInterpreterEventPoller.java | 2 +- .../remote/RemoteInterpreterProcess.java | 15 +-- .../remote/RemoteInterpreterServer.java | 18 +++- .../remote/RemoteInterpreterTest.java | 98 +++++++++++++++++--- .../apache/zeppelin/server/ZeppelinServer.java | 2 + 6 files changed, 126 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/c3892d56/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java index d5d92c8..9d01561 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -32,6 +32,7 @@ import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.InterpreterResult.Type; +import org.apache.zeppelin.interpreter.WrappedInterpreter; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; @@ -86,7 +87,7 @@ public class RemoteInterpreter extends Interpreter { this.interpreterRunner = interpreterRunner; this.interpreterPath = interpreterPath; this.env = env; - this.connectTimeout = connectTimeout; + this.connectTimeout = connectTimeout; } @Override @@ -105,7 +106,8 @@ public class RemoteInterpreter extends Interpreter { throw new InterpreterException(e); } } else { - throw new InterpreterException("Unexpected error"); + // closed or not opened yet + return null; } } } @@ -180,7 +182,13 @@ public class RemoteInterpreter extends Interpreter { interpreterProcess.releaseClient(client); } - interpreterProcess.dereference(); + int r = interpreterProcess.dereference(); + if (r == 0) { + synchronized (interpreterGroupReference) { + InterpreterGroup intpGroup = getInterpreterGroup(); + interpreterGroupReference.remove(getInterpreterGroupKey(intpGroup)); + } + } } @Override @@ -322,8 +330,7 @@ public class RemoteInterpreter extends Interpreter { int maxConcurrency = 10; RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); return SchedulerFactory.singleton().createOrGetRemoteScheduler( - "remoteinterpreter_" + interpreterProcess.hashCode(), - getInterpreterProcess(), + "remoteinterpreter_" + interpreterProcess.hashCode(), getInterpreterProcess(), maxConcurrency); } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/c3892d56/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java index f39f6a6..1b734b7 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java @@ -56,7 +56,7 @@ public class RemoteInterpreterEventPoller extends Thread { public void run() { Client client = null; - while (!shutdown) { + while (!shutdown && interpreterProcess.isRunning()) { try { client = interpreterProcess.getClient(); } catch (Exception e1) { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/c3892d56/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java index 534af27..0c9e877 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java @@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger; */ public class RemoteInterpreterProcess implements ExecuteResultHandler { private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterProcess.class); - + private final AtomicInteger referenceCount; private DefaultExecutor executor; private ExecuteWatchdog watchdog; @@ -124,7 +124,7 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler { } } } - + clientPool = new GenericObjectPool(new ClientFactory("localhost", port)); remoteInterpreterEventPoller.setInterpreterGroup(interpreterGroup); @@ -151,13 +151,16 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler { remoteInterpreterEventPoller.shutdown(); // first try shutdown + Client client = null; try { - Client client = getClient(); + client = getClient(); client.shutdown(); - releaseClient(client); } catch (Exception e) { - logger.error("Error", e); - watchdog.destroyProcess(); + // safely ignore exception while client.shutdown() may terminates remote process + } finally { + if (client != null) { + releaseClient(client); + } } clientPool.clear(); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/c3892d56/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 16b1883..7405a66 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -100,10 +100,26 @@ public class RemoteInterpreterServer @Override public void shutdown() throws TException { + interpreterGroup.close(); + interpreterGroup.destroy(); + + server.stop(); + // server.stop() does not always finish server.serve() loop // sometimes server.serve() is hanging even after server.stop() call. // this case, need to force kill the process - server.stop(); + + long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTime < 2000 && server.isServing()) { + try { + Thread.sleep(300); + } catch (InterruptedException e) { + } + } + + if (server.isServing()) { + System.exit(0); + } } public int getPort() { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/c3892d56/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java index 0c74cea..bbda252 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java @@ -17,10 +17,7 @@ package org.apache.zeppelin.interpreter.remote; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; import java.io.File; import java.io.IOException; @@ -38,7 +35,6 @@ import org.apache.zeppelin.interpreter.InterpreterContextRunner; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer.InterpretJob; import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA; import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterB; import org.apache.zeppelin.scheduler.Job; @@ -63,7 +59,7 @@ public class RemoteInterpreterTest { @After public void tearDown() throws Exception { - intpGroup.clone(); + intpGroup.close(); intpGroup.destroy(); } @@ -225,7 +221,7 @@ public class RemoteInterpreterTest { intpB.close(); RemoteInterpreterProcess process = intpA.getInterpreterProcess(); - assertFalse(process.isRunning()); + assertNull(process); } @Test @@ -343,7 +339,7 @@ public class RemoteInterpreterTest { intpB.close(); RemoteInterpreterProcess process = intpA.getInterpreterProcess(); - assertFalse(process.isRunning()); + assertNull(process); } @Test @@ -555,10 +551,10 @@ public class RemoteInterpreterTest { } @Test - public void testInterpreterGroupResetDuringProcessRunning() { + public void testInterpreterGroupResetDuringProcessRunning() throws InterruptedException { Properties p = new Properties(); - RemoteInterpreter intpA = new RemoteInterpreter( + final RemoteInterpreter intpA = new RemoteInterpreter( p, MockInterpreterA.class.getName(), new File("../bin/interpreter.sh").getAbsolutePath(), @@ -567,16 +563,92 @@ public class RemoteInterpreterTest { 10 * 1000 ); + intpGroup.add(intpA); intpA.setInterpreterGroup(intpGroup); - RemoteInterpreterProcess processA = intpA.getInterpreterProcess(); + intpA.open(); + Job jobA = new Job("jobA", null) { + + @Override + public int progress() { + return 0; + } + + @Override + public Map info() { + return null; + } + + @Override + protected Object jobRun() throws Throwable { + return intpA.interpret("2000", + new InterpreterContext( + "note", + "jobA", + "title", + "text", + new HashMap(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LinkedList())); + } + + @Override + protected boolean jobAbort() { + return false; + } + + }; + intpA.getScheduler().submit(jobA); + + // wait for job started + while (intpA.getScheduler().getJobsRunning().size() == 0) { + Thread.sleep(100); + } + + // restart interpreter + RemoteInterpreterProcess processA = intpA.getInterpreterProcess(); + intpA.close(); intpA.setInterpreterGroup(new InterpreterGroup(intpA.getInterpreterGroup().getId())); + intpA.open(); RemoteInterpreterProcess processB = intpA.getInterpreterProcess(); - assertEquals(processA.hashCode(), processB.hashCode()); + assertNotSame(processA.hashCode(), processB.hashCode()); + + } + + @Test + public void testRemoteInterpreterSharesTheSameSchedulerInstanceInTheSameGroup() { + Properties p = new Properties(); - processA.dereference(); // intpA.close(); + RemoteInterpreter intpA = new RemoteInterpreter( + p, + MockInterpreterA.class.getName(), + new File("../bin/interpreter.sh").getAbsolutePath(), + "fake", + env, + 10 * 1000 + ); + + intpGroup.add(intpA); + intpA.setInterpreterGroup(intpGroup); + + RemoteInterpreter intpB = new RemoteInterpreter( + p, + MockInterpreterB.class.getName(), + new File("../bin/interpreter.sh").getAbsolutePath(), + "fake", + env, + 10 * 1000 + ); + + intpGroup.add(intpB); + intpB.setInterpreterGroup(intpGroup); + + intpA.open(); + intpB.open(); + assertEquals(intpA.getScheduler(), intpB.getScheduler()); } } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/c3892d56/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java index a6e944d..3717ecc 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java @@ -112,6 +112,7 @@ public class ZeppelinServer extends Application { LOG.info("Shutting down Zeppelin Server ... "); try { jettyServer.stop(); + ZeppelinServer.notebook.getInterpreterFactory().close(); } catch (Exception e) { LOG.error("Error while stopping servlet container", e); } @@ -131,6 +132,7 @@ public class ZeppelinServer extends Application { } jettyServer.join(); + ZeppelinServer.notebook.getInterpreterFactory().close(); } private static Server setupJettyServer(ZeppelinConfiguration conf)