activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQ-4621 - some improvements to the AbortSlowAckConsumerStrategy
Date Mon, 16 Sep 2013 13:20:29 GMT
Updated Branches:
  refs/heads/trunk e1bbde730 -> 25af8e62c


https://issues.apache.org/jira/browse/AMQ-4621 - some improvements to the AbortSlowAckConsumerStrategy


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/25af8e62
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/25af8e62
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/25af8e62

Branch: refs/heads/trunk
Commit: 25af8e62ccbe1abd1cdfed3347938a71523ee3e7
Parents: e1bbde7
Author: Dejan Bosanac <dejan@nighttale.net>
Authored: Mon Sep 16 15:14:36 2013 +0200
Committer: Dejan Bosanac <dejan@nighttale.net>
Committed: Mon Sep 16 15:18:29 2013 +0200

----------------------------------------------------------------------
 .../policy/AbortSlowAckConsumerStrategy.java     | 19 ++++++++++++++-----
 1 file changed, 14 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/25af8e62/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java
index 9de3ea9..dedb580 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java
@@ -103,7 +103,6 @@ public class AbortSlowAckConsumerStrategy extends AbortSlowConsumerStrategy
{
 
     private void updateSlowConsumersList(List<Subscription> subscribers) {
         for (Subscription subscriber : subscribers) {
-
             if (isIgnoreNetworkSubscriptions() && subscriber.getConsumerInfo().isNetworkSubscription())
{
                 if (slowConsumers.remove(subscriber) != null) {
                     LOG.info("network sub: {} is no longer slow", subscriber.getConsumerInfo().getConsumerId());
@@ -119,13 +118,20 @@ public class AbortSlowAckConsumerStrategy extends AbortSlowConsumerStrategy
{
                 continue;
             }
 
+            // don't mark consumers with no messages
+            if (subscriber.getInFlightSize() <= 0) {
+                continue;
+            }
+
             long lastAckTime = subscriber.getTimeOfLastMessageAck();
             long timeDelta = System.currentTimeMillis() - lastAckTime;
 
             if (timeDelta > maxTimeSinceLastAck) {
                 if (!slowConsumers.containsKey(subscriber)) {
                     LOG.debug("sub: {} is now slow", subscriber.getConsumerInfo().getConsumerId());
-                    slowConsumers.put(subscriber, new SlowConsumerEntry(subscriber.getContext()));
+                    SlowConsumerEntry entry = new SlowConsumerEntry(subscriber.getContext());
+                    entry.mark(); // mark consumer on first run
+                    slowConsumers.put(subscriber, entry);
                 } else if (getMaxSlowCount() > 0) {
                     slowConsumers.get(subscriber).slow();
                 }
@@ -140,10 +146,13 @@ public class AbortSlowAckConsumerStrategy extends AbortSlowConsumerStrategy
{
     private void abortAllQualifiedSlowConsumers() {
         HashMap<Subscription, SlowConsumerEntry> toAbort = new HashMap<Subscription,
SlowConsumerEntry>();
         for (Entry<Subscription, SlowConsumerEntry> entry : slowConsumers.entrySet())
{
-            if (getMaxSlowDuration() > 0 && (entry.getValue().markCount * getCheckPeriod()
> getMaxSlowDuration()) ||
-                getMaxSlowCount() > 0 && entry.getValue().slowCount > getMaxSlowCount())
{
+            if (getMaxSlowDuration() > 0 && (entry.getValue().markCount * getCheckPeriod()
>= getMaxSlowDuration()) ||
+                getMaxSlowCount() > 0 && entry.getValue().slowCount >= getMaxSlowCount())
{
 
-                LOG.trace("Transferring consumer{} to the abort list: {} slow duration =
{}, slow count = {}", new Object[]{ entry.getKey().getConsumerInfo().getConsumerId(), entry.getValue().markCount
* getCheckPeriod(), entry.getValue().getSlowCount() });
+                LOG.trace("Transferring consumer {} to the abort list: " +
+                              "slow duration = " + entry.getValue().markCount * getCheckPeriod()
+ ", " +
+                              "slow count = " + entry.getValue().slowCount,
+                              entry.getKey().getConsumerInfo().getConsumerId());
 
                 toAbort.put(entry.getKey(), entry.getValue());
                 slowConsumers.remove(entry.getKey());


Mime
View raw message