From commits-return-10822-apmail-activemq-commits-archive=activemq.apache.org@activemq.apache.org Thu May 21 02:34:44 2009 Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 70460 invoked from network); 21 May 2009 02:34:44 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 21 May 2009 02:34:44 -0000 Received: (qmail 30384 invoked by uid 500); 21 May 2009 02:34:57 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 30336 invoked by uid 500); 21 May 2009 02:34:57 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 30327 invoked by uid 99); 21 May 2009 02:34:57 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 21 May 2009 02:34:57 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 21 May 2009 02:34:47 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id A3027238888D; Thu, 21 May 2009 02:34:25 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r776934 [2/2] - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/openwire/ main/java/org/apache/activemq/broker/protocol/ main/java/org/apache/activemq/broker/stomp/ main/jav... Date: Thu, 21 May 2009 02:34:24 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090521023425.A3027238888D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java?rev=776934&r1=776933&r2=776934&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java (original) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java Thu May 21 02:34:23 2009 @@ -24,6 +24,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.JMSException; + +import org.apache.activemq.broker.BrokerSubscription.UserAlreadyConnectedException; +import org.apache.activemq.broker.Queue.QueueSubscription; import org.apache.activemq.broker.openwire.OpenWireMessageDelivery; import org.apache.activemq.broker.store.BrokerDatabase; import org.apache.activemq.broker.store.Store; @@ -38,7 +41,9 @@ import org.apache.activemq.dispatch.PriorityDispatcher; import org.apache.activemq.dispatch.IDispatcher.DispatchContext; import org.apache.activemq.dispatch.IDispatcher.Dispatchable; +import org.apache.activemq.flow.AbstractLimitedFlowResource; import org.apache.activemq.flow.Flow; +import org.apache.activemq.flow.FlowController; import org.apache.activemq.flow.IFlowController; import org.apache.activemq.flow.IFlowDrain; import org.apache.activemq.flow.IFlowRelay; @@ -57,6 +62,7 @@ import org.apache.activemq.queue.QueueStore; import org.apache.activemq.queue.SingleFlowRelay; import org.apache.activemq.queue.Subscription; +import org.apache.activemq.queue.Subscription.SubscriptionDeliveryCallback; import junit.framework.TestCase; @@ -68,7 +74,7 @@ BrokerDatabase database; BrokerQueueStore queueStore; private static final boolean USE_KAHA_DB = true; - private static final boolean PERSISTENT = false; + private static final boolean PERSISTENT = true; private static final boolean PURGE_STORE = true; protected MetricAggregator totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items"); @@ -393,15 +399,13 @@ } } - class Consumer implements DeliveryTarget { - private final HashMap, Subscription> subscriptions = new HashMap, Subscription>(); + class Consumer extends AbstractLimitedFlowResource implements Subscription, IFlowSink { private AtomicBoolean stopped = new AtomicBoolean(true); protected final MetricCounter rate = new MetricCounter(); private final String name; private final SizeLimiter limiter; - private final ExclusiveQueue queue; + private final FlowController controller; private final IQueue sourceQueue; - private final QueueStore.QueueDescriptor queueDescriptor; private int limit = 20000; private int count = 0; @@ -415,26 +419,9 @@ } }; - queue = new ExclusiveQueue(flow, flow.getFlowName(), limiter); - queue.setFlowExecutor(dispatcher.createPriorityExecutor(dispatcher.getDispatchPriorities() - 1)); - queue.setDispatcher(dispatcher); - queue.setAutoRelease(true); - - queueDescriptor = new QueueStore.QueueDescriptor(); - queueDescriptor.setQueueName(new AsciiBuffer(queue.getResourceName())); - queueDescriptor.setParent(null); - - queue.setDrain(new IFlowDrain() { - - public void drain(MessageDelivery elem, ISourceController controller) { - elem.acknowledge(queueDescriptor); - rate.increment(); - /* - if (count++ == limit) { - queue.stop(); - }*/ - } - }); + controller = new FlowController(null, flow, limiter, this); + controller.useOverFlowQueue(false); + controller.setExecutor(dispatcher.createPriorityExecutor(dispatcher.getDispatchPriorities() - 1)); rate.name("Consumer " + name + " Rate"); totalConsumerRate.add(rate); @@ -446,43 +433,122 @@ } private void subscribe(IQueue source) { - Subscription subscription = subscriptions.get(sourceQueue); - - subscriptions.get(sourceQueue); - if (subscription == null) { - subscription = new Queue.QueueSubscription(this); - subscriptions.put(sourceQueue, subscription); - } - source.addSubscription(subscription); + source.addSubscription(this); } public void stop() throws InterruptedException { - sourceQueue.removeSubscription(subscriptions.get(sourceQueue)); + sourceQueue.removeSubscription(this); stopped.set(true); } - public void deliver(MessageDelivery delivery, ISourceController source) { - queue.add(delivery, source); + public String toString() { + return name + " on " + sourceQueue.getResourceName(); + } + + /* + * (non-Javadoc) + * + * @see org.apache.activemq.queue.Subscription#add(java.lang.Object, + * org.apache.activemq.flow.ISourceController, + * org.apache.activemq.queue.Subscription.SubscriptionDeliveryCallback) + */ + public void add(MessageDelivery element, ISourceController source, SubscriptionDeliveryCallback callback) { + controller.add(element, source); + addInternal(element, source, callback); + } + + /* + * (non-Javadoc) + * + * @see org.apache.activemq.queue.Subscription#offer(java.lang.Object, + * org.apache.activemq.flow.ISourceController, + * org.apache.activemq.queue.Subscription.SubscriptionDeliveryCallback) + */ + public boolean offer(MessageDelivery element, ISourceController source, SubscriptionDeliveryCallback callback) { + if (controller.offer(element, source)) { + addInternal(element, source, callback); + } + return false; + } + + /** + * @param element + * @param source + * @param callback + */ + private void addInternal(MessageDelivery element, ISourceController source, SubscriptionDeliveryCallback callback) { + rate.increment(); + synchronized (this) { + controller.elementDispatched(element); + } + callback.acknowledge(); } + /* + * (non-Javadoc) + * + * @see org.apache.activemq.queue.Subscription#getSink() + */ public IFlowSink getSink() { - return queue; + return this; } - public boolean isDurable() { + /* + * (non-Javadoc) + * + * @see org.apache.activemq.queue.Subscription#hasSelector() + */ + public boolean hasSelector() { return false; } - public boolean hasSelector() { + /* + * (non-Javadoc) + * + * @see org.apache.activemq.queue.Subscription#isBrowser() + */ + public boolean isBrowser() { return false; } - public boolean match(MessageDelivery message) { + /* + * (non-Javadoc) + * + * @see + * org.apache.activemq.queue.Subscription#isRemoveOnDispatch(java.lang + * .Object) + */ + public boolean isRemoveOnDispatch(MessageDelivery elem) { + return false; + } + + /* + * (non-Javadoc) + * + * @see org.apache.activemq.queue.Subscription#matches(java.lang.Object) + */ + public boolean matches(MessageDelivery elem) { return true; } - public String toString() { - return name + " on " + sourceQueue.getResourceName(); + /* + * (non-Javadoc) + * + * @see org.apache.activemq.flow.IFlowSink#add(java.lang.Object, + * org.apache.activemq.flow.ISourceController) + */ + public void add(MessageDelivery elem, ISourceController source) { + add(elem, source, null); + } + + /* + * (non-Javadoc) + * + * @see org.apache.activemq.flow.IFlowSink#offer(java.lang.Object, + * org.apache.activemq.flow.ISourceController) + */ + public boolean offer(MessageDelivery elem, ISourceController source) { + return offer(elem, source, null); } } } Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueueTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueueTest.java?rev=776934&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueueTest.java (added) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueueTest.java Thu May 21 02:34:23 2009 @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import java.io.File; +import java.util.ArrayList; + +import org.apache.activemq.broker.store.BrokerDatabase; +import org.apache.activemq.broker.store.Store; +import org.apache.activemq.broker.store.StoreFactory; +import org.apache.activemq.dispatch.IDispatcher; +import org.apache.activemq.dispatch.PriorityDispatcher; +import org.apache.activemq.queue.IQueue; + +/** + * @author cmacnaug + * + */ +public class SharedQueueTest { + + + IDispatcher dispatcher; + BrokerDatabase database; + BrokerQueueStore queueStore; + private static final boolean USE_KAHA_DB = true; + private static final boolean PERSISTENT = true; + private static final boolean PURGE_STORE = false; + + protected ArrayList> queues = new ArrayList>(); + + protected IDispatcher createDispatcher() { + return PriorityDispatcher.createPriorityDispatchPool("TestDispatcher", MessageBroker.MAX_PRIORITY, Runtime.getRuntime().availableProcessors()); + } + + protected int consumerStartDelay = 0; + + protected void startServices() throws Exception { + dispatcher = createDispatcher(); + dispatcher.start(); + database = new BrokerDatabase(createStore(), dispatcher); + database.start(); + queueStore = new BrokerQueueStore(); + queueStore.setDatabase(database); + queueStore.setDispatcher(dispatcher); + queueStore.loadQueues(); + } + + protected void stopServices() throws Exception { + dispatcher.shutdown(); + database.stop(); + dispatcher.shutdown(); + queues.clear(); + } + + protected Store createStore() throws Exception { + Store store = null; + if (USE_KAHA_DB) { + store = StoreFactory.createStore("kaha-db"); + } else { + store = StoreFactory.createStore("memory"); + } + + store.setStoreDirectory(new File("test-data/shared-queue-test/")); + store.setDeleteAllMessages(PURGE_STORE); + return store; + } + + private final void createQueues(int count) { + for (int i = 0; i < count; i++) { + IQueue queue = queueStore.createSharedQueue("queue-" + (i + 1)); + queues.add(queue); + } + } + + protected void cleanup() throws Exception { + queues.clear(); + stopServices(); + } + + public void testExpiration() { + createQueues(1); + IQueue queue = queues.get(0); + } + +} Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java?rev=776934&r1=776933&r2=776934&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java (original) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java Thu May 21 02:34:23 2009 @@ -123,9 +123,13 @@ return dt.hasSelector(); } - public boolean offer(Message elem, ISourceController controller, SubscriptionDeliveryCallback ackCallback) { + public boolean offer(Message elem, ISourceController controller, SubscriptionDeliveryCallback ackCallback) { return getSink().offer(elem, controller); } + + public void add(Message elem, ISourceController controller, SubscriptionDeliveryCallback ackCallback) { + getSink().add(elem, controller); + } }; subs.put(dt, sub); queue.addSubscription(sub);