activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r811425 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/Queue.java test/java/org/apache/activemq/bugs/AMQ2356Test.java test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
Date Fri, 04 Sep 2009 14:25:19 GMT
Author: rajdavies
Date: Fri Sep  4 14:25:18 2009
New Revision: 811425

URL: http://svn.apache.org/viewvc?rev=811425&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-2356

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2356Test.java 
 (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=811425&r1=811424&r2=811425&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Sep  4 14:25:18 2009
@@ -16,29 +16,6 @@
  */
 package org.apache.activemq.broker.region;
 
-import java.io.IOException;
-import java.util.AbstractList;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
-
-import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
-
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
@@ -77,6 +54,26 @@
 import org.apache.activemq.util.BrokerSupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import java.io.IOException;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
 
 
 /**
@@ -100,7 +97,7 @@
     private final Object sendLock = new Object();
     private ExecutorService executor;
     protected final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
-    private final ReentrantLock dispatchLock = new ReentrantLock();
+    private final Object dispatchMutex = new Object();
     private boolean useConsumerPriority=true;
     private boolean strictOrderDispatch=false;
     private QueueDispatchSelector dispatchSelector;
@@ -276,8 +273,8 @@
         // synchronize with dispatch method so that no new messages are sent
         // while setting up a subscription. avoid out of order messages,
         // duplicates, etc.
-        dispatchLock.lock();
-        try {
+        synchronized(dispatchMutex) {
+        
             sub.add(context, this);
             destinationStatistics.getConsumers().increment();
 
@@ -324,8 +321,6 @@
             if (!(this.optimizedDispatch || isSlave())) {
                 wakeup();
             }
-        }finally {
-            dispatchLock.unlock();
         }
         if (this.optimizedDispatch || isSlave()) {
             // Outside of dispatchLock() to maintain the lock hierarchy of
@@ -339,8 +334,7 @@
         destinationStatistics.getConsumers().decrement();
         // synchronize with dispatch method so that no new messages are sent
         // while removing up a subscription.
-        dispatchLock.lock();
-        try {
+        synchronized(dispatchMutex) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("remove sub: " + sub + ", lastDeliveredSeqId: " + lastDeiveredSequenceId
                         + ", dequeues: " + getDestinationStatistics().getDequeues().getCount()
@@ -390,8 +384,6 @@
             if (!(this.optimizedDispatch || isSlave())) {
                 wakeup();
             }
-        }finally {
-            dispatchLock.unlock();
         }
         if (this.optimizedDispatch || isSlave()) {
             // Outside of dispatchLock() to maintain the lock hierarchy of
@@ -750,8 +742,7 @@
         try {
             pageInMessages(forcePageIn);
             List<MessageReference> toExpire = new ArrayList<MessageReference>();
-            dispatchLock.lock();
-            try {
+            synchronized(dispatchMutex) {
                 synchronized (pagedInPendingDispatch) {
                     addAll(pagedInPendingDispatch, l, max, toExpire);
                     for (MessageReference ref : toExpire) {
@@ -796,9 +787,7 @@
                         }
                     }
                 }
-            } finally {
-                dispatchLock.unlock();
-            }
+            } 
         } catch (Exception e) {
             LOG.error("Problem retrieving message for browse", e);
         }
@@ -1161,12 +1150,9 @@
 	        
 	        // Kinda ugly.. but I think dispatchLock is the only mutex protecting the 
 	        // pagedInPendingDispatch variable. 	        
-	        dispatchLock.lock();
-	        try {
+	        synchronized(dispatchMutex) {
 	            pageInMoreMessages |= !pagedInPendingDispatch.isEmpty();
-	        } finally {
-	            dispatchLock.unlock();
-	        }
+	        } 
 	        
 	        // Perhaps we should page always into the pagedInPendingDispatch list is 
 	        // !messages.isEmpty(), and then if !pagedInPendingDispatch.isEmpty()
@@ -1328,8 +1314,7 @@
     private List<QueueMessageReference> doPageIn(boolean force) throws Exception {
         List<QueueMessageReference> result = null;
         List<QueueMessageReference> resultList = null;
-        dispatchLock.lock();
-        try{
+        synchronized(dispatchMutex) {
             int toPageIn = getMaxPageSize() + Math.max(0, (int)destinationStatistics.getInflight().getCount())
- pagedInMessages.size();
          
             if (LOG.isDebugEnabled()) {
@@ -1381,15 +1366,13 @@
                 // Avoid return null list, if condition is not validated
                 resultList = new ArrayList<QueueMessageReference>();
             }
-        }finally {
-            dispatchLock.unlock();
         }
         return resultList;
     }
 
     private void doDispatch(List<QueueMessageReference> list) throws Exception {
-        dispatchLock.lock();
-        try {
+        synchronized(dispatchMutex) {
+       
             synchronized (pagedInPendingDispatch) {
                 if (!pagedInPendingDispatch.isEmpty()) {
                     // Try to first dispatch anything that had not been
@@ -1412,9 +1395,7 @@
                     }
                 }
             }
-        } finally {
-            dispatchLock.unlock();
-        }
+        } 
     }
     
     /**
@@ -1545,8 +1526,7 @@
         QueueMessageReference message = null;
         MessageId messageId = messageDispatchNotification.getMessageId();
         
-        dispatchLock.lock();
-        try {
+       synchronized(dispatchMutex) {
             synchronized (pagedInPendingDispatch) {
                for(QueueMessageReference ref : pagedInPendingDispatch) {
                    if (messageId.equals(ref.getMessageId())) {
@@ -1590,9 +1570,7 @@
                 }
             }          
             
-        } finally {
-            dispatchLock.unlock();        
-        }
+        } 
         if (message == null) {
             throw new JMSException(
                     "Slave broker out of sync with master - Message: "

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2356Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2356Test.java?rev=811425&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2356Test.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2356Test.java Fri
Sep  4 14:25:18 2009
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
+import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.perf.NumberOfDestinationsTest;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import java.io.File;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import junit.framework.TestCase;
+/*
+A AMQ2356Test
+We have an environment where we have a very large number of destinations. 
+In an effort to reduce the number of threads I have set the options
+-Dorg.apache.activemq.UseDedicatedTaskRunner=false
+
+and
+
+<policyEntry queue=">" optimizedDispatch="true"/>
+
+Unfortunately this very quickly leads to deadlocked queues.
+
+My environment is:
+
+ActiveMQ 5.2 Ubunty Jaunty kernel 2.6.28-14-generic #47-Ubuntu SMP (although only a single
core on my system)
+TCP transportConnector
+
+To reproduce the bug (which I can do 100% of the time) I connect 5 consumers (AUTO_ACK) to
5 different queues. 
+Then I start 5 producers and pair them up with a consumer on a queue, and they start sending
PERSISTENT messages. 
+I've set the producer to send 100 messages and disconnect, and the consumer to receive 100
messages and disconnect. 
+The first pair usually gets through their 100 messages and disconnect, at which point all
the other pairs have 
+deadlocked at less than 30 messages each.
+ */
+public class AMQ2356Test extends TestCase {
+    protected static final int MESSAGE_COUNT = 1000;
+    protected static final int NUMBER_OF_PAIRS = 10;
+    private static final Log LOG = LogFactory.getLog(NumberOfDestinationsTest.class);
+    protected BrokerService broker;
+    protected String brokerURL = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
+    protected int destinationCount;
+
+    public void testScenario() throws Exception {
+        for (int i = 0; i < NUMBER_OF_PAIRS; i++) {
+            ActiveMQQueue queue = new ActiveMQQueue(getClass().getName()+":"+i);
+            ProducerConsumerPair cp = new ProducerConsumerPair();
+            cp.start(this.brokerURL, queue, MESSAGE_COUNT);
+            cp.testRun();
+            cp.stop();
+        }
+    }
+
+    protected Destination getDestination(Session session) throws JMSException {
+        String destinationName = getClass().getName() + "." + destinationCount++;
+        return session.createQueue(destinationName);
+    }
+
+    protected void setUp() throws Exception {
+        if (broker == null) {
+            broker = createBroker();
+        }
+        super.setUp();
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        configureBroker(answer);
+        answer.start();
+        return answer;
+    }
+
+    protected void configureBroker(BrokerService answer) throws Exception {
+        File dataFileDir = new File("target/test-amq-data/bugs/AMQ2356/kahadb");
+        KahaDBStore kaha = new KahaDBStore();
+        kaha.setDirectory(dataFileDir);
+        answer.setUseJmx(false);
+     // Setup a destination policy where it takes only 1 message at a time.
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry policy = new PolicyEntry();
+        policy.setOptimizedDispatch(true);
+        policyMap.setDefaultEntry(policy);
+        answer.setDestinationPolicy(policyMap);
+        
+        answer.setAdvisorySupport(false);
+        answer.setEnableStatistics(false);
+        answer.setDeleteAllMessagesOnStartup(true);
+        answer.addConnector(brokerURL);
+       
+    }
+    static class ProducerConsumerPair {
+        private Destination destination;
+        private MessageProducer producer;
+        private MessageConsumer consumer;
+        private Connection producerConnection;
+        private Connection consumerConnection;
+        private int numberOfMessages;
+
+        ProducerConsumerPair(){
+           
+        }
+        void start(String brokerURL, final Destination dest, int msgNum) throws Exception
{
+            this.destination=dest;
+            this.numberOfMessages=msgNum;
+            ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerURL);
+            this.producerConnection = cf.createConnection();
+            this.producerConnection.start();
+            this.consumerConnection = cf.createConnection();
+            this.consumerConnection.start();
+            this.producer=createProducer(this.producerConnection);
+            this.consumer=createConsumer(this.consumerConnection);
+        }
+        
+        void testRun() throws Exception {
+            
+            
+                Session s = this.producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                for (int i = 0 ; i < this.numberOfMessages; i++) {
+                    BytesMessage msg = s.createBytesMessage();
+                    msg.writeBytes(new byte[1024]);
+                    this.producer.send(msg);
+                }
+                int received = 0;
+                for (int i = 0 ; i < this.numberOfMessages; i++) {
+                    Message msg = this.consumer.receive();
+                    assertNotNull(msg);
+                    received++;
+                }
+                assertEquals("Messages received on " + this.destination,this.numberOfMessages,received);
+           
+           
+        }
+        
+        void stop() throws Exception {
+            if (this.producerConnection != null) {
+                this.producerConnection.close();
+            }
+            if (this.consumerConnection != null) {
+                this.consumerConnection.close();
+            }
+        }
+
+        private MessageProducer createProducer(Connection connection) throws Exception {
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageProducer result = session.createProducer(this.destination);
+            return result;
+        }
+        
+        private MessageConsumer createConsumer(Connection connection) throws Exception {
+          
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer result = session.createConsumer(this.destination);
+            return result;
+        }
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2356Test.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2356Test.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java?rev=811425&r1=811424&r2=811425&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
Fri Sep  4 14:25:18 2009
@@ -76,6 +76,7 @@
 
         PolicyMap policyMap = new PolicyMap();
         PolicyEntry defaultEntry = new PolicyEntry();
+        defaultEntry.setOptimizedDispatch(true);
         defaultEntry.setExpireMessagesPeriod(100);
         defaultEntry.setMaxExpirePageSize(800);
 



Mime
View raw message