activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r831266 - in /activemq/branches/activemq-5.3/activemq-core/src: main/java/org/apache/activemq/broker/region/Queue.java test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
Date Fri, 30 Oct 2009 11:23:07 GMT
Author: gtully
Date: Fri Oct 30 11:23:06 2009
New Revision: 831266

URL: http://svn.apache.org/viewvc?rev=831266&view=rev
Log:
merge -c 831258 https://svn.apache.org/repos/asf/activemq/trunk  - resolve https://issues.apache.org/activemq/browse/AMQ-2468
- limit pagedInPendingDispatch to maxPageSize and bypass dispatch for jmx queue modifications
like purge and remove matching messages so they are not limited by pending messages and can
page through all messages. Resolve intermittent deadlock in AMQ2102Test. Note: sparse selectors
may need to increase maxPageSize as ever increasing pagedInPendingDispatch was exceeding that
limit in error

Modified:
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=831266&r1=831265&r2=831266&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Oct 30 11:23:06 2009
@@ -863,8 +863,8 @@
     public void purge() throws Exception {   
         ConnectionContext c = createConnectionContext();
         List<MessageReference> list = null;
-        do {
-            pageInMessages();
+        do {        
+            doPageIn(true);
             synchronized (pagedInMessages) {
                 list = new ArrayList<MessageReference>(pagedInMessages.values());
             }
@@ -876,6 +876,7 @@
                 } catch (IOException e) {
                 }
             }
+            
         } while (!pagedInMessages.isEmpty() || this.destinationStatistics.getMessages().getCount()
> 0);
         gc();
         this.destinationStatistics.getMessages().setCount(0);
@@ -919,7 +920,7 @@
         Set<MessageReference> set = new CopyOnWriteArraySet<MessageReference>();
         ConnectionContext context = createConnectionContext();
         do {
-            pageInMessages();
+            doPageIn(true);
             synchronized (pagedInMessages) {
                 set.addAll(pagedInMessages.values());
             }
@@ -979,7 +980,7 @@
         do {
             int oldMaxSize=getMaxPageSize();
             setMaxPageSize((int) this.destinationStatistics.getMessages().getCount());
-            pageInMessages();
+            doPageIn(true);
             setMaxPageSize(oldMaxSize);
             synchronized (pagedInMessages) {
                 set.addAll(pagedInMessages.values());
@@ -1170,7 +1171,7 @@
 	            pageInMoreMessages |= !pagedInPendingDispatch.isEmpty();
 	        } 
 	        
-	        // Perhaps we should page always into the pagedInPendingDispatch list is 
+	        // Perhaps we should page always into the pagedInPendingDispatch list if 
 	        // !messages.isEmpty(), and then if !pagedInPendingDispatch.isEmpty()
 	        // then we do a dispatch.
 	        if (pageInMoreMessages) {
@@ -1215,6 +1216,11 @@
 
     protected void removeMessage(ConnectionContext c, QueueMessageReference r) throws IOException
{
         removeMessage(c, null, r);
+        synchronized(dispatchMutex) {            
+            synchronized (pagedInPendingDispatch) {
+                pagedInPendingDispatch.remove(r);
+            }
+        }
     }
     
     protected void removeMessage(ConnectionContext c, Subscription subs,QueueMessageReference
r) throws IOException {
@@ -1349,12 +1355,11 @@
                         + ", pagedInMessages.size " + pagedInMessages.size());
             }
            
-            if (isLazyDispatch()&& !force) {
+            if (isLazyDispatch() && !force) {
                 // Only page in the minimum number of messages which can be dispatched immediately.
                 toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
             }
-            
-            if ((force || !consumers.isEmpty()) && toPageIn > 0) { 
+            if (toPageIn > 0 && (force || (!consumers.isEmpty() && pagedInPendingDispatch.size()
< getMaxPageSize()))) {
                 int count = 0;
                 result = new ArrayList<QueueMessageReference>(toPageIn);
                 synchronized (messages) {
@@ -1405,8 +1410,7 @@
                     // dispatched before.
                     pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch);
                 }
-                // and now see if we can dispatch the new stuff.. and append to
-                // the pending
+                // and now see if we can dispatch the new stuff.. and append to the pending
                 // list anything that does not actually get dispatched.
                 if (list != null && !list.isEmpty()) {
                     if (pagedInPendingDispatch.isEmpty()) {
@@ -1423,7 +1427,8 @@
             }
         } 
         if (doWakeUp) {
-            wakeup();
+            // avoid lock order contention
+            asyncWakeup();
         }
     }
     
@@ -1495,9 +1500,6 @@
         return rc;
     }
 
-    private void pageInMessages() throws Exception {
-        pageInMessages(true);
-    }
 
     protected void pageInMessages(boolean force) throws Exception {
             doDispatch(doPageIn(force));

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java?rev=831266&r1=831265&r2=831266&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
Fri Oct 30 11:23:06 2009
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.broker.region;
 
+import java.io.File;
+
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.JMSException;
@@ -25,15 +27,25 @@
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
-import javax.management.MBeanServerInvocationHandler;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
+
 import junit.framework.TestCase;
+
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
+import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 public class QueuePurgeTest extends TestCase {
+    private static final Log LOG = LogFactory.getLog(QueuePurgeTest.class);
+    private final String MESSAGE_TEXT = new String(new byte[1024]);
     BrokerService broker;
     ConnectionFactory factory;
     Connection connection;
@@ -43,17 +55,23 @@
 
     protected void setUp() throws Exception {
         broker = new BrokerService();
+        broker.setDataDirectory("target/activemq-data");
         broker.setUseJmx(true);
-        broker.setPersistent(false);
+        broker.setDeleteAllMessagesOnStartup(true);
+        KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
+        persistenceAdapter.setDirectory(new File("target/activemq-data/kahadb/QueuePurgeTest"));
+        broker.setPersistenceAdapter(persistenceAdapter);
         broker.addConnector("tcp://localhost:0");
         broker.start();
-        factory = new ActiveMQConnectionFactory("vm://localhost");
+        factory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri().toString());
         connection = factory.createConnection();
         connection.start();
     }
 
     protected void tearDown() throws Exception {
-        consumer.close();
+        if (consumer != null) {
+            consumer.close();
+        }
         session.close();
         connection.stop();
         connection.close();
@@ -61,10 +79,45 @@
     }
 
     public void testPurgeQueueWithActiveConsumer() throws Exception {
-        createProducerAndSendMessages();
+        createProducerAndSendMessages(10000);
+        QueueViewMBean proxy = getProxyToQueueViewMBean();
+        createConsumer();
+        proxy.purge();
+        assertEquals("Queue size is not zero, it's " + proxy.getQueueSize(), 0,
+                proxy.getQueueSize());
+    }
+    
+    
+    public void testPurgeLargeQueue() throws Exception {       
+        applyBrokerSpoolingPolicy();
+        createProducerAndSendMessages(90000);
+        QueueViewMBean proxy = getProxyToQueueViewMBean();
+        LOG.info("purging..");
+        proxy.purge();
+        assertEquals("Queue size is not zero, it's " + proxy.getQueueSize(), 0,
+                proxy.getQueueSize());
+    }
+
+    private void applyBrokerSpoolingPolicy() {
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry defaultEntry = new PolicyEntry();
+        defaultEntry.setProducerFlowControl(false);
+        PendingQueueMessageStoragePolicy pendingQueuePolicy = new FilePendingQueueMessageStoragePolicy();
+        defaultEntry.setPendingQueuePolicy(pendingQueuePolicy);
+        policyMap.setDefaultEntry(defaultEntry);
+        broker.setDestinationPolicy(policyMap);
+    }
+
+    
+    public void testPurgeLargeQueueWithConsumer() throws Exception {       
+        applyBrokerSpoolingPolicy();
+        createProducerAndSendMessages(90000);
         QueueViewMBean proxy = getProxyToQueueViewMBean();
         createConsumer();
+        long start = System.currentTimeMillis();
+        LOG.info("purging..");
         proxy.purge();
+        LOG.info("purge done: " + (System.currentTimeMillis() - start) + "ms");
         assertEquals("Queue size is not zero, it's " + proxy.getQueueSize(), 0,
                 proxy.getQueueSize());
     }
@@ -80,12 +133,15 @@
         return proxy;
     }
 
-    private void createProducerAndSendMessages() throws Exception {
+    private void createProducerAndSendMessages(int numToSend) throws Exception {
         session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
         queue = session.createQueue("test1");
         MessageProducer producer = session.createProducer(queue);
-        for (int i = 0; i < 10000; i++) {
-            TextMessage message = session.createTextMessage("message " + i);
+        for (int i = 0; i < numToSend; i++) {
+            TextMessage message = session.createTextMessage(MESSAGE_TEXT + i);
+            if (i  != 0 && i % 50000 == 0) {
+                LOG.info("sent: " + i);
+            }
             producer.send(message);
         }
         producer.close();
@@ -95,7 +151,7 @@
         consumer = session.createConsumer(queue);
         // wait for buffer fill out
         Thread.sleep(5 * 1000);
-        for (int i = 0; i < 100; ++i) {
+        for (int i = 0; i < 500; ++i) {
             Message message = consumer.receive();
             message.acknowledge();
         }



Mime
View raw message