Return-Path: X-Original-To: apmail-falcon-commits-archive@minotaur.apache.org Delivered-To: apmail-falcon-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 524671075A for ; Wed, 9 Sep 2015 09:01:36 +0000 (UTC) Received: (qmail 25002 invoked by uid 500); 9 Sep 2015 09:01:36 -0000 Delivered-To: apmail-falcon-commits-archive@falcon.apache.org Received: (qmail 24961 invoked by uid 500); 9 Sep 2015 09:01:36 -0000 Mailing-List: contact commits-help@falcon.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@falcon.apache.org Delivered-To: mailing list commits@falcon.apache.org Received: (qmail 24952 invoked by uid 99); 9 Sep 2015 09:01:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 09 Sep 2015 09:01:36 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C35DAE00CC; Wed, 9 Sep 2015 09:01:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: pallavi@apache.org To: commits@falcon.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: falcon git commit: FALCON-348 Add shutdown hook for Falcon (Contributed by Sandeep Samudrala) Date: Wed, 9 Sep 2015 09:01:35 +0000 (UTC) Repository: falcon Updated Branches: refs/heads/master 4ad28f630 -> 6bbfe2366 FALCON-348 Add shutdown hook for Falcon (Contributed by Sandeep Samudrala) Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/6bbfe236 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/6bbfe236 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/6bbfe236 Branch: refs/heads/master Commit: 6bbfe2366b32d885a5b67fc4accb11ed76b889ca Parents: 4ad28f6 Author: Pallavi Rao Authored: Wed Sep 9 14:31:19 2015 +0530 Committer: Pallavi Rao Committed: Wed Sep 9 14:31:19 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + prism/src/main/java/org/apache/falcon/Main.java | 19 +++++++++++++++-- .../rerun/handler/AbstractRerunConsumer.java | 22 +++++++++++++------- .../rerun/handler/AbstractRerunHandler.java | 4 ++++ .../falcon/rerun/handler/LateRerunHandler.java | 10 ++++++++- .../falcon/rerun/handler/RetryHandler.java | 9 +++++++- .../apache/falcon/rerun/queue/ActiveMQueue.java | 3 --- .../falcon/rerun/service/LateRunService.java | 6 ++++-- 8 files changed, 57 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/6bbfe236/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 30f2b8c..196490d 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -15,6 +15,7 @@ Trunk (Unreleased) FALCON-1250 Throw error when keys in startup.properties do not start with "*." or domain+"."(Narayan Periwal via Ajay Yadava) + FALCON-348 Add shutdown hook for Falcon (Sandeep Samudrala via Pallavi Rao) OPTIMIZATIONS http://git-wip-us.apache.org/repos/asf/falcon/blob/6bbfe236/prism/src/main/java/org/apache/falcon/Main.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/Main.java b/prism/src/main/java/org/apache/falcon/Main.java index 96e003c..d8bbfbd 100644 --- a/prism/src/main/java/org/apache/falcon/Main.java +++ b/prism/src/main/java/org/apache/falcon/Main.java @@ -38,6 +38,8 @@ public final class Main { private static final Logger LOG = LoggerFactory.getLogger(Main.class); private static final String APP_PATH = "app"; private static final String APP_PORT = "port"; + private static EmbeddedServer server; + private static BrokerService broker; /** * Prevent users from constructing this. @@ -60,7 +62,19 @@ public final class Main { return new GnuParser().parse(options, args); } + static class ShutDown extends Thread { + public void run() { + try { + LOG.info("calling shutdown hook"); + server.stop(); + broker.stop(); + } catch (Exception e) { + LOG.error("Server shutdown failed with " , e); + } + } + } public static void main(String[] args) throws Exception { + Runtime.getRuntime().addShutdownHook(new ShutDown()); CommandLine cmd = parseArgs(args); String projectVersion = BuildProperties.get().getProperty("project.version"); String appPath = "webapp/target/falcon-webapp-" + projectVersion; @@ -79,7 +93,7 @@ public final class Main { LOG.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"); LOG.info("Server starting with TLS ? {} on port {}", enableTLS, appPort); LOG.info("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<"); - EmbeddedServer server = EmbeddedServer.newServer(appPort, appPath, enableTLS); + server = EmbeddedServer.newServer(appPort, appPath, enableTLS); server.start(); } @@ -109,12 +123,13 @@ public final class Main { int mqport = Integer.valueOf(System.getProperty("falcon.embeddedmq.port", "61616")); LOG.info("Starting ActiveMQ at port {} with data dir {}", mqport, dataDir); - BrokerService broker = new BrokerService(); + broker = new BrokerService(); broker.setUseJmx(false); broker.setDataDirectory(dataDir); broker.addConnector("vm://localhost"); broker.addConnector("tcp://0.0.0.0:" + mqport); broker.setSchedulerSupport(true); + broker.setUseShutdownHook(false); broker.start(); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/6bbfe236/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java index 9ee94c5..582cb15 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java @@ -17,6 +17,7 @@ */ package org.apache.falcon.rerun.handler; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.falcon.FalconException; import org.apache.falcon.aspect.GenericAlert; import org.apache.falcon.entity.v0.Frequency; @@ -50,20 +51,25 @@ public abstract class AbstractRerunConsumer> extends AbstractRerunHandler { + private Thread daemon; @Override //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck @@ -188,13 +189,20 @@ public class LateRerunHandler> extends @Override public void init(M aDelayQueue) throws FalconException { super.init(aDelayQueue); - Thread daemon = new Thread(new LateRerunConsumer(this)); + daemon = new Thread(new LateRerunConsumer(this)); daemon.setName("LaterunHandler"); daemon.setDaemon(true); daemon.start(); LOG.info("Laterun Handler thread started"); } + @Override + public void close() throws FalconException { + daemon.interrupt(); + super.close(); + } + + public Path getLateLogPath(String logDir, String nominalTime, String srcClusterName) { //SrcClusterName valid only in case of feed http://git-wip-us.apache.org/repos/asf/falcon/blob/6bbfe236/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java index c6bc36f..b952bbe 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java @@ -38,6 +38,7 @@ import org.apache.falcon.workflow.WorkflowExecutionContext; */ public class RetryHandler> extends AbstractRerunHandler { + private Thread daemon; @Override //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck @@ -85,7 +86,7 @@ public class RetryHandler> extends @Override public void init(M aDelayQueue) throws FalconException { super.init(aDelayQueue); - Thread daemon = new Thread(new RetryConsumer(this)); + daemon = new Thread(new RetryConsumer(this)); daemon.setName("RetryHandler"); daemon.setDaemon(true); daemon.start(); @@ -93,6 +94,12 @@ public class RetryHandler> extends } @Override + public void close() throws FalconException { + daemon.interrupt(); + super.close(); + } + + @Override public void onSuccess(WorkflowExecutionContext context) throws FalconException { // do nothing since retry does not apply for failed workflows } http://git-wip-us.apache.org/repos/asf/falcon/blob/6bbfe236/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java b/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java index 021e4cc..3168c31 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java @@ -68,7 +68,6 @@ public class ActiveMQueue extends DelayedQueue { event.toString(), event.getDelay(TimeUnit.MILLISECONDS)); return true; } catch (Exception e) { - LOG.error("Unable to offer event: {} to ActiveMQ", event, e); throw new FalconException("Unable to offer event:" + event + " to ActiveMQ", e); } } @@ -91,7 +90,6 @@ public class ActiveMQueue extends DelayedQueue { LOG.debug("Dequeued Message: {}", event.toString()); return event; } catch (Exception e) { - LOG.error("Error getting the message from ActiveMQ", e); throw new FalconException("Error getting the message from ActiveMQ: ", e); } } @@ -111,7 +109,6 @@ public class ActiveMQueue extends DelayedQueue { consumer = session.createConsumer(destination); LOG.info("Initialized Queue on ActiveMQ: {}", destinationName); } catch (Exception e) { - LOG.error("Error starting ActiveMQ connection for delayed queue", e); throw new RuntimeException("Error starting ActiveMQ connection for delayed queue", e); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/6bbfe236/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java b/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java index 2bb198b..8be6810 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java @@ -39,6 +39,8 @@ public class LateRunService implements FalconService { private ActiveMQueue queue; + private AbstractRerunHandler> rerunHandler; + @Override public String getName() { return LateRunService.class.getName(); @@ -50,8 +52,7 @@ public class LateRunService implements FalconService { throw new FalconException("WorkflowJobEndNotificationService must be configured ahead"); } - AbstractRerunHandler> rerunHandler = - RerunHandlerFactory.getRerunHandler(RerunType.LATE); + rerunHandler = RerunHandlerFactory.getRerunHandler(RerunType.LATE); queue = new ActiveMQueue( StartupProperties.get() .getProperty("broker.url", "failover:(tcp://localhost:61616)?initialReconnectDelay=5000"), @@ -64,6 +65,7 @@ public class LateRunService implements FalconService { @Override public void destroy() throws FalconException { + rerunHandler.close(); closeQuietly(); LOG.info("LateRun thread destroyed"); }