From commits-return-10539-apmail-activemq-commits-archive=activemq.apache.org@activemq.apache.org Mon Mar 30 16:20:59 2009 Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 55939 invoked from network); 30 Mar 2009 16:20:59 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 30 Mar 2009 16:20:59 -0000 Received: (qmail 17537 invoked by uid 500); 30 Mar 2009 16:20:58 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 17492 invoked by uid 500); 30 Mar 2009 16:20:58 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 17483 invoked by uid 99); 30 Mar 2009 16:20:58 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 30 Mar 2009 16:20:58 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 30 Mar 2009 16:20:57 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 2B5B02388A13; Mon, 30 Mar 2009 16:20:37 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r760028 [3/3] - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/openwire/ main/java/org/apache/activemq/broker/protocol/ main/java/org/apache/activemq/broker/stomp/ main/jav... Date: Mon, 30 Mar 2009 16:20:31 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090330162037.2B5B02388A13@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java?rev=760028&r1=760027&r2=760028&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java Mon Mar 30 16:20:28 2009 @@ -202,8 +202,8 @@ } } - public IFlowSource getFlowSource() { - return controllable.getFlowSource(); + public IFlowResource getFlowResource() { + return controllable.getFlowResource(); } /** @@ -387,7 +387,6 @@ synchronized (mutex) { resuming = true; } - String was = Thread.currentThread().getName(); try { for (ISourceController source : blockedSources) { // System.out.println("UNBLOCKING: SINK[" + FlowController.this + "], SOURCE[" + source + "]"); @@ -424,8 +423,8 @@ return name; } - public IFlowSink getFlowSink() { - return controllable.getFlowSink(); + public IFlowResource getFlowSink() { + return controllable.getFlowResource(); } public void setExecutor(Executor executor) { Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java?rev=760028&r1=760027&r2=760028&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java Mon Mar 30 16:20:28 2009 @@ -25,11 +25,19 @@ * @param */ public interface FlowControllable { - public void flowElemAccepted(ISourceController controller, E elem); - - public IFlowSink getFlowSink(); - - public IFlowSource getFlowSource(); + + /** + * Called by a flow controller when it accepts a element. + * @param source The source controller + * @param elem + */ + public void flowElemAccepted(ISourceController source, E elem); + + /** + * Gets the resource being flow controlled; + * @return The resource being flow controlled. + */ + public IFlowResource getFlowResource(); } /** @@ -93,10 +101,10 @@ public boolean addUnblockListener(FlowUnblockListener listener); /** - * Gets the {@link IFlowSink} that this controller is controlling. - * @return The {@link IFlowSink} that this controller is controlling. + * Gets the {@link IFlowResource} that this controller is controlling. + * @return The {@link IFlowResource} that this controller is controlling. */ - public IFlowSink getFlowSink(); + public IFlowResource getFlowResource(); /** * Sets the executor for this {@link ISinkController}. The executor is Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java?rev=760028&r1=760027&r2=760028&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java Mon Mar 30 16:20:28 2009 @@ -25,11 +25,10 @@ public interface ISourceController { /** - * Returns the source that this FlowController is controlling. - * - * @return The source that the flow controller is controlling. + * Gets the {@link IFlowResource} that this controller is controlling. + * @return The {@link IFlowResource} that this controller is controlling. */ - public IFlowSource getFlowSource(); + public IFlowResource getFlowResource(); /** * Gets the flow that this controller is controlling. Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java?rev=760028&r1=760027&r2=760028&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java Mon Mar 30 16:20:28 2009 @@ -87,8 +87,8 @@ return false; } - public IFlowSink getFlowSink() { - return null; + public IFlowResource getFlowResource() { + return source; } public void setExecutor(Executor executor) { Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java?rev=760028&r1=760027&r2=760028&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java Mon Mar 30 16:20:28 2009 @@ -85,8 +85,8 @@ return flow; } - public IFlowSource getFlowSource() { - return controllable.getFlowSource(); + public IFlowResource getFlowResource() { + return controllable.getFlowResource(); } public void onFlowBlock(ISinkController sink) { @@ -110,13 +110,4 @@ controller.setExecutor(executor); } } - - // ///////////////////////////////////////////////////////////////// - // Getters and Setters - // ///////////////////////////////////////////////////////////////// - - public IFlowSink getFlowSink() { - return controllable.getFlowSink(); - } - } Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java?rev=760028&r1=760027&r2=760028&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java Mon Mar 30 16:20:28 2009 @@ -23,8 +23,7 @@ import org.apache.activemq.dispatch.IDispatcher.DispatchContext; import org.apache.activemq.dispatch.IDispatcher.Dispatchable; import org.apache.activemq.flow.AbstractLimitedFlowSource; -import org.apache.activemq.flow.IFlowSink; -import org.apache.activemq.flow.IFlowSource; +import org.apache.activemq.flow.IFlowResource; import org.apache.activemq.flow.ISinkController; import org.apache.activemq.flow.ISourceController; import org.apache.activemq.flow.ISinkController.FlowControllable; @@ -45,6 +44,14 @@ protected boolean dispatching = false; protected int dispatchPriority = 0; protected QueueStoreHelper storeHelper; + protected FlowQueueListener listener = new FlowQueueListener() + { + public void onQueueException(IFlowQueue queue, Throwable thrown) { + System.out.println("Exception in queue: " + thrown.getMessage()); + thrown.printStackTrace(); + } + }; + AsciiBuffer persistentQueueName; AbstractFlowQueue() { @@ -55,26 +62,33 @@ super(name); } + public void setFlowQueueListener(FlowQueueListener listener) { + this.listener = listener; + } + public final void add(E elem, ISourceController source) { checkSave(elem, source); getSinkController(elem, source).add(elem, source); } public final boolean offer(E elem, ISourceController source) { - if(getSinkController(elem, source).offer(elem, source)) - { + if (getSinkController(elem, source).offer(elem, source)) { checkSave(elem, source); return true; } return false; } - - private void checkSave(E elem, ISourceController source) - { - if(storeHelper != null && isElementPersistent(elem)) - { - storeHelper.save(elem, true); - } + + private final void checkSave(E elem, ISourceController source) { + //TODO This is currently handled externally to the queue + //but it would be nice to move it in here + /*if (storeHelper != null && isElementPersistent(elem)) { + try { + storeHelper.save(elem, true); + } catch (IOException e) { + listener.onQueueException(this, e); + } + }*/ } protected abstract ISinkController getSinkController(E elem, ISourceController source); @@ -87,13 +101,7 @@ return !pollingDispatch(); } - public final IFlowSink getFlowSink() { - // TODO Auto-generated method stub - return this; - } - - public final IFlowSource getFlowSource() { - // TODO Auto-generated method stub + public final IFlowResource getFlowResource() { return this; } @@ -196,15 +204,8 @@ } /** - * Called when there are no more elements to be loaded from the store. - */ - public void onQueueLoadComplete() { - - } - - /** - * Subclasses should override this if they require persistence - * requires saving to the store. + * Subclasses should override this if they require persistence requires + * saving to the store. * * @param elem * The element to check. Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IFlowQueue.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IFlowQueue.java?rev=760028&r1=760027&r2=760028&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IFlowQueue.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IFlowQueue.java Mon Mar 30 16:20:28 2009 @@ -20,5 +20,19 @@ public interface IFlowQueue extends IBlockingFlowSource, IPollableFlowSource, IAsynchronousFlowSource, IFlowRelay { + public interface FlowQueueListener { + + /** + * Called when there is a queue error + * + * @param queue The queue triggering the exception + * @param thrown The exception. + */ + public void onQueueException(IFlowQueue queue, Throwable thrown); + } + + public void setFlowQueueListener(FlowQueueListener listener); + public void setDispatchPriority(int priority); + } Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java?rev=760028&r1=760027&r2=760028&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java Mon Mar 30 16:20:28 2009 @@ -18,7 +18,7 @@ import org.apache.activemq.flow.IFlowSink; -public interface IQueue extends IFlowSink { +public interface IQueue extends IFlowSink, PersistentQueue{ public void addSubscription(Subscription sub); @@ -27,5 +27,7 @@ public boolean removeByValue(V value); public boolean removeByKey(K key); + + public void setStore(Store store); } Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java?rev=760028&r1=760027&r2=760028&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java Mon Mar 30 16:20:28 2009 @@ -22,8 +22,8 @@ import org.apache.activemq.flow.Flow; import org.apache.activemq.flow.FlowController; import org.apache.activemq.flow.IFlowLimiter; +import org.apache.activemq.flow.IFlowResource; import org.apache.activemq.flow.IFlowSink; -import org.apache.activemq.flow.IFlowSource; import org.apache.activemq.flow.ISinkController; import org.apache.activemq.flow.ISourceController; import org.apache.kahadb.util.LinkedNode; @@ -46,7 +46,7 @@ return sinkController.getFlow(); } - public IFlowSource getFlowSource() { + public IFlowResource getFlowResource() { return LoadBalancedFlowQueue.this; } Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java?rev=760028&r1=760027&r2=760028&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java Mon Mar 30 16:20:28 2009 @@ -22,8 +22,7 @@ import org.apache.activemq.flow.Flow; import org.apache.activemq.flow.FlowController; import org.apache.activemq.flow.IFlowLimiter; -import org.apache.activemq.flow.IFlowSink; -import org.apache.activemq.flow.IFlowSource; +import org.apache.activemq.flow.IFlowResource; import org.apache.activemq.flow.ISinkController; import org.apache.activemq.flow.ISourceController; import org.apache.activemq.flow.SizeLimiter; @@ -138,11 +137,7 @@ controller.add(elem, source); } - public IFlowSource getFlowSource() { - return MultiFlowQueue.this; - } - - public IFlowSink getFlowSink() { + public IFlowResource getFlowResource() { return MultiFlowQueue.this; } Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java?rev=760028&r1=760027&r2=760028&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java Mon Mar 30 16:20:28 2009 @@ -22,12 +22,15 @@ import org.apache.activemq.flow.AbstractLimitedFlowResource; import org.apache.activemq.flow.ISourceController; +import org.apache.activemq.protobuf.AsciiBuffer; abstract public class PartitionedQueue extends AbstractLimitedFlowResource implements IQueue { private HashSet> subscriptions = new HashSet>(); private HashMap> partitions = new HashMap>(); private Mapper partitionMapper; + private Store store; + private AsciiBuffer queueName; public IQueue getPartition(P partitionKey) { synchronized (partitions) { @@ -43,6 +46,10 @@ } } + public void setStore(Store store) { + this.store = store; + } + abstract protected IQueue cratePartition(P partitionKey); public void addSubscription(Subscription sub) { @@ -105,4 +112,16 @@ IQueue partition = getPartition(partitionKey); return partition.offer(value, source); } + + public void addFromStore(V elem, ISourceController controller) { + throw new UnsupportedOperationException(); + + } + + public AsciiBuffer getPeristentQueueName() { + if (queueName == null) { + queueName = new AsciiBuffer(getResourceName()); + } + return queueName; + } } Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PersistentQueue.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PersistentQueue.java?rev=760028&r1=760027&r2=760028&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PersistentQueue.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PersistentQueue.java Mon Mar 30 16:20:28 2009 @@ -21,28 +21,6 @@ public interface PersistentQueue { - /** - * When the memory size of the queue exceeds this limit, elements are - * spooled to disk. - * - * @param extent - * The save extent. - * - * public PersistentQueue setSaveExent(long extent); - */ - - /** - * Gets the save extent associated with the queue. - * - * @return the save extent - * - * public long getSaveExent(long extent); - */ - - /** - * Enables persistence for this queue. - */ - public void enablePersistence(QueueStoreHelper storeHelper); /** * Called when an element is added from the queue's store. @@ -55,11 +33,6 @@ public void addFromStore(E elem, ISourceController controller); /** - * Called when there are no more elements to be loaded from the store. - */ - public void onQueueLoadComplete(); - - /** * Implementors implement this to indicate whether or not the given element * requires saving to the store. * @@ -73,5 +46,5 @@ * @return */ public AsciiBuffer getPeristentQueueName(); - + } Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStoreHelper.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStoreHelper.java?rev=760028&r1=760027&r2=760028&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStoreHelper.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStoreHelper.java Mon Mar 30 16:20:28 2009 @@ -16,6 +16,8 @@ */ package org.apache.activemq.queue; +import java.io.IOException; + public interface QueueStoreHelper { /** @@ -48,7 +50,8 @@ /** * Saves an element to the store. * @param elem The element to be saved. + * @throws IOException */ - public void save(E elem, boolean flush); + public void save(E elem, boolean flush) throws IOException; } Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java?rev=760028&r1=760027&r2=760028&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java Mon Mar 30 16:20:28 2009 @@ -23,6 +23,7 @@ import org.apache.activemq.flow.AbstractLimitedFlowResource; import org.apache.activemq.flow.ISourceController; import org.apache.activemq.flow.PrioritySizeLimiter; +import org.apache.activemq.protobuf.AsciiBuffer; public class SharedPriorityQueue extends AbstractLimitedFlowResource implements IQueue { @@ -33,6 +34,7 @@ private boolean autoRelease; private IDispatcher dispatcher; private final PrioritySizeLimiter limiter; + private Store store; public SharedPriorityQueue(String name, PrioritySizeLimiter limiter) { super(name); @@ -43,6 +45,10 @@ } } + public void setStore(Store store) { + this.store = store; + } + public void setResourceName(String resourceName) { super.setResourceName(resourceName); } @@ -98,12 +104,14 @@ queue.setDispatcher(dispatcher); queue.setDispatchPriority(prio); queue.setKeyMapper(keyMapper); + queue.setStore(store); partitions.set(prio, queue); onFlowOpened(queue.getFlowControler()); for (Subscription sub : subscriptions) { queue.addSubscription(sub); } + } return queue; } @@ -132,4 +140,18 @@ public void setDispatcher(IDispatcher dispatcher) { this.dispatcher = dispatcher; } + + public void addFromStore(V elem, ISourceController controller) { + // TODO Auto-generated method stub + + } + + public AsciiBuffer getPeristentQueueName() { + // TODO Auto-generated method stub + return new AsciiBuffer(this.getResourceName()); + } + + public boolean isElementPersistent(V elem) { + return false; + } } Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java?rev=760028&r1=760027&r2=760028&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java Mon Mar 30 16:20:28 2009 @@ -22,8 +22,8 @@ import org.apache.activemq.flow.Flow; import org.apache.activemq.flow.FlowController; import org.apache.activemq.flow.IFlowLimiter; +import org.apache.activemq.flow.IFlowResource; import org.apache.activemq.flow.IFlowSink; -import org.apache.activemq.flow.IFlowSource; import org.apache.activemq.flow.ISinkController; import org.apache.activemq.flow.ISourceController; import org.apache.activemq.queue.Store.StoreCursor; @@ -45,7 +45,7 @@ private final LinkedNodeList readyPollingSubs = new LinkedNodeList(); private final HashMap, SubscriptionNode> subscriptions = new HashMap, SubscriptionNode>(); - private final HashMap, SubscriptionNode> sinks = new HashMap, SubscriptionNode>(); + private final HashMap sinks = new HashMap(); private final FlowController sinkController; private final Object mutex; @@ -66,7 +66,7 @@ } public void onFlowResume(ISinkController sinkController) { - IFlowSink sink = (IFlowSink)sinkController.getFlowSink(); + IFlowResource sink = sinkController.getFlowResource(); synchronized (mutex) { SubscriptionNode node = sinks.get(sink); if (node != null) { @@ -74,13 +74,13 @@ boolean notify = false; if (node.cursor == null) { readyDirectSubs.addLast(node); - //System.out.println("Subscription state change: un-ready direct -> ready direct: "+node); + // System.out.println("Subscription state change: un-ready direct -> ready direct: "+node); } else { if (readyPollingSubs.isEmpty()) { notify = !store.isEmpty(); } readyPollingSubs.addLast(node); - //System.out.println("Subscription state change: un-ready polling -> ready polling: "+node); + // System.out.println("Subscription state change: un-ready polling -> ready polling: "+node); } if (notify) { @@ -99,7 +99,7 @@ throw new UnsupportedOperationException(); } - public IFlowSource getFlowSource() { + public IFlowResource getFlowResource() { return SharedQueue.this; } @@ -126,6 +126,10 @@ super.onFlowOpened(sinkController); } + public void setStore(Store store) { + this.store = store; + } + protected final ISinkController getSinkController(V elem, ISourceController source) { return sinkController; } @@ -163,7 +167,7 @@ sub.resumeAt(node); unreadyPollingSubs.addLast(sub); matchCount++; - //System.out.println("Subscription state change: un-ready direct -> un-ready polling: "+sub); + // System.out.println("Subscription state change: un-ready direct -> un-ready polling: "+sub); } sub = next; } @@ -175,7 +179,7 @@ subNode.unlink(); subNode.resumeAt(node); unreadyPollingSubs.addLast(subNode); - //System.out.println("Subscription state change: ready direct -> un-ready polling: "+subNode); + // System.out.println("Subscription state change: ready direct -> un-ready polling: "+subNode); } matchCount += matches.size(); @@ -287,7 +291,7 @@ } return true; } else { - //System.out.println("Subscription state change: ready polling -> un-ready polling: "+subNode); + // System.out.println("Subscription state change: ready polling -> un-ready polling: "+subNode); // Subscription is no longer ready.. subNode.cursorUnPeek(storeNode); subNode.unlink(); Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java?rev=760028&r1=760027&r2=760028&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java Mon Mar 30 16:20:28 2009 @@ -6,8 +6,7 @@ import org.apache.activemq.flow.IFlowController; import org.apache.activemq.flow.IFlowLimiter; import org.apache.activemq.flow.IFlowRelay; -import org.apache.activemq.flow.IFlowSink; -import org.apache.activemq.flow.IFlowSource; +import org.apache.activemq.flow.IFlowResource; import org.apache.activemq.flow.ISourceController; import org.apache.activemq.flow.ISinkController.FlowControllable; @@ -35,13 +34,7 @@ drain.drain(elem, controller); } - public IFlowSink getFlowSink() { - // TODO Auto-generated method stub - return this; - } - - public IFlowSource getFlowSource() { - // TODO Auto-generated method stub + public IFlowResource getFlowResource() { return this; } Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/HashList.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/HashList.java?rev=760028&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/HashList.java (added) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/HashList.java Mon Mar 30 16:20:28 2009 @@ -0,0 +1,142 @@ +package org.apache.activemq.util; + +import java.util.HashMap; +import java.util.Iterator; + +import org.apache.kahadb.util.LinkedNodeList; +import org.apache.kahadb.util.LinkedNode; + +/** + *

+ * Title: Sonic MQ v6.1 + *

+ *

+ * Description: Sonic MQ v6.1 + *

+ *

+ * Copyright: Copyright (c) 2004 + *

+ *

+ * Company: Sonic Software Corporation + *

+ * + * @author Colin MacNaughton + * @version 6.1 + */ + +public class HashList { + private HashMap m_index = null; + private LinkedNodeList m_list = null; + + public HashList() { + m_index = new HashMap(); + m_list = new LinkedNodeList(); + } + + /** + * Adds an object to the list if it is not already present. + * + * @param o + * True if the object was added. + */ + public final boolean add(E o) { + HashListNode n = (HashListNode) m_index.get(o); + if (n == null) { + n = new HashListNode(o); + m_index.put(o, n); + return true; + } else { + return false; + } + } + + public final boolean remove(E o) { + HashListNode n = m_index.remove(o); + if (n != null) { + n.unlink(); + return true; + } + return false; + } + + public final Object get(E o) { + HashListNode n = m_index.get(o); + if (n == null) { + return null; + } else + return n.elem; + } + + public final int size() { + return m_index.size(); + } + + public final boolean contains(Object o) { + return m_index.containsKey(o); + } + + /** + * @return Returns a head to tail iterator of the underlying list. + */ + public final Iterator iterator() { + return new Iterator() { + HashListNode next = m_list.getHead(); + + public void remove() { + HashListNode newNext = next.getNext(); + m_index.remove(next.elem); + next.unlink(); + next = newNext; + } + + public boolean hasNext() { + return next != null; + } + + public E next() { + try { + return next.elem; + } finally { + next = next.getNext(); + } + } + }; + } + + private class HashListNode extends LinkedNode { + private final E elem; + + HashListNode(E elem) { + this.elem = elem; + } + + public int hashCode() { + return elem.hashCode(); + } + + public boolean equals(Object o) { + if (o == this) { + return true; + } + + if (o == null || o.hashCode() != hashCode()) { + return false; + } else { + HashListNode node = null; + if(getClass().isInstance(o)) + { + node = getClass().cast(o); + } + if (node == null) { + return false; + } + return equals(node); + } + } + + public boolean equals(HashList.HashListNode node) { + return node.elem.equals(elem); + } + } + +} \ No newline at end of file Modified: activemq/sandbox/activemq-flow/src/main/proto/kahadb-data.proto URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/proto/kahadb-data.proto?rev=760028&r1=760027&r2=760028&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/proto/kahadb-data.proto (original) +++ activemq/sandbox/activemq-flow/src/main/proto/kahadb-data.proto Mon Mar 30 16:20:28 2009 @@ -77,7 +77,7 @@ } message QueueRemoveMessage { optional bytes queueName = 1 [java_override_type = "AsciiBuffer"]; - optional int64 queueKey=2; + optional int64 messageKey=2; } /////////////////////////////////////////////////////////////// Added: activemq/sandbox/activemq-flow/src/main/resources/META-INF/services/org/apache/activemq/broker/store/kaha-db URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/resources/META-INF/services/org/apache/activemq/broker/store/kaha-db?rev=760028&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/resources/META-INF/services/org/apache/activemq/broker/store/kaha-db (added) +++ activemq/sandbox/activemq-flow/src/main/resources/META-INF/services/org/apache/activemq/broker/store/kaha-db Mon Mar 30 16:20:28 2009 @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.activemq.broker.store.kahadb.KahaDBStore \ No newline at end of file Added: activemq/sandbox/activemq-flow/src/main/resources/META-INF/services/org/apache/activemq/broker/store/memory URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/resources/META-INF/services/org/apache/activemq/broker/store/memory?rev=760028&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/main/resources/META-INF/services/org/apache/activemq/broker/store/memory (added) +++ activemq/sandbox/activemq-flow/src/main/resources/META-INF/services/org/apache/activemq/broker/store/memory Mon Mar 30 16:20:28 2009 @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.activemq.broker.store.memory.MemoryStore \ No newline at end of file Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java?rev=760028&r1=760027&r2=760028&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java (original) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java Mon Mar 30 16:20:28 2009 @@ -17,6 +17,7 @@ package org.apache.activemq.broker; import java.beans.ExceptionListener; +import java.io.File; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; @@ -31,6 +32,8 @@ import org.apache.activemq.broker.MessageDelivery; import org.apache.activemq.broker.Queue; import org.apache.activemq.broker.Router; +import org.apache.activemq.broker.store.Store; +import org.apache.activemq.broker.store.StoreFactory; import org.apache.activemq.dispatch.IDispatcher; import org.apache.activemq.dispatch.PriorityDispatcher; import org.apache.activemq.metric.MetricAggregator; @@ -49,6 +52,10 @@ protected static final int PRIORITY_LEVELS = 10; protected static final boolean USE_INPUT_QUEUES = true; + protected final boolean USE_KAHA_DB = true; + protected final boolean PERSISTENT = true; + protected final boolean DURABLE = true; + // Set to put senders and consumers on separate brokers. protected boolean multibroker = false; @@ -82,7 +89,7 @@ protected IDispatcher dispatcher; protected final AtomicLong msgIdGenerator = new AtomicLong(); protected final AtomicBoolean stopping = new AtomicBoolean(); - + final ArrayList producers = new ArrayList(); final ArrayList consumers = new ArrayList(); @@ -124,7 +131,7 @@ protected IDispatcher createDispatcher() { return PriorityDispatcher.createPriorityDispatchPool("BrokerDispatcher", MessageBroker.MAX_PRIORITY, asyncThreadPoolSize); } - + public void test_1_1_0() throws Exception { producerCount = 1; destCount = 1; @@ -236,7 +243,6 @@ } } - /** * Tests 2 producers sending to 1 destination with 2 consumres, but with * consumers set to select only messages from each producer. 1 consumers is @@ -374,7 +380,7 @@ } } - private void createConnections() throws IOException, URISyntaxException { + private void createConnections() throws Exception, IOException, URISyntaxException { if (multibroker) { sendBroker = createBroker("SendBroker", sendBrokerBindURI, sendBrokerConnectURI); @@ -406,12 +412,14 @@ for (int i = 0; i < producerCount; i++) { Destination destination = dests[i % destCount]; RemoteProducer producer = createProducer(i, destination); + producer.setPersistentDelivery(PERSISTENT); producers.add(producer); } for (int i = 0; i < consumerCount; i++) { Destination destination = dests[i % destCount]; RemoteConsumer consumer = createConsumer(i, destination); + consumer.setDurable(DURABLE); consumers.add(consumer); } @@ -425,9 +433,9 @@ private RemoteConsumer createConsumer(int i, Destination destination) throws URISyntaxException { RemoteConsumer consumer = createConsumer(); - consumer.setExceptionListener(new ExceptionListener(){ + consumer.setExceptionListener(new ExceptionListener() { public void exceptionThrown(Exception error) { - if( !stopping.get() ) { + if (!stopping.get()) { System.err.println("Consumer Async Error:"); error.printStackTrace(); } @@ -445,9 +453,9 @@ private RemoteProducer createProducer(int id, Destination destination) throws URISyntaxException { RemoteProducer producer = cerateProducer(); - producer.setExceptionListener(new ExceptionListener(){ + producer.setExceptionListener(new ExceptionListener() { public void exceptionThrown(Exception error) { - if( !stopping.get() ) { + if (!stopping.get()) { System.err.println("Producer Async Error:"); error.printStackTrace(); } @@ -476,15 +484,29 @@ return queue; } - private MessageBroker createBroker(String name, String bindURI, String connectUri) { + private MessageBroker createBroker(String name, String bindURI, String connectUri) throws Exception { MessageBroker broker = new MessageBroker(); broker.setName(name); broker.setBindUri(bindURI); broker.setConnectUri(connectUri); broker.setDispatcher(dispatcher); + broker.setStore(createStore(broker)); return broker; } + protected Store createStore(MessageBroker broker) throws Exception { + Store store = null; + if (USE_KAHA_DB) { + store = StoreFactory.createStore("kaha-db"); + } else { + store = StoreFactory.createStore("memory"); + } + + store.setStoreDirectory(new File("target/test-data/broker-test/" + broker.getName())); + store.setDeleteAllMessages(true); + return store; + } + private void stopServices() throws Exception { stopping.set(true); for (MessageBroker broker : brokers) { Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteConsumer.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteConsumer.java?rev=760028&r1=760027&r2=760028&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteConsumer.java (original) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteConsumer.java Mon Mar 30 16:20:28 2009 @@ -20,6 +20,7 @@ protected long thinkTime; protected Destination destination; protected String selector; + protected boolean durable; protected URI uri; private boolean schedualWait; @@ -82,6 +83,14 @@ public void setTotalConsumerRate(MetricAggregator totalConsumerRate) { this.totalConsumerRate = totalConsumerRate; } + + public boolean isDurable() { + return durable; + } + + public void setDurable(boolean durable) { + this.durable = durable; + } public Destination getDestination() { return destination; Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteProducer.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteProducer.java?rev=760028&r1=760027&r2=760028&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteProducer.java (original) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteProducer.java Mon Mar 30 16:20:28 2009 @@ -22,6 +22,7 @@ protected AtomicLong messageIdGenerator; protected int priority; + protected boolean persistentDelivery; protected int priorityMod; protected int counter; protected int producerId; @@ -118,6 +119,14 @@ return name+":"+(++counter); } } + + public boolean isPersistentDelivery() { + return persistentDelivery; + } + + public void setPersistentDelivery(boolean persistentDelivery) { + this.persistentDelivery = persistentDelivery; + } public void setName(String name) { this.name = name; Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java?rev=760028&r1=760027&r2=760028&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java (original) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java Mon Mar 30 16:20:28 2009 @@ -24,12 +24,9 @@ import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.flow.Flow; import org.apache.activemq.flow.FlowController; -import org.apache.activemq.flow.IFlowSink; -import org.apache.activemq.flow.IFlowSource; +import org.apache.activemq.flow.IFlowResource; import org.apache.activemq.flow.ISourceController; import org.apache.activemq.flow.ISinkController.FlowControllable; -import org.apache.activemq.transport.InactivityMonitor; -import org.apache.activemq.transport.tcp.TcpTransport; public class OpenwireRemoteConsumer extends RemoteConsumer { @@ -60,10 +57,7 @@ public String toString() { return flow.getFlowName(); } - public IFlowSink getFlowSink() { - return null; - } - public IFlowSource getFlowSource() { + public IFlowResource getFlowResource() { return null; } }, flow, limiter, inboundMutex); @@ -81,7 +75,7 @@ transport.oneway(connectionInfo); sessionInfo = createSessionInfo(connectionInfo); transport.oneway(sessionInfo); - consumerInfo = createConsumerInfo(sessionInfo, activemqDestination); + consumerInfo = createConsumerInfo(sessionInfo, activemqDestination, isDurable() ? name : null); consumerInfo.setPrefetchSize(inputWindowSize); transport.oneway(consumerInfo); } Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java?rev=760028&r1=760027&r2=760028&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java (original) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java Mon Mar 30 16:20:28 2009 @@ -38,28 +38,28 @@ private WindowLimiter outboundLimiter; protected void setupProducer() throws Exception, IOException { - if( destination.getDomain().equals( Router.QUEUE_DOMAIN ) ) { + if (destination.getDomain().equals(Router.QUEUE_DOMAIN)) { activemqDestination = new ActiveMQQueue(destination.getName().toString()); } else { activemqDestination = new ActiveMQTopic(destination.getName().toString()); } - + connectionInfo = createConnectionInfo(name); transport.oneway(connectionInfo); sessionInfo = createSessionInfo(connectionInfo); transport.oneway(sessionInfo); producerInfo = createProducerInfo(sessionInfo); producerInfo.setWindowSize(outputWindowSize); - transport.oneway(producerInfo); + transport.oneway(producerInfo); } - + protected void initialize() { - Flow flow = new Flow("client-"+name+"-outbound", false); - outputResumeThreshold = outputWindowSize/2; + Flow flow = new Flow("client-" + name + "-outbound", false); + outputResumeThreshold = outputWindowSize / 2; outboundLimiter = new WindowLimiter(true, flow, outputWindowSize, outputResumeThreshold); SingleFlowRelay outboundQueue = new SingleFlowRelay(flow, flow.getFlowName(), outboundLimiter); this.outboundQueue = outboundQueue; - + outboundController = outboundQueue.getFlowController(flow); outboundQueue.setDrain(new IFlowDrain() { public void drain(MessageDelivery message, ISourceController controller) { @@ -68,12 +68,12 @@ } }); } - + public void onCommand(Object command) { try { if (command.getClass() == WireFormatInfo.class) { } else if (command.getClass() == BrokerInfo.class) { - System.out.println("Producer "+name+" connected to "+((BrokerInfo)command).getBrokerName()); + System.out.println("Producer " + name + " connected to " + ((BrokerInfo) command).getBrokerName()); } else if (command.getClass() == ProducerAck.class) { ProducerAck fc = (ProducerAck) command; synchronized (outboundQueue) { @@ -86,7 +86,7 @@ onException(e); } } - + protected void createNextMessage() { int priority = this.priority; if (priorityMod > 0) { @@ -94,6 +94,9 @@ } ActiveMQTextMessage msg = createMessage(producerInfo, activemqDestination, priority, createPayload()); + if (persistentDelivery) { + msg.setPersistent(true); + } if (property != null) { try { msg.setStringProperty(property, property); @@ -104,4 +107,3 @@ next = new OpenWireMessageDelivery(msg); } } - Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireSupport.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireSupport.java?rev=760028&r1=760027&r2=760028&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireSupport.java (original) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireSupport.java Mon Mar 30 16:20:28 2009 @@ -19,12 +19,13 @@ static private long idGenerator; static private long msgIdGenerator; - public static ConsumerInfo createConsumerInfo(SessionInfo sessionInfo, ActiveMQDestination destination) throws Exception { + public static ConsumerInfo createConsumerInfo(SessionInfo sessionInfo, ActiveMQDestination destination, String subscriptionName) throws Exception { ConsumerInfo info = new ConsumerInfo(sessionInfo, ++idGenerator); info.setBrowser(false); info.setDestination(destination); info.setPrefetchSize(1000); info.setDispatchAsync(false); + info.setSubscriptionName(subscriptionName); return info; } Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteConsumer.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteConsumer.java?rev=760028&r1=760027&r2=760028&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteConsumer.java (original) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteConsumer.java Mon Mar 30 16:20:28 2009 @@ -9,8 +9,7 @@ import org.apache.activemq.broker.stomp.StompMessageDelivery; import org.apache.activemq.flow.Flow; import org.apache.activemq.flow.FlowController; -import org.apache.activemq.flow.IFlowSink; -import org.apache.activemq.flow.IFlowSource; +import org.apache.activemq.flow.IFlowResource; import org.apache.activemq.flow.ISourceController; import org.apache.activemq.flow.SizeLimiter; import org.apache.activemq.flow.ISinkController.FlowControllable; @@ -58,10 +57,7 @@ public String toString() { return flow.getFlowName(); } - public IFlowSink getFlowSink() { - return null; - } - public IFlowSource getFlowSource() { + public IFlowResource getFlowResource() { return null; } }, flow, limiter, inboundMutex); Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java?rev=760028&r1=760027&r2=760028&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java (original) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java Mon Mar 30 16:20:28 2009 @@ -107,6 +107,7 @@ for( long i=0; !stopped.get(); i++ ) { final MessageRecord messageRecord = new MessageRecord(); + messageRecord.setKey(store.allocateStoreTracking()); messageRecord.setMessageId(new AsciiBuffer(""+i)); messageRecord.setEncoding(new AsciiBuffer("encoding")); messageRecord.setBuffer(buffer); @@ -188,7 +189,7 @@ for (Iterator iterator = queueRecords; iterator.hasNext();) { QueueRecord r = iterator.next(); records.add(session.messageGetRecord(r.getMessageKey())); - session.queueRemoveMessage(queueName, r.queueKey); + session.queueRemoveMessage(queueName, r.messageKey); } } }, onFlush); Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java?rev=760028&r1=760027&r2=760028&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java (original) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java Mon Mar 30 16:20:28 2009 @@ -31,9 +31,11 @@ public abstract class StoreTestBase extends TestCase { private Store store; - + abstract protected Store createStore(); + abstract protected boolean isStoreTransactional(); + abstract protected boolean isStorePersistent(); @Override @@ -69,7 +71,7 @@ } }, null); } - + public void testQueueAdd() throws Exception { final AsciiBuffer expected = new AsciiBuffer("test"); store.execute(new VoidCallback() { @@ -78,7 +80,7 @@ session.queueAdd(expected); } }, null); - + store.execute(new VoidCallback() { @Override public void run(Session session) throws Exception { @@ -89,7 +91,7 @@ } }, null); } - + public void testStoreExecuteExceptionPassthrough() throws Exception { try { store.execute(new VoidCallback() { @@ -102,10 +104,11 @@ fail("Expected IOException"); } catch (IOException e) { } - - // If the store implementation is transactional, then the work done should + + // If the store implementation is transactional, then the work done + // should // have been rolled back. - if( isStoreTransactional() ) { + if (isStoreTransactional()) { store.execute(new VoidCallback() { @Override public void run(Session session) throws Exception { @@ -116,10 +119,7 @@ } } - - - - + static void assertEquals(MessageRecord expected, MessageRecord actual) { assertEquals(expected.getBuffer(), actual.getBuffer()); assertEquals(expected.getEncoding(), actual.getEncoding()); Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStorePerformance.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStorePerformance.java?rev=760028&r1=760027&r2=760028&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStorePerformance.java (original) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStorePerformance.java Mon Mar 30 16:20:28 2009 @@ -26,7 +26,7 @@ @Override protected Store createStore() { KahaDBStore rc = new KahaDBStore(); - rc.setDirectory(new File("target/test-data/kahadb-store-performance")); + rc.setStoreDirectory(new File("target/test-data/kahadb-store-performance")); rc.setDeleteAllMessages(true); return rc; } Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStoreTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStoreTest.java?rev=760028&r1=760027&r2=760028&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStoreTest.java (original) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStoreTest.java Mon Mar 30 16:20:28 2009 @@ -26,7 +26,7 @@ @Override protected Store createStore() { KahaDBStore rc = new KahaDBStore(); - rc.setDirectory(new File("target/test-data/kahadb-store-test")); + rc.setStoreDirectory(new File("target/test-data/kahadb-store-test")); rc.setDeleteAllMessages(true); return rc; } Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java?rev=760028&r1=760027&r2=760028&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java (original) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java Mon Mar 30 16:20:28 2009 @@ -20,18 +20,22 @@ private Destination destination; private IQueue queue; private MockBroker broker; - + private Mapper partitionMapper; private Mapper keyExtractor; private IQueue createQueue() { - if (partitionMapper!=null) { + if (partitionMapper != null) { PartitionedQueue queue = new PartitionedQueue() { @Override protected IQueue cratePartition(Integer partitionKey) { return createSharedFlowQueue(); } + + public boolean isElementPersistent(Message message) { + return false; + } }; queue.setPartitionMapper(partitionMapper); queue.setResourceName(destination.getName().toString()); @@ -63,7 +67,7 @@ public final void deliver(ISourceController source, Message msg) { queue.add(msg, source); } - + public final Destination getDestination() { return destination; } Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java?rev=760028&r1=760027&r2=760028&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java (original) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java Mon Mar 30 16:20:28 2009 @@ -9,6 +9,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.dispatch.IDispatcher; @@ -29,7 +30,6 @@ import org.apache.activemq.wireformat.WireFormat; import org.apache.activemq.wireformat.WireFormatFactory; -import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger; public class PipeTransportFactory extends TransportFactory {