activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r577746 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Date Thu, 20 Sep 2007 13:49:33 GMT
Author: rajdavies
Date: Thu Sep 20 06:49:32 2007
New Revision: 577746

URL: http://svn.apache.org/viewvc?rev=577746&view=rev
Log:
make the locking more coarse grained

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=577746&r1=577745&r2=577746&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Thu Sep 20 06:49:32 2007
@@ -58,7 +58,6 @@
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.thread.TaskRunnerFactory;
-import org.apache.activemq.thread.Valve;
 import org.apache.activemq.transaction.Synchronization;
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.usage.SystemUsage;
@@ -79,7 +78,6 @@
     private final Log log;
     private final ActiveMQDestination destination;
     private final List<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
-    private final Valve dispatchValve = new Valve(true);
     private final SystemUsage systemUsage;
     private final MemoryUsage memoryUsage;
     private final DestinationStatistics destinationStatistics = new DestinationStatistics();
@@ -97,7 +95,6 @@
     private int maximumPagedInMessages = garbageSizeBeforeCollection * 2;
     private final MessageEvaluationContext queueMsgConext = new MessageEvaluationContext();
     private final Object exclusiveLockMutex = new Object();
-    private final Object doDispatchMutex = new Object();
     private TaskRunner taskRunner;
     
     private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
@@ -203,13 +200,15 @@
         return true;
     }
 
-    public void addSubscription(ConnectionContext context, Subscription sub) throws Exception
{
+    public synchronized  void addSubscription(ConnectionContext context, Subscription sub)
throws Exception {
         sub.add(context, this);
         destinationStatistics.getConsumers().increment();
         maximumPagedInMessages += sub.getConsumerInfo().getPrefetchSize();
 
         MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
         try {
+        	
+        	//needs to be synchronized - so no contention with dispatching
             synchronized (consumers) {
                 consumers.add(sub);
                 if (sub.getConsumerInfo().isExclusive()) {
@@ -224,22 +223,26 @@
                     }
                 }
             }
-            // page in messages
-            doPageIn();
+            
+            //we hold the lock on the dispatchValue - so lets build the paged in
+            //list directly;
+            buildList(false);
+           
             // synchronize with dispatch method so that no new messages are sent
             // while
             // setting up a subscription. avoid out of order messages,
             // duplicates
-            // etc.
-            dispatchValve.turnOff();
-            try {
+            // etc.  
+            
+         
+            
                 msgContext.setDestination(destination);
                 synchronized (pagedInMessages) {
                     // Add all the matching messages in the queue to the
                     // subscription.
                     for (Iterator<MessageReference> i = pagedInMessages.iterator();
i.hasNext();) {
                         QueueMessageReference node = (QueueMessageReference)i.next();
-                        if (node.isDropped()) {
+                        if (node.isDropped() ||  (!sub.getConsumerInfo().isBrowser() &&
node.getLockOwner()!=null)) {
                             continue;
                         }
                         try {
@@ -252,101 +255,94 @@
                         }
                     }
                 }
-            } finally {
-                dispatchValve.turnOn();
-            }
+          
+            
+            
         } finally {
             msgContext.clear();
         }
     }
 
-    public void removeSubscription(ConnectionContext context, Subscription sub) throws Exception
{
-
-        destinationStatistics.getConsumers().decrement();
-        maximumPagedInMessages -= sub.getConsumerInfo().getPrefetchSize();
-
-        // synchronize with dispatch method so that no new messages are sent
-        // while
-        // removing up a subscription.
-        dispatchValve.turnOff();
-        try {
-
-            synchronized (consumers) {
-                consumers.remove(sub);
-                if (sub.getConsumerInfo().isExclusive()) {
-                    LockOwner owner = (LockOwner)sub;
-                    // Did we loose the exclusive owner??
-                    if (exclusiveOwner == owner) {
-
-                        // Find the exclusive consumer with the higest Lock
-                        // Priority.
-                        exclusiveOwner = null;
-                        for (Iterator<Subscription> iter = consumers.iterator(); iter.hasNext();)
{
-                            Subscription s = iter.next();
-                            LockOwner so = (LockOwner)s;
-                            if (s.getConsumerInfo().isExclusive() && (exclusiveOwner
== null || so.getLockPriority() > exclusiveOwner.getLockPriority())) {
-                                exclusiveOwner = so;
-                            }
-                        }
-                    }
-                }
-                if (consumers.isEmpty()) {
-                    messages.gc();
-                }
-
-            }
-            sub.remove(context, this);
-
-            boolean wasExclusiveOwner = false;
-            if (exclusiveOwner == sub) {
-                exclusiveOwner = null;
-                wasExclusiveOwner = true;
-            }
-
-            ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
-            MessageGroupSet ownedGroups = getMessageGroupOwners().removeConsumer(consumerId);
-
-            if (!sub.getConsumerInfo().isBrowser()) {
-                MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
-                try {
-                    msgContext.setDestination(destination);
-
-                    // lets copy the messages to dispatch to avoid deadlock
-                    List<QueueMessageReference> messagesToDispatch = new ArrayList<QueueMessageReference>();
-                    synchronized (pagedInMessages) {
-                        for (Iterator<MessageReference> i = pagedInMessages.iterator();
i.hasNext();) {
-                            QueueMessageReference node = (QueueMessageReference)i.next();
-                            if (node.isDropped()) {
-                                continue;
-                            }
-
-                            String groupID = node.getGroupID();
-
-                            // Re-deliver all messages that the sub locked
-                            if (node.getLockOwner() == sub || wasExclusiveOwner || (groupID
!= null && ownedGroups.contains(groupID))) {
-                                messagesToDispatch.add(node);
-                            }
-                        }
-                    }
-
-                    // now lets dispatch from the copy of the collection to
-                    // avoid deadlocks
-                    for (Iterator<QueueMessageReference> iter = messagesToDispatch.iterator();
iter.hasNext();) {
-                        QueueMessageReference node = iter.next();
-                        node.incrementRedeliveryCounter();
-                        node.unlock();
-                        msgContext.setMessageReference(node);
-                        dispatchPolicy.dispatch(node, msgContext, consumers);
-                    }
-                } finally {
-                    msgContext.clear();
-                }
-            }
-        } finally {
-            dispatchValve.turnOn();
-        }
-
-    }
+    public synchronized void removeSubscription(ConnectionContext context,
+	        Subscription sub) throws Exception{
+		destinationStatistics.getConsumers().decrement();
+		maximumPagedInMessages-=sub.getConsumerInfo().getPrefetchSize();
+		// synchronize with dispatch method so that no new messages are sent
+		// while
+		// removing up a subscription.
+		synchronized(consumers){
+			consumers.remove(sub);
+			if(sub.getConsumerInfo().isExclusive()){
+				LockOwner owner=(LockOwner)sub;
+				// Did we loose the exclusive owner??
+				if(exclusiveOwner==owner){
+					// Find the exclusive consumer with the higest Lock
+					// Priority.
+					exclusiveOwner=null;
+					for(Iterator<Subscription> iter=consumers.iterator();iter
+					        .hasNext();){
+						Subscription s=iter.next();
+						LockOwner so=(LockOwner)s;
+						if(s.getConsumerInfo().isExclusive()
+						        &&(exclusiveOwner==null||so.getLockPriority()>exclusiveOwner
+						                .getLockPriority())){
+							exclusiveOwner=so;
+						}
+					}
+				}
+			}
+			if(consumers.isEmpty()){
+				messages.gc();
+			}
+		}
+		sub.remove(context,this);
+		boolean wasExclusiveOwner=false;
+		if(exclusiveOwner==sub){
+			exclusiveOwner=null;
+			wasExclusiveOwner=true;
+		}
+		ConsumerId consumerId=sub.getConsumerInfo().getConsumerId();
+		MessageGroupSet ownedGroups=getMessageGroupOwners().removeConsumer(
+		        consumerId);
+		if(!sub.getConsumerInfo().isBrowser()){
+			MessageEvaluationContext msgContext=context
+			        .getMessageEvaluationContext();
+			try{
+				msgContext.setDestination(destination);
+				// lets copy the messages to dispatch to avoid deadlock
+				List<QueueMessageReference> messagesToDispatch=new ArrayList<QueueMessageReference>();
+				synchronized(pagedInMessages){
+					for(Iterator<MessageReference> i=pagedInMessages.iterator();i
+					        .hasNext();){
+						QueueMessageReference node=(QueueMessageReference)i
+						        .next();
+						if(node.isDropped()){
+							continue;
+						}
+						String groupID=node.getGroupID();
+						// Re-deliver all messages that the sub locked
+						if(node.getLockOwner()==sub
+						        ||wasExclusiveOwner
+						        ||(groupID!=null&&ownedGroups.contains(groupID))){
+							messagesToDispatch.add(node);
+						}
+					}
+				}
+				// now lets dispatch from the copy of the collection to
+				// avoid deadlocks
+				for(Iterator<QueueMessageReference> iter=messagesToDispatch
+				        .iterator();iter.hasNext();){
+					QueueMessageReference node=iter.next();
+					node.incrementRedeliveryCounter();
+					node.unlock();
+					msgContext.setMessageReference(node);
+					dispatchPolicy.dispatch(node,msgContext,consumers);
+				}
+			}finally{
+				msgContext.clear();
+			}
+		}
+	}
 
     public void send(final ProducerBrokerExchange producerExchange, final Message message)
throws Exception {
         final ConnectionContext context = producerExchange.getConnectionContext();
@@ -998,18 +994,20 @@
         pageInMessages(false);
     }
 
-    private List<MessageReference> doPageIn() throws Exception {
-        return doPageIn(true);
+    
+    private   List<MessageReference> doPageIn(boolean force) throws Exception {
+    	 List<MessageReference> result  = null;
+    		result  = buildList(force);
+    	return  result;
     }
 
-    private List<MessageReference> doPageIn(boolean force) throws Exception {
+    private   synchronized List<MessageReference> buildList(boolean force) throws Exception
{
 
         final int toPageIn = maximumPagedInMessages - pagedInMessages.size();
         List<MessageReference> result = null;
         if ((force || !consumers.isEmpty()) && toPageIn > 0) {
             messages.setMaxBatchSize(toPageIn);
             try {
-                dispatchValve.increment();
                 int count = 0;
                 result = new ArrayList<MessageReference>(toPageIn);
                 synchronized (messages) {
@@ -1037,16 +1035,14 @@
                 }
             } finally {
                 queueMsgConext.clear();
-                dispatchValve.decrement();
             }
         }
         return result;
     }
 
-    private void doDispatch(List<MessageReference> list) throws Exception {
+    private  synchronized void doDispatch(List<MessageReference> list) throws Exception
{
         if (list != null && !list.isEmpty()) {
             try {
-                dispatchValve.increment();
                 for (int i = 0; i < list.size(); i++) {
                     MessageReference node = list.get(i);
                     queueMsgConext.setDestination(destination);
@@ -1055,7 +1051,6 @@
                 }
             } finally {
                 queueMsgConext.clear();
-                dispatchValve.decrement();
             }
         }
     }
@@ -1065,9 +1060,7 @@
     }
 
     private void pageInMessages(boolean force) throws Exception {
-        synchronized (doDispatchMutex) {
             doDispatch(doPageIn(force));
-        }
     }
 
 }



Mime
View raw message