activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQ-4731
Date Thu, 26 Sep 2013 17:02:01 GMT
Updated Branches:
  refs/heads/trunk 3e58a5a4e -> 74dafd7f2


https://issues.apache.org/jira/browse/AMQ-4731

Just use a concurrent linked queue, makes the code much simpler and
doesn't hurt performance overall

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/74dafd7f
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/74dafd7f
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/74dafd7f

Branch: refs/heads/trunk
Commit: 74dafd7f24028c3503758581166ec10bb3d5116a
Parents: 3e58a5a
Author: Timothy Bish <tabish121@gmail.com>
Authored: Thu Sep 26 18:01:43 2013 +0100
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Thu Sep 26 18:01:43 2013 +0100

----------------------------------------------------------------------
 .../activemq/advisory/AdvisoryBroker.java       | 83 ++------------------
 1 file changed, 8 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/74dafd7f/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
index 11af872..d68c5bd 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
@@ -19,8 +19,10 @@ package org.apache.activemq.advisory;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentSkipListMap;
 
 import org.apache.activemq.broker.Broker;
@@ -68,66 +70,9 @@ public class AdvisoryBroker extends BrokerFilter {
     private static final IdGenerator ID_GENERATOR = new IdGenerator();
 
     protected final ConcurrentHashMap<ConnectionId, ConnectionInfo> connections = new
ConcurrentHashMap<ConnectionId, ConnectionInfo>();
-    class ConsumerIdKey {
-        private final ConsumerId delegate;
-        private final long creationTime;
-
-        ConsumerIdKey(ConsumerId id) {
-            this.delegate = id;
-            this.creationTime = System.currentTimeMillis();
-        }
-
-        ConsumerIdKey(ConsumerId id, long creationTime) {
-            this.delegate = id;
-            this.creationTime = creationTime;
-        }
-
-        @Override
-        public boolean equals(Object other) {
-            if (this == other) {
-                return true;
-            }
-            if (other == null || other.getClass() != ConsumerIdKey.class) {
-                return false;
-            }
-
-            ConsumerIdKey key = (ConsumerIdKey) other;
-
-            return delegate.equals(key.delegate);
-        }
-
-        @Override
-        public int hashCode() {
-            return delegate.hashCode();
-        }
-
-        @Override
-        public String toString() {
-            return "ConsumerIdKey { " + delegate + " }";
-        }
-
-        public ConsumerId getConsumerId() {
-            return this.delegate;
-        }
-
-        public long getCreationTime() {
-            return this.creationTime;
-        }
-    }
-
-    // replay consumer advisory messages in the order in which they arrive - allows duplicate
suppression in
-    // mesh networks with ttl>1
-    protected final Map<ConsumerIdKey, ConsumerInfo> consumers = new ConcurrentSkipListMap<ConsumerIdKey,
ConsumerInfo>(
-        new Comparator<ConsumerIdKey>() {
-            @Override
-            public int compare(ConsumerIdKey o1, ConsumerIdKey o2) {
-                return (o1.creationTime < o2.creationTime ? -1 : o1.equals(o2) ? 0 : 1);
-            }
-        }
-    );
-
-    protected final Map<ConsumerId, Long> consumerTracker = new ConcurrentHashMap<ConsumerId,
Long>();
 
+    protected final Queue<ConsumerInfo> consumers = new ConcurrentLinkedQueue<ConsumerInfo>();
+    
     protected final ConcurrentHashMap<ProducerId, ProducerInfo> producers = new ConcurrentHashMap<ProducerId,
ProducerInfo>();
     protected final ConcurrentHashMap<ActiveMQDestination, DestinationInfo> destinations
= new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>();
     protected final ConcurrentHashMap<BrokerInfo, ActiveMQMessage> networkBridges =
new ConcurrentHashMap<BrokerInfo, ActiveMQMessage>();
@@ -159,9 +104,7 @@ public class AdvisoryBroker extends BrokerFilter {
         // Don't advise advisory topics.
         if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
             ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination());
-            ConsumerIdKey key = new ConsumerIdKey(info.getConsumerId());
-            consumerTracker.put(key.getConsumerId(), key.getCreationTime());
-            consumers.put(key, info);
+            consumers.offer(info);
             fireConsumerAdvisory(context, info.getDestination(), topic, info);
         } else {
             // We need to replay all the previously collected state objects
@@ -206,7 +149,7 @@ public class AdvisoryBroker extends BrokerFilter {
 
             // Replay the consumers.
             if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination())) {
-                for (Iterator<ConsumerInfo> iter = consumers.values().iterator(); iter.hasNext();)
{
+                for (Iterator<ConsumerInfo> iter = consumers.iterator(); iter.hasNext();)
{
                     ConsumerInfo value = iter.next();
                     ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination());
                     fireConsumerAdvisory(context,value.getDestination(), topic, value, info.getConsumerId());
@@ -324,17 +267,7 @@ public class AdvisoryBroker extends BrokerFilter {
         ActiveMQDestination dest = info.getDestination();
         if (!AdvisorySupport.isAdvisoryTopic(dest)) {
             ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest);
-            Object value = consumerTracker.remove(info.getConsumerId());
-            if (value != null) {
-                Long creationTime = (Long) value;
-                ConsumerIdKey key = new ConsumerIdKey(info.getConsumerId(), creationTime);
-                if (consumers.remove(key) == null) {
-                    LOG.trace("Failed to remove:{} from the consumers map: {}", key, consumers);
-                }
-            } else {
-                LOG.trace("Failed to find consumer:{} in creation time tracking map: ", info.getConsumerId());
-            }
-
+            consumers.remove(info);
             if (!dest.isTemporary() || destinations.containsKey(dest)) {
                 fireConsumerAdvisory(context,dest, topic, info.createRemoveCommand());
             }
@@ -660,7 +593,7 @@ public class AdvisoryBroker extends BrokerFilter {
         return connections;
     }
 
-    public Map<ConsumerIdKey, ConsumerInfo> getAdvisoryConsumers() {
+    public Queue<ConsumerInfo> getAdvisoryConsumers() {
         return consumers;
     }
 


Mime
View raw message