activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r915914 - in /activemq/branches/activemq-5.3/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/broker/region/policy/ main/java/org/apache/activ...
Date Wed, 24 Feb 2010 18:36:36 GMT
Author: gtully
Date: Wed Feb 24 18:36:35 2010
New Revision: 915914

URL: http://svn.apache.org/viewvc?rev=915914&view=rev
Log:
merge -c 915770,915809 https://svn.apache.org/repos/asf/activemq/trunk - resolve https://issues.apache.org/activemq/browse/AMQ-2626

Added:
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/MessageEvictionTest.java
      - copied, changed from r915770, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageEvictionTest.java
Modified:
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=915914&r1=915913&r2=915914&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Wed Feb 24 18:36:35 2010
@@ -284,13 +284,13 @@
 
                 if (warnOnProducerFlowControl) {
                     warnOnProducerFlowControl = false;
-                    LOG.info("Usage Manager memory limit reached for " + getActiveMQDestination().getQualifiedName()
+                    LOG.info("Usage Manager memory limit ("+ memoryUsage.getLimit() + ")
reached for " + getActiveMQDestination().getQualifiedName()
                             + ". Producers will be throttled to the rate at which messages
are removed from this destination to prevent flooding it."
                             + " See http://activemq.apache.org/producer-flow-control.html
for more info");
                 }
 
                 if (systemUsage.isSendFailIfNoSpace()) {
-                    throw new javax.jms.ResourceAllocationException("Usage Manager memory
limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding "
+                    throw new javax.jms.ResourceAllocationException("Usage Manager memory
limit ("+ memoryUsage.getLimit() + ") reached. Stopping producer (" + message.getProducerId()
+ ") to prevent flooding "
                             + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html
for more info");
                 }
 

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=915914&r1=915913&r2=915914&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Wed Feb 24 18:36:35 2010
@@ -17,11 +17,11 @@
 package org.apache.activemq.broker.region;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.LinkedList;
-import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
+
 import javax.jms.JMSException;
+
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
@@ -88,7 +88,7 @@
             dispatch(node);
             slowConsumer=false;
         } else {
-          //we are slow
+            //we are slow
             if(!slowConsumer) {
                 slowConsumer=true;
                 for (Destination dest: destinations) {
@@ -98,7 +98,12 @@
             if (maximumPendingMessages != 0) {
             	synchronized(matchedListMutex){
             		while (matched.isFull()){
-            			matchedListMutex.wait(20);
+                        if (getContext().getStopping().get()) {
+                            LOG.warn("stopped waiting for space in pendingMessage cursor
for: " + node.getMessageId());
+                            enqueueCounter.decrementAndGet();
+                            return;
+                        }
+                        matchedListMutex.wait(20);
             		}
             		matched.addMessageLast(node);
             	}
@@ -124,8 +129,11 @@
                             LinkedList<MessageReference> list = null;
                             MessageReference[] oldMessages=null;
                             synchronized(matched){
-                            list = matched.pageInList(pageInSize);
+                                list = matched.pageInList(pageInSize);
                             	oldMessages = messageEvictionStrategy.evictMessages(list);
+                            	for (MessageReference ref : list) {
+                            	    ref.decrementReferenceCount();
+                            	}
                             }
                             int messagesToEvict = 0;
                             if (oldMessages != null){
@@ -478,17 +486,5 @@
     public int getPrefetchSize() {
         return (int)info.getPrefetchSize();
     }
-    
-    /**
-     * Get the list of inflight messages
-     * @return the list
-     */
-    public synchronized List<MessageReference> getInFlightMessages(){
-    	List<MessageReference> result = new ArrayList<MessageReference>();
-        synchronized(matched) {
-            result.addAll(matched.pageInList(1000));
-        }
-        return result;
-    }
 
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?rev=915914&r1=915913&r2=915914&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
Wed Feb 24 18:36:35 2010
@@ -211,7 +211,7 @@
     void destroy() throws Exception;
 
     /**
-     * Page in a restricted number of messages
+     * Page in a restricted number of messages and increment the reference count
      * 
      * @param maxItems
      * @return a list of paged in messages

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java?rev=915914&r1=915913&r2=915914&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
Wed Feb 24 18:36:35 2010
@@ -179,7 +179,15 @@
      */
     @Override
     public LinkedList<MessageReference> pageInList(int maxItems) {
-        return list;
+        LinkedList<MessageReference> result = new LinkedList<MessageReference>();
+        for (MessageReference ref: list) {
+            ref.incrementReferenceCount();
+            result.add(ref);
+            if (result.size() >= maxItems) {
+                break;
+            }
+        }
+        return result;
     }
 
     @Override

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=915914&r1=915913&r2=915914&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Wed Feb 24 18:36:35 2010
@@ -147,6 +147,11 @@
     }
 
     public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription)
{
+        //override prefetch size if not set by the Consumer
+        int prefetch=subscription.getConsumerInfo().getPrefetchSize();
+        if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH){
+            subscription.getConsumerInfo().setPrefetchSize(getTopicPrefetch());
+        }
         if (pendingMessageLimitStrategy != null) {
             int value = pendingMessageLimitStrategy.getMaximumPendingMessageLimit(subscription);
             int consumerLimit = subscription.getInfo().getMaximumPendingMessageLimit();
@@ -167,11 +172,6 @@
         }
         if (pendingSubscriberPolicy != null) {
             String name = subscription.getContext().getClientId() + "_" + subscription.getConsumerInfo().getConsumerId();
-            //override prefetch size if not set by the Consumer
-            int prefetch=subscription.getConsumerInfo().getPrefetchSize();
-            if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH){
-                subscription.getConsumerInfo().setPrefetchSize(getTopicPrefetch());
-            }
             int maxBatchSize = subscription.getConsumerInfo().getPrefetchSize();
             subscription.setMatched(pendingSubscriberPolicy.getSubscriberPendingMessageCursor(broker,name,
maxBatchSize));
         }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java?rev=915914&r1=915913&r2=915914&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
Wed Feb 24 18:36:35 2010
@@ -240,7 +240,7 @@
 
     private void fireEvent(final int oldPercentUsage, final int newPercentUsage) {
         if (debug) {
-            LOG.debug("Memory usage change from: " + oldPercentUsage + "% of available memory,
to: " 
+            LOG.debug(getName() + ": usage change from: " + oldPercentUsage + "% of available
memory, to: " 
                 + newPercentUsage + "% of available memory");
         }   
         if (started.get()) {

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java?rev=915914&r1=915913&r2=915914&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java
Wed Feb 24 18:36:35 2010
@@ -43,12 +43,14 @@
      * output the result of stack trace capture to the log
      */
     public static void result() {
-        for (Entry<String, Tracker> t: trackers.entrySet()) {
-            LOG.info("Tracker: " + t.getKey() + ", " + t.getValue().size() + " entry points...");
-            for (Trace trace : t.getValue().values()) {
-                LOG.info("count: " + trace.count, trace);
+        synchronized(trackers) {
+            for (Entry<String, Tracker> t: trackers.entrySet()) {
+                LOG.info("Tracker: " + t.getKey() + ", " + t.getValue().size() + " entry
points...");
+                for (Trace trace : t.getValue().values()) {
+                    LOG.info("count: " + trace.count, trace);
+                }
+                LOG.info("Tracker: " + t.getKey() + ", done.");
             }
-            LOG.info("Tracker: " + t.getKey() + ", done.");
         }
     }
 

Copied: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/MessageEvictionTest.java
(from r915770, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageEvictionTest.java)
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/MessageEvictionTest.java?p2=activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/MessageEvictionTest.java&p1=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageEvictionTest.java&r1=915770&r2=915914&rev=915914&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageEvictionTest.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/MessageEvictionTest.java
Wed Feb 24 18:36:35 2010
@@ -40,17 +40,17 @@
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
+import org.apache.activemq.broker.region.policy.FilePendingSubscriberMessageStoragePolicy;
 import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
-import org.apache.activemq.broker.region.policy.PendingMessageLimitStrategy;
+import org.apache.activemq.broker.region.policy.PendingSubscriberMessageStoragePolicy;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.policy.PrefetchRatePendingMessageLimitStrategy;
+import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
 import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.util.ThreadTracker;
+import org.apache.activemq.util.Wait;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 
 public class MessageEvictionTest {
@@ -60,12 +60,11 @@
     Connection connection;
     private Session session;
     private Topic destination;
-    protected int numMessages = 4000;
+    protected int numMessages = 2000;
     protected String payload = new String(new byte[1024*2]);
 
-    @Before
-    public void setUp() throws Exception {
-        broker = createBroker();
+    public void setUp(PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy) throws
Exception {
+        broker = createBroker(pendingSubscriberPolicy);
         broker.start();
         connectionFactory = createConnectionFactory();
         connection = connectionFactory.createConnection();
@@ -76,13 +75,22 @@
     
     @After
     public void tearDown() throws Exception {
-        ThreadTracker.result();
         connection.stop();
         broker.stop();
     }
     
     @Test
-    public void testMessageEvictionMemoryUsage() throws Exception {
+    public void testMessageEvictionMemoryUsageFileCursor() throws Exception {
+        doTestMessageEvictionMemoryUsage(new FilePendingSubscriberMessageStoragePolicy());
+    }
+    
+    @Test
+    public void testMessageEvictionMemoryUsageVmCursor() throws Exception {
+        doTestMessageEvictionMemoryUsage(new VMPendingSubscriberMessageStoragePolicy());
+    }
+    
+    public void doTestMessageEvictionMemoryUsage(PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy)
throws Exception {
+        setUp(pendingSubscriberPolicy);
         ExecutorService executor = Executors.newCachedThreadPool();
         final CountDownLatch doAck = new CountDownLatch(1);
         final CountDownLatch consumerRegistered = new CountDownLatch(1);
@@ -137,20 +145,22 @@
             }
         });
         
-        assertTrue("messages sending done", sendDone.await(90, TimeUnit.SECONDS));
+        assertTrue("messages sending done", sendDone.await(120, TimeUnit.SECONDS));
         assertEquals("all message were sent", numMessages, sent.get());
         
         doAck.countDown();
         executor.shutdown();
         executor.awaitTermination(30, TimeUnit.SECONDS);
         
-        assertEquals("usage goes to 0", 0,
-                TestSupport.getDestination(broker, 
-                        ActiveMQDestination.transform(destination)).getMemoryUsage().getPercentUsage());
-        
+        assertTrue("usage goes to 0 once consumer goes away", Wait.waitFor(new Wait.Condition()
{
+            public boolean isSatisified() throws Exception {
+                return 0 == TestSupport.getDestination(broker, 
+                        ActiveMQDestination.transform(destination)).getMemoryUsage().getPercentUsage();
+            }
+        }));
     }
 
-    BrokerService createBroker() throws Exception {
+    BrokerService createBroker(PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy)
throws Exception {
         BrokerService brokerService = new BrokerService();
         brokerService.addConnector("tcp://localhost:0");
         brokerService.setUseJmx(false);
@@ -166,9 +176,17 @@
         // so consumer does not get over run while blocked limit the prefetch
         entry.setTopicPrefetch(50);
         
+        
+        entry.setPendingSubscriberPolicy(pendingSubscriberPolicy);
+        
         // limit the number of outstanding messages, large enough to use the file store
+        // or small enough not to blow memory limit
+        int pendingMessageLimit = 50;
+        if (pendingSubscriberPolicy instanceof FilePendingSubscriberMessageStoragePolicy)
{
+            pendingMessageLimit = 500;
+        }
         ConstantPendingMessageLimitStrategy pendingMessageLimitStrategy = new ConstantPendingMessageLimitStrategy();
-        pendingMessageLimitStrategy.setLimit(500);
+        pendingMessageLimitStrategy.setLimit(pendingMessageLimit);
         entry.setPendingMessageLimitStrategy(pendingMessageLimitStrategy);
 
         // to keep the limit in check and up to date rather than just the first few, evict
some

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java?rev=915914&r1=915913&r2=915914&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
Wed Feb 24 18:36:35 2010
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.broker;
 
-import javax.jms.JMSException;
-
 import org.apache.activemq.TestSupport;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.util.ThreadTracker;

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java?rev=915914&r1=915913&r2=915914&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
Wed Feb 24 18:36:35 2010
@@ -20,6 +20,7 @@
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.util.ArrayList;
 import java.util.concurrent.CountDownLatch;
@@ -160,7 +161,19 @@
             }
         });
         
-        produceMessage(producerSession, destination, prefetch * 2);
+        // may block if broker shutodwn happens quickly
+        Executors.newSingleThreadExecutor().execute(new Runnable() {
+            public void run() {
+                LOG.info("producer started");
+                try {
+                    produceMessage(producerSession, destination, prefetch * 2);
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                    fail("unexpceted ex on producer: " + e);
+                }
+                LOG.info("producer done");
+            }
+        });
      
         // will be stopped by the plugin
         broker.waitUntilStopped();
@@ -247,7 +260,19 @@
             }
         });
 
-        produceMessage(producerSession, destination, prefetch * 2);
+        // may block if broker shutdown happens quickly
+        Executors.newSingleThreadExecutor().execute(new Runnable() {
+            public void run() {
+                LOG.info("producer started");
+                try {
+                    produceMessage(producerSession, destination, prefetch * 2);
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                    fail("unexpceted ex on producer: " + e);
+                }
+                LOG.info("producer done");
+            }
+        });
 
         // will be stopped by the plugin
         broker.waitUntilStopped();



Mime
View raw message