activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject [1/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5483 - fix and test - assigned group counts are updated when lru map evicts a group assignment
Date Thu, 11 Dec 2014 14:50:05 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk 1409acb36 -> d25c52ccb


https://issues.apache.org/jira/browse/AMQ-5483 - fix and test - assigned group counts are
updated when lru map evicts a group assignment


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/d25c52cc
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/d25c52cc
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/d25c52cc

Branch: refs/heads/trunk
Commit: d25c52ccb2c9c23535f9d4488fe8be8600148852
Parents: 15eba48
Author: gtully <gary.tully@gmail.com>
Authored: Thu Dec 11 14:40:56 2014 +0000
Committer: gtully <gary.tully@gmail.com>
Committed: Thu Dec 11 14:42:00 2014 +0000

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Queue.java    |  1 +
 .../region/group/CachedMessageGroupMap.java     | 30 +++++++-
 .../region/group/MessageGroupHashBucket.java    |  3 +
 .../broker/region/group/MessageGroupMap.java    |  3 +
 .../region/group/SimpleMessageGroupMap.java     |  3 +
 .../apache/activemq/command/ConsumerInfo.java   |  4 +
 .../MessageGroupReconnectDistributionTest.java  | 79 +++++++++++++-------
 7 files changed, 96 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/d25c52cc/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index f8373ac..a6515c4 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1013,6 +1013,7 @@ public class Queue extends BaseDestination implements Task, UsageListener,
Index
     public MessageGroupMap getMessageGroupOwners() {
         if (messageGroupOwners == null) {
             messageGroupOwners = getMessageGroupMapFactory().createMessageGroupMap();
+            messageGroupOwners.setDestination(this);
         }
         return messageGroupOwners;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/d25c52cc/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/CachedMessageGroupMap.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/CachedMessageGroupMap.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/CachedMessageGroupMap.java
index 084e8d0..b17f8ce 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/CachedMessageGroupMap.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/CachedMessageGroupMap.java
@@ -20,6 +20,8 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.memory.LRUMap;
 
@@ -31,8 +33,26 @@ import org.apache.activemq.memory.LRUMap;
 public class CachedMessageGroupMap implements MessageGroupMap {
     private final LRUMap<String, ConsumerId> cache;
     private final int maximumCacheSize;
+    Destination destination;
+
     CachedMessageGroupMap(int size){
-      cache = new LRUMap<String, ConsumerId>(size);
+      cache = new LRUMap<String, ConsumerId>(size) {
+          @Override
+          public boolean removeEldestEntry(final Map.Entry eldest) {
+              boolean remove = super.removeEldestEntry(eldest);
+              if (remove) {
+                  if (destination != null) {
+                      for (Subscription s : destination.getConsumers()) {
+                        if (s.getConsumerInfo().getConsumerId().equals(eldest.getValue()))
{
+                            s.getConsumerInfo().decrementAssignedGroupCount();
+                            break;
+                          }
+                      }
+                  }
+              }
+              return remove;
+          }
+      };
       maximumCacheSize = size;
     }
     public synchronized void put(String groupId, ConsumerId consumerId) {
@@ -68,6 +88,11 @@ public class CachedMessageGroupMap implements MessageGroupMap {
     @Override
     public synchronized void removeAll(){
         cache.clear();
+        if (destination != null) {
+            for (Subscription s : destination.getConsumers()) {
+                s.getConsumerInfo().clearAssignedGroupCount();
+            }
+        }
     }
 
     @Override
@@ -92,4 +117,7 @@ public class CachedMessageGroupMap implements MessageGroupMap {
         return "message groups: " + cache.size();
     }
 
+    public void setDestination(Destination destination) {
+        this.destination = destination;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/d25c52cc/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/MessageGroupHashBucket.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/MessageGroupHashBucket.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/MessageGroupHashBucket.java
index 652b985..5145b16 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/MessageGroupHashBucket.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/MessageGroupHashBucket.java
@@ -18,6 +18,7 @@ package org.apache.activemq.broker.region.group;
 
 import java.util.Map;
 
+import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.memory.LRUMap;
 
@@ -93,6 +94,8 @@ public class MessageGroupHashBucket implements MessageGroupMap {
         return "bucket";
     }
 
+    public void setDestination(Destination destination) {}
+
     public int getBucketCount(){
         return bucketCount;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/d25c52cc/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/MessageGroupMap.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/MessageGroupMap.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/MessageGroupMap.java
index c952c94..8998c23 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/MessageGroupMap.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/MessageGroupMap.java
@@ -18,6 +18,7 @@ package org.apache.activemq.broker.region.group;
 
 import java.util.Map;
 
+import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.command.ConsumerId;
 
 /**
@@ -44,4 +45,6 @@ public interface MessageGroupMap {
 
     String getType();
 
+    void setDestination(Destination destination);
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/d25c52cc/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupMap.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupMap.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupMap.java
index e3fd4ed..941477a 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupMap.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupMap.java
@@ -21,6 +21,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.command.ConsumerId;
 
 /**
@@ -78,6 +79,8 @@ public class SimpleMessageGroupMap implements MessageGroupMap {
         return "simple";
     }
 
+    public void setDestination(Destination destination) {}
+
     public String toString() {
         return "message groups: " + map.size();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/d25c52cc/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java b/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java
index c9fc3e6..09b6be5 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java
@@ -498,6 +498,10 @@ public class ConsumerInfo extends BaseCommand {
         this.assignedGroupCount++;
     }
 
+    public void clearAssignedGroupCount() {
+        this.assignedGroupCount=0;
+    }
+
     public void decrementAssignedGroupCount() {
         this.assignedGroupCount--;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/d25c52cc/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MessageGroupReconnectDistributionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MessageGroupReconnectDistributionTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MessageGroupReconnectDistributionTest.java
index 1fd7e07..da2f367 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MessageGroupReconnectDistributionTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MessageGroupReconnectDistributionTest.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.usecases;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -40,7 +41,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.junit.runners.BlockJUnit4ClassRunner;
+import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,38 +49,54 @@ import org.slf4j.LoggerFactory;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-@RunWith(BlockJUnit4ClassRunner.class)
+@RunWith(Parameterized.class)
 public class MessageGroupReconnectDistributionTest {
     public static final Logger LOG = LoggerFactory.getLogger(MessageGroupReconnectDistributionTest.class);
     final Random random = new Random();
     protected Connection connection;
     protected Session session;
     protected MessageProducer producer;
-    protected Destination destination;
+    protected ActiveMQQueue destination = new ActiveMQQueue("GroupQ");
     protected TransportConnector connector;
+    ActiveMQConnectionFactory connFactory;
     BrokerService broker;
+    int numMessages = 10000;
+    int groupSize = 10;
+    int batchSize = 20;
+
+    @Parameterized.Parameter(0)
+    public int numConsumers = 4;
+
+    @Parameterized.Parameter(1)
+    public boolean consumerPriority = true;
+
+    @Parameterized.Parameters(name="numConsumers={0},consumerPriority={1}")
+    public static Iterable<Object[]> combinations() {
+        return Arrays.asList(new Object[][]{{4, true}, {10, true}});
+    }
 
     @Before
     public void setUp() throws Exception {
         broker = createBroker();
         broker.start();
-        ActiveMQConnectionFactory connFactory = new ActiveMQConnectionFactory(connector.getConnectUri()
+ "?jms.prefetchPolicy.all=30");
+        connFactory = new ActiveMQConnectionFactory(connector.getConnectUri() + "?jms.prefetchPolicy.all=200");
+        connFactory.setWatchTopicAdvisories(false);
         connection = connFactory.createConnection();
         session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        destination = new ActiveMQQueue("GroupQ");
         producer = session.createProducer(destination);
         connection.start();
     }
 
     protected BrokerService createBroker() throws Exception {
         BrokerService service = new BrokerService();
+        service.setAdvisorySupport(false);
         service.setPersistent(false);
         service.setUseJmx(true);
 
         PolicyMap policyMap = new PolicyMap();
         PolicyEntry policy = new PolicyEntry();
-        policy.setUseConsumerPriority(true);
-        policy.setMessageGroupMapFactoryType("cached");
+        policy.setUseConsumerPriority(consumerPriority);
+        policy.setMessageGroupMapFactoryType("cached?cacheSize=" + (numConsumers - 1));
         policyMap.setDefaultEntry(policy);
         service.setDestinationPolicy(policyMap);
 
@@ -95,35 +112,35 @@ public class MessageGroupReconnectDistributionTest {
         broker.stop();
     }
 
-    public int getBatchSize(int bound) throws Exception {
-        return bound + random.nextInt(bound);
-    }
-
     @Test(timeout = 5 * 60 * 1000)
     public void testReconnect() throws Exception {
 
-        final int numMessages = 50000;
-        final int numConsumers = 10;
         final AtomicLong totalConsumed = new AtomicLong(0);
 
-        produceMessages(numMessages);
-
-        ExecutorService executorService = Executors.newCachedThreadPool();
+        ExecutorService executorService = Executors.newFixedThreadPool(numConsumers);
         final ArrayList<AtomicLong> consumedCounters = new ArrayList<AtomicLong>(numConsumers);
+        final ArrayList<AtomicLong> batchCounters = new ArrayList<AtomicLong>(numConsumers);
+
         for (int i = 0; i < numConsumers; i++) {
             consumedCounters.add(new AtomicLong(0l));
+            batchCounters.add(new AtomicLong(0l));
+
             final int id = i;
             executorService.submit(new Runnable() {
-                Session connectionSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+                int getBatchSize() {
+                    return (id + 1) * batchSize;
+                }
 
                 @Override
                 public void run() {
                     try {
-                        MessageConsumer messageConsumer = connectionSession.createConsumer(destination);
+                        Session connectionSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                        int batchSize = getBatchSize();
+                        MessageConsumer messageConsumer = connectionSession.createConsumer(destWithPrefetch(destination));
 
-                        long batchSize = getBatchSize(numConsumers);
                         Message message;
                         AtomicLong consumed = consumedCounters.get(id);
+                        AtomicLong batches = batchCounters.get(id);
 
                         LOG.info("Consumer: " + id + ", batchSize:" + batchSize + ", totalConsumed:"
+ totalConsumed.get() + ", consumed:" + consumed.get());
 
@@ -138,19 +155,21 @@ public class MessageGroupReconnectDistributionTest {
                                 if (totalConsumed.get() == numMessages) {
                                     break;
                                 } else {
-                                    messageConsumer = connectionSession.createConsumer(destination);
+                                    batchSize = getBatchSize();
+                                    messageConsumer = connectionSession.createConsumer(destWithPrefetch(destination));
+                                    batches.incrementAndGet();
                                     continue;
                                 }
                             }
 
-                            message.acknowledge();
                             consumed.incrementAndGet();
                             totalConsumed.incrementAndGet();
 
-                            if (consumed.get() > 0 && consumed.longValue() % batchSize
== 0) {
+                            if (consumed.get() > 0 && consumed.intValue() % batchSize
== 0) {
                                 messageConsumer.close();
-                                messageConsumer = connectionSession.createConsumer(destination);
-                                batchSize = getBatchSize(numConsumers);
+                                batchSize = getBatchSize();
+                                messageConsumer = connectionSession.createConsumer(destWithPrefetch(destination));
+                                batches.incrementAndGet();
                             }
                         }
                     } catch (Exception e) {
@@ -158,14 +177,19 @@ public class MessageGroupReconnectDistributionTest {
                     }
                 }
             });
+            TimeUnit.MILLISECONDS.sleep(200);
         }
 
+        TimeUnit.SECONDS.sleep(1);
+        produceMessages(numMessages);
+
         executorService.shutdown();
         assertTrue("threads done on time", executorService.awaitTermination(10, TimeUnit.MINUTES));
 
         assertEquals("All consumed", numMessages, totalConsumed.intValue());
 
         LOG.info("Distribution: " + consumedCounters);
+        LOG.info("Batches: " + batchCounters);
 
         double max = consumedCounters.get(0).longValue() * 1.5;
         double min = consumedCounters.get(0).longValue() * 0.5;
@@ -176,11 +200,14 @@ public class MessageGroupReconnectDistributionTest {
         }
     }
 
+    private Destination destWithPrefetch(ActiveMQQueue destination) throws Exception {
+        return destination;
+    }
+
     private void produceMessages(int numMessages) throws JMSException {
         int groupID=0;
         for (int i = 0; i < numMessages; i++) {
-            // groups of 10
-            if (i>0 && i%10==0) {
+            if (i>0 && i%groupSize==0) {
                 groupID++;
             }
             TextMessage msga = session.createTextMessage("hello " + i);


Mime
View raw message