activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cshan...@apache.org
Subject [1/3] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6016 - rework fix for https://issues.apache.org/jira/browse/AMQ-2106 - account group assignment on a per destination basis to prevent modification during consumer ordering
Date Wed, 16 Dec 2015 14:42:16 GMT
Repository: activemq
Updated Branches:
  refs/heads/activemq-5.12.x 4ceda81e0 -> 2ae189e70


https://issues.apache.org/jira/browse/AMQ-6016 - rework fix for https://issues.apache.org/jira/browse/AMQ-2106
- account group assignment on a per destination basis to prevent modification during consumer
ordering

(cherry picked from commit 5d697cff3b0f98a2d4d4f7176508bbbc77305d5d)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b84785d5
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b84785d5
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b84785d5

Branch: refs/heads/activemq-5.12.x
Commit: b84785d5d035a2c5d46cdda22fac9051fb5d7786
Parents: 4ceda81
Author: gtully <gary.tully@gmail.com>
Authored: Wed Oct 21 14:02:56 2015 +0100
Committer: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Committed: Wed Dec 16 14:41:14 2015 +0000

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Queue.java    | 10 +++---
 .../region/group/CachedMessageGroupMap.java     |  4 +--
 .../apache/activemq/command/ConsumerInfo.java   | 35 +++++++++++++++-----
 3 files changed, 33 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/b84785d5/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 868dfbc..232934e 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -227,8 +227,8 @@ public class Queue extends BaseDestination implements Task, UsageListener,
Index
             if (val == 0 && messageGroupOwners != null) {
                 // then ascending order of assigned message groups to favour less loaded
consumers
                 // Long.compare in jdk7
-                long x = s1.getConsumerInfo().getAssignedGroupCount();
-                long y = s2.getConsumerInfo().getAssignedGroupCount();
+                long x = s1.getConsumerInfo().getAssignedGroupCount(destination);
+                long y = s2.getConsumerInfo().getAssignedGroupCount(destination);
                 val = (x < y) ? -1 : ((x == y) ? 0 : 1);
             }
             return val;
@@ -504,7 +504,7 @@ public class Queue extends BaseDestination implements Task, UsageListener,
Index
                     getDestinationStatistics().getDequeues().getCount(),
                     getDestinationStatistics().getDispatched().getCount(),
                     getDestinationStatistics().getInflight().getCount(),
-                    sub.getConsumerInfo().getAssignedGroupCount()
+                    sub.getConsumerInfo().getAssignedGroupCount(destination)
             });
             consumersLock.writeLock().lock();
             try {
@@ -2093,7 +2093,7 @@ public class Queue extends BaseDestination implements Task, UsageListener,
Index
                         // A group sequence < 1 is an end of group signal.
                         if (sequence < 0) {
                             messageGroupOwners.removeGroup(groupId);
-                            subscription.getConsumerInfo().decrementAssignedGroupCount();
+                            subscription.getConsumerInfo().decrementAssignedGroupCount(destination);
                         }
                     } else {
                         result = false;
@@ -2109,7 +2109,7 @@ public class Queue extends BaseDestination implements Task, UsageListener,
Index
         messageGroupOwners.put(groupId, subs.getConsumerInfo().getConsumerId());
         Message message = n.getMessage();
         message.setJMSXGroupFirstForConsumer(true);
-        subs.getConsumerInfo().incrementAssignedGroupCount();
+        subs.getConsumerInfo().incrementAssignedGroupCount(destination);
     }
 
     protected void pageInMessages(boolean force) throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq/blob/b84785d5/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/CachedMessageGroupMap.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/CachedMessageGroupMap.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/CachedMessageGroupMap.java
index b17f8ce..2d5bd20 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/CachedMessageGroupMap.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/CachedMessageGroupMap.java
@@ -44,7 +44,7 @@ public class CachedMessageGroupMap implements MessageGroupMap {
                   if (destination != null) {
                       for (Subscription s : destination.getConsumers()) {
                         if (s.getConsumerInfo().getConsumerId().equals(eldest.getValue()))
{
-                            s.getConsumerInfo().decrementAssignedGroupCount();
+                            s.getConsumerInfo().decrementAssignedGroupCount(destination.getActiveMQDestination());
                             break;
                           }
                       }
@@ -90,7 +90,7 @@ public class CachedMessageGroupMap implements MessageGroupMap {
         cache.clear();
         if (destination != null) {
             for (Subscription s : destination.getConsumers()) {
-                s.getConsumerInfo().clearAssignedGroupCount();
+                s.getConsumerInfo().clearAssignedGroupCount(destination.getActiveMQDestination());
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/b84785d5/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java b/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java
index 0c1e691..ed97a48 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java
@@ -17,7 +17,11 @@
 package org.apache.activemq.command;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.filter.BooleanExpression;
 import org.apache.activemq.state.CommandVisitor;
@@ -63,7 +67,7 @@ public class ConsumerInfo extends BaseCommand {
     // not marshalled, populated from RemoveInfo, the last message delivered, used
     // to suppress redelivery on prefetched messages after close
     private transient long lastDeliveredSequenceId = RemoveInfo.LAST_DELIVERED_UNSET;
-    private transient long assignedGroupCount;
+    private transient Map<ActiveMQDestination, AtomicLong> assignedGroupCount = new
ConcurrentHashMap<>();
     // originated from a
     // network connection
 
@@ -494,20 +498,33 @@ public class ConsumerInfo extends BaseCommand {
         return lastDeliveredSequenceId;
     }
 
-    public void incrementAssignedGroupCount() {
-        this.assignedGroupCount++;
+    public void incrementAssignedGroupCount(final ActiveMQDestination dest) {
+        AtomicLong value = assignedGroupCount.get(dest);
+        if (value == null) {
+            value = new AtomicLong(0);
+            assignedGroupCount.put(dest, value);
+        }
+        value.incrementAndGet();
     }
 
-    public void clearAssignedGroupCount() {
-        this.assignedGroupCount=0;
+    public void clearAssignedGroupCount(final ActiveMQDestination dest) {
+        assignedGroupCount.remove(dest);
     }
 
-    public void decrementAssignedGroupCount() {
-        this.assignedGroupCount--;
+    public void decrementAssignedGroupCount(final ActiveMQDestination dest) {
+        AtomicLong value = assignedGroupCount.get(dest);
+        if (value != null) {
+            value.decrementAndGet();
+        }
     }
 
-    public long getAssignedGroupCount() {
-        return assignedGroupCount;
+    public long getAssignedGroupCount(final ActiveMQDestination dest) {
+        long result = 0l;
+        AtomicLong value = assignedGroupCount.get(dest);
+        if (value != null) {
+            result = value.longValue();
+        }
+        return result;
     }
 
 }


Mime
View raw message