activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r628667 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: ./ policy/
Date Mon, 18 Feb 2008 09:38:17 GMT
Author: rajdavies
Date: Mon Feb 18 01:38:10 2008
New Revision: 628667

URL: http://svn.apache.org/viewvc?rev=628667&view=rev
Log:
Change Queue dispatch model to reduce contention for lots of 
consumers

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchSelector.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchSelector.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java Mon Feb 18 01:38:10 2008
@@ -179,6 +179,17 @@
     public ActiveMQDestination getActiveMQDestination() {
         return info != null ? info.getDestination() : null;
     }
+    
+    public boolean isBrowser() {
+        return info != null && info.isBrowser();
+    }
+    
+    public int getInFlightUsage() {
+        if (info.getPrefetchSize() > 0) {
+        return (getInFlightSize() * 100)/info.getPrefetchSize();
+        }
+        return Integer.MAX_VALUE;
+    }
 
     protected void doAddRecoveredMessage(MessageReference message) throws Exception {
         add(message);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java Mon Feb 18 01:38:10 2008
@@ -28,7 +28,6 @@
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.usage.MemoryUsage;
-import org.apache.activemq.usage.SystemUsage;
 
 /**
  * @version $Revision: 1.12 $
@@ -44,8 +43,6 @@
     void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception;
 
     void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception;
-
-    boolean lock(MessageReference node, LockOwner lockOwner);
 
     void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, final MessageReference node) throws IOException;
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java Mon Feb 18 01:38:10 2008
@@ -85,10 +85,6 @@
         return next.getMemoryUsage();
     }
 
-    public boolean lock(MessageReference node, LockOwner lockOwner) {
-        return next.lock(node, lockOwner);
-    }
-
     public void removeSubscription(ConnectionContext context, Subscription sub) throws Exception {
         next.removeSubscription(context, sub);
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java Mon Feb 18 01:38:10 2008
@@ -36,6 +36,7 @@
     protected CountStatisticImpl messages;
     protected PollCountStatisticImpl messagesCached;
     protected CountStatisticImpl dispatched;
+    protected CountStatisticImpl inflight;
     protected TimeStatisticImpl processTime;
 
     public DestinationStatistics() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Mon Feb 18 01:38:10 2008
@@ -72,7 +72,7 @@
         return active;
     }
 
-    protected boolean isFull() {
+    public boolean isFull() {
         return !active || super.isFull();
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java Mon Feb 18 01:38:10 2008
@@ -140,9 +140,6 @@
     }
 
     public boolean lock(LockOwner subscription) {
-        if (!regionDestination.lock(this, subscription)) {
-            return false;
-        }
         synchronized (this) {
             if (dropped || (lockOwner != null && lockOwner != subscription)) {
                 return false;
@@ -152,8 +149,10 @@
         }
     }
 
-    public synchronized void unlock() {
+    public synchronized boolean unlock() {
+        boolean result = lockOwner != null;
         lockOwner = null;
+        return result;
     }
 
     public synchronized LockOwner getLockOwner() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java Mon Feb 18 01:38:10 2008
@@ -44,7 +44,7 @@
     }
 
     public boolean isDropped() {
-        throw new RuntimeException("not implemented");
+        return false;
     }
 
     public boolean lock(LockOwner subscription) {
@@ -55,7 +55,8 @@
         throw new RuntimeException("not implemented");
     }
 
-    public void unlock() {
+    public boolean unlock() {
+        return true;
     }
 
     public int decrementReferenceCount() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Mon Feb 18 01:38:10 2008
@@ -360,13 +360,17 @@
     protected void sendToDLQ(final ConnectionContext context, final MessageReference node) throws IOException, Exception {
         broker.sendToDeadLetterQueue(context, node);
     }
-
+    
+    public int getInFlightSize() {
+        return dispatched.size();
+    }
+    
     /**
      * Used to determine if the broker can dispatch to the consumer.
      * 
      * @return
      */
-    protected boolean isFull() {
+    public boolean isFull() {
         return isSlave() || dispatched.size() - prefetchExtension >= info.getPrefetchSize();
     }
 
@@ -603,6 +607,16 @@
 
     public void setMaxAuditDepth(int maxAuditDepth) {
         this.maxAuditDepth = maxAuditDepth;
+    }
+    
+    
+    public List<MessageReference> getInFlightMessages(){
+        List<MessageReference> result = new ArrayList<MessageReference>();
+        synchronized(pendingLock) {
+            result.addAll(dispatched);
+            result.addAll(pending.pageInList(1000));
+        }
+        return result;
     }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Mon Feb 18 01:38:10 2008
@@ -22,6 +22,9 @@
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.locks.ReentrantLock;
 
 import javax.jms.InvalidSelectorException;
@@ -55,6 +58,7 @@
 import org.apache.activemq.selector.SelectorParser;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.thread.DeterministicTaskRunner;
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.thread.TaskRunnerFactory;
@@ -75,23 +79,23 @@
     private final List<Subscription> consumers = new ArrayList<Subscription>(50);
     private PendingMessageCursor messages;
     private final LinkedHashMap<MessageId,MessageReference> pagedInMessages = new LinkedHashMap<MessageId,MessageReference>();
-    private LockOwner exclusiveOwner;
     private MessageGroupMap messageGroupOwners;
     private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
     private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
     private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
-    private final Object exclusiveLockMutex = new Object();
     private final Object sendLock = new Object();
+    private final ExecutorService executor;
     private final TaskRunner taskRunner;    
     private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
     private final ReentrantLock dispatchLock = new ReentrantLock();
+    private QueueDispatchSelector  dispatchSelector;
     private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
         public void run() {
             wakeup();
         }
     };
-           
-    public Queue(Broker broker, ActiveMQDestination destination, final SystemUsage systemUsage,MessageStore store,DestinationStatistics parentStats,
+               
+    public Queue(Broker broker, final ActiveMQDestination destination, final SystemUsage systemUsage,MessageStore store,DestinationStatistics parentStats,
                  TaskRunnerFactory taskFactory) throws Exception {
         super(broker, store, destination,systemUsage, parentStats);
         
@@ -100,8 +104,31 @@
         } else {
             this.messages = new StoreQueueCursor(broker,this);
         }
-        this.taskRunner = taskFactory.createTaskRunner(this, "Queue  " + destination.getPhysicalName());
+       
+        this.executor =  Executors.newSingleThreadExecutor(new ThreadFactory() {
+            public Thread newThread(Runnable runnable) {
+                Thread thread = new Thread(runnable, "QueueThread:"+destination);
+                thread.setDaemon(true);
+                thread.setPriority(Thread.NORM_PRIORITY);
+                return thread;
+            }
+        });
+           
+        this.taskRunner = new DeterministicTaskRunner(this.executor,this);
         this.log = LogFactory.getLog(getClass().getName() + "." + destination.getPhysicalName());
+        this.dispatchSelector=new QueueDispatchSelector(destination);
+       
+    }
+
+    /**
+     * @param queue
+     * @param string
+     * @param b
+     * @return
+     */
+    private TaskRunner DedicatedTaskRunner(Queue queue, String string, boolean b) {
+        // TODO Auto-generated method stub
+        return null;
     }
 
     public void initialize() throws Exception {
@@ -153,26 +180,7 @@
         }
     }
 
-    /**
-     * Lock a node
-     * 
-     * @param node
-     * @param lockOwner
-     * @return true if can be locked
-     * @see org.apache.activemq.broker.region.Destination#lock(org.apache.activemq.broker.region.MessageReference,
-     *      org.apache.activemq.broker.region.LockOwner)
-     */
-    public boolean lock(MessageReference node, LockOwner lockOwner) {
-        synchronized (exclusiveLockMutex) {
-            if (exclusiveOwner == lockOwner) {
-                return true;
-            }
-            if (exclusiveOwner != null) {
-                return false;
-            }
-        }
-        return true;
-    }
+   
 
     public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
         dispatchLock.lock();
@@ -185,54 +193,41 @@
             synchronized (consumers) {
                 consumers.add(sub);
                 if (sub.getConsumerInfo().isExclusive()) {
-                    LockOwner owner = (LockOwner) sub;
-                    if (exclusiveOwner == null) {
-                        exclusiveOwner = owner;
-                    } else {
-                        // switch the owner if the priority is higher.
-                        if (owner.getLockPriority() > exclusiveOwner
-                                .getLockPriority()) {
-                            exclusiveOwner = owner;
-                        }
+                    Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer();
+                    if(exclusiveConsumer==null) {
+                        exclusiveConsumer=sub;
+                    }else if (sub.getConsumerInfo().getPriority() > exclusiveConsumer.getConsumerInfo().getPriority()){
+                        exclusiveConsumer=sub;
                     }
+                    dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
                 }
             }
-
-            // we hold the lock on the dispatchValue - so lets build the paged
-            // in
-            // list directly;
-            doPageIn(false);
-
             // synchronize with dispatch method so that no new messages are sent
             // while
             // setting up a subscription. avoid out of order messages,
             // duplicates
             // etc.
-
+            doPageIn(false);
             msgContext.setDestination(destination);
             synchronized (pagedInMessages) {
                 // Add all the matching messages in the queue to the
                 // subscription.
+                
                 for (Iterator<MessageReference> i = pagedInMessages.values()
                         .iterator(); i.hasNext();) {
                     QueueMessageReference node = (QueueMessageReference) i
                             .next();
-                    if (node.isDropped()
-                            || (!sub.getConsumerInfo().isBrowser() && node
-                                    .getLockOwner() != null)) {
-                        continue;
-                    }
-                    try {
+                    if (!node.isDropped() && !node.isAcked() && (!node.isDropped() ||sub.getConsumerInfo().isBrowser())) {
                         msgContext.setMessageReference(node);
                         if (sub.matches(node, msgContext)) {
                             sub.add(node);
                         }
-                    } catch (IOException e) {
-                        log.warn("Could not load message: " + e, e);
                     }
                 }
+                
             }
-        } finally {
+            wakeup();
+        }finally {
             dispatchLock.unlock();
         }
     }
@@ -240,79 +235,60 @@
     public void removeSubscription(ConnectionContext context, Subscription sub)
             throws Exception {
         destinationStatistics.getConsumers().decrement();
-        // synchronize with dispatch method so that no new messages are sent
-        // while
-        // removing up a subscription.
-        synchronized (consumers) {
-            consumers.remove(sub);
-            if (sub.getConsumerInfo().isExclusive()) {
-                LockOwner owner = (LockOwner) sub;
-                // Did we loose the exclusive owner??
-                if (exclusiveOwner == owner) {
-                    // Find the exclusive consumer with the higest Lock
-                    // Priority.
-                    exclusiveOwner = null;
-                    for (Iterator<Subscription> iter = consumers.iterator(); iter
-                            .hasNext();) {
-                        Subscription s = iter.next();
-                        LockOwner so = (LockOwner) s;
-                        if (s.getConsumerInfo().isExclusive()
-                                && (exclusiveOwner == null || so
-                                        .getLockPriority() > exclusiveOwner
-                                        .getLockPriority())) {
-                            exclusiveOwner = so;
+        dispatchLock.lock();
+        try {
+            // synchronize with dispatch method so that no new messages are sent
+            // while
+            // removing up a subscription.
+            synchronized (consumers) {
+                consumers.remove(sub);
+                if (sub.getConsumerInfo().isExclusive()) {
+                    Subscription exclusiveConsumer = dispatchSelector
+                            .getExclusiveConsumer();
+                    if (exclusiveConsumer == sub) {
+                        exclusiveConsumer = null;
+                        for (Subscription s : consumers) {
+                            if (s.getConsumerInfo().isExclusive()
+                                    && (exclusiveConsumer == null
+                                    || s.getConsumerInfo().getPriority() > exclusiveConsumer
+                                            .getConsumerInfo().getPriority())) {
+                                exclusiveConsumer = s;
+
+                            }
                         }
+                        dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
                     }
                 }
-            }
-            if (consumers.isEmpty()) {
-                messages.gc();
-            }
-        }
-        sub.remove(context, this);
-        boolean wasExclusiveOwner = false;
-        if (exclusiveOwner == sub) {
-            exclusiveOwner = null;
-            wasExclusiveOwner = true;
-        }
-        ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
-        MessageGroupSet ownedGroups = getMessageGroupOwners().removeConsumer(
-                consumerId);
-        if (!sub.getConsumerInfo().isBrowser()) {
-            MessageEvaluationContext msgContext = new MessageEvaluationContext();
+                ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
+                MessageGroupSet ownedGroups = getMessageGroupOwners()
+                        .removeConsumer(consumerId);
+                // redeliver inflight messages
+                sub.remove(context, this);
 
-            msgContext.setDestination(destination);
-            // lets copy the messages to dispatch to avoid deadlock
-            List<QueueMessageReference> messagesToDispatch = new ArrayList<QueueMessageReference>();
-            synchronized (pagedInMessages) {
-                for (Iterator<MessageReference> i = pagedInMessages.values().iterator(); i
-                        .hasNext();) {
+                List<MessageReference> list = new ArrayList<MessageReference>();
+                for (Iterator<MessageReference> i = pagedInMessages.values()
+                        .iterator(); i.hasNext();) {
                     QueueMessageReference node = (QueueMessageReference) i
                             .next();
-                    if (node.isDropped()) {
-                        continue;
-                    }
-                    String groupID = node.getGroupID();
-                    // Re-deliver all messages that the sub locked
-                    if (node.getLockOwner() == sub
-                            || wasExclusiveOwner
-                            || (groupID != null && ownedGroups
-                                    .contains(groupID))) {
-                        messagesToDispatch.add(node);
+                    if (!node.isDropped() && !node.isAcked()
+                            && node.getLockOwner() == sub) {
+                        if (node.unlock()) {
+                            node.incrementRedeliveryCounter();
+                            list.add(node);
+                        }
                     }
                 }
-            }
-            // now lets dispatch from the copy of the collection to
-            // avoid deadlocks
-            for (Iterator<QueueMessageReference> iter = messagesToDispatch
-                    .iterator(); iter.hasNext();) {
-                QueueMessageReference node = iter.next();
-                node.incrementRedeliveryCounter();
-                node.unlock();
-                msgContext.setMessageReference(node);
-                dispatchPolicy.dispatch(node, msgContext, consumers);
+                if (list != null && !consumers.isEmpty()) {
+                    doDispatch(list);
+                }
             }
 
+            if (consumers.isEmpty()) {
+                messages.gc();
+            }
+            wakeup();
+        }finally {
+            dispatchLock.unlock();
         }
     }
 
@@ -523,6 +499,9 @@
         if (taskRunner != null) {
             taskRunner.shutdown();
         }
+        if (this.executor != null) {
+            this.executor.shutdownNow();
+        }
         if (messages != null) {
             messages.stop();
         }
@@ -677,11 +656,7 @@
             for (MessageReference ref : list) {
                 try {
                     QueueMessageReference r = (QueueMessageReference) ref;
-
-                    // We should only delete messages that can be locked.
-                    if (r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER)) {
                         removeMessage(c,(IndirectMessageReference) r);
-                    }
                 } catch (IOException e) {
                 }
             }
@@ -791,19 +766,16 @@
             for (MessageReference ref : list) {
                 IndirectMessageReference r = (IndirectMessageReference) ref;
                 if (filter.evaluate(context, r)) {
-                    // We should only copy messages that can be locked.
-                    if (lockMessage(r)) {
-                        r.incrementReferenceCount();
-                        try {
-                            Message m = r.getMessage();
-                            BrokerSupport.resend(context, m, dest);
-                            if (++movedCounter >= maximumMessages
-                                    && maximumMessages > 0) {
-                                return movedCounter;
-                            }
-                        } finally {
-                            r.decrementReferenceCount();
+                    r.incrementReferenceCount();
+                    try {
+                        Message m = r.getMessage();
+                        BrokerSupport.resend(context, m, dest);
+                        if (++movedCounter >= maximumMessages
+                                && maximumMessages > 0) {
+                            return movedCounter;
                         }
+                    } finally {
+                        r.decrementReferenceCount();
                     }
                 }
                 count++;
@@ -853,19 +825,17 @@
                 IndirectMessageReference r = (IndirectMessageReference) ref;
                 if (filter.evaluate(context, r)) {
                     // We should only move messages that can be locked.
-                    if (lockMessage(r)) {
-                        r.incrementReferenceCount();
-                        try {
-                            Message m = r.getMessage();
-                            BrokerSupport.resend(context, m, dest);
-                            removeMessage(context, r);
-                            if (++movedCounter >= maximumMessages
-                                    && maximumMessages > 0) {
-                                return movedCounter;
-                            }
-                        } finally {
-                            r.decrementReferenceCount();
+                    r.incrementReferenceCount();
+                    try {
+                        Message m = r.getMessage();
+                        BrokerSupport.resend(context, m, dest);
+                        removeMessage(context, r);
+                        if (++movedCounter >= maximumMessages
+                                && maximumMessages > 0) {
+                            return movedCounter;
                         }
+                    } finally {
+                        r.decrementReferenceCount();
                     }
                 }
                 count++;
@@ -885,7 +855,7 @@
         }
         if (result) {
             try {
-                pageInMessages(false);
+               pageInMessages(false);
                
             } catch (Throwable e) {
                 log.error("Failed to page in more queue messages ", e);
@@ -895,7 +865,6 @@
             Runnable op = messagesWaitingForSpace.removeFirst();
             op.run();
         }
-        //must return false  to prevent spinning
         return false;
     }
 
@@ -942,10 +911,7 @@
         wakeup();
     }
 
-    protected boolean lockMessage(IndirectMessageReference r) {
-        return r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER);
-    }
-
+    
     protected ConnectionContext createConnectionContext() {
         ConnectionContext answer = new ConnectionContext();
         answer.getMessageEvaluationContext().setDestination(getActiveMQDestination());
@@ -972,7 +938,8 @@
     private List<MessageReference> doPageIn(boolean force) throws Exception {
         List<MessageReference> result = null;
         dispatchLock.lock();
-        try {
+        try{
+        
             final int toPageIn = getMaxPageSize() - pagedInMessages.size();
             if ((force || !consumers.isEmpty()) && toPageIn > 0) {
                 messages.setMaxBatchSize(toPageIn);
@@ -1009,16 +976,48 @@
         }
         return result;
     }
-
+    
     private void doDispatch(List<MessageReference> list) throws Exception {
-       
-        if (list != null && !list.isEmpty()) {
-            MessageEvaluationContext msgContext = new MessageEvaluationContext();
-            for (int i = 0; i < list.size(); i++) {
-                MessageReference node = list.get(i);         
-                msgContext.setDestination(destination);
-                msgContext.setMessageReference(node);
-                dispatchPolicy.dispatch(node, msgContext, consumers);
+        
+        if (list != null) {
+            synchronized (consumers) {      
+                for (MessageReference node : list) {
+                    Subscription target = null;
+                    List<Subscription> targets = null;
+                    for (Subscription s : consumers) {
+                        if (dispatchSelector.canSelect(s, node)) {
+                            if (!s.isFull()) {
+                                s.add(node);
+                                target = s;
+                                break;
+                            } else {
+                                if (targets == null) {
+                                    targets = new ArrayList<Subscription>();
+                                }
+                                targets.add(s);
+                            }
+                        }
+                    }
+                    if (targets != null) {
+                        // pick the least loaded to add the messag too
+    
+                        for (Subscription s : targets) {
+                            if (target == null
+                                    || target.getInFlightUsage() > s
+                                            .getInFlightUsage()) {
+                                target = s;
+                            }
+                        }
+                        if (target != null) {
+                            target.add(node);
+                        }
+                    }
+                    if (target != null && !dispatchSelector.isExclusiveConsumer(target)) {
+                        consumers.remove(target);
+                        consumers.add(target);
+                    }
+    
+                }
             }
         }
     }
@@ -1030,7 +1029,4 @@
     private void pageInMessages(boolean force) throws Exception {
             doDispatch(doPageIn(force));
     }
-    
-    
-
 }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java?rev=628667&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java Mon Feb 18 01:38:10 2008
@@ -0,0 +1,115 @@
+/**
+ * 
+ */
+package org.apache.activemq.broker.region;
+
+import java.io.IOException;
+import java.util.List;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.broker.region.group.MessageGroupMap;
+import org.apache.activemq.broker.region.policy.SimpleDispatchSelector;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.Message;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Queue dispatch policy that determines if a message can be sent to a subscription
+ * 
+ * @org.apache.xbean.XBean
+ * @version $Revision$
+ */
+public class QueueDispatchSelector extends SimpleDispatchSelector {
+    private static final Log LOG = LogFactory.getLog(QueueDispatchSelector.class);
+    private Subscription exclusiveConsumer;
+   
+   
+    /**
+     * @param destination
+     */
+    public QueueDispatchSelector(ActiveMQDestination destination) {
+        super(destination);
+    }
+    
+    public Subscription getExclusiveConsumer() {
+        return exclusiveConsumer;
+    }
+    public void setExclusiveConsumer(Subscription exclusiveConsumer) {
+        this.exclusiveConsumer = exclusiveConsumer;
+    }
+    
+    public boolean isExclusiveConsumer(Subscription s) {
+        return s == this.exclusiveConsumer;
+    }
+    
+       
+    public boolean canSelect(Subscription subscription,
+            MessageReference m) throws Exception {
+        if (subscription.isBrowser() && super.canDispatch(subscription, m)) {
+            return true;
+        }
+       
+        boolean result =  super.canDispatch(subscription, m) ;
+        if (result) {
+            result = exclusiveConsumer == null
+                    || exclusiveConsumer == subscription;
+            if (result) {
+                QueueMessageReference node = (QueueMessageReference) m;
+                // Keep message groups together.
+                String groupId = node.getGroupID();
+                int sequence = node.getGroupSequence();
+                if (groupId != null) {
+                    MessageGroupMap messageGroupOwners = ((Queue) node
+                            .getRegionDestination()).getMessageGroupOwners();
+
+                    // If we can own the first, then no-one else should own the
+                    // rest.
+                    if (sequence == 1) {
+                        assignGroup(subscription, messageGroupOwners, node,groupId);
+                    }else {
+    
+                        // Make sure that the previous owner is still valid, we may
+                        // need to become the new owner.
+                        ConsumerId groupOwner;
+    
+                        groupOwner = messageGroupOwners.get(groupId);
+                        if (groupOwner == null) {
+                            assignGroup(subscription, messageGroupOwners, node,groupId);
+                        } else {
+                            if (groupOwner.equals(subscription.getConsumerInfo().getConsumerId())) {
+                                // A group sequence < 1 is an end of group signal.
+                                if (sequence < 0) {
+                                    messageGroupOwners.removeGroup(groupId);
+                                }
+                            } else {
+                                result = false;
+                            }
+                        }
+                    }
+                }
+            }
+        }
+        return result;
+    }
+    
+    protected void assignGroup(Subscription subs,MessageGroupMap messageGroupOwners, MessageReference n, String groupId) throws IOException {
+        messageGroupOwners.put(groupId, subs.getConsumerInfo().getConsumerId());
+        Message message = n.getMessage();
+        if (message instanceof ActiveMQMessage) {
+            ActiveMQMessage activeMessage = (ActiveMQMessage)message;
+            try {
+                activeMessage.setBooleanProperty("JMSXGroupFirstForConsumer", true, false);
+            } catch (JMSException e) {
+                LOG.warn("Failed to set boolean header: " + e, e);
+            }
+        }
+    }
+    
+    
+    
+    
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java Mon Feb 18 01:38:10 2008
@@ -36,7 +36,7 @@
         
     boolean lock(LockOwner subscription);
     
-    void unlock();
+    boolean unlock();
     
     LockOwner getLockOwner();
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java Mon Feb 18 01:38:10 2008
@@ -17,13 +17,14 @@
 package org.apache.activemq.broker.region;
 
 import java.io.IOException;
+
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
+
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.group.MessageGroupMap;
 import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
@@ -67,54 +68,13 @@
     }
 
     protected boolean canDispatch(MessageReference n) throws IOException {
+        boolean result = true;
         QueueMessageReference node = (QueueMessageReference)n;
-        if (node.isAcked()) {
-            return false;
-        }
-        // Keep message groups together.
-        String groupId = node.getGroupID();
-        int sequence = node.getGroupSequence();
-        if (groupId != null) {
-            MessageGroupMap messageGroupOwners = ((Queue)node.getRegionDestination()).getMessageGroupOwners();
-
-            // If we can own the first, then no-one else should own the rest.
-            if (sequence == 1) {
-                if (node.lock(this)) {
-                    assignGroupToMe(messageGroupOwners, n, groupId);
-                    return true;
-                } else {
-                    return false;
-                }
-            }
-
-            // Make sure that the previous owner is still valid, we may
-            // need to become the new owner.
-            ConsumerId groupOwner;
-            synchronized (node) {
-                groupOwner = messageGroupOwners.get(groupId);
-                if (groupOwner == null) {
-                    if (node.lock(this)) {
-                        assignGroupToMe(messageGroupOwners, n, groupId);
-                        return true;
-                    } else {
-                        return false;
-                    }
-                }
-            }
-
-            if (groupOwner.equals(info.getConsumerId())) {
-                // A group sequence < 1 is an end of group signal.
-                if (sequence < 0) {
-                    messageGroupOwners.removeGroup(groupId);
-                }
-                return true;
-            }
-
-            return false;
-
-        } else {
-            return node.lock(this);
+        if (node.isAcked() || node.isDropped()) {
+            result = false;
         }
+        result = result && (isBrowser() || node.lock(this));
+        return result;
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java Mon Feb 18 01:38:10 2008
@@ -17,6 +17,7 @@
 package org.apache.activemq.broker.region;
 
 import java.io.IOException;
+import java.util.List;
 
 import javax.jms.InvalidSelectorException;
 import javax.management.ObjectName;
@@ -38,6 +39,7 @@
     /**
      * Used to add messages that match the subscription.
      * @param node
+     * @throws Exception 
      * @throws InterruptedException 
      * @throws IOException 
      */
@@ -169,6 +171,11 @@
     boolean isHighWaterMark();
     
     /**
+     * @return true if there is no space to dispatch messages
+     */
+    boolean isFull();
+    
+    /**
      * inform the MessageConsumer on the client to change it's prefetch
      * @param newPrefetch
      */
@@ -186,11 +193,33 @@
     int getPrefetchSize();
     
     /**
+     * @return the number of messages awaiting acknowledgement
+     */
+    int getInFlightSize();
+    
+    /**
+     * @return the in flight messages as a percentage of the prefetch size
+     */
+    int getInFlightUsage();
+    
+    /**
      * Informs the Broker if the subscription needs to intervention to recover it's state
      * e.g. DurableTopicSubscriber may do
      * @see org.apache.activemq.region.cursors.PendingMessageCursor
      * @return true if recovery required
      */
     boolean isRecoveryRequired();
+    
+    
+    /**
+     * @return true if a browser
+     */
+    boolean isBrowser();
+    
+    /**
+     * Get the list of in flight messages
+     * @return list
+     */
+    List<MessageReference> getInFlightMessages();
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Mon Feb 18 01:38:10 2008
@@ -33,6 +33,7 @@
 import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy;
 import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
 import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
+import org.apache.activemq.broker.region.policy.SimpleDispatchSelector;
 import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -555,8 +556,7 @@
     protected void dispatch(final ConnectionContext context, Message message) throws Exception {
         destinationStatistics.getMessages().increment();
         destinationStatistics.getEnqueues().increment();
-        dispatchValve.increment();
-        MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
+        dispatchValve.increment();      
         try {
             if (!subscriptionRecoveryPolicy.add(context, message)) {
                 return;
@@ -567,7 +567,7 @@
                     return;
                 }
             }
-
+            MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
             msgContext.setDestination(destination);
             msgContext.setMessageReference(message);
 
@@ -575,7 +575,6 @@
                 onMessageWithNoConsumers(context, message);
             }
         } finally {
-            msgContext.clear();
             dispatchValve.decrement();
         }
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Mon Feb 18 01:38:10 2008
@@ -18,6 +18,8 @@
 
 import java.io.IOException;
 import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.JMSException;
@@ -37,7 +39,6 @@
 import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.command.MessagePull;
 import org.apache.activemq.command.Response;
-import org.apache.activemq.kaha.Store;
 import org.apache.activemq.transaction.Synchronization;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.commons.logging.Log;
@@ -51,8 +52,7 @@
     protected PendingMessageCursor matched;
     protected final SystemUsage usageManager;
     protected AtomicLong dispatchedCounter = new AtomicLong();
-    protected AtomicLong prefetchExtension = new AtomicLong();
-    
+       
     boolean singleDestination = true;
     Destination destination;
 
@@ -83,8 +83,7 @@
     public void add(MessageReference node) throws Exception {
         enqueueCounter.incrementAndGet();
         node.incrementReferenceCount();
-        if (!isFull() && !isSlave()) {
-            optimizePrefetch();
+        if (!isFull() && matched.isEmpty()  && !isSlave()) {
             // if maximumPendingMessages is set we will only discard messages
             // which
             // have not been dispatched (i.e. we allow the prefetch buffer to be
@@ -128,6 +127,7 @@
                         }
                     }
                 }
+                dispatchMatched();
             }
         }
     }
@@ -177,20 +177,18 @@
 
     public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
         // Handle the standard acknowledgment case.
-        boolean wasFull = isFull();
         if (ack.isStandardAck() || ack.isPoisonAck()) {
             if (context.isInTransaction()) {
-                prefetchExtension.addAndGet(ack.getMessageCount());
                 context.getTransaction().addSynchronization(new Synchronization() {
 
                     public void afterCommit() throws Exception {
-                        synchronized (TopicSubscription.this) {
+                       synchronized (TopicSubscription.this) {
                             if (singleDestination && destination != null) {
                                 destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
                             }
                         }
                         dequeueCounter.addAndGet(ack.getMessageCount());
-                        prefetchExtension.addAndGet(ack.getMessageCount());
+                        dispatchMatched();
                     }
                 });
             } else {
@@ -198,19 +196,14 @@
                     destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
                 }
                 dequeueCounter.addAndGet(ack.getMessageCount());
-                prefetchExtension.addAndGet(ack.getMessageCount());
-            }
-            if (wasFull && !isFull()) {
-                dispatchMatched();
             }
+            dispatchMatched();
             return;
         } else if (ack.isDeliveredAck()) {
             // Message was delivered but not acknowledged: update pre-fetch
             // counters.
-            prefetchExtension.addAndGet(ack.getMessageCount());
-            if (wasFull && !isFull()) {
-                dispatchMatched();
-            }
+            dequeueCounter.addAndGet(ack.getMessageCount());
+            dispatchMatched();
             return;
         }
         throw new JMSException("Invalid acknowledgment: " + ack);
@@ -287,22 +280,27 @@
 
     // Implementation methods
     // -------------------------------------------------------------------------
-    private boolean isFull() {
-        return getDispatchedQueueSize() - prefetchExtension.get() >= info.getPrefetchSize();
+    public boolean isFull() {
+        return getDispatchedQueueSize()  >= info.getPrefetchSize();
     }
-
+    
+    public int getInFlightSize() {
+        return getDispatchedQueueSize();
+    }
+    
+    
     /**
      * @return true when 60% or more room is left for dispatching messages
      */
     public boolean isLowWaterMark() {
-        return (getDispatchedQueueSize() - prefetchExtension.get()) <= (info.getPrefetchSize() * .4);
+        return getDispatchedQueueSize() <= (info.getPrefetchSize() * .4);
     }
 
     /**
      * @return true when 10% or less room is left for dispatching messages
      */
     public boolean isHighWaterMark() {
-        return (getDispatchedQueueSize() - prefetchExtension.get()) >= (info.getPrefetchSize() * .9);
+        return getDispatchedQueueSize() >= (info.getPrefetchSize() * .9);
     }
 
     /**
@@ -354,42 +352,30 @@
         }
     }
 
-    /**
-     * optimize message consumer prefetch if the consumer supports it
-     */
-    public void optimizePrefetch() {
-        /*
-         * if(info!=null&&info.isOptimizedAcknowledge()&&context!=null&&context.getConnection()!=null
-         * &&context.getConnection().isManageable()){
-         * if(info.getCurrentPrefetchSize()!=info.getPrefetchSize() &&
-         * isLowWaterMark()){
-         * info.setCurrentPrefetchSize(info.getPrefetchSize());
-         * updateConsumerPrefetch(info.getPrefetchSize()); }else
-         * if(info.getCurrentPrefetchSize()==info.getPrefetchSize() &&
-         * isHighWaterMark()){ // want to purge any outstanding acks held by the
-         * consumer info.setCurrentPrefetchSize(1); updateConsumerPrefetch(1); } }
-         */
-    }
-
-    private void dispatchMatched() throws IOException {
+    private void dispatchMatched() throws IOException {       
         synchronized (matchedListMutex) {
-            try {
-                matched.reset();
-                while (matched.hasNext() && !isFull()) {
-                    MessageReference message = (MessageReference)matched.next();
-                    matched.remove();
-                    // Message may have been sitting in the matched list a while
-                    // waiting for the consumer to ak the message.
-                    if (broker.isExpired(message)) {
-                        message.decrementReferenceCount();
-                        broker.messageExpired(getContext(), message);
-                        dequeueCounter.incrementAndGet();
-                        continue; // just drop it.
+            if (!matched.isEmpty() && !isFull()) {
+                try {
+                    matched.reset();
+                   
+                    while (matched.hasNext() && !isFull()) {
+                        MessageReference message = (MessageReference) matched
+                                .next();
+                        matched.remove();
+                        // Message may have been sitting in the matched list a
+                        // while
+                        // waiting for the consumer to ak the message.
+                        if (broker.isExpired(message)) {
+                            message.decrementReferenceCount();
+                            broker.messageExpired(getContext(), message);
+                            dequeueCounter.incrementAndGet();
+                            continue; // just drop it.
+                        }
+                        dispatch(message);
                     }
-                    dispatch(message);
+                } finally {
+                    matched.release();
                 }
-            } finally {
-                matched.release();
             }
         }
     }
@@ -456,7 +442,15 @@
     }
 
     public int getPrefetchSize() {
-        return (int)(info.getPrefetchSize() + prefetchExtension.get());
+        return (int)info.getPrefetchSize();
+    }
+    
+    /**
+     * Get the list of inflight messages
+     * @return the list
+     */
+    public synchronized List<MessageReference> getInFlightMessages(){
+        return matched.pageInList(1000);
     }
 
 }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchSelector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchSelector.java?rev=628667&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchSelector.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchSelector.java Mon Feb 18 01:38:10 2008
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.policy;
+
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Subscription;
+
+/**
+ * Determines if a subscription can dispatch a message reference
+ *
+ */
+public interface DispatchSelector {
+   
+   
+    /**
+     * return true if a subscription can dispatch a message reference
+     * @param subscription
+     * @param node
+     * @return true if can dispatch
+     * @throws Exception 
+     */
+    
+    boolean canDispatch(Subscription subscription, MessageReference node) throws Exception;
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchSelector.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchSelector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchSelector.java?rev=628667&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchSelector.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchSelector.java Mon Feb 18 01:38:10 2008
@@ -0,0 +1,34 @@
+/**
+ * 
+ */
+package org.apache.activemq.broker.region.policy;
+
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.filter.MessageEvaluationContext;
+
+/**
+ * Simple dispatch policy that determines if a message can be sent to a subscription
+ *
+ * @org.apache.xbean.XBean
+ * @version $Revision$
+ */
+public class SimpleDispatchSelector implements DispatchSelector {
+
+    private final ActiveMQDestination destination;
+
+    /**
+     * @param destination
+     */
+    public SimpleDispatchSelector(ActiveMQDestination destination) {
+        this.destination = destination;
+    }
+
+    public boolean canDispatch(Subscription subscription, MessageReference node) throws Exception {
+        MessageEvaluationContext msgContext = new MessageEvaluationContext();
+        msgContext.setDestination(this.destination);
+        msgContext.setMessageReference(node);
+        return subscription.matches(node, msgContext);
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchSelector.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message