activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQ-4621
Date Thu, 12 Sep 2013 19:10:39 GMT
Updated Branches:
  refs/heads/trunk 272b846b0 -> cdb7bb11f


https://issues.apache.org/jira/browse/AMQ-4621

Update the consumer / connection abort process such that when the
strategy is configured to abort the connection is only attempt to do so
once instead of once for every subscription in the map.  Also improve
logging to better indicate the subscription being aborted and the
destination that the subscription was on. 

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/cdb7bb11
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/cdb7bb11
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/cdb7bb11

Branch: refs/heads/trunk
Commit: cdb7bb11ff9351fb342033a863ad626233613873
Parents: 272b846
Author: Timothy Bish <tabish121@gmai.com>
Authored: Thu Sep 12 15:07:13 2013 -0400
Committer: Timothy Bish <tabish121@gmai.com>
Committed: Thu Sep 12 15:10:25 2013 -0400

----------------------------------------------------------------------
 .../policy/AbortSlowConsumerStrategy.java       | 93 ++++++++++++++------
 1 file changed, 65 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/cdb7bb11/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java
index 263ca03..46f68d8 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java
@@ -16,7 +16,9 @@
  */
 package org.apache.activemq.broker.region.policy;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
@@ -106,38 +108,73 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy,
Runnable
     }
 
     protected void abortSubscription(Map<Subscription, SlowConsumerEntry> toAbort,
boolean abortSubscriberConnection) {
+
+        Map<Connection, List<Subscription>> abortMap = new HashMap<Connection,
List<Subscription>>();
+
         for (final Entry<Subscription, SlowConsumerEntry> entry : toAbort.entrySet())
{
             ConnectionContext connectionContext = entry.getValue().context;
-            if (connectionContext!= null) {
-                try {
-                    LOG.info("aborting "
-                            + (abortSubscriberConnection ? "connection" : "consumer")
-                            + ", slow consumer: " + entry.getKey());
-
-                    final Connection connection = connectionContext.getConnection();
-                    if (connection != null) {
-                        if (abortSubscriberConnection) {
-                            scheduler.executeAfterDelay(new Runnable() {
-                                @Override
-                                public void run() {
-                                    connection.serviceException(new InactivityIOException("Consumer
was slow too often (>"
-                                            + maxSlowCount +  ") or too long (>"
-                                            + maxSlowDuration + "): " + entry.getKey().getConsumerInfo().getConsumerId()));
-                                }}, 0l);
-                        } else {
-                            // just abort the consumer by telling it to stop
-                            ConsumerControl stopConsumer = new ConsumerControl();
-                            stopConsumer.setConsumerId(entry.getKey().getConsumerInfo().getConsumerId());
-                            stopConsumer.setClose(true);
-                            connection.dispatchAsync(stopConsumer);
-                        }
-                    } else {
-                        LOG.debug("slowConsumer abort ignored, no connection in context:"
 + connectionContext);
+            if (connectionContext == null) {
+                continue;
+            }
+
+            Connection connection = connectionContext.getConnection();
+            if (connection == null) {
+                LOG.debug("slowConsumer abort ignored, no connection in context:"  + connectionContext);
+            }
+
+            if (!abortMap.containsKey(connection)) {
+                abortMap.put(connection, new ArrayList<Subscription>());
+            }
+
+            abortMap.get(connection).add(entry.getKey());
+        }
+
+        for (Entry<Connection, List<Subscription>> entry : abortMap.entrySet())
{
+            final Connection connection = entry.getKey();
+            final List<Subscription> subscriptions = entry.getValue();
+
+            if (abortSubscriberConnection) {
+
+                LOG.info("aborting connection:{} with {} slow consumers",
+                         connection.getConnectionId(), subscriptions.size());
+
+                if (LOG.isTraceEnabled()) {
+                    for (Subscription subscription : subscriptions) {
+                        LOG.trace("Connection {} being aborted because of slow consumer:
{} on destination: {}",
+                                  new Object[] { connection.getConnectionId(),
+                                                 subscription.getConsumerInfo().getConsumerId(),
+                                                 subscription.getActiveMQDestination() });
                     }
+                }
+
+                try {
+                    scheduler.executeAfterDelay(new Runnable() {
+                        @Override
+                        public void run() {
+                            connection.serviceException(new InactivityIOException(
+                                    subscriptions.size() + " Consumers was slow too often
(>"
+                                    + maxSlowCount +  ") or too long (>"
+                                    + maxSlowDuration + "): "));
+                        }}, 0l);
                 } catch (Exception e) {
-                    LOG.info("exception on stopping "
-                            + (abortSubscriberConnection ? "connection" : "consumer")
-                            + " to abort slow consumer: " + entry.getKey(), e);
+                    LOG.info("exception on aborting connection {} with {} slow consumers",
+                             connection.getConnectionId(), subscriptions.size());
+                }
+            } else {
+                // just abort each consumer by telling it to stop
+                for (Subscription subscription : subscriptions) {
+                    LOG.info("aborting slow consumer: {} for destination:{}",
+                             subscription.getConsumerInfo().getConsumerId(),
+                             subscription.getActiveMQDestination());
+
+                    try {
+                        ConsumerControl stopConsumer = new ConsumerControl();
+                        stopConsumer.setConsumerId(subscription.getConsumerInfo().getConsumerId());
+                        stopConsumer.setClose(true);
+                        connection.dispatchAsync(stopConsumer);
+                    } catch (Exception e) {
+                        LOG.info("exception on aborting slow consumer: {}", subscription.getConsumerInfo().getConsumerId());
+                    }
                 }
             }
         }


Mime
View raw message