activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1173583 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
Date Wed, 21 Sep 2011 12:09:30 GMT
Author: gtully
Date: Wed Sep 21 12:09:30 2011
New Revision: 1173583

URL: http://svn.apache.org/viewvc?rev=1173583&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3507 - Large number of expiring messages causing
QueueSize to groww. have cursor delegate to the destination for expiry processing. additional
test. This will also sort out the intermittent failure of this test.

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=1173583&r1=1173582&r2=1173583&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
Wed Sep 21 12:09:30 2011
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.IndirectMessageReference;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.QueueMessageReference;
 import org.apache.activemq.command.Message;
@@ -233,7 +234,7 @@ public class FilePendingMessageCursor ex
                 throw new RuntimeException(e);
             }
         } else {
-            discard(node);
+            discardExpiredMessage(node);
         }
         //message expired
         return true;
@@ -279,7 +280,7 @@ public class FilePendingMessageCursor ex
                 throw new RuntimeException(e);
             }
         } else {
-            discard(node);
+            discardExpiredMessage(node);
         }
     }
 
@@ -410,7 +411,8 @@ public class FilePendingMessageCursor ex
             while (!tmpList.isEmpty()) {
                 MessageReference node = tmpList.removeFirst();
                 if (node.isExpired()) {
-                    discard(node);
+                    node.decrementReferenceCount();
+                    discardExpiredMessage(node);
                 } else {
                     memoryList.add(node);
                 }
@@ -463,14 +465,15 @@ public class FilePendingMessageCursor ex
         return diskList;
     }
 
-    protected void discard(MessageReference message) {
-        message.decrementReferenceCount();
+    private void discardExpiredMessage(MessageReference reference) {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Discarding message " + message);
+            LOG.debug("Discarding expired message " + reference);
+        }
+        if (broker.isExpired(reference)) {
+            ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
+            context.setBroker(broker);
+            reference.getRegionDestination().messageExpired(context, null, new IndirectMessageReference(reference.getMessage()));
         }
-        ConnectionContext ctx = new ConnectionContext(new NonCachedMessageEvaluationContext());
-        ctx.setBroker(broker);
-        broker.getRoot().sendToDeadLetterQueue(ctx, message, null);
     }
 
     protected ByteSequence getByteSequence(Message message) throws IOException {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java?rev=1173583&r1=1173582&r2=1173583&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
Wed Sep 21 12:09:30 2011
@@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.Connection;
+import javax.jms.DeliveryMode;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
@@ -72,14 +73,18 @@ public class ExpiredMessagesWithNoConsum
     }
 
     protected void createBrokerWithMemoryLimit() throws Exception {
-        doCreateBroker(true);
+        createBrokerWithMemoryLimit(800);
+    }
+
+    protected void createBrokerWithMemoryLimit(int expireMessagesPeriod) throws Exception
{
+        doCreateBroker(true, expireMessagesPeriod);
     }
 
     protected void createBroker() throws Exception {
-        doCreateBroker(false);
+        doCreateBroker(false, 800);
     }
 
-    private void doCreateBroker(boolean memoryLimit) throws Exception {
+    private void doCreateBroker(boolean memoryLimit, int expireMessagesPeriod) throws Exception
{
         broker = new BrokerService();
         broker.setBrokerName("localhost");
         broker.setUseJmx(true);
@@ -89,7 +94,7 @@ public class ExpiredMessagesWithNoConsum
         PolicyMap policyMap = new PolicyMap();
         PolicyEntry defaultEntry = new PolicyEntry();
         defaultEntry.setOptimizedDispatch(optimizedDispatch);
-        defaultEntry.setExpireMessagesPeriod(800);
+        defaultEntry.setExpireMessagesPeriod(expireMessagesPeriod);
         defaultEntry.setMaxExpirePageSize(800);
 
         defaultEntry.setPendingQueuePolicy(pendingQueuePolicy);
@@ -109,6 +114,78 @@ public class ExpiredMessagesWithNoConsum
         connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
     }
 
+    public void testExpiredNonPersistentMessagesWithNoConsumer() throws Exception {
+
+        createBrokerWithMemoryLimit(2000);
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
+        connection = factory.createConnection();
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        producer = session.createProducer(destination);
+        producer.setTimeToLive(1000);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+        connection.start();
+        final long sendCount = 2000;
+
+        final Thread producingThread = new Thread("Producing Thread") {
+            public void run() {
+                try {
+                    int i = 0;
+                    long tStamp = System.currentTimeMillis();
+                    while (i++ < sendCount) {
+                        producer.send(session.createTextMessage("test"));
+                        if (i%100 == 0) {
+                            LOG.info("sent: " + i + " @ " + ((System.currentTimeMillis()
- tStamp) / 100)  + "m/ms");
+                            tStamp = System.currentTimeMillis() ;
+                        }
+
+                        if (135 == i) {
+                            // allow pending messages to expire, before usage limit kicks
in  to flush them
+                            TimeUnit.SECONDS.sleep(5);
+                        }
+                    }
+                } catch (Throwable ex) {
+                    ex.printStackTrace();
+                }
+            }
+        };
+
+        producingThread.start();
+
+        assertTrue("producer failed to complete within allocated time", Wait.waitFor(new
Wait.Condition() {
+            public boolean isSatisified() throws Exception {
+                producingThread.join(TimeUnit.SECONDS.toMillis(3000));
+                return !producingThread.isAlive();
+            }
+        }));
+
+        TimeUnit.SECONDS.sleep(5);
+        final DestinationViewMBean view = createView(destination);
+        Wait.waitFor(new Wait.Condition() {
+            public boolean isSatisified() throws Exception {
+                try {
+                LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount()
+                        + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
+                        + ", size= " + view.getQueueSize());
+                return view.getDequeueCount() != 0
+                        && view.getDequeueCount() == view.getExpiredCount()
+                        && view.getDequeueCount() == view.getEnqueueCount()
+                        && view.getQueueSize() == 0;
+                } catch (Exception ignored) {
+                    LOG.info(ignored.toString());
+                }
+                return false;
+            }
+        }, Wait.MAX_WAIT_MILLIS * 10);
+        LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount()
+                + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
+                + ", size= " + view.getQueueSize());
+
+        assertEquals("memory usage doesn't go to duck egg", 0, view.getMemoryPercentUsage());
+        assertEquals("0 queue", 0, view.getQueueSize());
+    }
+
+
     public void initCombosForTestExpiredMessagesWithNoConsumer() {
         addCombinationValues("optimizedDispatch", new Object[] {Boolean.TRUE, Boolean.FALSE});
         addCombinationValues("pendingQueuePolicy", new Object[] {null, new VMPendingQueueMessageStoragePolicy(),
new FilePendingQueueMessageStoragePolicy()});



Mime
View raw message