activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r474475 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: ./ cursors/ policy/
Date Mon, 13 Nov 2006 20:06:50 GMT
Author: rajdavies
Date: Mon Nov 13 12:06:49 2006
New Revision: 474475

URL: http://svn.apache.org/viewvc?view=rev&rev=474475
Log:
Split the internal messages in a Queue destination in two - messages accessed by the
PendingMessageCursor and work in progress or pagedInMessages - which are instances
of QueueMessageReference - hence keeping the semantics for Message Groups etc. and keeping
the majority of the 'fiddly bits' intact.


Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchPolicy.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchPolicy.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StrictOrderDispatchPolicy.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?view=diff&rev=474475&r1=474474&r2=474475
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Mon Nov 13 12:06:49 2006
@@ -58,7 +58,7 @@
     synchronized public void gc() {
     }
 
-    synchronized public void add(ConnectionContext context, Destination destination) throws Exception {
+    public void add(ConnectionContext context, Destination destination) throws Exception {
         super.add(context, destination);
         destinations.put(destination.getActiveMQDestination(), destination);
         if( active || keepDurableSubsActive ) {
@@ -68,7 +68,7 @@
         dispatchMatched();
     }
    
-    synchronized public void activate(ConnectionContext context, ConsumerInfo info) throws Exception {
+    public void activate(ConnectionContext context, ConsumerInfo info) throws Exception {
         if( !active ) {
             this.active = true;
             this.context = context;
@@ -79,38 +79,43 @@
                     topic.activate(context, this);
                 }
             }
-            pending.start();
+            synchronized(pending) {
+                pending.start();
+            }
             dispatchMatched();
         }
     }
 
     synchronized public void deactivate(boolean keepDurableSubsActive) throws Exception {        
         active=false;
-        pending.stop();
+        synchronized(pending){
+            pending.stop();
+        }
         if( !keepDurableSubsActive ) {
             for (Iterator iter = destinations.values().iterator(); iter.hasNext();) {
                 Topic topic = (Topic) iter.next();
                 topic.deactivate(context, this);
             }
         }
-        for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
-
-            // Mark the dispatched messages as redelivered for next time.
-            MessageReference node = (MessageReference) iter.next();
-            Integer count = (Integer) redeliveredMessages.get(node.getMessageId());
-            if( count !=null ) {
-                redeliveredMessages.put(node.getMessageId(), new Integer(count.intValue()+1));
-            } else {
-                redeliveredMessages.put(node.getMessageId(), new Integer(1));
-            }
-            if( keepDurableSubsActive ) {
-            	synchronized(pending) {
-            		pending.addMessageFirst(node);
-            	}
-            } else {
-                node.decrementReferenceCount();
+        synchronized(dispatched){
+            for(Iterator iter=dispatched.iterator();iter.hasNext();){
+                // Mark the dispatched messages as redelivered for next time.
+                MessageReference node=(MessageReference)iter.next();
+                Integer count=(Integer)redeliveredMessages.get(node.getMessageId());
+                if(count!=null){
+                    redeliveredMessages.put(node.getMessageId(),new Integer(count.intValue()+1));
+                }else{
+                    redeliveredMessages.put(node.getMessageId(),new Integer(1));
+                }
+                if(keepDurableSubsActive){
+                    synchronized(pending){
+                        pending.addMessageFirst(node);
+                    }
+                }else{
+                    node.decrementReferenceCount();
+                }
+                iter.remove();
             }
-            iter.remove();
         }
         
         if( !keepDurableSubsActive ) {
@@ -135,7 +140,7 @@
         return md;
     }
 
-    synchronized public void add(MessageReference node) throws Exception {
+    public void add(MessageReference node) throws Exception {
         if( !active && !keepDurableSubsActive ) {
             return;
         }
@@ -190,7 +195,7 @@
     /**
      * Release any references that we are holding.
      */
-    synchronized public void destroy() {
+    public void destroy() {
     	synchronized(pending) {
             pending.reset();
 	        while(pending.hasNext()) {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=474475&r1=474474&r2=474475
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Mon Nov 13 12:06:49 2006
@@ -77,7 +77,7 @@
     /**
      * Allows a message to be pulled on demand by a client
      */
-    synchronized public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
+    public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
     	// The slave should not deliver pull messages.  TODO: when the slave becomes a master,
     	// He should send a NULL message to all the consumers to 'wake them up' in case 
     	// they were waiting for a message.
@@ -111,7 +111,7 @@
      * Occurs when a pull times out.  If nothing has been dispatched
      * since the timeout was setup, then send the NULL message.
      */
-    synchronized private void pullTimeout(long dispatchCounterBeforePull) {    	
+    private void pullTimeout(long dispatchCounterBeforePull) {    	
     	if( dispatchCounterBeforePull == dispatchCounter ) {
         	try {
 				add(QueueMessageReference.NULL_MESSAGE);
@@ -122,15 +122,18 @@
     	}
 	}
         
-    synchronized public void add(MessageReference node) throws Exception{
-        enqueueCounter++;
-      
-        if(!isFull() && pending.isEmpty() ){
+    public void add(MessageReference node) throws Exception{
+        boolean pendingEmpty = false;
+        synchronized(pending){
+            pendingEmpty=pending.isEmpty();
+            enqueueCounter++;
+        }
+        if(!isFull()&&pendingEmpty){
             dispatch(node);
         }else{
             optimizePrefetch();
             synchronized(pending){
-                if( pending.isEmpty() ) {
+                if(log.isDebugEnabled() && pending.isEmpty()){
                     log.debug("Prefetch limit.");
                 }
                 pending.addMessageLast(node);
@@ -138,7 +141,7 @@
         }
     }
 
-    synchronized public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
+    public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
         synchronized(pending){
             pending.reset();
             while(pending.hasNext()){
@@ -154,107 +157,112 @@
         }
     }
 
-    synchronized public void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception{
+    public void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception{
         // Handle the standard acknowledgment case.
-        if(ack.isStandardAck()){
-            // Acknowledge all dispatched messages up till the message id of the acknowledgment.
-            int index=0;
-            boolean inAckRange=false;
-            for(Iterator iter=dispatched.iterator();iter.hasNext();){
-                final MessageReference node=(MessageReference) iter.next();
-                MessageId messageId=node.getMessageId();
-                if(ack.getFirstMessageId()==null||ack.getFirstMessageId().equals(messageId)){
-                    inAckRange=true;
-                }
-                if(inAckRange){
-                    // Don't remove the nodes until we are committed.
-                    if(!context.isInTransaction()){
-                    	dequeueCounter++;
-                    	node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
-                        iter.remove();
-                    }else{
-                        // setup a Synchronization to remove nodes from the dispatched list.
-                        context.getTransaction().addSynchronization(new Synchronization(){
-                            public void afterCommit() throws Exception{
-                                synchronized(PrefetchSubscription.this){
-                                	dequeueCounter++;
-                                    dispatched.remove(node);
-                                    node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
-                                    prefetchExtension--;
+        synchronized(dispatched){
+            if(ack.isStandardAck()){
+                // Acknowledge all dispatched messages up till the message id of the acknowledgment.
+                int index=0;
+                boolean inAckRange=false;
+                for(Iterator iter=dispatched.iterator();iter.hasNext();){
+                    final MessageReference node=(MessageReference)iter.next();
+                    MessageId messageId=node.getMessageId();
+                    if(ack.getFirstMessageId()==null||ack.getFirstMessageId().equals(messageId)){
+                        inAckRange=true;
+                    }
+                    if(inAckRange){
+                        // Don't remove the nodes until we are committed.
+                        if(!context.isInTransaction()){
+                            dequeueCounter++;
+                            node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
+                            iter.remove();
+                        }else{
+                            // setup a Synchronization to remove nodes from the dispatched list.
+                            context.getTransaction().addSynchronization(new Synchronization(){
+
+                                public void afterCommit() throws Exception{
+                                    synchronized(PrefetchSubscription.this){
+                                        dequeueCounter++;
+                                        dispatched.remove(node);
+                                        node.getRegionDestination().getDestinationStatistics().getDequeues()
+                                                .increment();
+                                        prefetchExtension--;
+                                    }
                                 }
-                            }
 
-                            public void afterRollback() throws Exception {
-                            	super.afterRollback();
-                            }
-                        });
-                    }
-                    index++;
-                    acknowledge(context,ack,node);
-                    if(ack.getLastMessageId().equals(messageId)){
-                        if(context.isInTransaction()) {
-                            // extend prefetch window only if not a pulling consumer
-                            if (getPrefetchSize() != 0) {
-                            prefetchExtension=Math.max(prefetchExtension,index+1);
-                            }
+                                public void afterRollback() throws Exception{
+                                    super.afterRollback();
+                                }
+                            });
                         }
-                        else {
-                            prefetchExtension=Math.max(0,prefetchExtension-(index+1));
+                        index++;
+                        acknowledge(context,ack,node);
+                        if(ack.getLastMessageId().equals(messageId)){
+                            if(context.isInTransaction()){
+                                // extend prefetch window only if not a pulling consumer
+                                if(getPrefetchSize()!=0){
+                                    prefetchExtension=Math.max(prefetchExtension,index+1);
+                                }
+                            }else{
+                                prefetchExtension=Math.max(0,prefetchExtension-(index+1));
+                            }
+                            dispatchMatched();
+                            return;
                         }
-                        dispatchMatched();
-                        return;
                     }
                 }
-            }
-            log.info("Could not correlate acknowledgment with dispatched message: "+ack);
-        }else if(ack.isDeliveredAck()){
-            // Message was delivered but not acknowledged: update pre-fetch counters.
-            // Acknowledge all dispatched messages up till the message id of the acknowledgment.
-            int index=0;
-            for(Iterator iter=dispatched.iterator();iter.hasNext();index++){
-                final MessageReference node=(MessageReference) iter.next();
-                if(ack.getLastMessageId().equals(node.getMessageId())){
-                    prefetchExtension=Math.max(prefetchExtension,index+1);
-                    dispatchMatched();
-                    return;
-                }
-            }
-            throw new JMSException("Could not correlate acknowledgment with dispatched message: "+ack);
-        }else if(ack.isPoisonAck()){
-            // TODO: what if the message is already in a DLQ???
-            // Handle the poison ACK case: we need to send the message to a DLQ
-            if(ack.isInTransaction())
-                throw new JMSException("Poison ack cannot be transacted: "+ack);
-            // Acknowledge all dispatched messages up till the message id of the acknowledgment.
-            int index=0;
-            boolean inAckRange=false;
-            for(Iterator iter=dispatched.iterator();iter.hasNext();){
-                final MessageReference node=(MessageReference) iter.next();
-                MessageId messageId=node.getMessageId();
-                if(ack.getFirstMessageId()==null||ack.getFirstMessageId().equals(messageId)){
-                    inAckRange=true;
-                }
-                if(inAckRange){
-                    sendToDLQ(context, node);
-                    node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
-                    iter.remove();
-                    dequeueCounter++;
-                    index++;
-                    acknowledge(context,ack,node);
-                    if(ack.getLastMessageId().equals(messageId)){
-                        prefetchExtension=Math.max(0,prefetchExtension-(index+1));
+                //this only happens after a reconnect - get an ack which is not valid
+                log.info("Could not correlate acknowledgment with dispatched message: "+ack);
+            }else if(ack.isDeliveredAck()){
+                // Message was delivered but not acknowledged: update pre-fetch counters.
+                // Acknowledge all dispatched messages up till the message id of the acknowledgment.
+                int index=0;
+                for(Iterator iter=dispatched.iterator();iter.hasNext();index++){
+                    final MessageReference node=(MessageReference)iter.next();
+                    if(ack.getLastMessageId().equals(node.getMessageId())){
+                        prefetchExtension=Math.max(prefetchExtension,index+1);
                         dispatchMatched();
                         return;
                     }
                 }
+                throw new JMSException("Could not correlate acknowledgment with dispatched message: "+ack);
+            }else if(ack.isPoisonAck()){
+                // TODO: what if the message is already in a DLQ???
+                // Handle the poison ACK case: we need to send the message to a DLQ
+                if(ack.isInTransaction())
+                    throw new JMSException("Poison ack cannot be transacted: "+ack);
+                // Acknowledge all dispatched messages up till the message id of the acknowledgment.
+                int index=0;
+                boolean inAckRange=false;
+                for(Iterator iter=dispatched.iterator();iter.hasNext();){
+                    final MessageReference node=(MessageReference)iter.next();
+                    MessageId messageId=node.getMessageId();
+                    if(ack.getFirstMessageId()==null||ack.getFirstMessageId().equals(messageId)){
+                        inAckRange=true;
+                    }
+                    if(inAckRange){
+                        sendToDLQ(context,node);
+                        node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
+                        iter.remove();
+                        dequeueCounter++;
+                        index++;
+                        acknowledge(context,ack,node);
+                        if(ack.getLastMessageId().equals(messageId)){
+                            prefetchExtension=Math.max(0,prefetchExtension-(index+1));
+                            dispatchMatched();
+                            return;
+                        }
+                    }
+                }
+                
+                throw new JMSException("Could not correlate acknowledgment with dispatched message: "+ack);
+            }
+            if(isSlaveBroker()){
+                throw new JMSException("Slave broker out of sync with master: Acknowledgment ("+ack
+                        +") was not in the dispatch list: "+dispatched);
+            }else{
+                log.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): "+ack);
             }
-            throw new JMSException("Could not correlate acknowledgment with dispatched message: "+ack);
-        }
-        
-        if( isSlaveBroker() ) {
-        	throw new JMSException("Slave broker out of sync with master: Acknowledgment ("+ack+") was not in the dispatch list: "+dispatched);
-        } else {
-        	log.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): "+ack);
         }
     }
 
@@ -306,8 +314,10 @@
     	}
     }
     
-    synchronized public int getDispatchedQueueSize(){
-        return dispatched.size();
+    public int getDispatchedQueueSize(){
+        synchronized(dispatched){
+            return dispatched.size();
+        }
     }
     
     synchronized public long getDequeueCounter(){
@@ -359,17 +369,19 @@
 
 
     protected void dispatchMatched() throws IOException{
-        if(!dispatching){
-            dispatching=true;
-            try{
-                pending.reset();
-                while(pending.hasNext()&&!isFull()){
-                    MessageReference node=pending.next();
-                    pending.remove();
-                    dispatch(node);
+        synchronized(pending){
+            if(!dispatching){
+                dispatching=true;
+                try{
+                    pending.reset();
+                    while(pending.hasNext()&&!isFull()){
+                        MessageReference node=pending.next();
+                        pending.remove();
+                        dispatch(node);
+                    }
+                }finally{
+                    dispatching=false;
                 }
-            }finally{
-                dispatching=false;
             }
         }
     }
@@ -379,38 +391,39 @@
         if(message==null){
             return false;
         }
-        // Make sure we can dispatch a message.
-        if(canDispatch(node)&&!isSlaveBroker()){
-        	
-            MessageDispatch md=createMessageDispatch(node,message);
-            // NULL messages don't count... they don't get Acked.
-            if( node != QueueMessageReference.NULL_MESSAGE ) {
-            dispatchCounter++;
-            dispatched.addLast(node);            
-            } else {
-            	prefetchExtension=Math.max(0,prefetchExtension-1);
-            }
-            
-            if(info.isDispatchAsync()){
-                md.setConsumer(new Runnable(){
-                    public void run(){
-                        // Since the message gets queued up in async dispatch, we don't want to
-                        // decrease the reference count until it gets put on the wire.
-                        onDispatch(node,message);
-                    }
-                });
-                context.getConnection().dispatchAsync(md);
+        synchronized(dispatched){
+            // Make sure we can dispatch a message.
+            if(canDispatch(node)&&!isSlaveBroker()){
+                MessageDispatch md=createMessageDispatch(node,message);
+                // NULL messages don't count... they don't get Acked.
+                if(node!=QueueMessageReference.NULL_MESSAGE){
+                    dispatchCounter++;
+                    dispatched.addLast(node);
+                }else{
+                    prefetchExtension=Math.max(0,prefetchExtension-1);
+                }
+                if(info.isDispatchAsync()){
+                    md.setConsumer(new Runnable(){
+
+                        public void run(){
+                            // Since the message gets queued up in async dispatch, we don't want to
+                            // decrease the reference count until it gets put on the wire.
+                            onDispatch(node,message);
+                        }
+                    });
+                    context.getConnection().dispatchAsync(md);
+                }else{
+                    context.getConnection().dispatchSync(md);
+                    onDispatch(node,message);
+                }
+                return true;
             }else{
-                context.getConnection().dispatchSync(md);
-                onDispatch(node,message);
+                return false;
             }
-            return true;
-        } else {
-            return false;
         }
     }
 
-    synchronized protected void onDispatch(final MessageReference node,final Message message){
+    protected void onDispatch(final MessageReference node,final Message message){
         if(node.getRegionDestination()!=null){
         	if( node != QueueMessageReference.NULL_MESSAGE ) {
             node.getRegionDestination().getDestinationStatistics().getDispatched().increment();

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=474475&r1=474474&r2=474475
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Mon Nov 13 12:06:49 2006
@@ -42,6 +42,8 @@
 import org.apache.activemq.selector.SelectorParser;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.thread.Task;
+import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.thread.Valve;
 import org.apache.activemq.transaction.Synchronization;
@@ -64,27 +66,32 @@
  * 
  * @version $Revision: 1.28 $
  */
-public class Queue implements Destination {
+public class Queue implements Destination, Task {
 
     private final Log log;
 
-    protected final ActiveMQDestination destination;
-    protected final List consumers = new CopyOnWriteArrayList();
-    protected final Valve dispatchValve = new Valve(true);
-    protected final UsageManager usageManager;
-    protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
-    protected  PendingMessageCursor messages = new VMPendingMessageCursor();
+    private final ActiveMQDestination destination;
+    private final List consumers = new CopyOnWriteArrayList();
+    private final Valve dispatchValve = new Valve(true);
+    private final UsageManager usageManager;
+    private final DestinationStatistics destinationStatistics = new DestinationStatistics();
+    private  PendingMessageCursor messages = new VMPendingMessageCursor();
+    private final LinkedList pagedInMessages = new LinkedList();
 
     private LockOwner exclusiveOwner;
     private MessageGroupMap messageGroupOwners;
 
-    protected long garbageSize = 0;
-    protected long garbageSizeBeforeCollection = 1000;
+    private int garbageSize = 0;
+    private int garbageSizeBeforeCollection = 1000;
     private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
-    protected final MessageStore store;
-    protected int highestSubscriptionPriority = Integer.MIN_VALUE;
+    private final MessageStore store;
+    private int highestSubscriptionPriority = Integer.MIN_VALUE;
     private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
     private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
+    private int maximumPagedInMessages = garbageSizeBeforeCollection * 2;
+    private final MessageEvaluationContext queueMsgConext = new MessageEvaluationContext();
+    private final Object exclusiveLockMutex = new Object();
+    private TaskRunner taskRunner;
 
     public Queue(ActiveMQDestination destination, final UsageManager memoryManager, MessageStore store, DestinationStatistics parentStats,
             TaskRunnerFactory taskFactory) throws Exception {
@@ -92,6 +99,7 @@
         this.usageManager = new UsageManager(memoryManager);
         this.usageManager.setLimit(Long.MAX_VALUE);
         this.store = store;
+        this.taskRunner = taskFactory.createTaskRunner(this, "Queue  "+destination.getPhysicalName());
 
         // Let the store know what usage manager we are using so that he can
         // flush messages to disk
@@ -106,43 +114,57 @@
         
     }
     
-    public void initialize() throws Exception {
-        if (store != null) {
+    public void initialize() throws Exception{
+        if(store!=null){
             // Restore the persistent messages.
-            store.recover(new MessageRecoveryListener() {
-                public void recoverMessage(Message message) {
-                    message.setRegionDestination(Queue.this);
-                    MessageReference reference = createMessageReference(message);
-                    synchronized (messages) {
-                        try{
-                            messages.addMessageLast(reference);
-                        }catch(Exception e){
-                           log.fatal("Failed to add message to cursor",e);
+            messages.start();
+            if(messages.isRecoveryRequired()){
+                store.recover(new MessageRecoveryListener(){
+
+                    public void recoverMessage(Message message){
+                        message.setRegionDestination(Queue.this);
+                        synchronized(messages){
+                            try{
+                                messages.addMessageLast(message);
+                            }catch(Exception e){
+                                log.fatal("Failed to add message to cursor",e);
+                            }
                         }
+                        destinationStatistics.getMessages().increment();
                     }
-                    reference.decrementReferenceCount();
-                    destinationStatistics.getMessages().increment();
-                }
 
-                public void recoverMessageReference(String messageReference) throws Exception {
-                    throw new RuntimeException("Should not be called.");
-                }
+                    public void recoverMessageReference(String messageReference) throws Exception{
+                        throw new RuntimeException("Should not be called.");
+                    }
 
-                public void finished() {
-                }
-            });
+                    public void finished(){
+                    }
+                });
+            }
         }
     }
 
-    public synchronized boolean lock(MessageReference node, LockOwner lockOwner) {
-        if (exclusiveOwner == lockOwner)
-            return true;
-        if (exclusiveOwner != null)
-            return false;
-        if (lockOwner.getLockPriority() < highestSubscriptionPriority)
-            return false;
-        if (lockOwner.isLockExclusive()) {
-            exclusiveOwner = lockOwner;
+    /**
+     * Lock a node
+     * @param node
+     * @param lockOwner
+     * @return true if can be locked
+     * @see org.apache.activemq.broker.region.Destination#lock(org.apache.activemq.broker.region.MessageReference, org.apache.activemq.broker.region.LockOwner)
+     */
+    public boolean lock(MessageReference node,LockOwner lockOwner){
+        synchronized(exclusiveLockMutex){
+            if(exclusiveOwner==lockOwner){
+                return true;
+            }
+            if(exclusiveOwner!=null){
+                return false;
+            }
+            if(lockOwner.getLockPriority()<highestSubscriptionPriority){
+                return false;
+            }
+            if(lockOwner.isLockExclusive()){
+                exclusiveOwner=lockOwner;
+            }
         }
         return true;
     }
@@ -150,16 +172,13 @@
     public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
         sub.add(context, this);
         destinationStatistics.getConsumers().increment();
+        maximumPagedInMessages += sub.getConsumerInfo().getPrefetchSize();
 
-        // synchronize with dispatch method so that no new messages are sent
-        // while
-        // setting up a subscription. avoid out of order messages, duplicates
-        // etc.
-        dispatchValve.turnOff();
-
-        MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
-        try {
-            synchronized (consumers) {
+        
+        
+        MessageEvaluationContext msgContext=context.getMessageEvaluationContext();
+        try{
+            synchronized(consumers){
                 if (sub.getConsumerInfo().isExclusive()) {
                     // Add to front of list to ensure that an exclusive consumer gets all messages
                     // before non-exclusive consumers
@@ -167,40 +186,37 @@
                 } else {
                     consumers.add(sub);
                 }
-
-                if (sub.getConsumerInfo().getPriority() > highestSubscriptionPriority) {
-                    highestSubscriptionPriority = sub.getConsumerInfo().getPriority();
-                }
             }
-
-            //highestSubscriptionPriority = calcHighestSubscriptionPriority();
+            // page in messages
+            doPageIn();
+            // synchronize with dispatch method so that no new messages are sent
+            // while
+            // setting up a subscription. avoid out of order messages, duplicates
+            // etc.
+            dispatchValve.turnOff();
+            if (sub.getConsumerInfo().getPriority() > highestSubscriptionPriority) {
+                highestSubscriptionPriority = sub.getConsumerInfo().getPriority();
+            }
             msgContext.setDestination(destination);
-
-            synchronized (messages) {
+            synchronized(pagedInMessages){
                 // Add all the matching messages in the queue to the
                 // subscription.
-                messages.reset();
-                while(messages.hasNext()) {
-
-                    QueueMessageReference node = (QueueMessageReference) messages.next();
-                    if (node.isDropped()) {
+                for(Iterator i=pagedInMessages.iterator();i.hasNext();){
+                    QueueMessageReference node=(QueueMessageReference)i.next();
+                    if(node.isDropped()){
                         continue;
                     }
-
-                    try {
+                    try{
                         msgContext.setMessageReference(node);
-                        if (sub.matches(node, msgContext)) {
+                        if(sub.matches(node,msgContext)){
                             sub.add(node);
                         }
-                    }
-                    catch (IOException e) {
-                        log.warn("Could not load message: " + e, e);
+                    }catch(IOException e){
+                        log.warn("Could not load message: "+e,e);
                     }
                 }
             }
-
-        }
-        finally {
+        }finally{
             msgContext.clear();
             dispatchValve.turnOn();
         }
@@ -209,6 +225,7 @@
     public void removeSubscription(ConnectionContext context, Subscription sub) throws Exception {
 
         destinationStatistics.getConsumers().decrement();
+        maximumPagedInMessages -= sub.getConsumerInfo().getPrefetchSize();
 
         // synchronize with dispatch method so that no new messages are sent
         // while
@@ -240,10 +257,9 @@
 
                     // lets copy the messages to dispatch to avoid deadlock
                     List messagesToDispatch = new ArrayList();
-                    synchronized (messages) {
-                        messages.reset();
-                        while(messages.hasNext()) {
-                            QueueMessageReference node = (QueueMessageReference) messages.next();
+                    synchronized (pagedInMessages) {
+                        for(Iterator i =  pagedInMessages.iterator();i.hasNext();) {
+                            QueueMessageReference node = (QueueMessageReference) i.next();
                             if (node.isDropped()) {
                                 continue;
                             }
@@ -264,7 +280,7 @@
                         node.incrementRedeliveryCounter();
                         node.unlock();
                         msgContext.setMessageReference(node);
-                        dispatchPolicy.dispatch(context, node, msgContext, consumers);
+                        dispatchPolicy.dispatch(node, msgContext, consumers);
                     }
                 }
                 finally {
@@ -278,40 +294,31 @@
 
     }
 
-    public void send(final ConnectionContext context, final Message message) throws Exception {
-
-        if (context.isProducerFlowControl()) {
-            if (usageManager.isSendFailIfNoSpace() && usageManager.isFull()) {
+    public void send(final ConnectionContext context,final Message message) throws Exception{
+        if(context.isProducerFlowControl()){
+            if(usageManager.isSendFailIfNoSpace()&&usageManager.isFull()){
                 throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached");
-            }
-            else {
+            }else{
                 usageManager.waitForSpace();
             }
         }
-
         message.setRegionDestination(this);
-
-        if (store != null && message.isPersistent())
+        if (store != null && message.isPersistent()) {
             store.addMessage(context, message);
-
-        final MessageReference node = createMessageReference(message);
-        try {
-
-            if (context.isInTransaction()) {
-                context.getTransaction().addSynchronization(new Synchronization() {
-                    public void afterCommit() throws Exception {
-                        dispatch(context, node, message);
-                    }
-                });
-            }
-            else {
-                dispatch(context, node, message);
-            }
         }
-        finally {
-            node.decrementReferenceCount();
+        if(context.isInTransaction()){
+            context.getTransaction().addSynchronization(new Synchronization(){
+
+                public void afterCommit() throws Exception{
+                    sendMessage(context,message);
+                }
+            });
+        }else{
+            sendMessage(context,message);
         }
     }
+       
+    
 
     public void dispose(ConnectionContext context) throws IOException {
         if (store != null) {
@@ -324,30 +331,34 @@
         dropEvent(false);
     }
 
-    public void dropEvent(boolean skipGc) {
+    public void dropEvent(boolean skipGc){
         // TODO: need to also decrement when messages expire.
         destinationStatistics.getMessages().decrement();
-        synchronized (messages) {
+        synchronized(pagedInMessages){
             garbageSize++;
-            if (!skipGc && garbageSize > garbageSizeBeforeCollection) {
-                gc();
-            }
+        }
+        if(!skipGc&&garbageSize>garbageSizeBeforeCollection){
+            gc();
         }
     }
 
     public void gc() {
-        synchronized (messages) {
-            messages.resetForGC();
-            while(messages.hasNext()) {
+        synchronized (pagedInMessages) {
+            for(Iterator i = pagedInMessages.iterator(); i.hasNext();) {
                 // Remove dropped messages from the queue.
-                QueueMessageReference node = (QueueMessageReference) messages.next();
+                QueueMessageReference node = (QueueMessageReference) i.next();
                 if (node.isDropped()) {
                     garbageSize--;
-                    messages.remove();
+                    i.remove();
                     continue;
                 }
             }
         }
+        try{
+            taskRunner.wakeup();
+        }catch(InterruptedException e){
+            log.warn("Task Runner failed to wakeup ",e);
+        }
     }
 
     public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException {
@@ -390,6 +401,9 @@
     }
 
     public void stop() throws Exception {
+        if( taskRunner!=null ) {
+            taskRunner.shutdown();
+        }
     }
 
     // Properties
@@ -455,38 +469,12 @@
     // Implementation methods
     // -------------------------------------------------------------------------
     private MessageReference createMessageReference(Message message) {
-        return new IndirectMessageReference(this, store, message);
-    }
-
-    private void dispatch(ConnectionContext context, MessageReference node, Message message) throws Exception {
-
-        dispatchValve.increment();
-        MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
-        try {
-            destinationStatistics.getEnqueues().increment();
-	    destinationStatistics.getMessages().increment();
-            synchronized (messages) {
-                messages.addMessageLast(node);
-            }
-
-            synchronized (consumers) {
-                if (consumers.isEmpty()) {
-                    log.debug("No subscriptions registered, will not dispatch message at this time.");
-                    return;
-                }
-            }
-
-            msgContext.setDestination(destination);
-            msgContext.setMessageReference(node);
-
-            dispatchPolicy.dispatch(context, node, msgContext, consumers);
-        }
-        finally {
-            msgContext.clear();
-            dispatchValve.decrement();
-        }
+        MessageReference result =  new IndirectMessageReference(this, store, message);
+        result.decrementReferenceCount();
+        return result;
     }
 
+    
     private int calcHighestSubscriptionPriority() {
         int rc = Integer.MIN_VALUE;
         synchronized (consumers) {
@@ -506,11 +494,28 @@
 
     public Message[] browse() {
         ArrayList l = new ArrayList();
+        synchronized(pagedInMessages) {
+            for (Iterator i = pagedInMessages.iterator();i.hasNext();) {
+                MessageReference r = (MessageReference)i.next();
+                r.incrementReferenceCount();
+                try {
+                    Message m = r.getMessage();
+                    if (m != null) {
+                        l.add(m);
+                    }
+                }catch(IOException e){
+                    log.error("caught an exception brwsing " + this,e);
+                }
+                finally {
+                    r.decrementReferenceCount();
+                }
+            }
+        }
         synchronized (messages) {
             messages.reset();
             while(messages.hasNext()) {
                 try {
-                    MessageReference r = (MessageReference) messages.next();
+                    MessageReference r = messages.next();
                     r.incrementReferenceCount();
                     try {
                         Message m = r.getMessage();
@@ -523,6 +528,7 @@
                     }
                 }
                 catch (IOException e) {
+                    log.error("caught an exception brwsing " + this,e);
                 }
             }
         }
@@ -535,7 +541,7 @@
             messages.reset();
             while(messages.hasNext()) {
                 try {
-                    MessageReference r = (MessageReference) messages.next();
+                    MessageReference r = messages.next();
                     if (messageId.equals(r.getMessageId().toString())) {
                         r.incrementReferenceCount();
                         try {
@@ -551,19 +557,22 @@
                     }
                 }
                 catch (IOException e) {
+                    log.error("got an exception retrieving message " + messageId);
                 }
             }
         }
         return null;
     }
 
-    public void purge() {
-        synchronized (messages) {
+    public void purge() throws Exception {
+        
+            doDispatch(doPageIn());
+        
+        synchronized (pagedInMessages) {
             ConnectionContext c = createConnectionContext();
-            messages.reset();
-            while(messages.hasNext()) {
+            for(Iterator i = pagedInMessages.iterator(); i.hasNext();){
                 try {
-                    QueueMessageReference r = (QueueMessageReference) messages.next();
+                    QueueMessageReference r = (QueueMessageReference) i.next();
 
                     // We should only delete messages that can be locked.
                     if (r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER)) {
@@ -618,20 +627,18 @@
      * @return the number of messages removed
      */
     public int removeMatchingMessages(MessageReferenceFilter filter, int maximumMessages) throws Exception {
+        doDispatch(doPageIn());
         int counter = 0;
-        synchronized (messages) {
+        synchronized (pagedInMessages) {
             ConnectionContext c = createConnectionContext();
-            messages.reset();
-            while(messages.hasNext()) {
-                IndirectMessageReference r = (IndirectMessageReference) messages.next();
+           for(Iterator i = pagedInMessages.iterator(); i.hasNext();) {
+               IndirectMessageReference r = (IndirectMessageReference) i.next();
                 if (filter.evaluate(c, r)) {
-                    // We should only delete messages that can be locked.
-                    if (lockMessage(r)) {
-                        removeMessage(c, r);
-                        if (++counter >= maximumMessages && maximumMessages > 0) {
-                            break;
-                        }
+                    removeMessage(c, r);
+                    if (++counter >= maximumMessages && maximumMessages > 0) {
+                        break;
                     }
+                    
                 }
             }
         }
@@ -669,11 +676,11 @@
      * @return the number of messages copied
      */
     public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception {
+        doDispatch(doPageIn());
         int counter = 0;
-        synchronized (messages) {
-            messages.reset();
-            while(messages.hasNext()) {
-                MessageReference r = (MessageReference) messages.next();
+        synchronized (pagedInMessages) {
+            for(Iterator i = pagedInMessages.iterator(); i.hasNext();) {
+                MessageReference r = (MessageReference) i.next();
                 if (filter.evaluate(context, r)) {
                     r.incrementReferenceCount();
                     try {
@@ -719,11 +726,11 @@
      * Moves the messages matching the given filter up to the maximum number of matched messages
      */
     public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception {
+        doDispatch(doPageIn());
         int counter = 0;
-        synchronized (messages) {
-            messages.reset();
-            while(messages.hasNext()) {
-                IndirectMessageReference r = (IndirectMessageReference) messages.next();
+        synchronized (pagedInMessages) {
+            for(Iterator i = pagedInMessages.iterator(); i.hasNext();) {
+                IndirectMessageReference r = (IndirectMessageReference) i.next();
                 if (filter.evaluate(context, r)) {
                     // We should only move messages that can be locked.
                     if (lockMessage(r)) {
@@ -745,6 +752,19 @@
         }
         return counter;
     }
+    
+    /**
+     * @return
+     * @see org.apache.activemq.thread.Task#iterate()
+     */
+    public boolean iterate(){
+        try{
+            doDispatch(doPageIn(false));
+         }catch(Exception e){
+             log.error("Failed to page in more queue messages ",e);
+         }
+        return false;
+    }
 
     protected MessageReferenceFilter createMessageIdFilter(final String messageId) {
         return new MessageReferenceFilter() {
@@ -771,6 +791,7 @@
         };
     }
 
+        
     protected void removeMessage(ConnectionContext c, IndirectMessageReference r) throws IOException {
         MessageAck ack = new MessageAck();
         ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
@@ -790,4 +811,65 @@
         answer.getMessageEvaluationContext().setDestination(getActiveMQDestination());
         return answer;
     }
+    
+    private void sendMessage(final ConnectionContext context,Message msg) throws Exception{
+        
+        synchronized(messages){
+            messages.addMessageLast(msg);
+        }
+        destinationStatistics.getEnqueues().increment();
+        destinationStatistics.getMessages().increment();
+        doDispatch(doPageIn(false));
+    }
+    
+    private List doPageIn() throws Exception{
+        return doPageIn(true);
+    }
+    private List doPageIn(boolean force) throws Exception{
+        final int toPageIn=maximumPagedInMessages-pagedInMessages.size();
+        List result=null;
+        if((force || !consumers.isEmpty())&&toPageIn>0){
+            try{
+                dispatchValve.increment();
+                int count=0;
+                result=new ArrayList(toPageIn);
+                synchronized(messages){
+                    messages.reset();
+                    while(messages.hasNext()&&count<toPageIn){
+                        MessageReference node=messages.next();
+                        messages.remove();
+                        node=createMessageReference(node.getMessage());
+                        result.add(node);
+                        count++;
+                    }
+                }
+                synchronized(pagedInMessages){
+                    pagedInMessages.addAll(result);
+                }
+            }finally{
+                queueMsgConext.clear();
+                dispatchValve.decrement();
+            }
+        }
+        return result;
+    }
+
+    private void doDispatch(List list) throws Exception{
+        if(list!=null&&!list.isEmpty()){
+            try{
+                dispatchValve.increment();
+                for(int i=0;i<list.size();i++){
+                    MessageReference node=(MessageReference)list.get(i);
+                    queueMsgConext.setDestination(destination);
+                    queueMsgConext.setMessageReference(node);
+                    dispatchPolicy.dispatch(node,queueMsgConext,consumers);
+                }
+            }finally{
+                queueMsgConext.clear();
+                dispatchValve.decrement();
+            }
+        }
+    }
+
+    
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=474475&r1=474474&r2=474475
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Mon Nov 13 12:06:49 2006
@@ -410,7 +410,7 @@
             msgContext.setDestination(destination);
             msgContext.setMessageReference(message);
 
-            if (!dispatchPolicy.dispatch(context, message, msgContext, consumers)) {
+            if (!dispatchPolicy.dispatch(message, msgContext, consumers)) {
                 onMessageWithNoConsumers(context, message);
             }
         }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?view=diff&rev=474475&r1=474474&r2=474475
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java Mon Nov 13 12:06:49 2006
@@ -92,4 +92,11 @@
     public void resetForGC(){
         reset();
     }
+
+    /**
+     * @param node
+     * @see org.apache.activemq.broker.region.cursors.PendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference)
+     */
+    public void remove(MessageReference node){
+    }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?view=diff&rev=474475&r1=474474&r2=474475
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Mon Nov 13 12:06:49 2006
@@ -117,7 +117,10 @@
     public void remove(){
         iter.remove();
     }
-
+    
+    public void remove(MessageReference node){
+        list.remove(node);
+    }
     /**
      * @return the number of pending messages
      */

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?view=diff&rev=474475&r1=474474&r2=474475
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java Mon Nov 13 12:06:49 2006
@@ -119,4 +119,10 @@
      * messages from memory only
      */
     public void resetForGC();
+    
+    /**
+     * remove a node
+     * @param node
+     */
+    public void remove(MessageReference node);
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?view=diff&rev=474475&r1=474474&r2=474475
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Mon Nov 13 12:06:49 2006
@@ -181,6 +181,13 @@
         }
         pendingCount--;
     }
+    
+    public void remove(MessageReference node){
+        if(currentCursor!=null){
+            currentCursor.remove(node);
+        }
+        pendingCount--;
+    }
 
     public synchronized void reset(){
         for(Iterator i=storePrefetches.iterator();i.hasNext();){

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java?view=diff&rev=474475&r1=474474&r2=474475
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java Mon Nov 13 12:06:49 2006
@@ -93,4 +93,19 @@
     public void clear(){
         list.clear();
     }
+    
+    public void remove(MessageReference node) {
+        boolean matched = false;
+        int size = list.size();
+        for (Iterator i = list.iterator();i.hasNext();) {
+            MessageReference ref = (MessageReference)i.next();
+                System.err.println("MATCHIG " + node.getMessageId() + " AGAINST " + ref.getMessageId());
+                if(node.getMessageId().equals(ref.getMessageId())){
+                    i.remove();
+                    matched = true;
+                    break;
+                }
+        }
+        System.err.println("REMOVED " + node.getMessageId() + "  = " + matched + " presize = " + size + " size now = " + list.size());
+    }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchPolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchPolicy.java?view=diff&rev=474475&r1=474474&r2=474475
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchPolicy.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchPolicy.java Mon Nov 13 12:06:49 2006
@@ -45,6 +45,6 @@
      * 
      * @return true if at least one consumer was dispatched or false if there are no active subscriptions that could be dispatched
      */
-    boolean dispatch(ConnectionContext newParam, MessageReference node, MessageEvaluationContext msgContext, List consumers) throws Exception;
+    boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, List consumers) throws Exception;
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java?view=diff&rev=474475&r1=474474&r2=474475
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java Mon Nov 13 12:06:49 2006
@@ -18,13 +18,13 @@
 package org.apache.activemq.broker.region.policy;
 
 
-import org.apache.activemq.broker.ConnectionContext;
+import java.util.Iterator;
+import java.util.List;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.filter.MessageEvaluationContext;
-
-import java.util.Iterator;
-import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * Simple dispatch policy that sends a message to every subscription that 
@@ -35,8 +35,17 @@
  * @version $Revision$
  */
 public class RoundRobinDispatchPolicy implements DispatchPolicy {
-
-    public boolean dispatch(ConnectionContext newParam, MessageReference node, MessageEvaluationContext msgContext, List consumers) throws Exception {
+    static final Log log=LogFactory.getLog(RoundRobinDispatchPolicy.class);
+    
+    /**
+     * @param node
+     * @param msgContext
+     * @param consumers
+     * @return true if dispatched
+     * @throws Exception
+     * @see org.apache.activemq.broker.region.policy.DispatchPolicy#dispatch(org.apache.activemq.broker.region.MessageReference, org.apache.activemq.filter.MessageEvaluationContext, java.util.List)
+     */
+    public boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, List consumers) throws Exception {
         
         // Big synch here so that only 1 message gets dispatched at a time.  Ensures 
         // Everyone sees the same order and that the consumer list is not used while
@@ -59,6 +68,7 @@
             try {
                 consumers.add(consumers.remove(0));
             } catch (Throwable bestEffort) {
+                log.error("Caught error rotating consumers");
             }
             return count > 0;
         }        

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchPolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchPolicy.java?view=diff&rev=474475&r1=474474&r2=474475
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchPolicy.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchPolicy.java Mon Nov 13 12:06:49 2006
@@ -18,14 +18,12 @@
 package org.apache.activemq.broker.region.policy;
 
 
-import org.apache.activemq.broker.ConnectionContext;
+import java.util.Iterator;
+import java.util.List;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.filter.MessageEvaluationContext;
 
-import java.util.Iterator;
-import java.util.List;
-
 /**
  * Simple dispatch policy that sends a message to every subscription that 
  * matches the message.
@@ -36,7 +34,7 @@
  */
 public class SimpleDispatchPolicy implements DispatchPolicy {
 
-    public boolean dispatch(ConnectionContext context, MessageReference node, MessageEvaluationContext msgContext, List consumers) throws Exception {
+    public boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, List consumers) throws Exception {
         int count = 0;
         for (Iterator iter = consumers.iterator(); iter.hasNext();) {
             Subscription sub = (Subscription) iter.next();

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StrictOrderDispatchPolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StrictOrderDispatchPolicy.java?view=diff&rev=474475&r1=474474&r2=474475
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StrictOrderDispatchPolicy.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StrictOrderDispatchPolicy.java Mon Nov 13 12:06:49 2006
@@ -18,14 +18,12 @@
 package org.apache.activemq.broker.region.policy;
 
 
-import org.apache.activemq.broker.ConnectionContext;
+import java.util.Iterator;
+import java.util.List;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.filter.MessageEvaluationContext;
 
-import java.util.Iterator;
-import java.util.List;
-
 /**
  * Dispatch policy that causes every subscription to see messages in the same order.
  * 
@@ -35,7 +33,15 @@
  */
 public class StrictOrderDispatchPolicy implements DispatchPolicy {
     
-    public boolean dispatch(ConnectionContext newParam, MessageReference node, MessageEvaluationContext msgContext, List consumers) throws Exception {
+    /**
+     * @param node
+     * @param msgContext
+     * @param consumers
+     * @return true if dispatched
+     * @throws Exception
+     * @see org.apache.activemq.broker.region.policy.DispatchPolicy#dispatch(org.apache.activemq.broker.region.MessageReference, org.apache.activemq.filter.MessageEvaluationContext, java.util.List)
+     */
+    public boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, List consumers) throws Exception {
         // Big synch here so that only 1 message gets dispatched at a time.  Ensures 
         // Everyone sees the same order.
         synchronized(consumers) {



Mime
View raw message