activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r490007 - /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Date Sun, 24 Dec 2006 08:57:34 GMT
Author: rajdavies
Date: Sun Dec 24 00:57:34 2006
New Revision: 490007

URL: http://svn.apache.org/viewvc?view=rev&rev=490007
Log:
fix for http://issues.apache.org/activemq/browse/AMQ-1108

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=490007&r1=490006&r2=490007
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Sun Dec 24 00:57:34 2006
@@ -18,12 +18,12 @@
 package org.apache.activemq.broker.region;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedList;
-
+import java.util.List;
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
-
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
@@ -56,11 +56,10 @@
     final protected LinkedList dispatched=new LinkedList();
     
     protected int prefetchExtension=0;
-    boolean dispatching=false;
-    
-    long enqueueCounter;
-    long dispatchCounter;
-    long dequeueCounter;
+        
+    protected long enqueueCounter;
+    protected long dispatchCounter;
+    protected long dequeueCounter;
     
     public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info,
PendingMessageCursor cursor)
                     throws  InvalidSelectorException{
@@ -123,8 +122,7 @@
 	}
         
     public void add(MessageReference node) throws Exception{
-        try {
-        boolean pendingEmpty = false;
+        boolean pendingEmpty=false;
         synchronized(pending){
             pendingEmpty=pending.isEmpty();
             enqueueCounter++;
@@ -134,17 +132,16 @@
         }else{
             optimizePrefetch();
             synchronized(pending){
-                if(log.isDebugEnabled() && pending.isEmpty()){
+                if(pending.isEmpty()&&log.isDebugEnabled()){
                     log.debug("Prefetch limit.");
                 }
                 pending.addMessageLast(node);
             }
-        }
-        }catch(Throwable e) {
-            e.printStackTrace();
-            
+            //we might be able to dispatch messages (i.e. not full() anymore)
+            dispatchMatched();
         }
     }
+    
 
     public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws
Exception{
         synchronized(pending){
@@ -169,6 +166,7 @@
 
     public void acknowledge(final ConnectionContext context,final MessageAck ack) throws
Exception{
         // Handle the standard acknowledgment case.
+        boolean callDispatchMatched=false;
         synchronized(dispatched){
             if(ack.isStandardAck()){
                 // Acknowledge all dispatched messages up till the message id of the acknowledgment.
@@ -216,13 +214,15 @@
                             }else{
                                 prefetchExtension=Math.max(0,prefetchExtension-(index+1));
                             }
-                            dispatchMatched();
-                            return;
+                            callDispatchMatched=true;
+                            break;
                         }
                     }
                 }
-                //this only happens after a reconnect - get an ack which is not valid
-                log.info("Could not correlate acknowledgment with dispatched message: "+ack);
+                // this only happens after a reconnect - get an ack which is not valid
+                if(!callDispatchMatched){
+                    log.info("Could not correlate acknowledgment with dispatched message:
"+ack);
+                }
             }else if(ack.isDeliveredAck()){
                 // Message was delivered but not acknowledged: update pre-fetch counters.
                 // Acknowledge all dispatched messages up till the message id of the acknowledgment.
@@ -231,11 +231,13 @@
                     final MessageReference node=(MessageReference)iter.next();
                     if(ack.getLastMessageId().equals(node.getMessageId())){
                         prefetchExtension=Math.max(prefetchExtension,index+1);
-                        dispatchMatched();
-                        return;
+                        callDispatchMatched=true;
+                        break;
                     }
                 }
-                throw new JMSException("Could not correlate acknowledgment with dispatched
message: "+ack);
+                if(!callDispatchMatched){
+                    throw new JMSException("Could not correlate acknowledgment with dispatched
message: "+ack);
+                }
             }else if(ack.isPoisonAck()){
                 // TODO: what if the message is already in a DLQ???
                 // Handle the poison ACK case: we need to send the message to a DLQ
@@ -259,14 +261,19 @@
                         acknowledge(context,ack,node);
                         if(ack.getLastMessageId().equals(messageId)){
                             prefetchExtension=Math.max(0,prefetchExtension-(index+1));
-                            dispatchMatched();
-                            return;
+                            callDispatchMatched=true;
+                            break;
                         }
                     }
                 }
-                
-                throw new JMSException("Could not correlate acknowledgment with dispatched
message: "+ack);
+                if(!callDispatchMatched){
+                    throw new JMSException("Could not correlate acknowledgment with dispatched
message: "+ack);
+                }
             }
+        }
+        if(callDispatchMatched){
+            dispatchMatched();
+        }else{
             if(isSlaveBroker()){
                 throw new JMSException("Slave broker out of sync with master: Acknowledgment
("+ack
                         +") was not in the dispatch list: "+dispatched);
@@ -366,40 +373,47 @@
         */
     }
     
-    public void add(ConnectionContext context, Destination destination) throws Exception
{
+    public void add(ConnectionContext context,Destination destination) throws Exception{
         super.add(context,destination);
-        pending.add(context,destination);
+        synchronized(pending){
+            pending.add(context,destination);
+        }
     }
 
-    public void remove(ConnectionContext context, Destination destination) throws Exception
{
+    public void remove(ConnectionContext context,Destination destination) throws Exception{
         super.remove(context,destination);
-        pending.remove(context,destination);
-       
+        synchronized(pending){
+            pending.remove(context,destination);
+        }
     }
 
 
     protected void dispatchMatched() throws IOException{
+        List toDispatch=null;
         synchronized(pending){
-            if(!dispatching){
-                dispatching=true;
-                try{
-                    pending.reset();
-                    while(pending.hasNext()&&!isFull()){
-                        MessageReference node=pending.next();
-                        pending.remove();
-                        
-                        // Message may have been sitting in the pending list a while
-                        // waiting for the consumer to ak the message.
-                		if( node != QueueMessageReference.NULL_MESSAGE && node.isExpired()
) {
-                			continue; // just drop it.
-                		}
-
-                        dispatch(node);
+            try{
+                pending.reset();
+                while(pending.hasNext()&&!isFull()){
+                    MessageReference node=pending.next();
+                    pending.remove();
+                    // Message may have been sitting in the pending list a while
+                    // waiting for the consumer to ak the message.
+                    if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){
+                        continue; // just drop it.
                     }
-                }finally{
-                    pending.release();
-                    dispatching=false;
+                    if(toDispatch==null){
+                        toDispatch=new ArrayList();
+                    }
+                    toDispatch.add(node);
                 }
+            }finally{
+                pending.release();
+            }
+        }
+        if(toDispatch!=null){
+            for(int i=0;i<toDispatch.size();i++){
+                MessageReference node=(MessageReference)toDispatch.get(i);
+                dispatch(node);
             }
         }
     }



Mime
View raw message