activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-5585 - move messages on memory limit
Date Fri, 13 Feb 2015 14:16:58 GMT
Repository: activemq
Updated Branches:
  refs/heads/master db084ef77 -> 1cab71386


https://issues.apache.org/jira/browse/AMQ-5585 - move messages on memory limit


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/1cab7138
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/1cab7138
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/1cab7138

Branch: refs/heads/master
Commit: 1cab7138640bcc075ba6f752728d2aa99f094106
Parents: db084ef
Author: Dejan Bosanac <dejan@nighttale.net>
Authored: Fri Feb 13 15:16:26 2015 +0100
Committer: Dejan Bosanac <dejan@nighttale.net>
Committed: Fri Feb 13 15:16:52 2015 +0100

----------------------------------------------------------------------
 .../activemq/broker/region/RegionBroker.java    |  1 -
 .../org/apache/activemq/util/BrokerSupport.java |  1 +
 .../activemq/usecases/MemoryLimitTest.java      | 73 ++++++++++++++++++--
 3 files changed, 68 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/1cab7138/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
index 3acc135..26e0207 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
@@ -777,7 +777,6 @@ public class RegionBroker extends EmptyBroker {
                         if (deadLetterStrategy.isSendToDeadLetterQueue(message)) {
                             // message may be inflight to other subscriptions so do not modify
                             message = message.copy();
-                            message.getMessageId().setFutureOrSequenceLong(null);
                             stampAsExpired(message);
                             message.setExpiration(0);
                             if (!message.isPersistent()) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/1cab7138/activemq-broker/src/main/java/org/apache/activemq/util/BrokerSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/util/BrokerSupport.java b/activemq-broker/src/main/java/org/apache/activemq/util/BrokerSupport.java
index f3f3b78..ca19b8b 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/util/BrokerSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/util/BrokerSupport.java
@@ -55,6 +55,7 @@ public final class BrokerSupport {
         message.setDestination(deadLetterDestination);
         message.setTransactionId(null);
         message.setMemoryUsage(null);
+        message.getMessageId().setFutureOrSequenceLong(null);
         message.setRedeliveryCounter(0);
         boolean originalFlowControl = context.isProducerFlowControl();
         try {

http://git-wip-us.apache.org/repos/asf/activemq/blob/1cab7138/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java
index c481172..647c683 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java
@@ -17,15 +17,16 @@
 package org.apache.activemq.usecases;
 
 import java.util.Arrays;
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.Queue;
-import javax.jms.Session;
+import javax.jms.*;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.TestSupport;
+import org.apache.activemq.broker.BrokerRegistry;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.QueueView;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
@@ -153,6 +154,58 @@ public class MemoryLimitTest extends TestSupport {
         }
     }
 
+    @Test(timeout = 120000)
+    public void testMoveMessages() throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=10");
+        factory.setOptimizeAcknowledge(true);
+        Connection conn = factory.createConnection();
+        conn.start();
+        Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = sess.createQueue("IN");
+        final byte[] payload = new byte[200 * 1024]; //200KB
+        final int count = 4;
+        final ProducerThread producer = new ProducerThread(sess, queue) {
+            @Override
+            protected Message createMessage(int i) throws Exception {
+                BytesMessage bytesMessage = session.createBytesMessage();
+                bytesMessage.writeBytes(payload);
+                return bytesMessage;
+            }
+        };
+        producer.setMessageCount(count);
+        producer.start();
+        producer.join();
+
+        Thread.sleep(1000);
+
+        final QueueViewMBean in = getProxyToQueue("IN");
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return in.getQueueSize() == count;
+            }
+        });
+
+        assertEquals("Messages not sent" , count, in.getQueueSize());
+
+        int moved = in.moveMatchingMessagesTo("", "OUT");
+
+        assertEquals("Didn't move all messages", count, moved);
+
+
+        final QueueViewMBean out = getProxyToQueue("OUT");
+
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return out.getQueueSize() == count;
+            }
+        });
+
+        assertEquals("Messages not moved" , count, out.getQueueSize());
+
+    }
+
     /**
      * Handy test for manually checking what's going on
      */
@@ -199,4 +252,12 @@ public class MemoryLimitTest extends TestSupport {
 
         assertEquals("consumer got all produced messages", producer.getMessageCount(), consumer.getReceived());
     }
+
+    protected QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException,
JMSException {
+        BrokerService brokerService = BrokerRegistry.getInstance().lookup("localhost");
+        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="+name);
+        QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext()
+                .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+        return proxy;
+    }
 }


Mime
View raw message