activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [1/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5616
Date Tue, 28 Apr 2015 15:15:29 GMT
Repository: activemq
Updated Branches:
  refs/heads/master bcdf770c1 -> 9ef425929


http://git-wip-us.apache.org/repos/asf/activemq/blob/9ef42592/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java
index 988a065..883bbfb 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java
@@ -19,6 +19,7 @@ package org.apache.activemq.transport.mqtt.strategy;
 import java.io.IOException;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
@@ -58,8 +59,8 @@ public abstract class AbstractMQTTSubscriptionStrategy implements MQTTSubscripti
     protected MQTTProtocolConverter protocol;
     protected BrokerService brokerService;
 
-    protected final ConcurrentHashMap<ConsumerId, MQTTSubscription> subscriptionsByConsumerId
= new ConcurrentHashMap<ConsumerId, MQTTSubscription>();
-    protected final ConcurrentHashMap<String, MQTTSubscription> mqttSubscriptionByTopic
= new ConcurrentHashMap<String, MQTTSubscription>();
+    protected final ConcurrentMap<ConsumerId, MQTTSubscription> subscriptionsByConsumerId
= new ConcurrentHashMap<ConsumerId, MQTTSubscription>();
+    protected final ConcurrentMap<String, MQTTSubscription> mqttSubscriptionByTopic
= new ConcurrentHashMap<String, MQTTSubscription>();
 
     protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/9ef42592/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBroker.java
----------------------------------------------------------------------
diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBroker.java
b/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBroker.java
index 6ae7990..7c6d169 100644
--- a/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBroker.java
+++ b/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBroker.java
@@ -16,8 +16,27 @@
  */
 package org.apache.activemq.partition;
 
-import org.apache.activemq.broker.*;
-import org.apache.activemq.command.*;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.broker.TransportConnection;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConnectionControl;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.Message;
 import org.apache.activemq.partition.dto.Partitioning;
 import org.apache.activemq.partition.dto.Target;
 import org.apache.activemq.state.ConsumerState;
@@ -27,12 +46,6 @@ import org.apache.activemq.util.LRUCache;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-
 /**
  * A BrokerFilter which partitions client connections over a cluster of brokers.
  *
@@ -267,7 +280,7 @@ public class PartitionBroker extends BrokerFilter {
         return null;
     }
 
-    protected final ConcurrentHashMap<ConnectionId, ConnectionMonitor> monitors = new
ConcurrentHashMap<ConnectionId, ConnectionMonitor>();
+    protected final ConcurrentMap<ConnectionId, ConnectionMonitor> monitors = new ConcurrentHashMap<ConnectionId,
ConnectionMonitor>();
 
     @Override
     public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception
{

http://git-wip-us.apache.org/repos/asf/activemq/blob/9ef42592/activemq-shiro/src/main/java/org/apache/activemq/shiro/subject/SubjectSecurityContext.java
----------------------------------------------------------------------
diff --git a/activemq-shiro/src/main/java/org/apache/activemq/shiro/subject/SubjectSecurityContext.java
b/activemq-shiro/src/main/java/org/apache/activemq/shiro/subject/SubjectSecurityContext.java
index 1d752e2..f344d8f 100644
--- a/activemq-shiro/src/main/java/org/apache/activemq/shiro/subject/SubjectSecurityContext.java
+++ b/activemq-shiro/src/main/java/org/apache/activemq/shiro/subject/SubjectSecurityContext.java
@@ -16,14 +16,14 @@
  */
 package org.apache.activemq.shiro.subject;
 
+import java.security.Principal;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.security.SecurityContext;
 import org.apache.shiro.subject.Subject;
 
-import java.security.Principal;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
 /**
  * ActiveMQ {@code SecurityContext} implementation that retains a Shiro {@code Subject} instance
for use during
  * security checks and other security-related operations.
@@ -73,12 +73,12 @@ public class SubjectSecurityContext extends SecurityContext {
     }
 
     @Override
-    public ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination> getAuthorizedReadDests()
{
+    public ConcurrentMap<ActiveMQDestination, ActiveMQDestination> getAuthorizedReadDests()
{
         throw notAllowed("getAuthorizedReadDests");
     }
 
     @Override
-    public ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination> getAuthorizedWriteDests()
{
+    public ConcurrentMap<ActiveMQDestination, ActiveMQDestination> getAuthorizedWriteDests()
{
         throw notAllowed("getAuthorizedWriteDests");
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/9ef42592/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
index 7ec53b1..245774d 100644
--- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
+++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.JMSException;
@@ -104,15 +105,15 @@ public class ProtocolConverter {
     private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator();
     private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator();
 
-    private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new
ConcurrentHashMap<Integer, ResponseHandler>();
-    private final ConcurrentHashMap<ConsumerId, StompSubscription> subscriptionsByConsumerId
= new ConcurrentHashMap<ConsumerId, StompSubscription>();
-    private final ConcurrentHashMap<String, StompSubscription> subscriptions = new
ConcurrentHashMap<String, StompSubscription>();
-    private final ConcurrentHashMap<String, ActiveMQDestination> tempDestinations =
new ConcurrentHashMap<String, ActiveMQDestination>();
-    private final ConcurrentHashMap<String, String> tempDestinationAmqToStompMap =
new ConcurrentHashMap<String, String>();
+    private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer,
ResponseHandler>();
+    private final ConcurrentMap<ConsumerId, StompSubscription> subscriptionsByConsumerId
= new ConcurrentHashMap<ConsumerId, StompSubscription>();
+    private final ConcurrentMap<String, StompSubscription> subscriptions = new ConcurrentHashMap<String,
StompSubscription>();
+    private final ConcurrentMap<String, ActiveMQDestination> tempDestinations = new
ConcurrentHashMap<String, ActiveMQDestination>();
+    private final ConcurrentMap<String, String> tempDestinationAmqToStompMap = new
ConcurrentHashMap<String, String>();
     private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<String,
LocalTransactionId>();
     private final StompTransport stompTransport;
 
-    private final ConcurrentHashMap<String, AckEntry> pedingAcks = new ConcurrentHashMap<String,
AckEntry>();
+    private final ConcurrentMap<String, AckEntry> pedingAcks = new ConcurrentHashMap<String,
AckEntry>();
     private final IdGenerator ACK_ID_GENERATOR = new IdGenerator();
 
     private final Object commnadIdMutex = new Object();

http://git-wip-us.apache.org/repos/asf/activemq/blob/9ef42592/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1936Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1936Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1936Test.java
index 2c86562..e4c73ff 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1936Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1936Test.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.bugs;
 
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -148,7 +149,7 @@ public class AMQ1936Test extends TestCase {
     }
 
     public void testForDuplicateMessages() throws Exception {
-        final ConcurrentHashMap<String, String> messages = new ConcurrentHashMap<String,
String>();
+        final ConcurrentMap<String, String> messages = new ConcurrentHashMap<String,
String>();
         final Object lock = new Object();
         final CountDownLatch duplicateSignal = new CountDownLatch(1);
         final AtomicInteger messageCount = new AtomicInteger(0);

http://git-wip-us.apache.org/repos/asf/activemq/blob/9ef42592/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2364Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2364Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2364Test.java
index 15d24d5..cc7c711 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2364Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2364Test.java
@@ -23,6 +23,7 @@ import java.lang.reflect.Field;
 import java.net.URI;
 import java.util.Collection;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 
 import javax.jms.Destination;
@@ -98,7 +99,7 @@ public class AMQ2364Test {
         ConnectionStateTracker stateTracker = (ConnectionStateTracker) stateTrackerField.get(failoverTrans);
         Field statesField = ConnectionStateTracker.class.getDeclaredField("connectionStates");
         statesField.setAccessible(true);
-        ConcurrentHashMap<ConnectionId, ConnectionState> states =
+        ConcurrentMap<ConnectionId, ConnectionState> states =
                 (ConcurrentHashMap<ConnectionId, ConnectionState>) statesField.get(stateTracker);
 
         ConnectionState state = states.get(connection.getConnectionInfo().getConnectionId());

http://git-wip-us.apache.org/repos/asf/activemq/blob/9ef42592/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4062Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4062Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4062Test.java
index a567455..e6c2ab5 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4062Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4062Test.java
@@ -22,7 +22,7 @@ import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.Iterator;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 
 import javax.jms.DeliveryMode;
@@ -57,7 +57,7 @@ public class AMQ4062Test {
 
     private BrokerService service;
     private PolicyEntry policy;
-    private ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubscriptions;
+    private ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscriptions;
 
     private static final int PREFETCH_SIZE_5=5;
     private String connectionUri;
@@ -174,17 +174,17 @@ public class AMQ4062Test {
     }
 
     @SuppressWarnings("unchecked")
-    private ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> getDurableSubscriptions()
throws NoSuchFieldException, IllegalAccessException {
+    private ConcurrentMap<SubscriptionKey, DurableTopicSubscription> getDurableSubscriptions()
throws NoSuchFieldException, IllegalAccessException {
         if(durableSubscriptions!=null) return durableSubscriptions;
         RegionBroker regionBroker=(RegionBroker)service.getRegionBroker();
         TopicRegion region=(TopicRegion)regionBroker.getTopicRegion();
         Field field=TopicRegion.class.getDeclaredField("durableSubscriptions");
         field.setAccessible(true);
-        durableSubscriptions=(ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>)field.get(region);
+        durableSubscriptions=(ConcurrentMap<SubscriptionKey, DurableTopicSubscription>)field.get(region);
         return durableSubscriptions;
     }
 
-    private ConsumerInfo getConsumerInfo(ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>
durableSubscriptions) {
+    private ConsumerInfo getConsumerInfo(ConcurrentMap<SubscriptionKey, DurableTopicSubscription>
durableSubscriptions) {
         ConsumerInfo info=null;
         for(Iterator<DurableTopicSubscription> it=durableSubscriptions.values().iterator();it.hasNext();){
             Subscription sub = it.next();

http://git-wip-us.apache.org/repos/asf/activemq/blob/9ef42592/activemq-unit-tests/src/test/java/org/apache/activemq/network/CompressionOverNetworkTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/CompressionOverNetworkTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/CompressionOverNetworkTest.java
index 58af6dc..c342eb2 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/CompressionOverNetworkTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/CompressionOverNetworkTest.java
@@ -23,7 +23,7 @@ import static org.junit.Assert.assertTrue;
 import java.net.URI;
 import java.util.Arrays;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
@@ -238,7 +238,7 @@ public class CompressionOverNetworkTest {
                 if (bridges.length > 0) {
                     LOG.info(brokerService + " bridges "  + Arrays.toString(bridges));
                     DemandForwardingBridgeSupport demandForwardingBridgeSupport = (DemandForwardingBridgeSupport)
bridges[0];
-                    ConcurrentHashMap<ConsumerId, DemandSubscription> forwardingBridges
= demandForwardingBridgeSupport.getLocalSubscriptionMap();
+                    ConcurrentMap<ConsumerId, DemandSubscription> forwardingBridges
= demandForwardingBridgeSupport.getLocalSubscriptionMap();
                     LOG.info(brokerService + " bridge "  + demandForwardingBridgeSupport
+ ", localSubs: " + forwardingBridges);
                     if (!forwardingBridges.isEmpty()) {
                         for (DemandSubscription demandSubscription : forwardingBridges.values())
{

http://git-wip-us.apache.org/repos/asf/activemq/blob/9ef42592/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkLoopBackTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkLoopBackTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkLoopBackTest.java
index 62b79b1..f784166 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkLoopBackTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkLoopBackTest.java
@@ -46,7 +46,7 @@ public class NetworkLoopBackTest {
                 }
             });
 
-            final DemandForwardingBridgeSupport loopbackBridge = (DemandForwardingBridgeSupport)
networkConnector.bridges.elements().nextElement();
+            final DemandForwardingBridgeSupport loopbackBridge = (DemandForwardingBridgeSupport)
networkConnector.bridges.values().iterator().next();
             assertTrue("nc started", networkConnector.isStarted());
 
             assertTrue("It should get disposed", Wait.waitFor(new Wait.Condition() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/9ef42592/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
index 62385cb..d7e987c 100755
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
@@ -22,7 +22,7 @@ import static org.junit.Assert.assertTrue;
 
 import java.net.URI;
 import java.util.Arrays;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
@@ -173,7 +173,7 @@ public class SimpleNetworkTest {
                 if (bridges.length > 0) {
                     LOG.info(brokerService + " bridges "  + Arrays.toString(bridges));
                     DemandForwardingBridgeSupport demandForwardingBridgeSupport = (DemandForwardingBridgeSupport)
bridges[0];
-                    ConcurrentHashMap<ConsumerId, DemandSubscription> forwardingBridges
= demandForwardingBridgeSupport.getLocalSubscriptionMap();
+                    ConcurrentMap<ConsumerId, DemandSubscription> forwardingBridges
= demandForwardingBridgeSupport.getLocalSubscriptionMap();
                     LOG.info(brokerService + " bridge "  + demandForwardingBridgeSupport
+ ", localSubs: " + forwardingBridges);
                     if (!forwardingBridges.isEmpty()) {
                         for (DemandSubscription demandSubscription : forwardingBridges.values())
{

http://git-wip-us.apache.org/repos/asf/activemq/blob/9ef42592/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/SocketTstFactory.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/SocketTstFactory.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/SocketTstFactory.java
index 396f284..9b31a73 100755
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/SocketTstFactory.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/SocketTstFactory.java
@@ -22,6 +22,7 @@ import java.net.Socket;
 import java.net.UnknownHostException;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import javax.net.SocketFactory;
 
@@ -36,7 +37,7 @@ import org.slf4j.LoggerFactory;
 public class SocketTstFactory extends SocketFactory {
     private static final Logger LOG = LoggerFactory.getLogger(SocketTstFactory.class);
 
-    private static final ConcurrentHashMap<InetAddress, Integer> closeIter = new ConcurrentHashMap<InetAddress,
Integer>();
+    private static final ConcurrentMap<InetAddress, Integer> closeIter = new ConcurrentHashMap<InetAddress,
Integer>();
 
     private class SocketTst {
 


Mime
View raw message