activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r776934 [2/2] - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/openwire/ main/java/org/apache/activemq/broker/protocol/ main/java/org/apache/activemq/broker/stomp/ main/jav...
Date Thu, 21 May 2009 02:34:24 GMT
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java?rev=776934&r1=776933&r2=776934&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java
Thu May 21 02:34:23 2009
@@ -24,6 +24,9 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.JMSException;
+
+import org.apache.activemq.broker.BrokerSubscription.UserAlreadyConnectedException;
+import org.apache.activemq.broker.Queue.QueueSubscription;
 import org.apache.activemq.broker.openwire.OpenWireMessageDelivery;
 import org.apache.activemq.broker.store.BrokerDatabase;
 import org.apache.activemq.broker.store.Store;
@@ -38,7 +41,9 @@
 import org.apache.activemq.dispatch.PriorityDispatcher;
 import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
 import org.apache.activemq.dispatch.IDispatcher.Dispatchable;
+import org.apache.activemq.flow.AbstractLimitedFlowResource;
 import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.FlowController;
 import org.apache.activemq.flow.IFlowController;
 import org.apache.activemq.flow.IFlowDrain;
 import org.apache.activemq.flow.IFlowRelay;
@@ -57,6 +62,7 @@
 import org.apache.activemq.queue.QueueStore;
 import org.apache.activemq.queue.SingleFlowRelay;
 import org.apache.activemq.queue.Subscription;
+import org.apache.activemq.queue.Subscription.SubscriptionDeliveryCallback;
 
 import junit.framework.TestCase;
 
@@ -68,7 +74,7 @@
     BrokerDatabase database;
     BrokerQueueStore queueStore;
     private static final boolean USE_KAHA_DB = true;
-    private static final boolean PERSISTENT = false;
+    private static final boolean PERSISTENT = true;
     private static final boolean PURGE_STORE = true;
 
     protected MetricAggregator totalProducerRate = new MetricAggregator().name("Aggregate
Producer Rate").unit("items");
@@ -393,15 +399,13 @@
         }
     }
 
-    class Consumer implements DeliveryTarget {
-        private final HashMap<IQueue<Long, MessageDelivery>, Subscription<MessageDelivery>>
subscriptions = new HashMap<IQueue<Long, MessageDelivery>, Subscription<MessageDelivery>>();
+    class Consumer extends AbstractLimitedFlowResource<MessageDelivery> implements
Subscription<MessageDelivery>, IFlowSink<MessageDelivery> {
         private AtomicBoolean stopped = new AtomicBoolean(true);
         protected final MetricCounter rate = new MetricCounter();
         private final String name;
         private final SizeLimiter<MessageDelivery> limiter;
-        private final ExclusiveQueue<MessageDelivery> queue;
+        private final FlowController<MessageDelivery> controller;
         private final IQueue<Long, MessageDelivery> sourceQueue;
-        private final QueueStore.QueueDescriptor queueDescriptor;
         private int limit = 20000;
         private int count = 0;
 
@@ -415,26 +419,9 @@
                 }
             };
 
-            queue = new ExclusiveQueue<MessageDelivery>(flow, flow.getFlowName(), limiter);
-            queue.setFlowExecutor(dispatcher.createPriorityExecutor(dispatcher.getDispatchPriorities()
- 1));
-            queue.setDispatcher(dispatcher);
-            queue.setAutoRelease(true);
-
-            queueDescriptor = new QueueStore.QueueDescriptor();
-            queueDescriptor.setQueueName(new AsciiBuffer(queue.getResourceName()));
-            queueDescriptor.setParent(null);
-
-            queue.setDrain(new IFlowDrain<MessageDelivery>() {
-
-                public void drain(MessageDelivery elem, ISourceController<MessageDelivery>
controller) {
-                    elem.acknowledge(queueDescriptor);
-                    rate.increment();
-                    /*
-                    if (count++ == limit) {
-                        queue.stop();
-                    }*/
-                }
-            });
+            controller = new FlowController<MessageDelivery>(null, flow, limiter, this);
+            controller.useOverFlowQueue(false);
+            controller.setExecutor(dispatcher.createPriorityExecutor(dispatcher.getDispatchPriorities()
- 1));
 
             rate.name("Consumer " + name + " Rate");
             totalConsumerRate.add(rate);
@@ -446,43 +433,122 @@
         }
 
         private void subscribe(IQueue<Long, MessageDelivery> source) {
-            Subscription<MessageDelivery> subscription = subscriptions.get(sourceQueue);
-
-            subscriptions.get(sourceQueue);
-            if (subscription == null) {
-                subscription = new Queue.QueueSubscription(this);
-                subscriptions.put(sourceQueue, subscription);
-            }
-            source.addSubscription(subscription);
+            source.addSubscription(this);
         }
 
         public void stop() throws InterruptedException {
-            sourceQueue.removeSubscription(subscriptions.get(sourceQueue));
+            sourceQueue.removeSubscription(this);
             stopped.set(true);
         }
 
-        public void deliver(MessageDelivery delivery, ISourceController<?> source)
{
-            queue.add(delivery, source);
+        public String toString() {
+            return name + " on " + sourceQueue.getResourceName();
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see org.apache.activemq.queue.Subscription#add(java.lang.Object,
+         * org.apache.activemq.flow.ISourceController,
+         * org.apache.activemq.queue.Subscription.SubscriptionDeliveryCallback)
+         */
+        public void add(MessageDelivery element, ISourceController<?> source, SubscriptionDeliveryCallback
callback) {
+            controller.add(element, source);
+            addInternal(element, source, callback);
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see org.apache.activemq.queue.Subscription#offer(java.lang.Object,
+         * org.apache.activemq.flow.ISourceController,
+         * org.apache.activemq.queue.Subscription.SubscriptionDeliveryCallback)
+         */
+        public boolean offer(MessageDelivery element, ISourceController<?> source,
SubscriptionDeliveryCallback callback) {
+            if (controller.offer(element, source)) {
+                addInternal(element, source, callback);
+            }
+            return false;
+        }
+
+        /**
+         * @param element
+         * @param source
+         * @param callback
+         */
+        private void addInternal(MessageDelivery element, ISourceController<?> source,
SubscriptionDeliveryCallback callback) {
+            rate.increment();
+            synchronized (this) {
+                controller.elementDispatched(element);
+            }
+            callback.acknowledge();
         }
 
+        /*
+         * (non-Javadoc)
+         * 
+         * @see org.apache.activemq.queue.Subscription#getSink()
+         */
         public IFlowSink<MessageDelivery> getSink() {
-            return queue;
+            return this;
         }
 
-        public boolean isDurable() {
+        /*
+         * (non-Javadoc)
+         * 
+         * @see org.apache.activemq.queue.Subscription#hasSelector()
+         */
+        public boolean hasSelector() {
             return false;
         }
 
-        public boolean hasSelector() {
+        /*
+         * (non-Javadoc)
+         * 
+         * @see org.apache.activemq.queue.Subscription#isBrowser()
+         */
+        public boolean isBrowser() {
             return false;
         }
 
-        public boolean match(MessageDelivery message) {
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.activemq.queue.Subscription#isRemoveOnDispatch(java.lang
+         * .Object)
+         */
+        public boolean isRemoveOnDispatch(MessageDelivery elem) {
+            return false;
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see org.apache.activemq.queue.Subscription#matches(java.lang.Object)
+         */
+        public boolean matches(MessageDelivery elem) {
             return true;
         }
 
-        public String toString() {
-            return name + " on " + sourceQueue.getResourceName();
+        /*
+         * (non-Javadoc)
+         * 
+         * @see org.apache.activemq.flow.IFlowSink#add(java.lang.Object,
+         * org.apache.activemq.flow.ISourceController)
+         */
+        public void add(MessageDelivery elem, ISourceController<?> source) {
+            add(elem, source, null);
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see org.apache.activemq.flow.IFlowSink#offer(java.lang.Object,
+         * org.apache.activemq.flow.ISourceController)
+         */
+        public boolean offer(MessageDelivery elem, ISourceController<?> source) {
+            return offer(elem, source, null);
         }
     }
 }

Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueueTest.java?rev=776934&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueueTest.java
(added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueueTest.java
Thu May 21 02:34:23 2009
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.io.File;
+import java.util.ArrayList;
+
+import org.apache.activemq.broker.store.BrokerDatabase;
+import org.apache.activemq.broker.store.Store;
+import org.apache.activemq.broker.store.StoreFactory;
+import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.dispatch.PriorityDispatcher;
+import org.apache.activemq.queue.IQueue;
+
+/**
+ * @author cmacnaug
+ * 
+ */
+public class SharedQueueTest {
+
+
+    IDispatcher dispatcher;
+    BrokerDatabase database;
+    BrokerQueueStore queueStore;
+    private static final boolean USE_KAHA_DB = true;
+    private static final boolean PERSISTENT = true;
+    private static final boolean PURGE_STORE = false;
+
+    protected ArrayList<IQueue<Long, MessageDelivery>> queues = new ArrayList<IQueue<Long,
MessageDelivery>>();
+
+    protected IDispatcher createDispatcher() {
+        return PriorityDispatcher.createPriorityDispatchPool("TestDispatcher", MessageBroker.MAX_PRIORITY,
Runtime.getRuntime().availableProcessors());
+    }
+
+    protected int consumerStartDelay = 0;
+
+    protected void startServices() throws Exception {
+        dispatcher = createDispatcher();
+        dispatcher.start();
+        database = new BrokerDatabase(createStore(), dispatcher);
+        database.start();
+        queueStore = new BrokerQueueStore();
+        queueStore.setDatabase(database);
+        queueStore.setDispatcher(dispatcher);
+        queueStore.loadQueues();
+    }
+
+    protected void stopServices() throws Exception {
+        dispatcher.shutdown();
+        database.stop();
+        dispatcher.shutdown();
+        queues.clear();
+    }
+
+    protected Store createStore() throws Exception {
+        Store store = null;
+        if (USE_KAHA_DB) {
+            store = StoreFactory.createStore("kaha-db");
+        } else {
+            store = StoreFactory.createStore("memory");
+        }
+
+        store.setStoreDirectory(new File("test-data/shared-queue-test/"));
+        store.setDeleteAllMessages(PURGE_STORE);
+        return store;
+    }
+    
+    private final void createQueues(int count) {
+        for (int i = 0; i < count; i++) {
+            IQueue<Long, MessageDelivery> queue = queueStore.createSharedQueue("queue-"
+ (i + 1));
+            queues.add(queue);
+        }
+    }
+
+    protected void cleanup() throws Exception {
+        queues.clear();
+        stopServices();
+    }
+    
+    public void testExpiration() {
+        createQueues(1);
+        IQueue<Long, MessageDelivery> queue = queues.get(0);
+    }
+
+}

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java?rev=776934&r1=776933&r2=776934&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java Thu
May 21 02:34:23 2009
@@ -123,9 +123,13 @@
                 return dt.hasSelector();
             }
 
-            public boolean offer(Message elem, ISourceController<Message> controller,
SubscriptionDeliveryCallback ackCallback) {
+            public boolean offer(Message elem, ISourceController<?> controller, SubscriptionDeliveryCallback
ackCallback) {
                 return getSink().offer(elem, controller);
             }
+            
+            public void add(Message elem, ISourceController<?> controller, SubscriptionDeliveryCallback
ackCallback) {
+                getSink().add(elem, controller);
+            }
         };
         subs.put(dt, sub);
         queue.addSubscription(sub);



Mime
View raw message