activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r961245 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Date Wed, 07 Jul 2010 05:10:18 GMT
Author: rajdavies
Date: Wed Jul  7 05:10:17 2010
New Revision: 961245

URL: http://svn.apache.org/viewvc?rev=961245&view=rev
Log:
Improve concurrency by using read/write locks

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=961245&r1=961244&r2=961245&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Wed Jul  7 05:10:17 2010
@@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 import javax.jms.ResourceAllocationException;
@@ -92,11 +93,15 @@ public class Queue extends BaseDestinati
     protected static final Log LOG = LogFactory.getLog(Queue.class);
     protected final TaskRunnerFactory taskFactory;
     protected TaskRunner taskRunner;
+    private final ReentrantReadWriteLock consumersLock = new ReentrantReadWriteLock();
     protected final List<Subscription> consumers = new ArrayList<Subscription>(50);
+    private final ReentrantReadWriteLock messagesLock = new ReentrantReadWriteLock();
     protected PendingMessageCursor messages;
+    private final ReentrantReadWriteLock pagedInMessagesLock = new ReentrantReadWriteLock();
     private final LinkedHashMap<MessageId, QueueMessageReference> pagedInMessages =
new LinkedHashMap<MessageId, QueueMessageReference>();
     // Messages that are paged in but have not yet been targeted at a
     // subscription
+    private final ReentrantReadWriteLock pagedInPendingDispatchLock = new ReentrantReadWriteLock();
     private List<QueueMessageReference> pagedInPendingDispatch = new ArrayList<QueueMessageReference>(100);
     private List<QueueMessageReference> redeliveredWaitingDispatch = new ArrayList<QueueMessageReference>();
     private MessageGroupMap messageGroupOwners;
@@ -106,7 +111,6 @@ public class Queue extends BaseDestinati
     private ExecutorService executor;
     protected final Map<MessageId, Runnable> messagesWaitingForSpace = Collections
             .synchronizedMap(new LinkedHashMap<MessageId, Runnable>());
-    private final Object dispatchMutex = new Object();
     private boolean useConsumerPriority = true;
     private boolean strictOrderDispatch = false;
     private final QueueDispatchSelector dispatchSelector;
@@ -219,8 +223,11 @@ public class Queue extends BaseDestinati
     }
 
     public List<Subscription> getConsumers() {
-        synchronized (consumers) {
+        consumersLock.readLock().lock();
+        try {
             return new ArrayList<Subscription>(consumers);
+        }finally {
+            consumersLock.readLock().unlock();
         }
     }
 
@@ -284,12 +291,15 @@ public class Queue extends BaseDestinati
                         }
                         if (hasSpace()) {
                             message.setRegionDestination(Queue.this);
-                            synchronized (messages) {
+                            messagesLock.writeLock().lock();
+                            try{
                                 try {
                                     messages.addMessageLast(message);
                                 } catch (Exception e) {
                                     LOG.fatal("Failed to add message to cursor", e);
                                 }
+                            }finally {
+                                messagesLock.writeLock().unlock();
                             }
                             destinationStatistics.getMessages().increment();
                             return true;
@@ -348,13 +358,16 @@ public class Queue extends BaseDestinati
         // synchronize with dispatch method so that no new messages are sent
         // while setting up a subscription. avoid out of order messages,
         // duplicates, etc.
-        synchronized (dispatchMutex) {
+        pagedInPendingDispatchLock.writeLock().lock();
+        try {
 
             sub.add(context, this);
             destinationStatistics.getConsumers().increment();
 
             // needs to be synchronized - so no contention with dispatching
-            synchronized (consumers) {
+           // consumersLock.
+            consumersLock.writeLock().lock();
+            try {
 
                 // set a flag if this is a first consumer
                 if (consumers.size() == 0) {
@@ -378,20 +391,27 @@ public class Queue extends BaseDestinati
                     }
                     dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
                 }
+            }finally {
+                consumersLock.writeLock().unlock();
             }
 
             if (sub instanceof QueueBrowserSubscription) {
                 // tee up for dispatch in next iterate
                 QueueBrowserSubscription browserSubscription = (QueueBrowserSubscription)
sub;
-                synchronized (pagedInMessages) {
+                pagedInMessagesLock.readLock().lock();
+                try{
                     BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription);
                     browserDispatches.addLast(browserDispatch);
+                }finally {
+                    pagedInMessagesLock.readLock().unlock();
                 }
             }
 
             if (!(this.optimizedDispatch || isSlave())) {
                 wakeup();
             }
+        }finally {
+            pagedInPendingDispatchLock.writeLock().unlock();
         }
         if (this.optimizedDispatch || isSlave()) {
             // Outside of dispatchLock() to maintain the lock hierarchy of
@@ -406,14 +426,16 @@ public class Queue extends BaseDestinati
         destinationStatistics.getConsumers().decrement();
         // synchronize with dispatch method so that no new messages are sent
         // while removing up a subscription.
-        synchronized (dispatchMutex) {
+        pagedInPendingDispatchLock.writeLock().lock();
+        try {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("remove sub: " + sub + ", lastDeliveredSeqId: " + lastDeiveredSequenceId
+ ", dequeues: "
                         + getDestinationStatistics().getDequeues().getCount() + ", dispatched:
"
                         + getDestinationStatistics().getDispatched().getCount() + ", inflight:
"
                         + getDestinationStatistics().getInflight().getCount());
             }
-            synchronized (consumers) {
+            consumersLock.writeLock().lock();
+            try {
                 removeFromConsumerList(sub);
                 if (sub.getConsumerInfo().isExclusive()) {
                     Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer();
@@ -461,10 +483,14 @@ public class Queue extends BaseDestinati
                 if (!redeliveredWaitingDispatch.isEmpty()) {
                     doDispatch(new ArrayList<QueueMessageReference>());
                 }
+            }finally {
+                consumersLock.writeLock().unlock();
             }
             if (!(this.optimizedDispatch || isSlave())) {
                 wakeup();
             }
+        }finally {
+            pagedInPendingDispatchLock.writeLock().unlock();
         }
         if (this.optimizedDispatch || isSlave()) {
             // Outside of dispatchLock() to maintain the lock hierarchy of
@@ -758,8 +784,11 @@ public class Queue extends BaseDestinati
     @Override
     public String toString() {
         int size = 0;
-        synchronized (messages) {
+        messagesLock.readLock().lock();
+        try{
             size = messages.size();
+        }finally {
+            messagesLock.readLock().unlock();
         }
         return "Queue: destination=" + destination.getPhysicalName() + ", subscriptions="
+ consumers.size()
                 + ", memory=" + memoryUsage.getPercentUsage() + "%, size=" + size + ", in
flight groups="
@@ -919,58 +948,70 @@ public class Queue extends BaseDestinati
         try {
             pageInMessages(false);
             List<MessageReference> toExpire = new ArrayList<MessageReference>();
-            synchronized (dispatchMutex) {
-                synchronized (pagedInPendingDispatch) {
-                    addAll(pagedInPendingDispatch, browseList, max, toExpire);
-                    for (MessageReference ref : toExpire) {
-                        pagedInPendingDispatch.remove(ref);
-                        if (broker.isExpired(ref)) {
-                            LOG.debug("expiring from pagedInPending: " + ref);
-                            messageExpired(connectionContext, ref);
-                        }
-                    }
-                }
-                toExpire.clear();
-                synchronized (pagedInMessages) {
-                    addAll(pagedInMessages.values(), browseList, max, toExpire);
-                }
+
+            pagedInPendingDispatchLock.writeLock().lock();
+            try {
+                addAll(pagedInPendingDispatch, browseList, max, toExpire);
                 for (MessageReference ref : toExpire) {
+                    pagedInPendingDispatch.remove(ref);
                     if (broker.isExpired(ref)) {
-                        LOG.debug("expiring from pagedInMessages: " + ref);
+                        LOG.debug("expiring from pagedInPending: " + ref);
                         messageExpired(connectionContext, ref);
-                    } else {
-                        synchronized (pagedInMessages) {
-                            pagedInMessages.remove(ref.getMessageId());
-                        }
                     }
                 }
+            } finally {
+                pagedInPendingDispatchLock.writeLock().unlock();
+            }
+            toExpire.clear();
+            pagedInMessagesLock.readLock().lock();
+            try {
+                addAll(pagedInMessages.values(), browseList, max, toExpire);
+            } finally {
+                pagedInMessagesLock.readLock().unlock();
+            }
+            for (MessageReference ref : toExpire) {
+                if (broker.isExpired(ref)) {
+                    LOG.debug("expiring from pagedInMessages: " + ref);
+                    messageExpired(connectionContext, ref);
+                } else {
+                    pagedInMessagesLock.writeLock().lock();
+                    try {
+                        pagedInMessages.remove(ref.getMessageId());
+                    } finally {
+                        pagedInMessagesLock.writeLock().unlock();
+                    }
+                }
+            }
 
-                if (browseList.size() < getMaxBrowsePageSize()) {
-                    synchronized (messages) {
-                        try {
-                            messages.reset();
-                            while (messages.hasNext() && browseList.size() < max)
{
-                                MessageReference node = messages.next();
-                                if (node.isExpired()) {
-                                    if (broker.isExpired(node)) {
-                                        LOG.debug("expiring from messages: " + node);
-                                        messageExpired(connectionContext, createMessageReference(node.getMessage()));
-                                    }
-                                    messages.remove();
-                                } else {
-                                    messages.rollback(node.getMessageId());
-                                    if (browseList.contains(node.getMessage()) == false)
{
-                                        browseList.add(node.getMessage());
-                                    }
+            if (browseList.size() < getMaxBrowsePageSize()) {
+                messagesLock.writeLock().lock();
+                try {
+                    try {
+                        messages.reset();
+                        while (messages.hasNext() && browseList.size() < max)
{
+                            MessageReference node = messages.next();
+                            if (node.isExpired()) {
+                                if (broker.isExpired(node)) {
+                                    LOG.debug("expiring from messages: " + node);
+                                    messageExpired(connectionContext, createMessageReference(node.getMessage()));
+                                }
+                                messages.remove();
+                            } else {
+                                messages.rollback(node.getMessageId());
+                                if (browseList.contains(node.getMessage()) == false) {
+                                    browseList.add(node.getMessage());
                                 }
-                                node.decrementReferenceCount();
                             }
-                        } finally {
-                            messages.release();
+                            node.decrementReferenceCount();
                         }
+                    } finally {
+                        messages.release();
                     }
+                } finally {
+                    messagesLock.writeLock().unlock();
                 }
             }
+
         } catch (Exception e) {
             LOG.error("Problem retrieving message for browse", e);
         }
@@ -990,13 +1031,17 @@ public class Queue extends BaseDestinati
 
     public Message getMessage(String id) {
         MessageId msgId = new MessageId(id);
-        synchronized (pagedInMessages) {
+        pagedInMessagesLock.readLock().lock();
+        try{
             QueueMessageReference r = this.pagedInMessages.get(msgId);
             if (r != null) {
                 return r.getMessage();
             }
+        }finally {
+            pagedInMessagesLock.readLock().unlock();
         }
-        synchronized (messages) {
+        messagesLock.readLock().lock();
+        try{
             try {
                 messages.reset();
                 while (messages.hasNext()) {
@@ -1014,6 +1059,8 @@ public class Queue extends BaseDestinati
             } finally {
                 messages.release();
             }
+        }finally {
+            messagesLock.readLock().unlock();
         }
         return null;
     }
@@ -1023,8 +1070,11 @@ public class Queue extends BaseDestinati
         List<MessageReference> list = null;
         do {
             doPageIn(true);
-            synchronized (pagedInMessages) {
+            pagedInMessagesLock.readLock().lock();
+            try {
                 list = new ArrayList<MessageReference>(pagedInMessages.values());
+            }finally {
+                pagedInMessagesLock.readLock().unlock();
             }
 
             for (MessageReference ref : list) {
@@ -1085,8 +1135,11 @@ public class Queue extends BaseDestinati
         ConnectionContext context = createConnectionContext();
         do {
             doPageIn(true);
-            synchronized (pagedInMessages) {
+            pagedInMessagesLock.readLock().lock();
+            try{
                 set.addAll(pagedInMessages.values());
+            }finally {
+                pagedInMessagesLock.readLock().unlock();
             }
             List<MessageReference> list = new ArrayList<MessageReference>(set);
             for (MessageReference ref : list) {
@@ -1149,8 +1202,11 @@ public class Queue extends BaseDestinati
             setMaxPageSize((int) this.destinationStatistics.getMessages().getCount());
             doPageIn(true);
             setMaxPageSize(oldMaxSize);
-            synchronized (pagedInMessages) {
+            pagedInMessagesLock.readLock().lock();
+            try {
                 set.addAll(pagedInMessages.values());
+            }finally {
+                pagedInMessagesLock.readLock().unlock();
             }
             List<MessageReference> list = new ArrayList<MessageReference>(set);
             for (MessageReference ref : list) {
@@ -1189,8 +1245,11 @@ public class Queue extends BaseDestinati
         QueueMessageReference r = createMessageReference(m);
         BrokerSupport.resend(context, m, dest);
         removeMessage(context, r);
-        synchronized (messages) {
+        messagesLock.writeLock().lock();
+        try{
             messages.rollback(r.getMessageId());
+        }finally {
+            messagesLock.writeLock().unlock();
         }
         return true;
     }
@@ -1232,8 +1291,11 @@ public class Queue extends BaseDestinati
         Set<MessageReference> set = new CopyOnWriteArraySet<MessageReference>();
         do {
             doPageIn(true);
-            synchronized (pagedInMessages) {
+            pagedInMessagesLock.readLock().lock();
+            try{
                 set.addAll(pagedInMessages.values());
+            }finally {
+                pagedInMessagesLock.readLock().unlock();
             }
             List<MessageReference> list = new ArrayList<MessageReference>(set);
             for (MessageReference ref : list) {
@@ -1252,11 +1314,14 @@ public class Queue extends BaseDestinati
     }
 
     BrowserDispatch getNextBrowserDispatch() {
-        synchronized (pagedInMessages) {
+        pagedInMessagesLock.readLock().lock();
+        try{
             if (browserDispatches.isEmpty()) {
                 return null;
             }
             return browserDispatches.removeFirst();
+        }finally {
+            pagedInMessagesLock.readLock().unlock();
         }
 
     }
@@ -1317,15 +1382,18 @@ public class Queue extends BaseDestinati
 
             BrowserDispatch pendingBrowserDispatch = getNextBrowserDispatch();
 
-            synchronized (messages) {
+            messagesLock.readLock().lock();
+            try{
                 pageInMoreMessages |= !messages.isEmpty();
+            }finally {
+                messagesLock.readLock().unlock();
             }
 
-            // Kinda ugly.. but I think dispatchLock is the only mutex
-            // protecting the
-            // pagedInPendingDispatch variable.
-            synchronized (dispatchMutex) {
+            pagedInPendingDispatchLock.readLock().lock();
+            try {
                 pageInMoreMessages |= !pagedInPendingDispatch.isEmpty();
+            }finally {
+                pagedInPendingDispatchLock.readLock().unlock();
             }
 
             // Perhaps we should page always into the pagedInPendingDispatch
@@ -1344,8 +1412,11 @@ public class Queue extends BaseDestinati
 
             if (pendingBrowserDispatch != null) {
                 ArrayList<QueueMessageReference> alreadyDispatchedMessages = null;
-                synchronized (pagedInMessages) {
+                pagedInMessagesLock.readLock().lock();
+                try{
                     alreadyDispatchedMessages = new ArrayList<QueueMessageReference>(pagedInMessages.values());
+                }finally {
+                    pagedInMessagesLock.readLock().unlock();
                 }
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("dispatch to browser: " + pendingBrowserDispatch.getBrowser()
@@ -1412,11 +1483,13 @@ public class Queue extends BaseDestinati
 
     protected void removeMessage(ConnectionContext c, QueueMessageReference r) throws IOException
{
         removeMessage(c, null, r);
-        synchronized (dispatchMutex) {
-            synchronized (pagedInPendingDispatch) {
-                pagedInPendingDispatch.remove(r);
-            }
+        pagedInPendingDispatchLock.writeLock().lock();
+        try {
+            pagedInPendingDispatch.remove(r);
+        } finally {
+            pagedInPendingDispatchLock.writeLock().unlock();
         }
+
     }
 
     protected void removeMessage(ConnectionContext c, Subscription subs, QueueMessageReference
r) throws IOException {
@@ -1457,8 +1530,11 @@ public class Queue extends BaseDestinati
         }
         if (ack.isPoisonAck()) {
             // message gone to DLQ, is ok to allow redelivery
-            synchronized (messages) {
+            messagesLock.writeLock().lock();
+            try{
                 messages.rollback(reference.getMessageId());
+            }finally {
+                messagesLock.writeLock().unlock();
             }
         }
 
@@ -1467,8 +1543,11 @@ public class Queue extends BaseDestinati
     private void dropMessage(QueueMessageReference reference) {
         reference.drop();
         destinationStatistics.getMessages().decrement();
-        synchronized (pagedInMessages) {
+        pagedInMessagesLock.writeLock().lock();
+        try{
             pagedInMessages.remove(reference.getMessageId());
+        }finally {
+            pagedInMessagesLock.writeLock().unlock();
         }
     }
 
@@ -1498,8 +1577,11 @@ public class Queue extends BaseDestinati
     }
 
     final void sendMessage(final Message msg) throws Exception {
-        synchronized (messages) {
+        messagesLock.writeLock().lock();
+        try{
             messages.addMessageLast(msg);
+        }finally {
+            messagesLock.writeLock().unlock();
         }
     }
     
@@ -1507,10 +1589,13 @@ public class Queue extends BaseDestinati
         destinationStatistics.getEnqueues().increment();
         destinationStatistics.getMessages().increment();
         messageDelivered(context, msg);
-        synchronized (consumers) {
+        consumersLock.readLock().lock();
+        try {
             if (consumers.isEmpty()) {
                 onMessageWithNoConsumers(context, msg);
             }
+        }finally {
+            consumersLock.readLock().unlock();
         }
         wakeup();
     }
@@ -1540,99 +1625,118 @@ public class Queue extends BaseDestinati
     private List<QueueMessageReference> doPageIn(boolean force) throws Exception {
         List<QueueMessageReference> result = null;
         List<QueueMessageReference> resultList = null;
-        synchronized (dispatchMutex) {
-            int toPageIn = Math.min(getMaxPageSize(), messages.size());
-            if (LOG.isDebugEnabled()) {
-                LOG.debug(destination.getPhysicalName() + " toPageIn: " + toPageIn + ", Inflight:
"
-                        + destinationStatistics.getInflight().getCount() + ", pagedInMessages.size
"
-                        + pagedInMessages.size() + ", enqueueSize: " + destinationStatistics.getEnqueues().getCount());
-            }
 
-            if (isLazyDispatch() && !force) {
-                // Only page in the minimum number of messages which can be
-                // dispatched immediately.
-                toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
-            }
-            if (toPageIn > 0 && (force || (!consumers.isEmpty() && pagedInPendingDispatch.size()
< getMaxPageSize()))) {
-                int count = 0;
-                result = new ArrayList<QueueMessageReference>(toPageIn);
-                synchronized (messages) {
-                    try {
-                        messages.setMaxBatchSize(toPageIn);
-                        messages.reset();
-                        while (messages.hasNext() && count < toPageIn) {
-                            MessageReference node = messages.next();
-                            messages.remove();
-                            
-                            QueueMessageReference ref = createMessageReference(node.getMessage());
-                            if (ref.isExpired()) {
-                                if (broker.isExpired(ref)) {
-                                    messageExpired(createConnectionContext(), ref);
-                                } else {
-                                    ref.decrementReferenceCount();
-                                }
+        int toPageIn = Math.min(getMaxPageSize(), messages.size());
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(destination.getPhysicalName() + " toPageIn: " + toPageIn + ", Inflight:
"
+                    + destinationStatistics.getInflight().getCount() + ", pagedInMessages.size
"
+                    + pagedInMessages.size() + ", enqueueSize: " + destinationStatistics.getEnqueues().getCount());
+        }
+
+        if (isLazyDispatch() && !force) {
+            // Only page in the minimum number of messages which can be
+            // dispatched immediately.
+            toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
+        }
+        int pagedInPendingSize = 0;
+        pagedInPendingDispatchLock.readLock().lock();
+        try {
+            pagedInPendingSize = pagedInPendingDispatch.size();
+        } finally {
+            pagedInPendingDispatchLock.readLock().unlock();
+        }
+        if (toPageIn > 0 && (force || (!consumers.isEmpty() && pagedInPendingSize
< getMaxPageSize()))) {
+            int count = 0;
+            result = new ArrayList<QueueMessageReference>(toPageIn);
+            messagesLock.writeLock().lock();
+            try {
+                try {
+                    messages.setMaxBatchSize(toPageIn);
+                    messages.reset();
+                    while (messages.hasNext() && count < toPageIn) {
+                        MessageReference node = messages.next();
+                        messages.remove();
+
+                        QueueMessageReference ref = createMessageReference(node.getMessage());
+                        if (ref.isExpired()) {
+                            if (broker.isExpired(ref)) {
+                                messageExpired(createConnectionContext(), ref);
                             } else {
-                                result.add(ref);
-                                count++;
+                                ref.decrementReferenceCount();
                             }
+                        } else {
+                            result.add(ref);
+                            count++;
                         }
-                    } finally {
-                        messages.release();
                     }
+                } finally {
+                    messages.release();
                 }
-                // Only add new messages, not already pagedIn to avoid multiple
-                // dispatch attempts
-                synchronized (pagedInMessages) {
-                    resultList = new ArrayList<QueueMessageReference>(result.size());
-                    for (QueueMessageReference ref : result) {
-                        if (!pagedInMessages.containsKey(ref.getMessageId())) {
-                            pagedInMessages.put(ref.getMessageId(), ref);
-                            resultList.add(ref);
-                        } else {
-                            ref.decrementReferenceCount();
-                        }
+            } finally {
+                messagesLock.writeLock().unlock();
+            }
+            // Only add new messages, not already pagedIn to avoid multiple
+            // dispatch attempts
+            pagedInMessagesLock.readLock().lock();
+            try {
+                resultList = new ArrayList<QueueMessageReference>(result.size());
+                for (QueueMessageReference ref : result) {
+                    if (!pagedInMessages.containsKey(ref.getMessageId())) {
+                        pagedInMessagesLock.readLock().unlock();
+                        pagedInMessagesLock.writeLock().lock();
+                        pagedInMessages.put(ref.getMessageId(), ref);
+                        pagedInMessagesLock.readLock().lock();
+                        pagedInMessagesLock.writeLock().unlock();
+                        resultList.add(ref);
+                    } else {
+                        ref.decrementReferenceCount();
                     }
                 }
-            } else {
-                // Avoid return null list, if condition is not validated
-                resultList = new ArrayList<QueueMessageReference>();
+            } finally {
+                pagedInMessagesLock.readLock().unlock();
             }
+        } else {
+            // Avoid return null list, if condition is not validated
+            resultList = new ArrayList<QueueMessageReference>();
         }
+
         return resultList;
     }
 
     private void doDispatch(List<QueueMessageReference> list) throws Exception {
         boolean doWakeUp = false;
-        synchronized (dispatchMutex) {
 
-            synchronized (pagedInPendingDispatch) {
-                if (!redeliveredWaitingDispatch.isEmpty()) {
-                    // Try first to dispatch redelivered messages to keep an
-                    // proper order
-                    redeliveredWaitingDispatch = doActualDispatch(redeliveredWaitingDispatch);
-                }
-                if (!pagedInPendingDispatch.isEmpty()) {
-                    // Next dispatch anything that had not been
-                    // dispatched before.
-                    pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch);
-                }
-                // and now see if we can dispatch the new stuff.. and append to
-                // the pending
-                // list anything that does not actually get dispatched.
-                if (list != null && !list.isEmpty()) {
-                    if (pagedInPendingDispatch.isEmpty()) {
-                        pagedInPendingDispatch.addAll(doActualDispatch(list));
-                    } else {
-                        for (QueueMessageReference qmr : list) {
-                            if (!pagedInPendingDispatch.contains(qmr)) {
-                                pagedInPendingDispatch.add(qmr);
-                            }
+        pagedInPendingDispatchLock.writeLock().lock();
+        try {
+            if (!redeliveredWaitingDispatch.isEmpty()) {
+                // Try first to dispatch redelivered messages to keep an
+                // proper order
+                redeliveredWaitingDispatch = doActualDispatch(redeliveredWaitingDispatch);
+            }
+            if (!pagedInPendingDispatch.isEmpty()) {
+                // Next dispatch anything that had not been
+                // dispatched before.
+                pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch);
+            }
+            // and now see if we can dispatch the new stuff.. and append to
+            // the pending
+            // list anything that does not actually get dispatched.
+            if (list != null && !list.isEmpty()) {
+                if (pagedInPendingDispatch.isEmpty()) {
+                    pagedInPendingDispatch.addAll(doActualDispatch(list));
+                } else {
+                    for (QueueMessageReference qmr : list) {
+                        if (!pagedInPendingDispatch.contains(qmr)) {
+                            pagedInPendingDispatch.add(qmr);
                         }
-                        doWakeUp = true;
                     }
+                    doWakeUp = true;
                 }
             }
+        } finally {
+            pagedInPendingDispatchLock.writeLock().unlock();
         }
+
         if (doWakeUp) {
             // avoid lock order contention
             asyncWakeup();
@@ -1645,13 +1749,15 @@ public class Queue extends BaseDestinati
      */
     private List<QueueMessageReference> doActualDispatch(List<QueueMessageReference>
list) throws Exception {
         List<Subscription> consumers;
-
-        synchronized (this.consumers) {
+        consumersLock.writeLock().lock();
+        try {
             if (this.consumers.isEmpty() || isSlave()) {
                 // slave dispatch happens in processDispatchNotification
                 return list;
             }
             consumers = new ArrayList<Subscription>(this.consumers);
+        }finally {
+            consumersLock.writeLock().unlock();
         }
 
         List<QueueMessageReference> rc = new ArrayList<QueueMessageReference>(list.size());
@@ -1698,11 +1804,14 @@ public class Queue extends BaseDestinati
             // distribution.
             if (target != null && !strictOrderDispatch && consumers.size()
> 1
                     && !dispatchSelector.isExclusiveConsumer(target)) {
-                synchronized (this.consumers) {
+                consumersLock.writeLock().lock();
+                try {
                     if (removeFromConsumerList(target)) {
                         addToConsumerList(target);
                         consumers = new ArrayList<Subscription>(this.consumers);
                     }
+                }finally {
+                    consumersLock.writeLock().unlock();
                 }
             }
         }
@@ -1730,12 +1839,15 @@ public class Queue extends BaseDestinati
     private int getConsumerMessageCountBeforeFull() throws Exception {
         int total = 0;
         boolean zeroPrefetch = false;
-        synchronized (consumers) {
+        consumersLock.readLock().lock();
+        try{
             for (Subscription s : consumers) {
                 zeroPrefetch |= s.getPrefetchSize() == 0;
                 int countBeforeFull = s.countBeforeFull();
                 total += countBeforeFull;
             }
+        }finally {
+            consumersLock.readLock().unlock();
         }
         if (total == 0 && zeroPrefetch) {
             total = 1;
@@ -1768,50 +1880,57 @@ public class Queue extends BaseDestinati
         QueueMessageReference message = null;
         MessageId messageId = messageDispatchNotification.getMessageId();
 
-        synchronized (dispatchMutex) {
-            synchronized (pagedInPendingDispatch) {
-                for (QueueMessageReference ref : pagedInPendingDispatch) {
-                    if (messageId.equals(ref.getMessageId())) {
-                        message = ref;
-                        pagedInPendingDispatch.remove(ref);
-                        break;
-                    }
+        pagedInPendingDispatchLock.writeLock().lock();
+        try {
+            for (QueueMessageReference ref : pagedInPendingDispatch) {
+                if (messageId.equals(ref.getMessageId())) {
+                    message = ref;
+                    pagedInPendingDispatch.remove(ref);
+                    break;
                 }
             }
+        } finally {
+            pagedInPendingDispatchLock.writeLock().unlock();
+        }
 
-            if (message == null) {
-                synchronized (pagedInMessages) {
-                    message = pagedInMessages.get(messageId);
-                }
+        if (message == null) {
+            pagedInMessagesLock.readLock().lock();
+            try {
+                message = pagedInMessages.get(messageId);
+            } finally {
+                pagedInMessagesLock.readLock().unlock();
             }
+        }
 
-            if (message == null) {
-                synchronized (messages) {
-                    try {
-                        messages.setMaxBatchSize(getMaxPageSize());
-                        messages.reset();
-                        while (messages.hasNext()) {
-                            MessageReference node = messages.next();
-                            messages.remove();
-                            if (messageId.equals(node.getMessageId())) {
-                                message = this.createMessageReference(node.getMessage());
-                                break;
-                            }
+        if (message == null) {
+            messagesLock.writeLock().lock();
+            try {
+                try {
+                    messages.setMaxBatchSize(getMaxPageSize());
+                    messages.reset();
+                    while (messages.hasNext()) {
+                        MessageReference node = messages.next();
+                        messages.remove();
+                        if (messageId.equals(node.getMessageId())) {
+                            message = this.createMessageReference(node.getMessage());
+                            break;
                         }
-                    } finally {
-                        messages.release();
                     }
+                } finally {
+                    messages.release();
                 }
+            } finally {
+                messagesLock.writeLock().unlock();
             }
+        }
 
-            if (message == null) {
-                Message msg = loadMessage(messageId);
-                if (msg != null) {
-                    message = this.createMessageReference(msg);
-                }
+        if (message == null) {
+            Message msg = loadMessage(messageId);
+            if (msg != null) {
+                message = this.createMessageReference(msg);
             }
-
         }
+
         if (message == null) {
             throw new JMSException("Slave broker out of sync with master - Message: "
                     + messageDispatchNotification.getMessageId() + " on "
@@ -1832,13 +1951,16 @@ public class Queue extends BaseDestinati
     private Subscription getMatchingSubscription(MessageDispatchNotification messageDispatchNotification)
             throws JMSException {
         Subscription sub = null;
-        synchronized (consumers) {
+        consumersLock.readLock().lock();
+        try {
             for (Subscription s : consumers) {
                 if (messageDispatchNotification.getConsumerId().equals(s.getConsumerInfo().getConsumerId()))
{
                     sub = s;
                     break;
                 }
             }
+        }finally {
+            consumersLock.readLock().unlock();
         }
         return sub;
     }



Mime
View raw message