activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQ-4731
Date Fri, 20 Sep 2013 20:50:24 GMT
Updated Branches:
  refs/heads/trunk 062adbbe7 -> 214204595


https://issues.apache.org/jira/browse/AMQ-4731

Initial fix for this issue.  Uses a separate collection to track the
creation time of the consumers, might want to test just using a
ConcurrentLinkedQueue since that will stay in creation or naturally
although the remove operations could cost more..  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/21420459
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/21420459
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/21420459

Branch: refs/heads/trunk
Commit: 21420459535335bef22f8d1ced6d3ce5b92d9628
Parents: 062adbb
Author: Timothy Bish <tabish121@gmai.com>
Authored: Fri Sep 20 16:50:20 2013 -0400
Committer: Timothy Bish <tabish121@gmai.com>
Committed: Fri Sep 20 16:50:20 2013 -0400

----------------------------------------------------------------------
 .../activemq/advisory/AdvisoryBroker.java       | 84 +++++++++++++++++---
 .../activemq/advisory/TempQueueMemoryTest.java  | 30 +++----
 2 files changed, 88 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/21420459/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
index 6571699..e12c0ce 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
@@ -34,7 +34,21 @@ import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.TopicRegion;
 import org.apache.activemq.broker.region.TopicSubscription;
-import org.apache.activemq.command.*;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.security.SecurityContext;
 import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.usage.Usage;
@@ -55,33 +69,65 @@ public class AdvisoryBroker extends BrokerFilter {
 
     protected final ConcurrentHashMap<ConnectionId, ConnectionInfo> connections = new
ConcurrentHashMap<ConnectionId, ConnectionInfo>();
     class ConsumerIdKey {
-        final ConsumerId delegate;
-        final long creationTime = System.currentTimeMillis();
+        private final ConsumerId delegate;
+        private final long creationTime;
+
         ConsumerIdKey(ConsumerId id) {
-            delegate = id;
+            this.delegate = id;
+            this.creationTime = System.currentTimeMillis();
+        }
+
+        ConsumerIdKey(ConsumerId id, long creationTime) {
+            this.delegate = id;
+            this.creationTime = creationTime;
         }
 
         @Override
         public boolean equals(Object other) {
-            return delegate.equals(other);
+            if (this == other) {
+                return true;
+            }
+            if (other == null || other.getClass() != ConsumerIdKey.class) {
+                return false;
+            }
+
+            ConsumerIdKey key = (ConsumerIdKey) other;
+
+            return delegate.equals(key.delegate);
         }
 
         @Override
         public int hashCode() {
             return delegate.hashCode();
         }
+
+        @Override
+        public String toString() {
+            return "ConsumerIdKey { " + delegate + " }";
+        }
+
+        public ConsumerId getConsumerId() {
+            return this.delegate;
+        }
+
+        public long getCreationTime() {
+            return this.creationTime;
+        }
     }
+
     // replay consumer advisory messages in the order in which they arrive - allows duplicate
suppression in
     // mesh networks with ttl>1
     protected final Map<ConsumerIdKey, ConsumerInfo> consumers = new ConcurrentSkipListMap<ConsumerIdKey,
ConsumerInfo>(
-            new Comparator<ConsumerIdKey>() {
-                @Override
-                public int compare(ConsumerIdKey o1, ConsumerIdKey o2) {
-                    return (o1.creationTime < o2.creationTime ? -1 : (o1.delegate==o2.delegate
? 0 : 1));
-                }
+        new Comparator<ConsumerIdKey>() {
+            @Override
+            public int compare(ConsumerIdKey o1, ConsumerIdKey o2) {
+                return (o1.creationTime < o2.creationTime ? -1 : o1.equals(o2) ? 0 : 1);
             }
+        }
     );
 
+    protected final Map<ConsumerId, Long> consumerTracker = new ConcurrentHashMap<ConsumerId,
Long>();
+
     protected final ConcurrentHashMap<ProducerId, ProducerInfo> producers = new ConcurrentHashMap<ProducerId,
ProducerInfo>();
     protected final ConcurrentHashMap<ActiveMQDestination, DestinationInfo> destinations
= new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>();
     protected final ConcurrentHashMap<BrokerInfo, ActiveMQMessage> networkBridges =
new ConcurrentHashMap<BrokerInfo, ActiveMQMessage>();
@@ -113,7 +159,10 @@ public class AdvisoryBroker extends BrokerFilter {
         // Don't advise advisory topics.
         if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
             ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination());
-            consumers.put(new ConsumerIdKey(info.getConsumerId()), info);
+            ConsumerIdKey key = new ConsumerIdKey(info.getConsumerId());
+            consumerTracker.put(key.getConsumerId(), key.getCreationTime());
+            consumers.put(key, info);
+            LOG.info("Added {} to the map:", key);
             fireConsumerAdvisory(context, info.getDestination(), topic, info);
         } else {
             // We need to replay all the previously collected state objects
@@ -276,7 +325,18 @@ public class AdvisoryBroker extends BrokerFilter {
         ActiveMQDestination dest = info.getDestination();
         if (!AdvisorySupport.isAdvisoryTopic(dest)) {
             ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest);
-            consumers.remove(new ConsumerIdKey(info.getConsumerId()));
+
+            Object value = consumerTracker.remove(info.getConsumerId());
+            if (value != null) {
+                Long creationTime = (Long) value;
+                ConsumerIdKey key = new ConsumerIdKey(info.getConsumerId(), creationTime);
+                if (consumers.remove(key) == null) {
+                    LOG.info("Failed to remove:{} from the consumers map: {}", key, consumers);
+                }
+            } else {
+                LOG.info("Failed to find consumer:{} in creation time tracking map: ", info.getConsumerId());
+            }
+
             if (!dest.isTemporary() || destinations.containsKey(dest)) {
                 fireConsumerAdvisory(context,dest, topic, info.createRemoveCommand());
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/21420459/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java
index 8268615..9bf8ed1 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java
@@ -34,12 +34,13 @@ import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 
 public class TempQueueMemoryTest extends EmbeddedBrokerTestSupport {
+
     protected Connection serverConnection;
     protected Session serverSession;
     protected Connection clientConnection;
     protected Session clientSession;
     protected Destination serverDestination;
-    protected int messagesToSend = 2000;
+    protected int messagesToSend = 10;
     protected boolean deleteTempQueue = true;
     protected boolean serverTransactional = false;
     protected boolean clientTransactional = false;
@@ -52,7 +53,7 @@ public class TempQueueMemoryTest extends EmbeddedBrokerTestSupport {
     }
 
     public void testLoadRequestReply() throws Exception {
-        for (int i=0; i< numConsumers; i++) {
+        for (int i = 0; i < numConsumers; i++) {
             serverSession.createConsumer(serverDestination).setMessageListener(new MessageListener()
{
                 @Override
                 public void onMessage(Message msg) {
@@ -73,17 +74,19 @@ public class TempQueueMemoryTest extends EmbeddedBrokerTestSupport {
 
         class Producer extends Thread {
             private final int numToSend;
+
             public Producer(int numToSend) {
                 this.numToSend = numToSend;
             }
+
             @Override
             public void run() {
                 try {
-                    Session session = clientConnection.createSession(clientTransactional,
-                            clientTransactional ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+                    Session session = clientConnection.createSession(clientTransactional,
clientTransactional ?
+                        Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
                     MessageProducer producer = session.createProducer(serverDestination);
 
-                    for (int i =0; i< numToSend; i++) {
+                    for (int i = 0; i < numToSend; i++) {
                         TemporaryQueue replyTo = session.createTemporaryQueue();
                         MessageConsumer consumer = session.createConsumer(replyTo);
                         Message msg = session.createMessage();
@@ -109,8 +112,8 @@ public class TempQueueMemoryTest extends EmbeddedBrokerTestSupport {
             }
         }
         Vector<Thread> threads = new Vector<Thread>(numProducers);
-        for (int i=0; i<numProducers ; i++) {
-            threads.add(new Producer(messagesToSend/numProducers));
+        for (int i = 0; i < numProducers; i++) {
+            threads.add(new Producer(messagesToSend / numProducers));
         }
         startAndJoinThreads(threads);
 
@@ -119,14 +122,13 @@ public class TempQueueMemoryTest extends EmbeddedBrokerTestSupport {
         clientConnection.close();
         serverConnection.close();
 
-        AdvisoryBroker ab = (AdvisoryBroker) broker.getBroker().getAdaptor(
-                AdvisoryBroker.class);
+        AdvisoryBroker ab = (AdvisoryBroker) broker.getBroker().getAdaptor(AdvisoryBroker.class);
 
-        ///The server destination will be left
+        // The server destination will be left
         assertTrue(ab.getAdvisoryDestinations().size() == 1);
 
-        assertTrue("should be zero but is "+ab.getAdvisoryConsumers().size(),ab.getAdvisoryConsumers().size()
== 0);
-        assertTrue("should be zero but is "+ab.getAdvisoryProducers().size(),ab.getAdvisoryProducers().size()
== 0);
+        assertTrue("should be zero but is " + ab.getAdvisoryConsumers().size(), ab.getAdvisoryConsumers().size()
== 0);
+        assertTrue("should be zero but is " + ab.getAdvisoryProducers().size(), ab.getAdvisoryProducers().size()
== 0);
 
         RegionBroker rb = (RegionBroker) broker.getBroker().getAdaptor(RegionBroker.class);
 
@@ -134,10 +136,10 @@ public class TempQueueMemoryTest extends EmbeddedBrokerTestSupport {
     }
 
     private void startAndJoinThreads(Vector<Thread> threads) throws Exception {
-        for (Thread thread: threads) {
+        for (Thread thread : threads) {
             thread.start();
         }
-        for (Thread thread: threads) {
+        for (Thread thread : threads) {
             thread.join();
         }
     }


Mime
View raw message