activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQ-5421
Date Tue, 04 Nov 2014 21:04:56 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk b70fc9a07 -> 7d136de42


https://issues.apache.org/jira/browse/AMQ-5421

Ensure concurrent access to the destinations state data does not cause
errors.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7d136de4
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7d136de4
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7d136de4

Branch: refs/heads/trunk
Commit: 7d136de422be6cea54c5615833e242908cb0d2dc
Parents: b70fc9a
Author: Timothy Bish <tabish121@gmail.com>
Authored: Tue Nov 4 16:04:29 2014 -0500
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Tue Nov 4 16:04:29 2014 -0500

----------------------------------------------------------------------
 .../policy/AbortSlowAckConsumerStrategy.java    |  13 +-
 .../org/apache/activemq/bugs/AMQ5421Test.java   | 119 +++++++++++++++++++
 2 files changed, 127 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/7d136de4/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java
index 5e538c7..811839d 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java
@@ -18,9 +18,10 @@ package org.apache.activemq.broker.region.policy;
 
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
@@ -40,7 +41,7 @@ public class AbortSlowAckConsumerStrategy extends AbortSlowConsumerStrategy
{
 
     private static final Logger LOG = LoggerFactory.getLogger(AbortSlowAckConsumerStrategy.class);
 
-    private final List<Destination> destinations = new LinkedList<Destination>();
+    private final Map<String, Destination> destinations = new ConcurrentHashMap<String,
Destination>();
     private long maxTimeSinceLastAck = 30*1000;
     private boolean ignoreIdleConsumers = true;
     private boolean ignoreNetworkConsumers = true;
@@ -83,7 +84,7 @@ public class AbortSlowAckConsumerStrategy extends AbortSlowConsumerStrategy
{
 
         List<Destination> disposed = new ArrayList<Destination>();
 
-        for (Destination destination : destinations) {
+        for (Destination destination : destinations.values()) {
             if (destination.isDisposed()) {
                 disposed.add(destination);
                 continue;
@@ -96,7 +97,9 @@ public class AbortSlowAckConsumerStrategy extends AbortSlowConsumerStrategy
{
         }
 
         // Clean up an disposed destinations to save space.
-        destinations.removeAll(disposed);
+        for (Destination destination : disposed) {
+            destinations.remove(destination.getName());
+        }
 
         abortAllQualifiedSlowConsumers();
     }
@@ -164,7 +167,7 @@ public class AbortSlowAckConsumerStrategy extends AbortSlowConsumerStrategy
{
 
     @Override
     public void addDestination(Destination destination) {
-        this.destinations.add(destination);
+        this.destinations.put(destination.getName(), destination);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/activemq/blob/7d136de4/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5421Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5421Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5421Test.java
new file mode 100644
index 0000000..751d488
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5421Test.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.net.URI;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ5421Test {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ5421Test.class);
+
+    private static final int DEST_COUNT = 1000;
+    private final Destination[] destination = new Destination[DEST_COUNT];
+    private final MessageProducer[] producer = new MessageProducer[DEST_COUNT];
+    private BrokerService brokerService;
+    private String connectionUri;
+
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        ActiveMQConnectionFactory conFactory = new ActiveMQConnectionFactory(connectionUri);
+        conFactory.setWatchTopicAdvisories(false);
+        return conFactory;
+    }
+
+    protected AbortSlowAckConsumerStrategy createSlowConsumerStrategy() {
+        AbortSlowAckConsumerStrategy strategy = new AbortSlowAckConsumerStrategy();
+        strategy.setCheckPeriod(2000);
+        strategy.setMaxTimeSinceLastAck(5000);
+        strategy.setIgnoreIdleConsumers(false);
+
+        return strategy;
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        brokerService = BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false&useJmx=true"));
+        PolicyEntry policy = new PolicyEntry();
+
+        policy.setSlowConsumerStrategy(createSlowConsumerStrategy());
+        policy.setQueuePrefetch(10);
+        policy.setTopicPrefetch(10);
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+        brokerService.setDestinationPolicy(pMap);
+        brokerService.addConnector("tcp://0.0.0.0:0");
+        brokerService.start();
+
+        connectionUri = brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString();
+    }
+
+    @Test
+    public void testManyTempDestinations() throws Exception {
+        Connection connection = createConnectionFactory().createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        for (int i = 0; i < DEST_COUNT; i++) {
+            destination[i] = session.createTemporaryQueue();
+            LOG.debug("Created temp queue: [}", i);
+        }
+
+        for (int i = 0; i < DEST_COUNT; i++) {
+            producer[i] = session.createProducer(destination[i]);
+            LOG.debug("Created producer: {}", i);
+            TextMessage msg = session.createTextMessage(" testMessage " + i);
+            producer[i].send(msg);
+            LOG.debug("message sent: {}", i);
+            MessageConsumer consumer = session.createConsumer(destination[i]);
+            Message message = consumer.receive(1000);
+            Assert.assertTrue(message.equals(msg));
+        }
+
+        for (int i = 0; i < DEST_COUNT; i++) {
+            producer[i].close();
+        }
+
+        connection.close();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        brokerService.stop();
+        brokerService.waitUntilStopped();
+    }
+}


Mime
View raw message