Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 31094 invoked from network); 19 Jun 2009 04:02:52 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 19 Jun 2009 04:02:52 -0000 Received: (qmail 10853 invoked by uid 500); 19 Jun 2009 04:03:03 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 10797 invoked by uid 500); 19 Jun 2009 04:03:03 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 10788 invoked by uid 99); 19 Jun 2009 04:03:03 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Jun 2009 04:03:03 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Jun 2009 04:03:00 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 51723238889D; Fri, 19 Jun 2009 04:02:40 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r786364 - in /activemq/sandbox/activemq-flow: activemq-broker/src/test/java/org/apache/activemq/broker/ activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/ activemq-openwire/src/test/java/org/apache/activemq/legacy/openwire... Date: Fri, 19 Jun 2009 04:02:39 -0000 To: commits@activemq.apache.org From: cmacnaug@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090619040240.51723238889D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cmacnaug Date: Fri Jun 19 04:02:38 2009 New Revision: 786364 URL: http://svn.apache.org/viewvc?rev=786364&view=rev Log: Adding support for exclusive consumers on shared queues. Also: -Refactored SharedPriorityQueue to extend partitioned queue (eventually we'll want partitioned queues to assist in exclusivity ... for now each partion is exlusive but it isn't enforced across partitions) -Fixed legacy openwire exclusive consumer test so that it correctly tests exclusivity. Modified: activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockQueue.java activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java Modified: activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java?rev=786364&r1=786363&r2=786364&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java (original) +++ activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java Fri Jun 19 04:02:38 2009 @@ -53,8 +53,8 @@ protected final boolean USE_KAHA_DB = true; protected final boolean PURGE_STORE = true; - protected final boolean PERSISTENT = true; - protected final boolean DURABLE = true; + protected final boolean PERSISTENT = false; + protected final boolean DURABLE = false; // Set to put senders and consumers on separate brokers. protected boolean multibroker = false; Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java?rev=786364&r1=786363&r2=786364&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java (original) +++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java Fri Jun 19 04:02:38 2009 @@ -107,7 +107,7 @@ public MessageRecord createMessageRecord() { MessageRecord record = new MessageRecord(); record.setEncoding(ENCODING); - + ByteSequence bytes; try { bytes = storeWireFormat.marshal(message); @@ -130,7 +130,9 @@ this.storeWireFormat = wireFormat; } - /* (non-Javadoc) + /* + * (non-Javadoc) + * * @see org.apache.activemq.broker.MessageDelivery#getTTE() */ public long getExpiration() { @@ -140,4 +142,8 @@ public MessageEvaluationContext createMessageEvaluationContext() { return new OpenwireMessageEvaluationContext(message); } + + public String toString() { + return message.getMessageId().toString(); + } } Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=786364&r1=786363&r2=786364&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original) +++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Fri Jun 19 04:02:38 2009 @@ -571,6 +571,10 @@ return isQueueReceiver; } + public boolean isExclusive() { + return info.isExclusive(); + } + /* * (non-Javadoc) * @@ -704,6 +708,10 @@ return true; } + public String toString() { + return info.getConsumerId().toString(); + } + } private static BooleanExpression parseSelector(ConsumerInfo info) throws FilterException { Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java?rev=786364&r1=786363&r2=786364&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java (original) +++ activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java Fri Jun 19 04:02:38 2009 @@ -786,9 +786,7 @@ connection2.request(consumerInfo2); // Second message should go to consumer 1 even though consumer 2 is - // ready - // for dispatch. - connection1.send(createMessage(producerInfo, destination, deliveryMode)); + // ready for dispatch. connection1.send(createMessage(producerInfo, destination, deliveryMode)); // Acknowledge the first 2 messages @@ -803,6 +801,7 @@ // The last two messages should now go the the second consumer. connection1.send(createMessage(producerInfo, destination, deliveryMode)); + connection1.send(createMessage(producerInfo, destination, deliveryMode)); for (int i = 0; i < 2; i++) { Message m1 = receiveMessage(connection2); Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java?rev=786364&r1=786363&r2=786364&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java (original) +++ activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java Fri Jun 19 04:02:38 2009 @@ -445,6 +445,10 @@ return name + " on " + sourceQueue.getResourceName(); } + public boolean isExclusive() { + return false; + } + /* * (non-Javadoc) * Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java?rev=786364&r1=786363&r2=786364&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java (original) +++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java Fri Jun 19 04:02:38 2009 @@ -21,23 +21,24 @@ import java.util.HashSet; import org.apache.activemq.dispatch.IDispatcher; -import org.apache.activemq.dispatch.IDispatcher.DispatchContext; -import org.apache.activemq.flow.AbstractLimitedFlowResource; import org.apache.activemq.flow.ISourceController; import org.apache.activemq.protobuf.AsciiBuffer; import org.apache.activemq.util.Mapper; abstract public class PartitionedQueue extends AbstractFlowQueue implements IPartitionedQueue { - private HashSet> subscriptions = new HashSet>(); + protected HashSet> subscriptions = new HashSet>(); private HashMap> partitions = new HashMap>(); - protected Mapper partitionMapper; - private QueueStore store; + protected QueueStore store; protected IDispatcher dispatcher; - private boolean started; - private boolean shutdown = false; + protected boolean started; + protected boolean shutdown = false; protected QueueDescriptor queueDescriptor; - private int basePriority = 0; + protected PersistencePolicy persistencePolicy; + protected Mapper expirationMapper; + protected Mapper keyMapper; + protected Mapper partitionMapper; + protected int basePriority = 0; public PartitionedQueue(String name) { super(name); @@ -50,7 +51,7 @@ return queueDescriptor; } - public IQueue getPartition(int partitionKey) { + protected IQueue getPartition(int partitionKey) { boolean save = false; IQueue rc = null; checkShutdown(); @@ -70,7 +71,10 @@ return rc; } - + + + abstract public IQueue createPartition(int partitionKey); + /* * (non-Javadoc) * @@ -81,20 +85,20 @@ if (basePriority != priority) { basePriority = priority; if (!shutdown) { - for (IQueue queue : partitions.values()) { + for (IQueue queue : getPartitions()) { queue.setDispatchPriority(basePriority); } } } } } - + public int getEnqueuedCount() { checkShutdown(); - synchronized (partitions) { + synchronized (this) { int count = 0; - for (IQueue queue : partitions.values()) { + for (IQueue queue : getPartitions()) { count += queue.getEnqueuedCount(); } return count; @@ -103,9 +107,9 @@ public synchronized long getEnqueuedSize() { checkShutdown(); - synchronized (partitions) { + synchronized (this) { long size = 0; - for (IQueue queue : partitions.values()) { + for (IQueue queue : getPartitions()) { if (queue != null) { size += queue.getEnqueuedSize(); } @@ -114,43 +118,44 @@ } } + public void setStore(QueueStore store) { this.store = store; } public void setPersistencePolicy(PersistencePolicy persistencePolicy) { - // No-Op for now. + this.persistencePolicy = persistencePolicy; } public void setExpirationMapper(Mapper expirationMapper) { - // No-Op for now. - } - - abstract public IQueue createPartition(int partitionKey); - - public void addPartition(int partitionKey, IQueue queue) { - checkShutdown(); - synchronized (partitions) { - partitions.put(partitionKey, queue); - for (Subscription sub : subscriptions) { - queue.addSubscription(sub); - queue.setDispatchPriority(basePriority); - } - } + this.expirationMapper = expirationMapper; } public void initialize(long sequenceMin, long sequenceMax, int count, long size) { + checkShutdown(); // No-op, only partitions should have stored values. if (count > 0 || size > 0) { throw new IllegalArgumentException("Partioned queues do not themselves hold values"); } + if (expirationMapper == null) { + expirationMapper = new Mapper() { + + public Long map(V element) { + return -1L; + } + }; + } + if (persistencePolicy == null) { + persistencePolicy = new PersistencePolicy.NON_PERSISTENT_POLICY(); + } } + public synchronized void start() { if (!started) { checkShutdown(); started = true; - for (IQueue partition : partitions.values()) { + for (IQueue partition : getPartitions()) { if (partition != null) partition.start(); } @@ -160,26 +165,27 @@ public synchronized void stop() { if (started) { started = false; - for (IQueue partition : partitions.values()) { + for (IQueue partition : getPartitions()) { if (partition != null) partition.stop(); } } } + public void shutdown(boolean sync) { - HashMap> partitions = null; + Collection > partitions = null; synchronized (this) { if (!shutdown) { shutdown = true; started = false; } - partitions = this.partitions; + partitions = getPartitions(); this.partitions = null; } if (partitions != null) { - for (IQueue partition : partitions.values()) { + for (IQueue partition : partitions) { if (partition != null) partition.shutdown(sync); } @@ -188,9 +194,9 @@ public void addSubscription(Subscription sub) { checkShutdown(); - synchronized (partitions) { + synchronized (this) { subscriptions.add(sub); - Collection> values = partitions.values(); + Collection> values = getPartitions(); for (IQueue queue : values) { queue.addSubscription(sub); } @@ -199,9 +205,9 @@ public boolean removeSubscription(Subscription sub) { checkShutdown(); - synchronized (partitions) { + synchronized (this) { if (subscriptions.remove(sub)) { - Collection> values = partitions.values(); + Collection> values = getPartitions(); for (IQueue queue : values) { queue.removeSubscription(sub); } @@ -219,30 +225,42 @@ return partitionMapper; } + public void add(V value, ISourceController source) { int partitionKey = partitionMapper.map(value); - IQueue partition = getPartition(partitionKey); - partition.add(value, source); + getPartition(partitionKey).add(value, source); } public boolean offer(V value, ISourceController source) { int partitionKey = partitionMapper.map(value); - IQueue partition = getPartition(partitionKey); - return partition.offer(value, source); + return getPartition(partitionKey).offer(value, source); + } + + public void setKeyMapper(Mapper keyMapper) { + this.keyMapper = keyMapper; + } + + public void setAutoRelease(boolean autoRelease) { + this.autoRelease = autoRelease; } public void setDispatcher(IDispatcher dispatcher) { checkShutdown(); this.dispatcher = dispatcher; - synchronized (partitions) { - Collection> values = partitions.values(); + synchronized (this) { + Collection> values = getPartitions(); for (IQueue queue : values) { queue.setDispatcher(dispatcher); } } } + + protected Collection> getPartitions() + { + return partitions.values(); + } - private void checkShutdown() { + protected void checkShutdown() { if (shutdown) { throw new IllegalStateException(this + " is shutdown"); } Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java?rev=786364&r1=786363&r2=786364&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java (original) +++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java Fri Jun 19 04:02:38 2009 @@ -17,32 +17,13 @@ package org.apache.activemq.queue; import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; - -import org.apache.activemq.dispatch.IDispatcher; -import org.apache.activemq.flow.AbstractLimitedFlowResource; -import org.apache.activemq.flow.ISourceController; import org.apache.activemq.flow.PrioritySizeLimiter; import org.apache.activemq.protobuf.AsciiBuffer; -import org.apache.activemq.util.Mapper; -public class SharedPriorityQueue extends AbstractFlowQueue implements IPartitionedQueue { +public class SharedPriorityQueue extends PartitionedQueue { - private final HashSet> subscriptions = new HashSet>(); - private final Mapper priorityMapper; private final ArrayList> partitions = new ArrayList>(); - private Mapper keyMapper; - private boolean autoRelease; - private IDispatcher dispatcher; private final PrioritySizeLimiter limiter; - private QueueStore store; - private PersistencePolicy persistencePolicy; - private boolean started; - private QueueDescriptor queueDescriptor; - private Mapper expirationMapper; - private int basePriority = 0; - private boolean shutdown = false; public SharedPriorityQueue(String name, PrioritySizeLimiter limiter) { super(name); @@ -50,154 +31,36 @@ queueDescriptor.setQueueName(new AsciiBuffer(super.getResourceName())); queueDescriptor.setQueueType(QueueDescriptor.SHARED_PRIORITY); this.limiter = limiter; - priorityMapper = limiter.getPriorityMapper(); + super.setPartitionMapper(limiter.getPriorityMapper()); for (int i = 0; i < limiter.getPriorities(); i++) { partitions.add(null); } } - public synchronized void start() { - if (!started) { - checkShutdown(); - started = true; - for (SharedQueue partition : partitions) { - if (partition != null) - partition.start(); - } - } - } - - public synchronized void stop() { - if (started) { - started = false; - for (SharedQueue partition : partitions) { - if (partition != null) - partition.stop(); - } - } - } - + @Override public void shutdown(boolean sync) { - ArrayList> partitions = null; - synchronized (this) { - if (!shutdown) { - shutdown = true; - started = false; - } - partitions = this.partitions; - } - - if (partitions != null) { - for (IQueue partition : partitions) { - if (partition != null) - partition.shutdown(sync); - } - } - } - - public void initialize(long sequenceMin, long sequenceMax, int count, long size) { - checkShutdown(); - // No-op, only partitions should have stored values. - if (count > 0 || size > 0) { - throw new IllegalArgumentException("Partioned queues do not themselves hold values"); - } - if (expirationMapper == null) { - expirationMapper = new Mapper() { - - public Long map(V element) { - return -1L; - } - - }; - } - if (persistencePolicy == null) { - persistencePolicy = new PersistencePolicy.NON_PERSISTENT_POLICY(); - } - } - - public synchronized int getEnqueuedCount() { - checkShutdown(); - int count = 0; - for (SharedQueue queue : partitions) { - if (queue != null) { - count += queue.getEnqueuedCount(); - } + try { + super.shutdown(sync); + } finally { + partitions.clear(); } - return count; } + /** + * Override with more efficient limiter lookup: + */ + @Override public synchronized long getEnqueuedSize() { return limiter.getSize(); } - public void setStore(QueueStore store) { - this.store = store; - } - - public void setPersistencePolicy(PersistencePolicy persistencePolicy) { - this.persistencePolicy = persistencePolicy; - } - - public void setExpirationMapper(Mapper expirationMapper) { - this.expirationMapper = expirationMapper; - } - - - @Override - public void addSubscription(Subscription sub) { - synchronized (this) { - checkShutdown(); - subscriptions.add(sub); - for (SharedQueue queue : partitions) { - if (queue != null) { - queue.addSubscription(sub); - } - } - } - } - - @Override - public boolean removeSubscription(Subscription sub) { - synchronized (this) { - if (subscriptions.remove(sub)) { - for (SharedQueue queue : partitions) { - if (queue != null) { - queue.removeSubscription(sub); - } - } - return true; - } - } - return false; - } - - /* - * (non-Javadoc) - * - * @see org.apache.activemq.queue.IQueue#setDispatchPriority(int) - */ - public void setDispatchPriority(int priority) { + public IQueue createPartition(int prio) { synchronized (this) { - if (basePriority != priority) { - basePriority = priority; - if (shutdown) { - return; - } - for (int i = 0; i < limiter.getPriorities(); i++) { - SharedQueue queue = partitions.get(i); - if (queue != null) { - queue.setDispatchPriority(basePriority + i); - } - } - } + return getPartition(prio, started); } } - public IQueue createPartition(int prio) { - return getPartition(prio, false); - } - - private IQueue getPartition(int prio, boolean initialize) { + protected IQueue getPartition(int prio, boolean initialize) { synchronized (this) { checkShutdown(); SharedQueue queue = partitions.get(prio); @@ -231,71 +94,4 @@ return queue; } } - - public QueueDescriptor getDescriptor() { - return queueDescriptor; - } - - public void add(V value, ISourceController source) { - int prio = priorityMapper.map(value); - getPartition(prio, true).add(value, source); - } - - public boolean offer(V value, ISourceController source) { - int prio = priorityMapper.map(value); - return getPartition(prio, true).offer(value, source); - } - - public void setKeyMapper(Mapper keyMapper) { - this.keyMapper = keyMapper; - } - - public void setAutoRelease(boolean autoRelease) { - this.autoRelease = autoRelease; - } - - public void setDispatcher(IDispatcher dispatcher) { - this.dispatcher = dispatcher; - super.setFlowExecutor(dispatcher.createPriorityExecutor(dispatcher.getDispatchPriorities() - 1)); - } - - private void checkShutdown() { - if (shutdown) { - throw new IllegalStateException(this + " is shutdown"); - } - } - - /* (non-Javadoc) - * @see org.apache.activemq.queue.IPollableFlowSource#isDispatchReady() - */ - public boolean isDispatchReady() { - // TODO Auto-generated method stub - throw new UnsupportedOperationException(); - } - - /* (non-Javadoc) - * @see org.apache.activemq.queue.IPollableFlowSource#poll() - */ - public V poll() { - // TODO Auto-generated method stub - throw new UnsupportedOperationException(); - } - - /* (non-Javadoc) - * @see org.apache.activemq.queue.IPollableFlowSource#pollingDispatch() - */ - public boolean pollingDispatch() { - // TODO Auto-generated method stub - throw new UnsupportedOperationException(); - } - - /* (non-Javadoc) - * @see org.apache.activemq.flow.ISinkController.FlowControllable#flowElemAccepted(org.apache.activemq.flow.ISourceController, java.lang.Object) - */ - public void flowElemAccepted(ISourceController source, V elem) { - // TODO Remove - throw new UnsupportedOperationException(); - - } - } Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java?rev=786364&r1=786363&r2=786364&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java (original) +++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java Fri Jun 19 04:02:38 2009 @@ -67,6 +67,9 @@ private QueueStore store; private PersistencePolicy persistencePolicy; + private SubscriptionContext exclusiveConsumer = null; + private int exclusiveConsumerCount = 0; + // Open consumers: private final HashMap, SubscriptionContext> consumers = new HashMap, SubscriptionContext>(); private int startedConsumers = 0; @@ -172,8 +175,15 @@ SubscriptionContext context = new SubscriptionContext(subscription); SubscriptionContext old = consumers.put(subscription, context); if (old != null) { + context.close(); consumers.put(subscription, old); } else { + if (exclusiveConsumer == null) { + if (context.isExclusive()) { + exclusiveConsumer = context; + } + } + context.start(); } } @@ -184,6 +194,29 @@ SubscriptionContext old = consumers.remove(subscription); if (old != null) { old.close(); + + //Was this the exclusive consumer? + if (old == exclusiveConsumer) { + if (exclusiveConsumerCount > 0) { + for (SubscriptionContext context : consumers.values()) { + if (context.isExclusive()) { + exclusiveConsumer = context; + //Update the dispatch list: + context.updateDispatchList(); + break; + } + } + } else { + //Otherwise add the remaining subs to appropriate dispatch + //lists: + exclusiveConsumer = null; + for (SubscriptionContext context : consumers.values()) { + if (!context.sub.isBrowser()) { + context.updateDispatchList(); + } + } + } + } return true; } return false; @@ -397,6 +430,9 @@ SubscriptionContext nextConsumer = consumer.getNext(); switch (consumer.offer(next)) { case ACCEPTED: + if (DEBUG) + System.out.println("Dispatched " + next.getElement() + " to " + consumer); + // Rotate list so this one is last next time: sharedConsumers.rotate(); interested = true; @@ -469,6 +505,9 @@ SubscriptionContext(Subscription target) { this.sub = target; this.cursor = openCursor(target.toString(), true, !sub.isBrowser()); + if (isExclusive()) { + exclusiveConsumerCount++; + } cursor.setCursorReadyListener(new CursorReadyListener() { public void onElementReady() { if (!isLinked()) { @@ -478,10 +517,15 @@ }); } + public boolean isExclusive() { + return sub.isExclusive() && !sub.isBrowser(); + } + public void start() { if (!isStarted) { isStarted = true; if (!sub.isBrowser()) { + if (sub.hasSelector()) { activeSelectorSubs++; } @@ -504,6 +548,7 @@ // If started remove this from any dispatch list if (isStarted) { if (!sub.isBrowser()) { + if (sub.hasSelector()) { activeSelectorSubs--; } @@ -517,6 +562,10 @@ } public void close() { + if (isExclusive()) { + exclusiveConsumerCount--; + } + stop(); } @@ -550,13 +599,22 @@ return false; } - // TODO Even if there are subscriptions with selectors present - // we can still join the shared cursor as long as there is at - // least one ready selector-less sub. boolean join = false; - if (activeSelectorSubs == 0) { + //If we are the exlusive consumer then we join the shared + //cursor: + if (exclusiveConsumer == this) { + join = true; + } + //Otherwise if we aren't we won't be joining anything! + else if (exclusiveConsumer != null) { + return false; + } else if (activeSelectorSubs == 0) { join = true; } else { + + // TODO Even if there are subscriptions with selectors present + // we can still join the shared cursor as long as there is at + // least one ready selector-less sub. cursor.getNext(); if (queue.isEmpty() || cursor.compareTo(sharedCursor) >= 0) { join = true; @@ -578,9 +636,14 @@ /** * Adds to subscription to the appropriate dispatch list: */ - private final void updateDispatchList() { + final void updateDispatchList() { if (!checkJoinShared()) { + //Otherwise if we're not the exclusive consumer + if (!sub.isBrowser() && exclusiveConsumer != null) { + return; + } + // Make sure our cursor is activated: cursor.activate(); // If our next element is paged out @@ -641,9 +704,7 @@ if (callback == null) { qe.acknowledge(); } - } - else - { + } else { qe.setAcquired(null); } Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java?rev=786364&r1=786363&r2=786364&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java (original) +++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java Fri Jun 19 04:02:38 2009 @@ -57,6 +57,15 @@ * @return True if this is a subscription browser. */ public boolean isBrowser(); + + /** + * Indicates that the subscription is exclusive. When there at least one + * exclusive subscription on a shared queue, the queue will dispatch to + * only one such consumer while there is at least one connected. + * + * @return True if the Subscription is exclusive. + */ + public boolean isExclusive(); /** * Returns true if the Subscription has a selector. If true Modified: activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockQueue.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockQueue.java?rev=786364&r1=786363&r2=786364&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockQueue.java (original) +++ activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockQueue.java Fri Jun 19 04:02:38 2009 @@ -36,7 +36,7 @@ private final MockStoreAdapater store = new MockStoreAdapater(); private static final PersistencePolicy NO_PERSISTENCE = new PersistencePolicy.NON_PERSISTENT_POLICY(); private static final boolean USE_OLD_QUEUE = false; - + private IQueue createQueue() { if (partitionMapper != null) { @@ -117,6 +117,10 @@ return true; } + public boolean isExclusive() { + return false; + } + public IFlowSink getSink() { return dt.getSink(); } @@ -133,7 +137,7 @@ public boolean offer(Message elem, ISourceController controller, SubscriptionDeliveryCallback ackCallback) { return getSink().offer(elem, controller); } - + public void add(Message elem, ISourceController controller, SubscriptionDeliveryCallback ackCallback) { getSink().add(elem, controller); } Modified: activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java?rev=786364&r1=786363&r2=786364&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java (original) +++ activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java Fri Jun 19 04:02:38 2009 @@ -489,6 +489,13 @@ public boolean autoCreateDestination() { return true; } + + /* (non-Javadoc) + * @see org.apache.activemq.queue.Subscription#isExclusive() + */ + public boolean isExclusive() { + return false; + } } private void sendError(String message) {