activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r398015 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: PrefetchSubscription.java Subscription.java
Date Fri, 28 Apr 2006 21:24:08 GMT
Author: chirino
Date: Fri Apr 28 14:24:06 2006
New Revision: 398015

URL: http://svn.apache.org/viewcvs?rev=398015&view=rev
Log:
- Gaurd access to dispatched list ( a sync was missing).
- Added better exception messages to know what happened when a slave subscription gets out
of sync with the master.
- Implemented a simpler isFull()

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=398015&r1=398014&r2=398015&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Fri Apr 28 14:24:06 2006
@@ -60,35 +60,31 @@
 
     synchronized public void add(MessageReference node) throws Exception{
         enqueueCounter++;
-        if(!isFull()&&!isSlaveBroker()){
+        if(!isFull()){
             dispatch(node);
         }else{
             optimizePrefetch();
             synchronized(pending){
-                if( pending.isEmpty() )
-                    if (log.isDebugEnabled()){
-                        log.debug("Prefetch limit.");
-                    }
+                if( pending.isEmpty() ) {
+                    log.debug("Prefetch limit.");
+                }
                 pending.addLast(node);
             }
         }
     }
 
-    public void processMessageDispatchNotification(MessageDispatchNotification mdn){
+    synchronized public void processMessageDispatchNotification(MessageDispatchNotification
mdn) throws Exception {
         synchronized(pending){
             for(Iterator i=pending.iterator();i.hasNext();){
                 MessageReference node=(MessageReference) i.next();
                 if(node.getMessageId().equals(mdn.getMessageId())){
                     i.remove();
-                    try{
-                        MessageDispatch md=createMessageDispatch(node,node.getMessage());
-                        dispatched.addLast(node);
-                    }catch(Exception e){
-                        log.error("Problem processing MessageDispatchNotification: "+mdn,e);
-                    }
-                    break;
+                    createMessageDispatch(node,node.getMessage());
+                    dispatched.addLast(node);
+                    return;
                 }
             }
+            throw new JMSException("Slave broker out of sync with master: Dispatched message
("+mdn.getMessageId()+") was not in the pending list: "+pending);
         }
     }
 
@@ -178,7 +174,12 @@
             }
             throw new JMSException("Could not correlate acknowledgment with dispatched message:
"+ack);
         }
-        throw new JMSException("Invalid acknowledgment: "+ack);
+        
+        if( isSlaveBroker() ) {
+        	throw new JMSException("Slave broker out of sync with master: Acknowledgment ("+ack+")
was not in the dispatch list: "+dispatched);
+        } else {
+        	throw new JMSException("Invalid acknowledgment: "+ack);
+        }
     }
 
     /**
@@ -201,8 +202,12 @@
         }
     }
 
+    /**
+     * Used to determine if the broker can dispatch to the consumer.
+     * @return
+     */
     protected boolean isFull(){
-        return dispatched.size()-prefetchExtension>=info.getPrefetchSize();
+        return isSlaveBroker() || dispatched.size()-prefetchExtension>=info.getPrefetchSize();
     }
     
     /**

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java?rev=398015&r1=398014&r2=398015&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
Fri Apr 28 14:24:06 2006
@@ -95,8 +95,9 @@
     /**
      * Used by a Slave Broker to update dispatch infomation
      * @param mdn
+     * @throws Exception 
      */
-    void processMessageDispatchNotification(MessageDispatchNotification  mdn);
+    void processMessageDispatchNotification(MessageDispatchNotification  mdn) throws Exception;
     
     /**
      * @return true if the broker is currently in slave mode



Mime
View raw message