activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cshan...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-6094 - regression via https://issues.apache.org/jira/browse/AMQ-6014 - incorrect ref count on message expiry event during cursor move for dispatch. Fix and test - thanks for the test
Date Tue, 05 Jan 2016 15:08:06 GMT
Repository: activemq
Updated Branches:
  refs/heads/activemq-5.13.x 63f045e5c -> 26665fa1b


https://issues.apache.org/jira/browse/AMQ-6094 - regression via https://issues.apache.org/jira/browse/AMQ-6014
- incorrect ref count on message expiry event during cursor move for dispatch. Fix and test
- thanks for the test

(cherry picked from commit e3df09b9db09d6cf2834b0beb901c253be9b6120)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/26665fa1
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/26665fa1
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/26665fa1

Branch: refs/heads/activemq-5.13.x
Commit: 26665fa1b98a9004fc90a6eeea26ca06eb53d118
Parents: 63f045e
Author: gtully <gary.tully@gmail.com>
Authored: Tue Jan 5 14:09:16 2016 +0000
Committer: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Committed: Tue Jan 5 14:33:43 2016 +0000

----------------------------------------------------------------------
 .../broker/region/PrefetchSubscription.java     |   1 +
 .../org/apache/activemq/bugs/AMQ6094Test.java   | 234 +++++++++++++++++++
 2 files changed, 235 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/26665fa1/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index 8b52121..8332d25 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -708,6 +708,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription
{
                                     }
 
                                     if (!isBrowser()) {
+                                        node.decrementReferenceCount();
                                         continue;
                                     }
                                 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/26665fa1/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6094Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6094Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6094Test.java
new file mode 100644
index 0000000..8f53d61
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6094Test.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+
+import java.util.ArrayList;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.junit.Assert.assertEquals;
+
+public class AMQ6094Test {
+
+    private static Logger LOG = LoggerFactory.getLogger(AMQ6094Test.class);
+
+    private BrokerService brokerService;
+    private String connectionUri;
+
+    @Before
+    public void before() throws Exception {
+        brokerService = new BrokerService();
+        TransportConnector connector = brokerService.addConnector("tcp://localhost:0");
+        connectionUri = connector.getPublishableConnectString();
+        brokerService.setDeleteAllMessagesOnStartup(true);
+        brokerService.start();
+        brokerService.waitUntilStarted();
+    }
+
+    @After
+    public void after() throws Exception {
+        if (brokerService != null) {
+            brokerService.stop();
+            brokerService.waitUntilStopped();
+        }
+    }
+
+    @Test
+	public void testQueueMemoryUsage() throws Exception {
+
+        final ArrayList<ThreadSlot> producerThreads = new ArrayList<>();
+
+		final ArrayList<ThreadSlot> consumerThreads = new ArrayList<>();
+
+		for (int i = 0; i < 4; i++)
+			producerThreads.add(runInThread(new UnsafeRunnable() {
+
+                @Override
+                public void run() throws Exception {
+                    producer(connectionUri, "queueA");
+                }
+
+			}));
+
+		for (int i = 0; i < 4; i++)
+			consumerThreads.add(runInThread(new UnsafeRunnable() {
+
+                @Override
+                public void run() throws Exception {
+                    consumer(connectionUri, "queueA", 2500);
+                }
+            }));
+
+		// kill and restart random threads
+		for (int count = 0; count < 10; count++) {
+			Thread.sleep(5000);
+			final int i = (int) (Math.random() * consumerThreads.size());
+			final ThreadSlot slot = consumerThreads.get(i);
+			slot.thread.interrupt();
+			consumerThreads.remove(i);
+			consumerThreads.add(runInThread(slot.runnable));
+
+			Queue queue = (Queue) brokerService.getDestination(new ActiveMQQueue("queueA"));
+			LOG.info("cursorMemoryUsage: " + queue.getMessages().
+			        getSystemUsage().getMemoryUsage().getUsage());
+            LOG.info("messagesStat: " + queue.getDestinationStatistics().getMessages().getCount());
+
+		}
+
+        // verify usage
+        Queue queue = (Queue) brokerService.getDestination(new ActiveMQQueue("queueA"));
+        LOG.info("cursorMemoryUsage: " + queue.getMessages().
+                getSystemUsage().getMemoryUsage().getUsage());
+        LOG.info("messagesStat: " + queue.getDestinationStatistics().getMessages().getCount());
+
+        // drain the queue
+        for (ThreadSlot threadSlot: producerThreads) {
+            threadSlot.thread.interrupt();
+            threadSlot.thread.join(4000);
+        }
+
+        for (ThreadSlot threadSlot : consumerThreads) {
+            threadSlot.thread.interrupt();
+            threadSlot.thread.join(4000);
+        }
+
+        consumer(connectionUri, "queueA", 2500, true);
+
+        LOG.info("After drain, cursorMemoryUsage: " + queue.getMessages().
+                getSystemUsage().getMemoryUsage().getUsage());
+        LOG.info("messagesStat: " + queue.getDestinationStatistics().getMessages().getCount());
+
+        assertEquals("Queue memory usage to 0", 0, queue.getMessages().
+                getSystemUsage().getMemoryUsage().getUsage());
+
+	}
+
+	public static void producer(String uri, String topic) throws Exception {
+		final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+		        uri + "?jms.useCompression=true&jms.useAsyncSend=true&daemon=true");
+
+		Connection connection = factory.createConnection();
+		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(new ActiveMQQueue(topic));
+
+		producer.setTimeToLive(6000);
+		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+		while (true) {
+			producer.send(session.createTextMessage(msg()));
+			if (Math.random() > 0.5)
+				Thread.sleep(1);
+		}
+
+	}
+
+    public static void consumer(String uri, String queue, int prefetchSize) throws Exception
{
+        consumer(uri, queue, prefetchSize, false);
+    }
+
+	public static void consumer(String uri, String queue, int prefetchSize, boolean drain) throws
Exception {
+		final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+				uri + "?jms.prefetchPolicy.queuePrefetch=" + prefetchSize + "&jms.useAsyncSend=true");
+
+		Connection connection = null;
+		try {
+			connection = factory.createConnection();
+			connection.start();
+			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+			MessageConsumer consumer = session.createConsumer(new ActiveMQQueue(queue));
+            if (drain) {
+                Message message = null;
+                do {
+                    message = consumer.receive(4000);
+                } while (message != null);
+            } else {
+                // block
+                while (true) {
+                    consumer.receive();
+                }
+            }
+		} finally {
+
+            Thread.interrupted();
+			if (!drain) {
+                Thread.sleep(5000); // delay closing of connection
+            }
+
+			LOG.info("Now closing");
+			if (connection != null)
+				connection.close();
+		}
+
+	}
+
+	private static String msg() {
+		final StringBuilder builder = new StringBuilder();
+		for (int i = 0; i < 100; i++)
+			builder.append("123457890");
+
+		return builder.toString();
+	}
+
+	private static interface UnsafeRunnable {
+		public void run() throws Exception;
+	}
+
+	public static class ThreadSlot {
+		private UnsafeRunnable runnable;
+		private Thread thread;
+	}
+
+	public static ThreadSlot runInThread(final UnsafeRunnable runnable) {
+		final Thread thread = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    runnable.run();
+                } catch (Exception e) {
+                    //e.printStackTrace();
+                }
+
+            }
+        });
+
+		thread.start();
+
+		final ThreadSlot result = new ThreadSlot();
+		result.thread = thread;
+		result.runnable = runnable;
+		return result;
+	}
+}


Mime
View raw message