activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r760028 [3/3] - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/openwire/ main/java/org/apache/activemq/broker/protocol/ main/java/org/apache/activemq/broker/stomp/ main/jav...
Date Mon, 30 Mar 2009 16:20:31 GMT
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java Mon Mar 30 16:20:28 2009
@@ -202,8 +202,8 @@
         }
     }
 
-    public IFlowSource<E> getFlowSource() {
-        return controllable.getFlowSource();
+    public IFlowResource getFlowResource() {
+        return controllable.getFlowResource();
     }
 
     /**
@@ -387,7 +387,6 @@
                     synchronized (mutex) {
                         resuming = true;
                     }
-                    String was = Thread.currentThread().getName();
                     try {
                         for (ISourceController<?> source : blockedSources) {
 //                            System.out.println("UNBLOCKING: SINK[" + FlowController.this + "], SOURCE[" + source + "]");
@@ -424,8 +423,8 @@
         return name;
     }
 
-    public IFlowSink<E> getFlowSink() {
-        return controllable.getFlowSink();
+    public IFlowResource getFlowSink() {
+        return controllable.getFlowResource();
     }
 
     public void setExecutor(Executor executor) {

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java Mon Mar 30 16:20:28 2009
@@ -25,11 +25,19 @@
      * @param <E>
      */
     public interface FlowControllable<E> {
-        public void flowElemAccepted(ISourceController<E> controller, E elem);
-
-        public IFlowSink<E> getFlowSink();
-
-        public IFlowSource<E> getFlowSource();
+        
+        /**
+         * Called by a flow controller when it accepts a element. 
+         * @param source The source controller
+         * @param elem
+         */
+        public void flowElemAccepted(ISourceController<E> source, E elem);
+
+        /**
+         * Gets the resource being flow controlled;
+         * @return The resource being flow controlled.
+         */
+        public IFlowResource getFlowResource();
     }
 
     /**
@@ -93,10 +101,10 @@
     public boolean addUnblockListener(FlowUnblockListener<E> listener);
 
     /**
-     * Gets the {@link IFlowSink} that this controller is controlling. 
-     * @return The {@link IFlowSink} that this controller is controlling.
+     * Gets the {@link IFlowResource} that this controller is controlling. 
+     * @return The {@link IFlowResource} that this controller is controlling.
      */
-    public IFlowSink<E> getFlowSink();
+    public IFlowResource getFlowResource();
     
     /**
      * Sets the executor for this {@link ISinkController}. The executor is

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java Mon Mar 30 16:20:28 2009
@@ -25,11 +25,10 @@
 public interface ISourceController<E> {
 
     /**
-     * Returns the source that this FlowController is controlling.
-     * 
-     * @return The source that the flow controller is controlling.
+     * Gets the {@link IFlowResource} that this controller is controlling. 
+     * @return The {@link IFlowResource} that this controller is controlling.
      */
-    public IFlowSource<E> getFlowSource();
+    public IFlowResource getFlowResource();
 
     /**
      * Gets the flow that this controller is controlling.

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java Mon Mar 30 16:20:28 2009
@@ -87,8 +87,8 @@
         return false;
     }
 
-    public IFlowSink<E> getFlowSink() {
-        return null;
+    public IFlowResource getFlowResource() {
+        return source;
     }
 
     public void setExecutor(Executor executor) {

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java Mon Mar 30 16:20:28 2009
@@ -85,8 +85,8 @@
         return flow;
     }
 
-    public IFlowSource<E> getFlowSource() {
-        return controllable.getFlowSource();
+    public IFlowResource getFlowResource() {
+        return controllable.getFlowResource();
     }
 
     public void onFlowBlock(ISinkController<?> sink) {
@@ -110,13 +110,4 @@
             controller.setExecutor(executor);
         }
     }
-
-    // /////////////////////////////////////////////////////////////////
-    // Getters and Setters
-    // /////////////////////////////////////////////////////////////////
-
-    public IFlowSink<E> getFlowSink() {
-        return controllable.getFlowSink();
-    }
-
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java Mon Mar 30 16:20:28 2009
@@ -23,8 +23,7 @@
 import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
 import org.apache.activemq.dispatch.IDispatcher.Dispatchable;
 import org.apache.activemq.flow.AbstractLimitedFlowSource;
-import org.apache.activemq.flow.IFlowSink;
-import org.apache.activemq.flow.IFlowSource;
+import org.apache.activemq.flow.IFlowResource;
 import org.apache.activemq.flow.ISinkController;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
@@ -45,6 +44,14 @@
     protected boolean dispatching = false;
     protected int dispatchPriority = 0;
     protected QueueStoreHelper<E> storeHelper;
+    protected FlowQueueListener listener = new FlowQueueListener()
+    {
+        public void onQueueException(IFlowQueue<?> queue, Throwable thrown) {
+            System.out.println("Exception in queue: " + thrown.getMessage());
+            thrown.printStackTrace();
+        }
+    };
+    
     AsciiBuffer persistentQueueName;
 
     AbstractFlowQueue() {
@@ -55,26 +62,33 @@
         super(name);
     }
 
+    public void setFlowQueueListener(FlowQueueListener listener) {
+        this.listener = listener;
+    }
+
     public final void add(E elem, ISourceController<?> source) {
         checkSave(elem, source);
         getSinkController(elem, source).add(elem, source);
     }
 
     public final boolean offer(E elem, ISourceController<?> source) {
-        if(getSinkController(elem, source).offer(elem, source))
-        {
+        if (getSinkController(elem, source).offer(elem, source)) {
             checkSave(elem, source);
             return true;
         }
         return false;
     }
-    
-    private void checkSave(E elem, ISourceController<?> source) 
-    {
-        if(storeHelper != null && isElementPersistent(elem))
-        {
-            storeHelper.save(elem, true);
-        }
+
+    private final void checkSave(E elem, ISourceController<?> source) {
+        //TODO This is currently handled externally to the queue
+        //but it would be nice to move it in here
+        /*if (storeHelper != null && isElementPersistent(elem)) {
+            try {
+                storeHelper.save(elem, true);
+            } catch (IOException e) {
+                listener.onQueueException(this, e);
+            }
+        }*/
     }
 
     protected abstract ISinkController<E> getSinkController(E elem, ISourceController<?> source);
@@ -87,13 +101,7 @@
         return !pollingDispatch();
     }
 
-    public final IFlowSink<E> getFlowSink() {
-        // TODO Auto-generated method stub
-        return this;
-    }
-
-    public final IFlowSource<E> getFlowSource() {
-        // TODO Auto-generated method stub
+    public final IFlowResource getFlowResource() {
         return this;
     }
 
@@ -196,15 +204,8 @@
     }
 
     /**
-     * Called when there are no more elements to be loaded from the store.
-     */
-    public void onQueueLoadComplete() {
-
-    }
-
-    /**
-     * Subclasses should override this if they require persistence 
-     * requires saving to the store.
+     * Subclasses should override this if they require persistence requires
+     * saving to the store.
      * 
      * @param elem
      *            The element to check.

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IFlowQueue.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IFlowQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IFlowQueue.java Mon Mar 30 16:20:28 2009
@@ -20,5 +20,19 @@
 
 public interface IFlowQueue<E> extends IBlockingFlowSource<E>, IPollableFlowSource<E>, IAsynchronousFlowSource<E>, IFlowRelay<E> {
 
+    public interface FlowQueueListener {
+        
+        /**
+         * Called when there is a queue error
+         * 
+         * @param queue The queue triggering the exception
+         * @param thrown The exception. 
+         */
+        public void onQueueException(IFlowQueue<?> queue, Throwable thrown);
+    }
+
+    public void setFlowQueueListener(FlowQueueListener listener);
+    
     public void setDispatchPriority(int priority);
+
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java Mon Mar 30 16:20:28 2009
@@ -18,7 +18,7 @@
 
 import org.apache.activemq.flow.IFlowSink;
 
-public interface IQueue<K, V> extends IFlowSink<V> {
+public interface IQueue<K, V> extends IFlowSink<V>, PersistentQueue<V>{
 
     public void addSubscription(Subscription<V> sub);
 
@@ -27,5 +27,7 @@
     public boolean removeByValue(V value);
 
     public boolean removeByKey(K key);
+    
+    public void setStore(Store<K, V> store);
 
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java Mon Mar 30 16:20:28 2009
@@ -22,8 +22,8 @@
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;
 import org.apache.activemq.flow.IFlowLimiter;
+import org.apache.activemq.flow.IFlowResource;
 import org.apache.activemq.flow.IFlowSink;
-import org.apache.activemq.flow.IFlowSource;
 import org.apache.activemq.flow.ISinkController;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.kahadb.util.LinkedNode;
@@ -46,7 +46,7 @@
             return sinkController.getFlow();
         }
 
-        public IFlowSource<E> getFlowSource() {
+        public IFlowResource getFlowResource() {
             return LoadBalancedFlowQueue.this;
         }
 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java Mon Mar 30 16:20:28 2009
@@ -22,8 +22,7 @@
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;
 import org.apache.activemq.flow.IFlowLimiter;
-import org.apache.activemq.flow.IFlowSink;
-import org.apache.activemq.flow.IFlowSource;
+import org.apache.activemq.flow.IFlowResource;
 import org.apache.activemq.flow.ISinkController;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.SizeLimiter;
@@ -138,11 +137,7 @@
             controller.add(elem, source);
         }
 
-        public IFlowSource<E> getFlowSource() {
-            return MultiFlowQueue.this;
-        }
-
-        public IFlowSink<E> getFlowSink() {
+        public IFlowResource getFlowResource() {
             return MultiFlowQueue.this;
         }
 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java Mon Mar 30 16:20:28 2009
@@ -22,12 +22,15 @@
 
 import org.apache.activemq.flow.AbstractLimitedFlowResource;
 import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.protobuf.AsciiBuffer;
 
 abstract public class PartitionedQueue<P, K, V> extends AbstractLimitedFlowResource<V> implements IQueue<K, V> {
 
     private HashSet<Subscription<V>> subscriptions = new HashSet<Subscription<V>>();
     private HashMap<P, IQueue<K, V>> partitions = new HashMap<P, IQueue<K, V>>();
     private Mapper<P, V> partitionMapper;
+    private Store<K, V> store;
+    private AsciiBuffer queueName;
 
     public IQueue<K, V> getPartition(P partitionKey) {
         synchronized (partitions) {
@@ -43,6 +46,10 @@
         }
     }
 
+    public void setStore(Store<K, V> store) {
+        this.store = store;
+    }
+
     abstract protected IQueue<K, V> cratePartition(P partitionKey);
 
     public void addSubscription(Subscription<V> sub) {
@@ -105,4 +112,16 @@
         IQueue<K, V> partition = getPartition(partitionKey);
         return partition.offer(value, source);
     }
+
+    public void addFromStore(V elem, ISourceController<?> controller) {
+        throw new UnsupportedOperationException();
+
+    }
+
+    public AsciiBuffer getPeristentQueueName() {
+        if (queueName == null) {
+            queueName = new AsciiBuffer(getResourceName());
+        }
+        return queueName;
+    }
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PersistentQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PersistentQueue.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PersistentQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PersistentQueue.java Mon Mar 30 16:20:28 2009
@@ -21,28 +21,6 @@
 
 public interface PersistentQueue<E> {
 
-    /**
-     * When the memory size of the queue exceeds this limit, elements are
-     * spooled to disk.
-     * 
-     * @param extent
-     *            The save extent.
-     * 
-     *            public PersistentQueue<E> setSaveExent(long extent);
-     */
-
-    /**
-     * Gets the save extent associated with the queue.
-     * 
-     * @return the save extent
-     * 
-     *         public long getSaveExent(long extent);
-     */
-
-    /**
-     * Enables persistence for this queue.
-     */
-    public void enablePersistence(QueueStoreHelper<E> storeHelper);
 
     /**
      * Called when an element is added from the queue's store.
@@ -55,11 +33,6 @@
     public void addFromStore(E elem, ISourceController<?> controller);
 
     /**
-     * Called when there are no more elements to be loaded from the store.
-     */
-    public void onQueueLoadComplete();
-
-    /**
      * Implementors implement this to indicate whether or not the given element
      * requires saving to the store.
      * 
@@ -73,5 +46,5 @@
      * @return
      */
     public AsciiBuffer getPeristentQueueName();
-
+    
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStoreHelper.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStoreHelper.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStoreHelper.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStoreHelper.java Mon Mar 30 16:20:28 2009
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.queue;
 
+import java.io.IOException;
+
 public interface QueueStoreHelper<E> {
 
     /**
@@ -48,7 +50,8 @@
     /**
      * Saves an element to the store. 
      * @param elem The element to be saved. 
+     * @throws IOException 
      */
-    public void save(E elem, boolean flush);
+    public void save(E elem, boolean flush) throws IOException;
     
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java Mon Mar 30 16:20:28 2009
@@ -23,6 +23,7 @@
 import org.apache.activemq.flow.AbstractLimitedFlowResource;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.PrioritySizeLimiter;
+import org.apache.activemq.protobuf.AsciiBuffer;
 
 public class SharedPriorityQueue<K, V> extends AbstractLimitedFlowResource<V> implements IQueue<K, V> {
 
@@ -33,6 +34,7 @@
     private boolean autoRelease;
     private IDispatcher dispatcher;
     private final PrioritySizeLimiter<V> limiter;
+    private Store<K, V> store;
 
     public SharedPriorityQueue(String name, PrioritySizeLimiter<V> limiter) {
         super(name);
@@ -43,6 +45,10 @@
         }
     }
 
+    public void setStore(Store<K, V> store) {
+        this.store = store;
+    }
+
     public void setResourceName(String resourceName) {
         super.setResourceName(resourceName);
     }
@@ -98,12 +104,14 @@
                 queue.setDispatcher(dispatcher);
                 queue.setDispatchPriority(prio);
                 queue.setKeyMapper(keyMapper);
+                queue.setStore(store);
                 partitions.set(prio, queue);
                 onFlowOpened(queue.getFlowControler());
 
                 for (Subscription<V> sub : subscriptions) {
                     queue.addSubscription(sub);
                 }
+
             }
             return queue;
         }
@@ -132,4 +140,18 @@
     public void setDispatcher(IDispatcher dispatcher) {
         this.dispatcher = dispatcher;
     }
+
+    public void addFromStore(V elem, ISourceController<?> controller) {
+        // TODO Auto-generated method stub
+
+    }
+
+    public AsciiBuffer getPeristentQueueName() {
+        // TODO Auto-generated method stub
+        return new AsciiBuffer(this.getResourceName());
+    }
+
+    public boolean isElementPersistent(V elem) {
+        return false;
+    }
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java Mon Mar 30 16:20:28 2009
@@ -22,8 +22,8 @@
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;
 import org.apache.activemq.flow.IFlowLimiter;
+import org.apache.activemq.flow.IFlowResource;
 import org.apache.activemq.flow.IFlowSink;
-import org.apache.activemq.flow.IFlowSource;
 import org.apache.activemq.flow.ISinkController;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.queue.Store.StoreCursor;
@@ -45,7 +45,7 @@
     private final LinkedNodeList<SubscriptionNode> readyPollingSubs = new LinkedNodeList<SubscriptionNode>();
 
     private final HashMap<Subscription<V>, SubscriptionNode> subscriptions = new HashMap<Subscription<V>, SubscriptionNode>();
-    private final HashMap<IFlowSink<V>, SubscriptionNode> sinks = new HashMap<IFlowSink<V>, SubscriptionNode>();
+    private final HashMap<IFlowResource, SubscriptionNode> sinks = new HashMap<IFlowResource, SubscriptionNode>();
 
     private final FlowController<V> sinkController;
     private final Object mutex;
@@ -66,7 +66,7 @@
         }
 
         public void onFlowResume(ISinkController<?> sinkController) {
-            IFlowSink<V> sink = (IFlowSink<V>)sinkController.getFlowSink();
+            IFlowResource sink = sinkController.getFlowResource();
             synchronized (mutex) {
                 SubscriptionNode node = sinks.get(sink);
                 if (node != null) {
@@ -74,13 +74,13 @@
                     boolean notify = false;
                     if (node.cursor == null) {
                         readyDirectSubs.addLast(node);
-                        //System.out.println("Subscription state change: un-ready direct -> ready direct: "+node);
+                        // System.out.println("Subscription state change: un-ready direct -> ready direct: "+node);
                     } else {
                         if (readyPollingSubs.isEmpty()) {
                             notify = !store.isEmpty();
                         }
                         readyPollingSubs.addLast(node);
-                        //System.out.println("Subscription state change: un-ready polling -> ready polling: "+node);
+                        // System.out.println("Subscription state change: un-ready polling -> ready polling: "+node);
                     }
 
                     if (notify) {
@@ -99,7 +99,7 @@
             throw new UnsupportedOperationException();
         }
 
-        public IFlowSource<V> getFlowSource() {
+        public IFlowResource getFlowResource() {
             return SharedQueue.this;
         }
 
@@ -126,6 +126,10 @@
         super.onFlowOpened(sinkController);
     }
 
+    public void setStore(Store<K, V> store) {
+        this.store = store;
+    }
+
     protected final ISinkController<V> getSinkController(V elem, ISourceController<?> source) {
         return sinkController;
     }
@@ -163,7 +167,7 @@
                         sub.resumeAt(node);
                         unreadyPollingSubs.addLast(sub);
                         matchCount++;
-                        //System.out.println("Subscription state change: un-ready direct -> un-ready polling: "+sub);
+                        // System.out.println("Subscription state change: un-ready direct -> un-ready polling: "+sub);
                     }
                     sub = next;
                 }
@@ -175,7 +179,7 @@
                     subNode.unlink();
                     subNode.resumeAt(node);
                     unreadyPollingSubs.addLast(subNode);
-                    //System.out.println("Subscription state change: ready direct -> un-ready polling: "+subNode);
+                    // System.out.println("Subscription state change: ready direct -> un-ready polling: "+subNode);
                 }
                 matchCount += matches.size();
 
@@ -287,7 +291,7 @@
                     }
                     return true;
                 } else {
-                    //System.out.println("Subscription state change: ready polling -> un-ready polling: "+subNode);
+                    // System.out.println("Subscription state change: ready polling -> un-ready polling: "+subNode);
                     // Subscription is no longer ready..
                     subNode.cursorUnPeek(storeNode);
                     subNode.unlink();

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java Mon Mar 30 16:20:28 2009
@@ -6,8 +6,7 @@
 import org.apache.activemq.flow.IFlowController;
 import org.apache.activemq.flow.IFlowLimiter;
 import org.apache.activemq.flow.IFlowRelay;
-import org.apache.activemq.flow.IFlowSink;
-import org.apache.activemq.flow.IFlowSource;
+import org.apache.activemq.flow.IFlowResource;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
 
@@ -35,13 +34,7 @@
         drain.drain(elem, controller);
     }
 
-    public IFlowSink<E> getFlowSink() {
-        // TODO Auto-generated method stub
-        return this;
-    }
-
-    public IFlowSource<E> getFlowSource() {
-        // TODO Auto-generated method stub
+    public IFlowResource getFlowResource() {
         return this;
     }
 

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/HashList.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/HashList.java?rev=760028&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/HashList.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/HashList.java Mon Mar 30 16:20:28 2009
@@ -0,0 +1,142 @@
+package org.apache.activemq.util;
+
+import java.util.HashMap;
+import java.util.Iterator;
+
+import org.apache.kahadb.util.LinkedNodeList;
+import org.apache.kahadb.util.LinkedNode;
+
+/**
+ * <p>
+ * Title: Sonic MQ v6.1
+ * </p>
+ * <p>
+ * Description: Sonic MQ v6.1
+ * </p>
+ * <p>
+ * Copyright: Copyright (c) 2004
+ * </p>
+ * <p>
+ * Company: Sonic Software Corporation
+ * </p>
+ * 
+ * @author Colin MacNaughton
+ * @version 6.1
+ */
+
+public class HashList<E> {
+    private HashMap<E, HashListNode> m_index = null;
+    private LinkedNodeList<HashListNode> m_list = null;
+
+    public HashList() {
+        m_index = new HashMap<E, HashListNode>();
+        m_list = new LinkedNodeList<HashListNode>();
+    }
+
+    /**
+     * Adds an object to the list if it is not already present.
+     * 
+     * @param o
+     *            True if the object was added.
+     */
+    public final boolean add(E o) {
+        HashListNode n = (HashListNode) m_index.get(o);
+        if (n == null) {
+            n = new HashListNode(o);
+            m_index.put(o, n);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    public final boolean remove(E o) {
+        HashListNode n = m_index.remove(o);
+        if (n != null) {
+            n.unlink();
+            return true;
+        }
+        return false;
+    }
+
+    public final Object get(E o) {
+        HashListNode n = m_index.get(o);
+        if (n == null) {
+            return null;
+        } else
+            return n.elem;
+    }
+
+    public final int size() {
+        return m_index.size();
+    }
+
+    public final boolean contains(Object o) {
+        return m_index.containsKey(o);
+    }
+
+    /**
+     * @return Returns a head to tail iterator of the underlying list.
+     */
+    public final Iterator<E> iterator() {
+        return new Iterator<E>() {
+            HashListNode next = m_list.getHead();
+
+            public void remove() {
+                HashListNode newNext = next.getNext();
+                m_index.remove(next.elem);
+                next.unlink();
+                next = newNext;
+            }
+
+            public boolean hasNext() {
+                return next != null;
+            }
+
+            public E next() {
+                try {
+                    return next.elem;
+                } finally {
+                    next = next.getNext();
+                }
+            }
+        };
+    }
+
+    private class HashListNode extends LinkedNode<HashListNode> {
+        private final E elem;
+
+        HashListNode(E elem) {
+            this.elem = elem;
+        }
+
+        public int hashCode() {
+            return elem.hashCode();
+        }
+
+        public boolean equals(Object o) {
+            if (o == this) {
+                return true;
+            }
+            
+            if (o == null || o.hashCode() != hashCode()) {
+                return false;
+            } else {
+                HashListNode node = null;
+                if(getClass().isInstance(o))
+                {
+                    node = getClass().cast(o);
+                }
+                if (node == null) {
+                    return false;
+                }
+                return equals(node);
+            }
+        }
+
+        public boolean equals(HashList<E>.HashListNode node) {
+            return node.elem.equals(elem);
+        }
+    }
+
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/src/main/proto/kahadb-data.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/proto/kahadb-data.proto?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/proto/kahadb-data.proto (original)
+++ activemq/sandbox/activemq-flow/src/main/proto/kahadb-data.proto Mon Mar 30 16:20:28 2009
@@ -77,7 +77,7 @@
 }  
 message QueueRemoveMessage {
   optional bytes queueName = 1 [java_override_type = "AsciiBuffer"];
-  optional int64 queueKey=2;
+  optional int64 messageKey=2;
 }  
 
 ///////////////////////////////////////////////////////////////

Added: activemq/sandbox/activemq-flow/src/main/resources/META-INF/services/org/apache/activemq/broker/store/kaha-db
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/resources/META-INF/services/org/apache/activemq/broker/store/kaha-db?rev=760028&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/resources/META-INF/services/org/apache/activemq/broker/store/kaha-db (added)
+++ activemq/sandbox/activemq-flow/src/main/resources/META-INF/services/org/apache/activemq/broker/store/kaha-db Mon Mar 30 16:20:28 2009
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.broker.store.kahadb.KahaDBStore
\ No newline at end of file

Added: activemq/sandbox/activemq-flow/src/main/resources/META-INF/services/org/apache/activemq/broker/store/memory
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/resources/META-INF/services/org/apache/activemq/broker/store/memory?rev=760028&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/resources/META-INF/services/org/apache/activemq/broker/store/memory (added)
+++ activemq/sandbox/activemq-flow/src/main/resources/META-INF/services/org/apache/activemq/broker/store/memory Mon Mar 30 16:20:28 2009
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.broker.store.memory.MemoryStore
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java Mon Mar 30 16:20:28 2009
@@ -17,6 +17,7 @@
 package org.apache.activemq.broker;
 
 import java.beans.ExceptionListener;
+import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -31,6 +32,8 @@
 import org.apache.activemq.broker.MessageDelivery;
 import org.apache.activemq.broker.Queue;
 import org.apache.activemq.broker.Router;
+import org.apache.activemq.broker.store.Store;
+import org.apache.activemq.broker.store.StoreFactory;
 import org.apache.activemq.dispatch.IDispatcher;
 import org.apache.activemq.dispatch.PriorityDispatcher;
 import org.apache.activemq.metric.MetricAggregator;
@@ -49,6 +52,10 @@
     protected static final int PRIORITY_LEVELS = 10;
     protected static final boolean USE_INPUT_QUEUES = true;
 
+    protected final boolean USE_KAHA_DB = true;
+    protected final boolean PERSISTENT = true;
+    protected final boolean DURABLE = true;
+
     // Set to put senders and consumers on separate brokers.
     protected boolean multibroker = false;
 
@@ -82,7 +89,7 @@
     protected IDispatcher dispatcher;
     protected final AtomicLong msgIdGenerator = new AtomicLong();
     protected final AtomicBoolean stopping = new AtomicBoolean();
-    
+
     final ArrayList<RemoteProducer> producers = new ArrayList<RemoteProducer>();
     final ArrayList<RemoteConsumer> consumers = new ArrayList<RemoteConsumer>();
 
@@ -124,7 +131,7 @@
     protected IDispatcher createDispatcher() {
         return PriorityDispatcher.createPriorityDispatchPool("BrokerDispatcher", MessageBroker.MAX_PRIORITY, asyncThreadPoolSize);
     }
-    
+
     public void test_1_1_0() throws Exception {
         producerCount = 1;
         destCount = 1;
@@ -236,7 +243,6 @@
         }
     }
 
-
     /**
      * Tests 2 producers sending to 1 destination with 2 consumres, but with
      * consumers set to select only messages from each producer. 1 consumers is
@@ -374,7 +380,7 @@
         }
     }
 
-    private void createConnections() throws IOException, URISyntaxException {
+    private void createConnections() throws Exception, IOException, URISyntaxException {
 
         if (multibroker) {
             sendBroker = createBroker("SendBroker", sendBrokerBindURI, sendBrokerConnectURI);
@@ -406,12 +412,14 @@
         for (int i = 0; i < producerCount; i++) {
             Destination destination = dests[i % destCount];
             RemoteProducer producer = createProducer(i, destination);
+            producer.setPersistentDelivery(PERSISTENT);
             producers.add(producer);
         }
 
         for (int i = 0; i < consumerCount; i++) {
             Destination destination = dests[i % destCount];
             RemoteConsumer consumer = createConsumer(i, destination);
+            consumer.setDurable(DURABLE);
             consumers.add(consumer);
         }
 
@@ -425,9 +433,9 @@
 
     private RemoteConsumer createConsumer(int i, Destination destination) throws URISyntaxException {
         RemoteConsumer consumer = createConsumer();
-        consumer.setExceptionListener(new ExceptionListener(){
+        consumer.setExceptionListener(new ExceptionListener() {
             public void exceptionThrown(Exception error) {
-                if( !stopping.get() ) {
+                if (!stopping.get()) {
                     System.err.println("Consumer Async Error:");
                     error.printStackTrace();
                 }
@@ -445,9 +453,9 @@
 
     private RemoteProducer createProducer(int id, Destination destination) throws URISyntaxException {
         RemoteProducer producer = cerateProducer();
-        producer.setExceptionListener(new ExceptionListener(){
+        producer.setExceptionListener(new ExceptionListener() {
             public void exceptionThrown(Exception error) {
-                if( !stopping.get() ) {
+                if (!stopping.get()) {
                     System.err.println("Producer Async Error:");
                     error.printStackTrace();
                 }
@@ -476,15 +484,29 @@
         return queue;
     }
 
-    private MessageBroker createBroker(String name, String bindURI, String connectUri) {
+    private MessageBroker createBroker(String name, String bindURI, String connectUri) throws Exception {
         MessageBroker broker = new MessageBroker();
         broker.setName(name);
         broker.setBindUri(bindURI);
         broker.setConnectUri(connectUri);
         broker.setDispatcher(dispatcher);
+        broker.setStore(createStore(broker));
         return broker;
     }
 
+    protected Store createStore(MessageBroker broker) throws Exception {
+        Store store = null;
+        if (USE_KAHA_DB) {
+            store = StoreFactory.createStore("kaha-db");
+        } else {
+            store = StoreFactory.createStore("memory");
+        }
+
+        store.setStoreDirectory(new File("target/test-data/broker-test/" +  broker.getName()));
+        store.setDeleteAllMessages(true);
+        return store;
+    }
+
     private void stopServices() throws Exception {
         stopping.set(true);
         for (MessageBroker broker : brokers) {

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteConsumer.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteConsumer.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteConsumer.java Mon Mar 30 16:20:28 2009
@@ -20,6 +20,7 @@
     protected long thinkTime;
     protected Destination destination;
     protected String selector;
+    protected boolean durable;
     protected URI uri;
 
     private boolean schedualWait;
@@ -82,6 +83,14 @@
     public void setTotalConsumerRate(MetricAggregator totalConsumerRate) {
         this.totalConsumerRate = totalConsumerRate;
     }
+    
+    public boolean isDurable() {
+        return durable;
+    }
+
+    public void setDurable(boolean durable) {
+        this.durable = durable;
+    }
 
     public Destination getDestination() {
         return destination;

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteProducer.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteProducer.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteProducer.java Mon Mar 30 16:20:28 2009
@@ -22,6 +22,7 @@
 
     protected AtomicLong messageIdGenerator;
     protected int priority;
+    protected boolean persistentDelivery;
     protected int priorityMod;
     protected int counter;
     protected int producerId;
@@ -118,6 +119,14 @@
             return name+":"+(++counter);
         }
     }
+    
+    public boolean isPersistentDelivery() {
+        return persistentDelivery;
+    }
+
+    public void setPersistentDelivery(boolean persistentDelivery) {
+        this.persistentDelivery = persistentDelivery;
+    }
 	
 	public void setName(String name) {
         this.name = name;

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java Mon Mar 30 16:20:28 2009
@@ -24,12 +24,9 @@
 import org.apache.activemq.command.WireFormatInfo;
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;
-import org.apache.activemq.flow.IFlowSink;
-import org.apache.activemq.flow.IFlowSource;
+import org.apache.activemq.flow.IFlowResource;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
-import org.apache.activemq.transport.InactivityMonitor;
-import org.apache.activemq.transport.tcp.TcpTransport;
 
 public class OpenwireRemoteConsumer extends RemoteConsumer {
 
@@ -60,10 +57,7 @@
             public String toString() {
                 return flow.getFlowName();
             }
-            public IFlowSink<MessageDelivery> getFlowSink() {
-                return null;
-            }
-            public IFlowSource<MessageDelivery> getFlowSource() {
+            public IFlowResource getFlowResource() {
                 return null;
             }
         }, flow, limiter, inboundMutex);
@@ -81,7 +75,7 @@
         transport.oneway(connectionInfo);
         sessionInfo = createSessionInfo(connectionInfo);
         transport.oneway(sessionInfo);
-        consumerInfo = createConsumerInfo(sessionInfo, activemqDestination);
+        consumerInfo = createConsumerInfo(sessionInfo, activemqDestination, isDurable() ? name : null);
         consumerInfo.setPrefetchSize(inputWindowSize);
         transport.oneway(consumerInfo);
     }

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java Mon Mar 30 16:20:28 2009
@@ -38,28 +38,28 @@
     private WindowLimiter<MessageDelivery> outboundLimiter;
 
     protected void setupProducer() throws Exception, IOException {
-        if( destination.getDomain().equals( Router.QUEUE_DOMAIN ) ) {
+        if (destination.getDomain().equals(Router.QUEUE_DOMAIN)) {
             activemqDestination = new ActiveMQQueue(destination.getName().toString());
         } else {
             activemqDestination = new ActiveMQTopic(destination.getName().toString());
         }
-        
+
         connectionInfo = createConnectionInfo(name);
         transport.oneway(connectionInfo);
         sessionInfo = createSessionInfo(connectionInfo);
         transport.oneway(sessionInfo);
         producerInfo = createProducerInfo(sessionInfo);
         producerInfo.setWindowSize(outputWindowSize);
-        transport.oneway(producerInfo);        
+        transport.oneway(producerInfo);
     }
-    
+
     protected void initialize() {
-        Flow flow = new Flow("client-"+name+"-outbound", false);
-        outputResumeThreshold = outputWindowSize/2;
+        Flow flow = new Flow("client-" + name + "-outbound", false);
+        outputResumeThreshold = outputWindowSize / 2;
         outboundLimiter = new WindowLimiter<MessageDelivery>(true, flow, outputWindowSize, outputResumeThreshold);
         SingleFlowRelay<MessageDelivery> outboundQueue = new SingleFlowRelay<MessageDelivery>(flow, flow.getFlowName(), outboundLimiter);
         this.outboundQueue = outboundQueue;
-        
+
         outboundController = outboundQueue.getFlowController(flow);
         outboundQueue.setDrain(new IFlowDrain<MessageDelivery>() {
             public void drain(MessageDelivery message, ISourceController<MessageDelivery> controller) {
@@ -68,12 +68,12 @@
             }
         });
     }
-    
+
     public void onCommand(Object command) {
         try {
             if (command.getClass() == WireFormatInfo.class) {
             } else if (command.getClass() == BrokerInfo.class) {
-                System.out.println("Producer "+name+" connected to "+((BrokerInfo)command).getBrokerName());
+                System.out.println("Producer " + name + " connected to " + ((BrokerInfo) command).getBrokerName());
             } else if (command.getClass() == ProducerAck.class) {
                 ProducerAck fc = (ProducerAck) command;
                 synchronized (outboundQueue) {
@@ -86,7 +86,7 @@
             onException(e);
         }
     }
-    
+
     protected void createNextMessage() {
         int priority = this.priority;
         if (priorityMod > 0) {
@@ -94,6 +94,9 @@
         }
 
         ActiveMQTextMessage msg = createMessage(producerInfo, activemqDestination, priority, createPayload());
+        if (persistentDelivery) {
+            msg.setPersistent(true);
+        }
         if (property != null) {
             try {
                 msg.setStringProperty(property, property);
@@ -104,4 +107,3 @@
         next = new OpenWireMessageDelivery(msg);
     }
 }
-

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireSupport.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireSupport.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireSupport.java Mon Mar 30 16:20:28 2009
@@ -19,12 +19,13 @@
     static private long idGenerator;
     static private long msgIdGenerator;
 
-    public static ConsumerInfo createConsumerInfo(SessionInfo sessionInfo, ActiveMQDestination destination) throws Exception {
+    public static ConsumerInfo createConsumerInfo(SessionInfo sessionInfo, ActiveMQDestination destination, String subscriptionName) throws Exception {
         ConsumerInfo info = new ConsumerInfo(sessionInfo, ++idGenerator);
         info.setBrowser(false);
         info.setDestination(destination);
         info.setPrefetchSize(1000);
         info.setDispatchAsync(false);
+        info.setSubscriptionName(subscriptionName);
         return info;
     }
 

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteConsumer.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteConsumer.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteConsumer.java Mon Mar 30 16:20:28 2009
@@ -9,8 +9,7 @@
 import org.apache.activemq.broker.stomp.StompMessageDelivery;
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;
-import org.apache.activemq.flow.IFlowSink;
-import org.apache.activemq.flow.IFlowSource;
+import org.apache.activemq.flow.IFlowResource;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.SizeLimiter;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
@@ -58,10 +57,7 @@
             public String toString() {
                 return flow.getFlowName();
             }
-            public IFlowSink<MessageDelivery> getFlowSink() {
-                return null;
-            }
-            public IFlowSource<MessageDelivery> getFlowSource() {
+            public IFlowResource getFlowResource() {
                 return null;
             }
         }, flow, limiter, inboundMutex);

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java Mon Mar 30 16:20:28 2009
@@ -107,6 +107,7 @@
                 for( long i=0; !stopped.get(); i++ ) {
                     
                     final MessageRecord messageRecord = new MessageRecord();
+                    messageRecord.setKey(store.allocateStoreTracking());
                     messageRecord.setMessageId(new AsciiBuffer(""+i));
                     messageRecord.setEncoding(new AsciiBuffer("encoding"));
                     messageRecord.setBuffer(buffer);
@@ -188,7 +189,7 @@
                             for (Iterator<QueueRecord> iterator = queueRecords; iterator.hasNext();) {
                                 QueueRecord r = iterator.next();
                                 records.add(session.messageGetRecord(r.getMessageKey()));
-                                session.queueRemoveMessage(queueName, r.queueKey);
+                                session.queueRemoveMessage(queueName, r.messageKey);
                             }
                         }
                     }, onFlush);

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java Mon Mar 30 16:20:28 2009
@@ -31,9 +31,11 @@
 public abstract class StoreTestBase extends TestCase {
 
     private Store store;
-    
+
     abstract protected Store createStore();
+
     abstract protected boolean isStoreTransactional();
+
     abstract protected boolean isStorePersistent();
 
     @Override
@@ -69,7 +71,7 @@
             }
         }, null);
     }
-    
+
     public void testQueueAdd() throws Exception {
         final AsciiBuffer expected = new AsciiBuffer("test");
         store.execute(new VoidCallback<Exception>() {
@@ -78,7 +80,7 @@
                 session.queueAdd(expected);
             }
         }, null);
-        
+
         store.execute(new VoidCallback<Exception>() {
             @Override
             public void run(Session session) throws Exception {
@@ -89,7 +91,7 @@
             }
         }, null);
     }
-    
+
     public void testStoreExecuteExceptionPassthrough() throws Exception {
         try {
             store.execute(new VoidCallback<Exception>() {
@@ -102,10 +104,11 @@
             fail("Expected IOException");
         } catch (IOException e) {
         }
-        
-        // If the store implementation is transactional, then the work done should 
+
+        // If the store implementation is transactional, then the work done
+        // should
         // have been rolled back.
-        if( isStoreTransactional() ) {
+        if (isStoreTransactional()) {
             store.execute(new VoidCallback<Exception>() {
                 @Override
                 public void run(Session session) throws Exception {
@@ -116,10 +119,7 @@
         }
 
     }
-    
-    
-    
-    
+
     static void assertEquals(MessageRecord expected, MessageRecord actual) {
         assertEquals(expected.getBuffer(), actual.getBuffer());
         assertEquals(expected.getEncoding(), actual.getEncoding());

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStorePerformance.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStorePerformance.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStorePerformance.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStorePerformance.java Mon Mar 30 16:20:28 2009
@@ -26,7 +26,7 @@
     @Override
     protected Store createStore() {
         KahaDBStore rc = new KahaDBStore();
-        rc.setDirectory(new File("target/test-data/kahadb-store-performance"));
+        rc.setStoreDirectory(new File("target/test-data/kahadb-store-performance"));
         rc.setDeleteAllMessages(true);
         return rc;
     }

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStoreTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStoreTest.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStoreTest.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStoreTest.java Mon Mar 30 16:20:28 2009
@@ -26,7 +26,7 @@
     @Override
     protected Store createStore() {
         KahaDBStore rc = new KahaDBStore();
-        rc.setDirectory(new File("target/test-data/kahadb-store-test"));
+        rc.setStoreDirectory(new File("target/test-data/kahadb-store-test"));
         rc.setDeleteAllMessages(true);
         return rc;
     }

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java Mon Mar 30 16:20:28 2009
@@ -20,18 +20,22 @@
     private Destination destination;
     private IQueue<Long, Message> queue;
     private MockBroker broker;
-    
+
     private Mapper<Integer, Message> partitionMapper;
     private Mapper<Long, Message> keyExtractor;
 
     private IQueue<Long, Message> createQueue() {
 
-        if (partitionMapper!=null) {
+        if (partitionMapper != null) {
             PartitionedQueue<Integer, Long, Message> queue = new PartitionedQueue<Integer, Long, Message>() {
                 @Override
                 protected IQueue<Long, Message> cratePartition(Integer partitionKey) {
                     return createSharedFlowQueue();
                 }
+
+                public boolean isElementPersistent(Message message) {
+                    return false;
+                }
             };
             queue.setPartitionMapper(partitionMapper);
             queue.setResourceName(destination.getName().toString());
@@ -63,7 +67,7 @@
     public final void deliver(ISourceController<Message> source, Message msg) {
         queue.add(msg, source);
     }
-    
+
     public final Destination getDestination() {
         return destination;
     }

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java?rev=760028&r1=760027&r2=760028&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java Mon Mar 30 16:20:28 2009
@@ -9,6 +9,7 @@
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.dispatch.IDispatcher;
@@ -29,7 +30,6 @@
 import org.apache.activemq.wireformat.WireFormat;
 import org.apache.activemq.wireformat.WireFormatFactory;
 
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
 
 public class PipeTransportFactory extends TransportFactory {
 



Mime
View raw message