activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r492461 - /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Date Thu, 04 Jan 2007 08:59:15 GMT
Author: rajdavies
Date: Thu Jan  4 00:59:12 2007
New Revision: 492461

URL: http://svn.apache.org/viewvc?view=rev&rev=492461
Log:
go back to less granular synchronization

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=492461&r1=492460&r2=492461
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Thu Jan  4 00:59:12 2007
@@ -1,27 +1,22 @@
 /**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.activemq.broker.region;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedList;
-import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
@@ -45,91 +40,86 @@
 import org.apache.activemq.util.BrokerSupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+
 /**
  * A subscription that honors the pre-fetch option of the ConsumerInfo.
  * 
  * @version $Revision: 1.15 $
  */
 abstract public class PrefetchSubscription extends AbstractSubscription{
-    
+
     static private final Log log=LogFactory.getLog(PrefetchSubscription.class);
     final protected PendingMessageCursor pending;
     final protected LinkedList dispatched=new LinkedList();
-    
     protected int prefetchExtension=0;
-        
     protected long enqueueCounter;
     protected long dispatchCounter;
     protected long dequeueCounter;
-    private AtomicBoolean dispatching = new AtomicBoolean();
-    
-    public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info,
PendingMessageCursor cursor)
-                    throws  InvalidSelectorException{
+    private AtomicBoolean dispatching=new AtomicBoolean();
+
+    public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info,PendingMessageCursor
cursor)
+            throws InvalidSelectorException{
         super(broker,context,info);
-        pending = cursor;
+        pending=cursor;
     }
-    
+
     public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info)
-    throws  InvalidSelectorException{
-       this(broker,context,info,new VMPendingMessageCursor()); 
+            throws InvalidSelectorException{
+        this(broker,context,info,new VMPendingMessageCursor());
     }
 
-    
     /**
      * Allows a message to be pulled on demand by a client
      */
-    public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception
{
-    	// The slave should not deliver pull messages.  TODO: when the slave becomes a master,
-    	// He should send a NULL message to all the consumers to 'wake them up' in case 
-    	// they were waiting for a message.
-        if (getPrefetchSize() == 0 && !isSlaveBroker()) {
+    public synchronized Response pullMessage(ConnectionContext context,MessagePull pull)
throws Exception{
+        // The slave should not deliver pull messages. TODO: when the slave becomes a master,
+        // He should send a NULL message to all the consumers to 'wake them up' in case
+        // they were waiting for a message.
+        if(getPrefetchSize()==0&&!isSlaveBroker()){
             prefetchExtension++;
-            
-            final long dispatchCounterBeforePull = dispatchCounter;
+            final long dispatchCounterBeforePull=dispatchCounter;
             dispatchMatched();
-            
-        	// If there was nothing dispatched.. we may need to setup a timeout.
-        	if( dispatchCounterBeforePull == dispatchCounter ) {
-        		// imediate timeout used by receiveNoWait()
-        		if( pull.getTimeout() == -1 ) {
-        			// Send a NULL message.
-	            	add(QueueMessageReference.NULL_MESSAGE);
-	            	dispatchMatched();
-        }
-        		if( pull.getTimeout() > 0 ) {
-	            	Scheduler.executeAfterDelay(new Runnable(){
-							public void run() {
-								pullTimeout(dispatchCounterBeforePull);
-							}
-						}, pull.getTimeout());
-        		}
-        	}
+            // If there was nothing dispatched.. we may need to setup a timeout.
+            if(dispatchCounterBeforePull==dispatchCounter){
+                // imediate timeout used by receiveNoWait()
+                if(pull.getTimeout()==-1){
+                    // Send a NULL message.
+                    add(QueueMessageReference.NULL_MESSAGE);
+                    dispatchMatched();
+                }
+                if(pull.getTimeout()>0){
+                    Scheduler.executeAfterDelay(new Runnable(){
+
+                        public void run(){
+                            pullTimeout(dispatchCounterBeforePull);
+                        }
+                    },pull.getTimeout());
+                }
+            }
         }
         return null;
     }
-    
+
     /**
-     * Occurs when a pull times out.  If nothing has been dispatched
-     * since the timeout was setup, then send the NULL message.
+     * Occurs when a pull times out. If nothing has been dispatched since the timeout was
setup, then send the NULL
+     * message.
      */
-    private void pullTimeout(long dispatchCounterBeforePull) {    	
-    	if( dispatchCounterBeforePull == dispatchCounter ) {
-        	try {
-				add(QueueMessageReference.NULL_MESSAGE);
-				dispatchMatched();
-			} catch (Exception e) {
-				context.getConnection().serviceException(e);
-			}
-    	}
-	}
-        
-    public void add(MessageReference node) throws Exception{
-        boolean pendingEmpty=false;
-        
-        synchronized(pending){
-            pendingEmpty=pending.isEmpty();
-            enqueueCounter++;
+    private void pullTimeout(long dispatchCounterBeforePull){
+        if(dispatchCounterBeforePull==dispatchCounter){
+            try{
+                add(QueueMessageReference.NULL_MESSAGE);
+                dispatchMatched();
+            }catch(Exception e){
+                context.getConnection().serviceException(e);
+            }
         }
+    }
+
+    public synchronized void add(MessageReference node) throws Exception{
+        boolean pendingEmpty=false;
+        pendingEmpty=pending.isEmpty();
+        enqueueCounter++;
+       
         if(!isFull()&&pendingEmpty&&!broker.isSlaveBroker()){
             dispatch(node);
         }else{
@@ -137,142 +127,133 @@
             synchronized(pending){
                 if(pending.isEmpty()&&log.isDebugEnabled()){
                     log.debug("Prefetch limit.");
-                }       
+                }
                 pending.addMessageLast(node);
             }
-            //we might be able to dispatch messages (i.e. not full() anymore)
-            dispatchMatched();
         }
     }
-    
 
-    public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws
Exception{
-        synchronized(pending){
-            try{
-                pending.reset();
-                while(pending.hasNext()){
-                    MessageReference node=pending.next();
-                    if(node.getMessageId().equals(mdn.getMessageId())){
-                        pending.remove();
-                        createMessageDispatch(node,node.getMessage());
-                        dispatched.addLast(node);
-                        
-                        return;
-                    }
+    public synchronized void processMessageDispatchNotification(MessageDispatchNotification
mdn) throws Exception{
+        try{
+            pending.reset();
+            while(pending.hasNext()){
+                MessageReference node=pending.next();
+                if(node.getMessageId().equals(mdn.getMessageId())){
+                    pending.remove();
+                    createMessageDispatch(node,node.getMessage());
+                    dispatched.addLast(node);
+                    return;
                 }
-            }finally{
-                pending.release();
             }
-            throw new JMSException("Slave broker out of sync with master: Dispatched message
("+mdn.getMessageId()
-                    +") was not in the pending list");
+        }finally{
+            pending.release();
         }
+        throw new JMSException("Slave broker out of sync with master: Dispatched message
("+mdn.getMessageId()
+                +") was not in the pending list");
     }
 
-    public void acknowledge(final ConnectionContext context,final MessageAck ack) throws
Exception{
+    public synchronized void acknowledge(final ConnectionContext context,final MessageAck
ack) throws Exception{
         // Handle the standard acknowledgment case.
         boolean callDispatchMatched=false;
-        synchronized(dispatched){
-            if(ack.isStandardAck()){
-                // Acknowledge all dispatched messages up till the message id of the acknowledgment.
-                int index=0;
-                boolean inAckRange=false;
-                for(Iterator iter=dispatched.iterator();iter.hasNext();){
-                    final MessageReference node=(MessageReference)iter.next();
-                    MessageId messageId=node.getMessageId();
-                    if(ack.getFirstMessageId()==null||ack.getFirstMessageId().equals(messageId)){
-                        inAckRange=true;
-                    }
-                    if(inAckRange){
-                        // Don't remove the nodes until we are committed.
-                        if(!context.isInTransaction()){
-                            dequeueCounter++;
-                            node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
-                            iter.remove();
-                        }else{
-                            // setup a Synchronization to remove nodes from the dispatched
list.
-                            context.getTransaction().addSynchronization(new Synchronization(){
-
-                                public void afterCommit() throws Exception{
-                                    synchronized(PrefetchSubscription.this){
-                                        dequeueCounter++;
-                                        dispatched.remove(node);
-                                        node.getRegionDestination().getDestinationStatistics().getDequeues()
-                                                .increment();
-                                        prefetchExtension--;
-                                    }
+        if(ack.isStandardAck()){
+            // Acknowledge all dispatched messages up till the message id of the acknowledgment.
+            int index=0;
+            boolean inAckRange=false;
+            for(Iterator iter=dispatched.iterator();iter.hasNext();){
+                final MessageReference node=(MessageReference)iter.next();
+                MessageId messageId=node.getMessageId();
+                if(ack.getFirstMessageId()==null||ack.getFirstMessageId().equals(messageId)){
+                    inAckRange=true;
+                }
+                if(inAckRange){
+                    // Don't remove the nodes until we are committed.
+                    if(!context.isInTransaction()){
+                        dequeueCounter++;
+                        node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
+                        iter.remove();
+                    }else{
+                        // setup a Synchronization to remove nodes from the dispatched list.
+                        context.getTransaction().addSynchronization(new Synchronization(){
+
+                            public void afterCommit() throws Exception{
+                                synchronized(PrefetchSubscription.this){
+                                    dequeueCounter++;
+                                    dispatched.remove(node);
+                                    node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
+                                    prefetchExtension--;
                                 }
+                            }
 
-                                public void afterRollback() throws Exception{
-                                    super.afterRollback();
-                                }
-                            });
-                        }
-                        index++;
-                        acknowledge(context,ack,node);
-                        if(ack.getLastMessageId().equals(messageId)){
-                            if(context.isInTransaction()){
-                                // extend prefetch window only if not a pulling consumer
-                                if(getPrefetchSize()!=0){
-                                    prefetchExtension=Math.max(prefetchExtension,index+1);
-                                }
-                            }else{
-                                prefetchExtension=Math.max(0,prefetchExtension-(index+1));
+                            public void afterRollback() throws Exception{
+                                super.afterRollback();
                             }
-                            callDispatchMatched=true;
-                            break;
-                        }
+                        });
                     }
-                }
-                // this only happens after a reconnect - get an ack which is not valid
-                if(!callDispatchMatched){
-                    log.info("Could not correlate acknowledgment with dispatched message:
"+ack);
-                }
-            }else if(ack.isDeliveredAck()){
-                // Message was delivered but not acknowledged: update pre-fetch counters.
-                // Acknowledge all dispatched messages up till the message id of the acknowledgment.
-                int index=0;
-                for(Iterator iter=dispatched.iterator();iter.hasNext();index++){
-                    final MessageReference node=(MessageReference)iter.next();
-                    if(ack.getLastMessageId().equals(node.getMessageId())){
-                        prefetchExtension=Math.max(prefetchExtension,index+1);
+                    index++;
+                    acknowledge(context,ack,node);
+                    if(ack.getLastMessageId().equals(messageId)){
+                        if(context.isInTransaction()){
+                            // extend prefetch window only if not a pulling consumer
+                            if(getPrefetchSize()!=0){
+                                prefetchExtension=Math.max(prefetchExtension,index+1);
+                            }
+                        }else{
+                            prefetchExtension=Math.max(0,prefetchExtension-(index+1));
+                        }
                         callDispatchMatched=true;
                         break;
                     }
                 }
-                if(!callDispatchMatched){
-                    throw new JMSException("Could not correlate acknowledgment with dispatched
message: "+ack);
+            }
+            // this only happens after a reconnect - get an ack which is not valid
+            if(!callDispatchMatched){
+                log.info("Could not correlate acknowledgment with dispatched message: "+ack);
+            }
+        }else if(ack.isDeliveredAck()){
+            // Message was delivered but not acknowledged: update pre-fetch counters.
+            // Acknowledge all dispatched messages up till the message id of the acknowledgment.
+            int index=0;
+            for(Iterator iter=dispatched.iterator();iter.hasNext();index++){
+                final MessageReference node=(MessageReference)iter.next();
+                if(ack.getLastMessageId().equals(node.getMessageId())){
+                    prefetchExtension=Math.max(prefetchExtension,index+1);
+                    callDispatchMatched=true;
+                    break;
                 }
-            }else if(ack.isPoisonAck()){
-                // TODO: what if the message is already in a DLQ???
-                // Handle the poison ACK case: we need to send the message to a DLQ
-                if(ack.isInTransaction())
-                    throw new JMSException("Poison ack cannot be transacted: "+ack);
-                // Acknowledge all dispatched messages up till the message id of the acknowledgment.
-                int index=0;
-                boolean inAckRange=false;
-                for(Iterator iter=dispatched.iterator();iter.hasNext();){
-                    final MessageReference node=(MessageReference)iter.next();
-                    MessageId messageId=node.getMessageId();
-                    if(ack.getFirstMessageId()==null||ack.getFirstMessageId().equals(messageId)){
-                        inAckRange=true;
-                    }
-                    if(inAckRange){
-                        sendToDLQ(context,node);
-                        node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
-                        iter.remove();
-                        dequeueCounter++;
-                        index++;
-                        acknowledge(context,ack,node);
-                        if(ack.getLastMessageId().equals(messageId)){
-                            prefetchExtension=Math.max(0,prefetchExtension-(index+1));
-                            callDispatchMatched=true;
-                            break;
-                        }
+            }
+            if(!callDispatchMatched){
+                throw new JMSException("Could not correlate acknowledgment with dispatched
message: "+ack);
+            }
+        }else if(ack.isPoisonAck()){
+            // TODO: what if the message is already in a DLQ???
+            // Handle the poison ACK case: we need to send the message to a DLQ
+            if(ack.isInTransaction())
+                throw new JMSException("Poison ack cannot be transacted: "+ack);
+            // Acknowledge all dispatched messages up till the message id of the acknowledgment.
+            int index=0;
+            boolean inAckRange=false;
+            for(Iterator iter=dispatched.iterator();iter.hasNext();){
+                final MessageReference node=(MessageReference)iter.next();
+                MessageId messageId=node.getMessageId();
+                if(ack.getFirstMessageId()==null||ack.getFirstMessageId().equals(messageId)){
+                    inAckRange=true;
+                }
+                if(inAckRange){
+                    sendToDLQ(context,node);
+                    node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
+                    iter.remove();
+                    dequeueCounter++;
+                    index++;
+                    acknowledge(context,ack,node);
+                    if(ack.getLastMessageId().equals(messageId)){
+                        prefetchExtension=Math.max(0,prefetchExtension-(index+1));
+                        callDispatchMatched=true;
+                        break;
                     }
                 }
-                if(!callDispatchMatched){
-                    throw new JMSException("Could not correlate acknowledgment with dispatched
message: "+ack);
-                }
+            }
+            if(!callDispatchMatched){
+                throw new JMSException("Could not correlate acknowledgment with dispatched
message: "+ack);
             }
         }
         if(callDispatchMatched){
@@ -293,7 +274,7 @@
      * @throws IOException
      * @throws Exception
      */
-    protected void sendToDLQ(final ConnectionContext context, final MessageReference node)
throws IOException, Exception {
+    protected void sendToDLQ(final ConnectionContext context,final MessageReference node)
throws IOException,Exception{
         // Send the message to the DLQ
         Message message=node.getMessage();
         if(message!=null){
@@ -301,142 +282,118 @@
             // sent,
             // it is only populated if the message is routed to another destination like
the DLQ
             DeadLetterStrategy deadLetterStrategy=node.getRegionDestination().getDeadLetterStrategy();
-            ActiveMQDestination deadLetterDestination=deadLetterStrategy.getDeadLetterQueueFor(message.getDestination());
-            BrokerSupport.resend(context, message, deadLetterDestination);
-
+            ActiveMQDestination deadLetterDestination=deadLetterStrategy
+                    .getDeadLetterQueueFor(message.getDestination());
+            BrokerSupport.resend(context,message,deadLetterDestination);
         }
     }
 
     /**
      * Used to determine if the broker can dispatch to the consumer.
+     * 
      * @return
      */
-    protected boolean isFull(){
-        return isSlaveBroker() || dispatched.size()-prefetchExtension>=info.getPrefetchSize();
+    protected synchronized boolean isFull(){
+        return isSlaveBroker()||dispatched.size()-prefetchExtension>=info.getPrefetchSize();
     }
-    
+
     /**
      * @return true when 60% or more room is left for dispatching messages
      */
     public boolean isLowWaterMark(){
-        return (dispatched.size()-prefetchExtension) <= (info.getPrefetchSize() *.4);
+        return (dispatched.size()-prefetchExtension)<=(info.getPrefetchSize()*.4);
     }
-    
+
     /**
      * @return true when 10% or less room is left for dispatching messages
      */
     public boolean isHighWaterMark(){
-        return (dispatched.size()-prefetchExtension) >= (info.getPrefetchSize() *.9);
+        return (dispatched.size()-prefetchExtension)>=(info.getPrefetchSize()*.9);
     }
-    
-    public int countBeforeFull() {
-        return info.getPrefetchSize() + prefetchExtension - dispatched.size();
+
+    public synchronized int countBeforeFull(){
+        return info.getPrefetchSize()+prefetchExtension-dispatched.size();
     }
-    
+
     public int getPendingQueueSize(){
-    	synchronized(pending) {
-    		return pending.size();
-    	}
+        synchronized(pending){
+            return pending.size();
+        }
     }
-    
+
     public int getDispatchedQueueSize(){
         synchronized(dispatched){
             return dispatched.size();
         }
     }
-    
+
     synchronized public long getDequeueCounter(){
         return dequeueCounter;
     }
-    
-    synchronized public long getDispatchedCounter() {
+
+    synchronized public long getDispatchedCounter(){
         return dispatchCounter;
     }
-    
-    synchronized public long getEnqueueCounter() {
+
+    synchronized public long getEnqueueCounter(){
         return enqueueCounter;
     }
-    
+
     public boolean isRecoveryRequired(){
         return pending.isRecoveryRequired();
     }
-    
+
     /**
      * optimize message consumer prefetch if the consumer supports it
-     *
+     * 
      */
     public void optimizePrefetch(){
-    	/*
-        if(info!=null&&info.isOptimizedAcknowledge()&&context!=null&&context.getConnection()!=null
-                        &&context.getConnection().isManageable()){
-            if(info.getCurrentPrefetchSize()!=info.getPrefetchSize() && isLowWaterMark()){
-                info.setCurrentPrefetchSize(info.getPrefetchSize());
-                updateConsumerPrefetch(info.getPrefetchSize());
-            }else if(info.getCurrentPrefetchSize()==info.getPrefetchSize() && isHighWaterMark()){
-                // want to purge any outstanding acks held by the consumer
-                info.setCurrentPrefetchSize(1);
-                updateConsumerPrefetch(1);
-            }
-        }
-        */
+        /*
+         * if(info!=null&&info.isOptimizedAcknowledge()&&context!=null&&context.getConnection()!=null
+         * &&context.getConnection().isManageable()){ if(info.getCurrentPrefetchSize()!=info.getPrefetchSize()
&&
+         * isLowWaterMark()){ info.setCurrentPrefetchSize(info.getPrefetchSize());
+         * updateConsumerPrefetch(info.getPrefetchSize()); }else
+         * if(info.getCurrentPrefetchSize()==info.getPrefetchSize() && isHighWaterMark()){
// want to purge any
+         * outstanding acks held by the consumer info.setCurrentPrefetchSize(1); updateConsumerPrefetch(1);
} }
+         */
     }
-    
-    public void add(ConnectionContext context,Destination destination) throws Exception{
+
+    public synchronized void add(ConnectionContext context,Destination destination) throws
Exception{
         super.add(context,destination);
-        synchronized(pending){
-            pending.add(context,destination);
-        }
+        pending.add(context,destination);
     }
 
-    public void remove(ConnectionContext context,Destination destination) throws Exception{
+    public synchronized void remove(ConnectionContext context,Destination destination) throws
Exception{
         super.remove(context,destination);
-        synchronized(pending){
-            pending.remove(context,destination);
-        }
+        pending.remove(context,destination);
     }
 
-
     protected void dispatchMatched() throws IOException{
-        if(!broker.isSlaveBroker() && dispatching.compareAndSet(false,true)){
+        if(!broker.isSlaveBroker()&&dispatching.compareAndSet(false,true)){
             try{
-                List toDispatch=null;
-                synchronized(pending){
-                    try{
-                        int numberToDispatch=countBeforeFull();
-                        if(numberToDispatch>0){
-                            int count=0;
-                            pending.reset();
-                            while(pending.hasNext()&&!isFull()&&count<numberToDispatch){
-                                MessageReference node=pending.next();
-                                if ( node == null )
-                                	break;
-                                
-                                if(canDispatch(node)){
-                                    pending.remove();
-                                    // Message may have been sitting in the pending list
a while
-                                    // waiting for the consumer to ak the message.
-                                    if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){
-                                        continue; // just drop it.
-                                    }
-                                    if(toDispatch==null){
-                                        toDispatch=new ArrayList();
-                                    }
-                                    toDispatch.add(node);
-                                    count++;
+                try{
+                    int numberToDispatch=countBeforeFull();
+                    if(numberToDispatch>0){
+                        int count=0;
+                        pending.reset();
+                        while(pending.hasNext()&&!isFull()&&count<numberToDispatch){
+                            MessageReference node=pending.next();
+                            if(node==null)
+                                break;
+                            if(canDispatch(node)){
+                                pending.remove();
+                                // Message may have been sitting in the pending list a while
+                                // waiting for the consumer to ak the message.
+                                if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){
+                                    continue; // just drop it.
                                 }
+                                dispatch(node);
+                                count++;
                             }
                         }
-                    }finally{
-                        pending.release();
-                    }
-                }
-                if(toDispatch!=null){
-                    synchronized(dispatched){
-                        for(int i=0;i<toDispatch.size();i++){
-                            MessageReference node=(MessageReference)toDispatch.get(i);
-                            dispatch(node);
-                        }
                     }
+                }finally{
+                    pending.release();
                 }
             }finally{
                 dispatching.set(false);
@@ -449,45 +406,44 @@
         if(message==null){
             return false;
         }
-        synchronized(dispatched){
-            // Make sure we can dispatch a message.
-            if(canDispatch(node)&&!isSlaveBroker()){
-                MessageDispatch md=createMessageDispatch(node,message);
-                // NULL messages don't count... they don't get Acked.
-                if(node!=QueueMessageReference.NULL_MESSAGE){
-                    dispatchCounter++;
-                    dispatched.addLast(node);
-                }else{
-                    prefetchExtension=Math.max(0,prefetchExtension-1);
-                }
-                if(info.isDispatchAsync()){
-                    md.setConsumer(new Runnable(){
+        // Make sure we can dispatch a message.
+        if(canDispatch(node)&&!isSlaveBroker()){
+            MessageDispatch md=createMessageDispatch(node,message);
+            // NULL messages don't count... they don't get Acked.
+            if(node!=QueueMessageReference.NULL_MESSAGE){
+                dispatchCounter++;
+                dispatched.addLast(node);
+            }else{
+                prefetchExtension=Math.max(0,prefetchExtension-1);
+            }
+            if(info.isDispatchAsync()){
+                md.setConsumer(new Runnable(){
 
-                        public void run(){
-                            // Since the message gets queued up in async dispatch, we don't
want to
-                            // decrease the reference count until it gets put on the wire.
-                            onDispatch(node,message);
-                        }
-                    });
-                    context.getConnection().dispatchAsync(md);
-                }else{
-                    context.getConnection().dispatchSync(md);
-                    onDispatch(node,message);
-                }
-                return true;
+                    public void run(){
+                        // Since the message gets queued up in async dispatch, we don't want
to
+                        // decrease the reference count until it gets put on the wire.
+                        onDispatch(node,message);
+                    }
+                });
+                context.getConnection().dispatchAsync(md);
             }else{
-                QueueMessageReference n = (QueueMessageReference) node;
-                return false;
+                context.getConnection().dispatchSync(md);
+                onDispatch(node,message);
             }
+            //System.err.println(broker.getBrokerName() + " " + this + " (" + enqueueCounter
+ ", " + dispatchCounter +") " + node);
+            return true;
+        }else{
+            QueueMessageReference n=(QueueMessageReference)node;
+            return false;
         }
     }
 
     protected void onDispatch(final MessageReference node,final Message message){
         if(node.getRegionDestination()!=null){
-        	if( node != QueueMessageReference.NULL_MESSAGE ) {
-            node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
-            context.getConnection().getStatistics().onMessageDequeue(message);
-        	}
+            if(node!=QueueMessageReference.NULL_MESSAGE){
+                node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
+                context.getConnection().getStatistics().onMessageDequeue(message);
+            }
             try{
                 dispatchMatched();
             }catch(IOException e){
@@ -495,14 +451,15 @@
             }
         }
     }
-    
+
     /**
      * inform the MessageConsumer on the client to change it's prefetch
+     * 
      * @param newPrefetch
      */
     public void updateConsumerPrefetch(int newPrefetch){
-        if (context != null && context.getConnection() != null && context.getConnection().isManageable()){
-            ConsumerControl cc = new ConsumerControl();
+        if(context!=null&&context.getConnection()!=null&&context.getConnection().isManageable()){
+            ConsumerControl cc=new ConsumerControl();
             cc.setConsumerId(info.getConsumerId());
             cc.setPrefetch(newPrefetch);
             context.getConnection().dispatchAsync(cc);
@@ -515,20 +472,20 @@
      * @return MessageDispatch
      */
     protected MessageDispatch createMessageDispatch(MessageReference node,Message message){
-        if( node == QueueMessageReference.NULL_MESSAGE ) {
-        MessageDispatch md=new MessageDispatch();
+        if(node==QueueMessageReference.NULL_MESSAGE){
+            MessageDispatch md=new MessageDispatch();
             md.setMessage(null);
-        md.setConsumerId(info.getConsumerId());
-            md.setDestination( null );
+            md.setConsumerId(info.getConsumerId());
+            md.setDestination(null);
             return md;
-        } else {
+        }else{
             MessageDispatch md=new MessageDispatch();
             md.setConsumerId(info.getConsumerId());
-        md.setDestination(node.getRegionDestination().getActiveMQDestination());
-        md.setMessage(message);
-        md.setRedeliveryCounter(node.getRedeliveryCounter());
-        return md;
-    }
+            md.setDestination(node.getRegionDestination().getActiveMQDestination());
+            md.setMessage(message);
+            md.setRedeliveryCounter(node.getRedeliveryCounter());
+            return md;
+        }
     }
 
     /**
@@ -537,7 +494,7 @@
      * @param node
      * @return false if the message should not be dispatched to the client (another sub may
have already dispatched it
      *         for example).
-     * @throws IOException 
+     * @throws IOException
      */
     abstract protected boolean canDispatch(MessageReference node) throws IOException;
 
@@ -547,6 +504,6 @@
      * @throws IOException
      */
     protected void acknowledge(ConnectionContext context,final MessageAck ack,final MessageReference
node)
-                    throws IOException{}
-
+            throws IOException{
+    }
 }



Mime
View raw message