activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r915809 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ test/java/org/apache/activemq/
Date Wed, 24 Feb 2010 14:46:21 GMT
Author: gtully
Date: Wed Feb 24 14:46:21 2010
New Revision: 915809

URL: http://svn.apache.org/viewvc?rev=915809&view=rev
Log:
variation on https://issues.apache.org/activemq/browse/AMQ-2626 - add vmcursor eviction test,
shows similar issues also fix potential hang when connection is shutdown and producer is waiting
for space in pending messages

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageEvictionTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=915809&r1=915808&r2=915809&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Wed Feb 24 14:46:21 2010
@@ -17,11 +17,11 @@
 package org.apache.activemq.broker.region;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.LinkedList;
-import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
+
 import javax.jms.JMSException;
+
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
@@ -98,6 +98,11 @@
             if (maximumPendingMessages != 0) {
             	synchronized(matchedListMutex){
             		while (matched.isFull()){
+            		    if (getContext().getStopping().get()) {
+            		        LOG.warn("stopped waiting for space in pendingMessage cursor for: "
+ node.getMessageId());
+            		        enqueueCounter.decrementAndGet();
+            		        return;
+            		    }
             			matchedListMutex.wait(20);
             		}
             		matched.addMessageLast(node);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java?rev=915809&r1=915808&r2=915809&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
Wed Feb 24 14:46:21 2010
@@ -179,7 +179,15 @@
      */
     @Override
     public LinkedList<MessageReference> pageInList(int maxItems) {
-        return list;
+        LinkedList<MessageReference> result = new LinkedList<MessageReference>();
+        for (MessageReference ref: list) {
+            ref.incrementReferenceCount();
+            result.add(ref);
+            if (result.size() >= maxItems) {
+                break;
+            }
+        }
+        return result;
     }
 
     @Override

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageEvictionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageEvictionTest.java?rev=915809&r1=915808&r2=915809&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageEvictionTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageEvictionTest.java
Wed Feb 24 14:46:21 2010
@@ -40,17 +40,18 @@
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
+import org.apache.activemq.broker.region.policy.FilePendingSubscriberMessageStoragePolicy;
 import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
-import org.apache.activemq.broker.region.policy.PendingMessageLimitStrategy;
+import org.apache.activemq.broker.region.policy.PendingSubscriberMessageStoragePolicy;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.policy.PrefetchRatePendingMessageLimitStrategy;
+import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.util.ThreadTracker;
+import org.apache.activemq.util.Wait;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 
 public class MessageEvictionTest {
@@ -63,9 +64,8 @@
     protected int numMessages = 4000;
     protected String payload = new String(new byte[1024*2]);
 
-    @Before
-    public void setUp() throws Exception {
-        broker = createBroker();
+    public void setUp(PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy) throws
Exception {
+        broker = createBroker(pendingSubscriberPolicy);
         broker.start();
         connectionFactory = createConnectionFactory();
         connection = connectionFactory.createConnection();
@@ -82,7 +82,17 @@
     }
     
     @Test
-    public void testMessageEvictionMemoryUsage() throws Exception {
+    public void testMessageEvictionMemoryUsageFileCursor() throws Exception {
+        doTestMessageEvictionMemoryUsage(new FilePendingSubscriberMessageStoragePolicy());
+    }
+    
+    @Test
+    public void testMessageEvictionMemoryUsageVmCursor() throws Exception {
+        doTestMessageEvictionMemoryUsage(new VMPendingSubscriberMessageStoragePolicy());
+    }
+    
+    public void doTestMessageEvictionMemoryUsage(PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy)
throws Exception {
+        setUp(pendingSubscriberPolicy);
         ExecutorService executor = Executors.newCachedThreadPool();
         final CountDownLatch doAck = new CountDownLatch(1);
         final CountDownLatch consumerRegistered = new CountDownLatch(1);
@@ -144,13 +154,15 @@
         executor.shutdown();
         executor.awaitTermination(30, TimeUnit.SECONDS);
         
-        assertEquals("usage goes to 0", 0,
-                TestSupport.getDestination(broker, 
-                        ActiveMQDestination.transform(destination)).getMemoryUsage().getPercentUsage());
-        
+        assertTrue("usage goes to 0 once consumer goes away", Wait.waitFor(new Wait.Condition()
{
+            public boolean isSatisified() throws Exception {
+                return 0 == TestSupport.getDestination(broker, 
+                        ActiveMQDestination.transform(destination)).getMemoryUsage().getPercentUsage();
+            }
+        }));
     }
 
-    BrokerService createBroker() throws Exception {
+    BrokerService createBroker(PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy)
throws Exception {
         BrokerService brokerService = new BrokerService();
         brokerService.addConnector("tcp://localhost:0");
         brokerService.setUseJmx(false);
@@ -166,9 +178,17 @@
         // so consumer does not get over run while blocked limit the prefetch
         entry.setTopicPrefetch(50);
         
+        
+        entry.setPendingSubscriberPolicy(pendingSubscriberPolicy);
+        
         // limit the number of outstanding messages, large enough to use the file store
+        // or small enough not to blow memory limit
+        int pendingMessageLimit = 50;
+        if (pendingSubscriberPolicy instanceof FilePendingSubscriberMessageStoragePolicy)
{
+            pendingMessageLimit = 500;
+        }
         ConstantPendingMessageLimitStrategy pendingMessageLimitStrategy = new ConstantPendingMessageLimitStrategy();
-        pendingMessageLimitStrategy.setLimit(500);
+        pendingMessageLimitStrategy.setLimit(pendingMessageLimit);
         entry.setPendingMessageLimitStrategy(pendingMessageLimitStrategy);
 
         // to keep the limit in check and up to date rather than just the first few, evict
some



Mime
View raw message