falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From venkat...@apache.org
Subject [1/8] incubator-falcon git commit: FALCON-717 Shutdown not clean for JMSMessageConsumer. Contributed by Shaik Idris Ali
Date Fri, 14 Nov 2014 02:54:46 GMT
Repository: incubator-falcon
Updated Branches:
  refs/heads/master 59441beb7 -> c4dd440d9


FALCON-717 Shutdown not clean for JMSMessageConsumer. Contributed by Shaik Idris Ali


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/40c3e949
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/40c3e949
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/40c3e949

Branch: refs/heads/master
Commit: 40c3e94955e1703da9ef07a7a5517b7d9aee91b0
Parents: 59441be
Author: Venkatesh Seetharam <venkatesh@apache.org>
Authored: Thu Nov 13 17:49:19 2014 -0800
Committer: Venkatesh Seetharam <venkatesh@apache.org>
Committed: Thu Nov 13 17:49:19 2014 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +
 .../falcon/messaging/JMSMessageConsumer.java    | 38 +-------
 .../falcon/messaging/JMSMessageProducer.java    | 13 +--
 .../falcon/messaging/util/MessagingUtil.java    | 94 ++++++++++++++++++++
 .../apache/falcon/rerun/queue/ActiveMQueue.java | 36 +++-----
 .../apache/falcon/rerun/queue/DelayedQueue.java |  2 +
 .../falcon/rerun/queue/InMemoryQueue.java       | 10 ++-
 .../falcon/rerun/service/LateRunService.java    | 11 ++-
 8 files changed, 134 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/40c3e949/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7eb4b70..896116b 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -138,6 +138,9 @@ Trunk (Unreleased)
   OPTIMIZATIONS
 
   BUG FIXES
+   FALCON-717 Shutdown not clean for JMSMessageConsumer
+   (Shaik Idris Ali via Venkatesh Seetharam
+
    FALCON-875 Enitiy Summary endpoint filterBy does not filter entities
    without pipelines (Balu Vellanki via Venkatesh Seetharam)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/40c3e949/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
index 300fecf..4df1490 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
@@ -21,6 +21,7 @@ package org.apache.falcon.messaging;
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.aspect.GenericAlert;
+import org.apache.falcon.messaging.util.MessagingUtil;
 import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
@@ -147,40 +148,9 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener
{
     public void closeSubscriber() {
         LOG.info("Closing topicSubscriber on topic : " + this.topicName);
         // closing each quietly so client id can be unsubscribed
-        closeTopicSubscriberQuietly();
-        closeTopicSessionQuietly();
-        closeConnectionQuietly();
-    }
-
-    private void closeTopicSubscriberQuietly() {
-        if (topicSubscriber != null) {
-            try {
-                topicSubscriber.close();
-            } catch (JMSException ignore) {
-                LOG.error("Error closing JMS topic subscriber: " + topicSubscriber, ignore);
-            }
-        }
-    }
-
-    private void closeTopicSessionQuietly() {
-        if (topicSession != null) { // unsubscribe the durable topic topicSubscriber
-            try {
-                topicSession.unsubscribe(FALCON_CLIENT_ID);
-                topicSession.close();
-            } catch (JMSException ignore) {
-                LOG.error("Error closing JMS topic session: " + topicSession, ignore);
-            }
-        }
-    }
-
-    private void closeConnectionQuietly() {
-        if (connection != null) {
-            try {
-                connection.close();
-            } catch (JMSException ignore) {
-                LOG.error("Error closing JMS connection: " + connection, ignore);
-            }
-        }
+        MessagingUtil.closeQuietly(topicSubscriber);
+        MessagingUtil.closeQuietly(topicSession, FALCON_CLIENT_ID);
+        MessagingUtil.closeQuietly(connection);
     }
 
     private static Connection createAndGetConnection(String implementation,

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/40c3e949/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
index dece932..515562a 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
@@ -19,6 +19,7 @@
 package org.apache.falcon.messaging;
 
 import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.messaging.util.MessagingUtil;
 import org.apache.falcon.retention.EvictedInstanceSerDe;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
@@ -168,7 +169,7 @@ public class JMSMessageProducer {
                 sendMessage(connection, message);
             }
         } finally {
-            closeQuietly(connection);
+            MessagingUtil.closeQuietly(connection);
         }
 
         return 0;
@@ -296,14 +297,4 @@ public class JMSMessageProducer {
 
         return connection;
     }
-
-    private void closeQuietly(Connection connection) {
-        try {
-            if (connection != null) {
-                connection.close();
-            }
-        } catch (JMSException e) {
-            LOG.error("Error in closing connection:", e);
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/40c3e949/messaging/src/main/java/org/apache/falcon/messaging/util/MessagingUtil.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/util/MessagingUtil.java b/messaging/src/main/java/org/apache/falcon/messaging/util/MessagingUtil.java
new file mode 100644
index 0000000..8d59937
--- /dev/null
+++ b/messaging/src/main/java/org/apache/falcon/messaging/util/MessagingUtil.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.messaging.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+
+/**
+ * Utility class.
+ */
+public final class MessagingUtil {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MessagingUtil.class);
+
+    private MessagingUtil() {
+    }
+
+    public static void closeQuietly(TopicSubscriber topicSubscriber) {
+        if (topicSubscriber != null) {
+            try {
+                topicSubscriber.close();
+            } catch (JMSException ignore) {
+                LOG.error("Error closing JMS topic subscriber: " + topicSubscriber, ignore);
+            }
+        }
+    }
+
+    public static void closeQuietly(TopicSession topicSession, String clientId) {
+        if (topicSession != null) { // unsubscribe the durable topic topicSubscriber
+            try {
+                topicSession.unsubscribe(clientId);
+                topicSession.close();
+            } catch (JMSException ignore) {
+                LOG.error("Error closing JMS topic session: " + topicSession, ignore);
+            }
+        }
+    }
+
+    public static void closeQuietly(Connection connection) {
+        if (connection != null) {
+            try {
+                LOG.info("Attempting to close connection");
+                connection.close();
+            } catch (JMSException ignore) {
+                LOG.error("Error closing JMS connection: " + connection, ignore);
+            }
+        }
+    }
+
+    public static void closeQuietly(MessageProducer messageProducer) {
+        if (messageProducer != null) {
+            try {
+                LOG.info("Attempting to close producer");
+                messageProducer.close();
+            } catch (JMSException ignore) {
+                LOG.error("Error closing JMS messageProducer: " + messageProducer, ignore);
+            }
+        }
+    }
+
+    public static void closeQuietly(MessageConsumer messageConsumer) {
+        if (messageConsumer != null) {
+            try {
+                LOG.info("Attempting to close consumer");
+                messageConsumer.close();
+            } catch (JMSException ignore) {
+                LOG.error("Error closing JMS messageConsumer: " + messageConsumer, ignore);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/40c3e949/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java b/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java
index d9567b6..ec57a53 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java
@@ -21,6 +21,7 @@ import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ScheduledMessage;
 import org.apache.falcon.FalconException;
+import org.apache.falcon.messaging.util.MessagingUtil;
 import org.apache.falcon.rerun.event.RerunEvent;
 import org.apache.falcon.rerun.event.RerunEventFactory;
 
@@ -71,8 +72,7 @@ public class ActiveMQueue<T extends RerunEvent> extends DelayedQueue<T>
{
             init();
         }
 
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        return session;
+        return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
     }
 
     @Override
@@ -121,30 +121,16 @@ public class ActiveMQueue<T extends RerunEvent> extends DelayedQueue<T>
{
 
     @Override
     public void reconnect() throws FalconException {
-        try {
-            LOG.info("Attempting to close producer");
-            producer.close();
-            LOG.info("Producer closed successfully");
-        } catch (Exception ignore) {
-            LOG.info("Producer could not be closed");
-        }
-
-        try {
-            LOG.info("Attempting to close consumer");
-            consumer.close();
-            LOG.info("Consumer closed successfully");
-        } catch (Exception ignore) {
-            LOG.info("Consumer could not be closed");
-        }
+        close();
+        init();
+    }
 
-        try {
-            LOG.info("Attempting to close connection");
-            connection.close();
-            LOG.info("Connection closed successfully");
-        } catch (Exception ignore) {
-            LOG.info("Connection could not be closed");
-        }
+    public void close() {
+        LOG.info("Closing queue for broker={}, destination{}", brokerUrl, destinationName);
+        destination = null;
 
-        init();
+        MessagingUtil.closeQuietly(producer);
+        MessagingUtil.closeQuietly(consumer);
+        MessagingUtil.closeQuietly(connection);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/40c3e949/rerun/src/main/java/org/apache/falcon/rerun/queue/DelayedQueue.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/queue/DelayedQueue.java b/rerun/src/main/java/org/apache/falcon/rerun/queue/DelayedQueue.java
index 01231d4..f5fbbe8 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/queue/DelayedQueue.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/queue/DelayedQueue.java
@@ -40,4 +40,6 @@ public abstract class DelayedQueue<T extends RerunEvent> {
     public abstract void init();
 
     public abstract void reconnect() throws FalconException;
+
+    public abstract void close();
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/40c3e949/rerun/src/main/java/org/apache/falcon/rerun/queue/InMemoryQueue.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/queue/InMemoryQueue.java b/rerun/src/main/java/org/apache/falcon/rerun/queue/InMemoryQueue.java
index ecd6b0a..6035cdb 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/queue/InMemoryQueue.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/queue/InMemoryQueue.java
@@ -36,6 +36,7 @@ import java.util.concurrent.DelayQueue;
  * @param <T>
  */
 public class InMemoryQueue<T extends RerunEvent> extends DelayedQueue<T> {
+
     public static final Logger LOG = LoggerFactory.getLogger(DelayedQueue.class);
 
     protected DelayQueue<T> delayQueue = new DelayQueue<T>();
@@ -47,8 +48,8 @@ public class InMemoryQueue<T extends RerunEvent> extends DelayedQueue<T>
{
 
     @Override
     public boolean offer(T event) {
-        boolean flag = delayQueue.offer(event);
         beforeRetry(event);
+        boolean flag = delayQueue.offer(event);
         LOG.debug("Enqueued Message: {}", event.toString());
         return flag;
     }
@@ -58,8 +59,8 @@ public class InMemoryQueue<T extends RerunEvent> extends DelayedQueue<T>
{
         T event;
         try {
             event = delayQueue.take();
-            LOG.debug("Dequeued Message: {}", event.toString());
             afterRetry(event);
+            LOG.debug("Dequeued Message: {}", event.toString());
         } catch (InterruptedException e) {
             throw new FalconException(e);
         }
@@ -145,4 +146,9 @@ public class InMemoryQueue<T extends RerunEvent> extends DelayedQueue<T>
{
         }
         return rerunEvents;
     }
+
+    @Override
+    public void close() {
+        //Do nothing
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/40c3e949/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java b/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java
index bbfdaff..2bb198b 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java
@@ -37,6 +37,8 @@ public class LateRunService implements FalconService {
 
     private static final Logger LOG = LoggerFactory.getLogger(LateRunService.class);
 
+    private ActiveMQueue<LaterunEvent> queue;
+
     @Override
     public String getName() {
         return LateRunService.class.getName();
@@ -50,7 +52,7 @@ public class LateRunService implements FalconService {
 
         AbstractRerunHandler<LaterunEvent, ActiveMQueue<LaterunEvent>> rerunHandler
=
             RerunHandlerFactory.getRerunHandler(RerunType.LATE);
-        ActiveMQueue<LaterunEvent> queue = new ActiveMQueue<LaterunEvent>(
+        queue = new ActiveMQueue<LaterunEvent>(
                 StartupProperties.get()
                     .getProperty("broker.url", "failover:(tcp://localhost:61616)?initialReconnectDelay=5000"),
                 "falcon.late.queue");
@@ -62,6 +64,13 @@ public class LateRunService implements FalconService {
 
     @Override
     public void destroy() throws FalconException {
+        closeQuietly();
         LOG.info("LateRun thread destroyed");
     }
+
+    private void closeQuietly() {
+        if (queue != null) {
+            queue.close();
+        }
+    }
 }


Mime
View raw message