activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r777236 - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/stomp/ main/java/org/apache/activemq/broker/store/kahadb/ main/java/org/apache/activemq/broker/store/memory/ main/j...
Date Thu, 21 May 2009 19:58:14 GMT
Author: chirino
Date: Thu May 21 19:58:13 2009
New Revision: 777236

URL: http://svn.apache.org/viewvc?rev=777236&view=rev
Log:
Applying Colins patch at https://issues.apache.org/activemq/browse/AMQ-2263
Thanks!

Added:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowRelay.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueDispatchTarget.java
Removed:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowSource.java
Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicSubscription.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowDrain.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSource.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/CursoredQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IAsynchronousFlowSource.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IFlowQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteProducer.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicSubscription.java?rev=777236&r1=777235&r2=777236&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicSubscription.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicSubscription.java
Thu May 21 19:58:13 2009
@@ -60,7 +60,8 @@
      * org.apache.activemq.broker.BrokerSubscription#connect(org.apache.activemq
      * .broker.protocol.ProtocolHandler.ConsumerContext)
      */
-    public synchronized void connect(Subscription<MessageDelivery> context) throws
UserAlreadyConnectedException {
+    public synchronized void connect(Subscription<MessageDelivery> subsription) throws
UserAlreadyConnectedException {
+        connectedSub = subsription;
         host.getRouter().bind(destination, this);
     }
 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java?rev=777236&r1=777235&r2=777236&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
Thu May 21 19:58:13 2009
@@ -43,13 +43,13 @@
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;
 import org.apache.activemq.flow.IFlowController;
-import org.apache.activemq.flow.IFlowDrain;
 import org.apache.activemq.flow.IFlowResource;
 import org.apache.activemq.flow.IFlowSink;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.SizeLimiter;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
 import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.queue.QueueDispatchTarget;
 import org.apache.activemq.queue.SingleFlowRelay;
 import org.apache.activemq.selector.SelectorParser;
 import org.apache.activemq.transport.stomp.Stomp;
@@ -151,7 +151,7 @@
         Flow outboundFlow = new Flow("broker-" + connection.getName() + "-outbound", false);
         SizeLimiter<MessageDelivery> outLimiter = new SizeLimiter<MessageDelivery>(connection.getOutputWindowSize(),
connection.getOutputWindowSize());
         outboundQueue = new SingleFlowRelay<MessageDelivery>(outboundFlow, outboundFlow.getFlowName(),
outLimiter);
-        outboundQueue.setDrain(new IFlowDrain<MessageDelivery>() {
+        outboundQueue.setDrain(new QueueDispatchTarget<MessageDelivery>() {
             public void drain(final MessageDelivery message, final ISourceController<MessageDelivery>
controller) {
                 StompFrame msg = message.asType(StompFrame.class);
                 connection.write(msg, new Runnable() {

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java?rev=777236&r1=777235&r2=777236&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
Thu May 21 19:58:13 2009
@@ -50,7 +50,6 @@
 import org.apache.activemq.protobuf.MessageBuffer;
 import org.apache.activemq.protobuf.PBMessage;
 import org.apache.activemq.queue.QueueStore;
-import org.apache.activemq.queue.QueueStore.QueueDescriptor;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.kahadb.journal.Journal;

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java?rev=777236&r1=777235&r2=777236&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
Thu May 21 19:58:13 2009
@@ -28,11 +28,9 @@
 import java.io.File;
 
 import org.apache.activemq.broker.store.Store;
-import org.apache.activemq.broker.store.Store.QueueRecord;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.queue.QueueStore;
-import org.apache.activemq.queue.QueueStore.QueueDescriptor;
 import org.apache.activemq.util.ByteArrayOutputStream;
 import org.apache.activemq.util.ByteSequence;
 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java?rev=777236&r1=777235&r2=777236&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java
Thu May 21 19:58:13 2009
@@ -22,7 +22,7 @@
 
 public abstract class AbstractLimitedFlowResource<E> implements IFlowResource {
     private Executor flowExecutor = null;
-    private final HashSet<FlowLifeCycleListener> lifeCycleWatchers = new HashSet<FlowLifeCycleListener>();
+    private HashSet<FlowLifeCycleListener> lifeCycleListeners = null;
     private final HashMap<Flow, IFlowController<E>> openControllers = new HashMap<Flow,
IFlowController<E>>();
 
     private final long resourceId = RESOURCE_COUNTER.incrementAndGet();
@@ -50,13 +50,23 @@
     }
 
     public synchronized final void addFlowLifeCycleListener(FlowLifeCycleListener listener)
{
-        lifeCycleWatchers.add(listener);
+        if (lifeCycleListeners == null) {
+            lifeCycleListeners = new HashSet<FlowLifeCycleListener>();
+        }
+        lifeCycleListeners.add(listener);
         // Notify the watchers of all flows that are already open:
         for (IFlowController<E> controller : openControllers.values()) {
             listener.onFlowOpened(this, controller.getFlow());
         }
     }
 
+    public synchronized void removeFlowLifeCycleListener(FlowLifeCycleListener listener)
{
+        lifeCycleListeners.remove(listener);
+        if (lifeCycleListeners.isEmpty()) {
+            lifeCycleListeners = null;
+        }
+    }
+
     /**
      * Subclasses must call this whenever a new {@link ISinkController} is
      * opened.
@@ -74,8 +84,10 @@
         if (flowExecutor != null) {
             controller.setExecutor(flowExecutor);
         }
-        for (FlowLifeCycleListener listener : lifeCycleWatchers) {
-            listener.onFlowOpened(this, controller.getFlow());
+        if (lifeCycleListeners != null) {
+            for (FlowLifeCycleListener listener : lifeCycleListeners) {
+                listener.onFlowOpened(this, controller.getFlow());
+            }
         }
     }
 
@@ -83,7 +95,7 @@
         IFlowController<E> existing = openControllers.remove(flow);
 
         if (existing != null) {
-            for (FlowLifeCycleListener listener : lifeCycleWatchers) {
+            for (FlowLifeCycleListener listener : lifeCycleListeners) {
                 listener.onFlowClosed(this, existing.getFlow());
             }
         }
@@ -98,10 +110,6 @@
         }
     }
 
-    public synchronized void removeFlowLifeCycleListener(FlowLifeCycleListener listener)
{
-        lifeCycleWatchers.remove(listener);
-    }
-
     /**
      * Gets the flow controller corresponding to the specified flow.
      * 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowDrain.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowDrain.java?rev=777236&r1=777235&r2=777236&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowDrain.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowDrain.java
Thu May 21 19:58:13 2009
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.flow;
-
-/**
- * Defines an interface for draining a flow source.
- * 
- * @param <E>
- */
-public interface IFlowDrain<E> {
-
-    /**
-	 * Used by a FlowSource that is being dispatched to drain it's elements.
-	 * The implementor is responsible for calling {@link ISourceController#elementDispatched(Object)
-	 * when the element has been dispatched to all downstream sinks unless the 
-	 * IFlowSource#getAutoRelease() is set to true.
-	 * 
-	 * @param elem The element being drained
-	 * @param controller The source's controller
-	 */
-    public void drain(E elem, ISourceController<E> controller);
-}

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSource.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSource.java?rev=777236&r1=777235&r2=777236&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSource.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSource.java
Thu May 21 19:58:13 2009
@@ -29,35 +29,4 @@
      * @return The flow controller for the specified flow.
      */
     public IFlowController<E> getFlowController(Flow flow);
-
-    /**
-     * If set to true the source will automatically release limiter space
-     * associated with {@link IFlowElem}s as they are dispacthed. If set to
-     * false then the {@link IFlowDrain} must release space via a call to
-     * {@link ISourceController#elementDispatched(IFlowElem)}.
-     * 
-     * @param autoRelease
-     *            If the source should release limiter space for elements.
-     */
-    public void setAutoRelease(boolean autoRelease);
-
-/**
-	 * Returns whether or not this {@link IFlowSource} is set to automatically
-	 * release elements via {@link FlowController#elementDispatched(Object) during
-	 * dispatch. When auto release is set the caller <i>must</i> not call 
-	 * {@link FlowController#elementDispatched(Object). 
-	 * 
-	 * @return true if auto release is set, false otherwise. 
-	 */
-    public boolean getAutoRelease();
-
-    /**
-     * Sets the default drain for elements from this flow source. It will be
-     * invoked to dispatch elements from the source.
-     * 
-     * @param drain
-     *            The drain.
-     */
-    public void setDrain(IFlowDrain<E> drain);
-
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java?rev=777236&r1=777235&r2=777236&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
Thu May 21 19:58:13 2009
@@ -22,8 +22,6 @@
 import org.apache.activemq.dispatch.IDispatcher;
 import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
 import org.apache.activemq.dispatch.IDispatcher.Dispatchable;
-import org.apache.activemq.flow.AbstractLimitedFlowSource;
-import org.apache.activemq.flow.IFlowResource;
 import org.apache.activemq.flow.ISinkController;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
@@ -34,21 +32,20 @@
  * 
  * @param <E>
  */
-public abstract class AbstractFlowQueue<E> extends AbstractLimitedFlowSource<E>
implements FlowControllable<E>, IFlowQueue<E>, Dispatchable {
+public abstract class AbstractFlowQueue<E> extends AbstractFlowRelay<E> implements
FlowControllable<E>, IFlowQueue<E>, Dispatchable {
 
     protected IDispatcher dispatcher;
     protected DispatchContext dispatchContext;
-    protected final Collection<IPollableFlowSource.FlowReadyListener<E>> readyListeners
= new ArrayList<IPollableFlowSource.FlowReadyListener<E>>();
+    protected Collection<IPollableFlowSource.FlowReadyListener<E>> readyListeners;
     private boolean notifyReady = false;
     protected int dispatchPriority = 0;
-    protected FlowQueueListener listener = new FlowQueueListener()
-    {
+    protected FlowQueueListener listener = new FlowQueueListener() {
         public void onQueueException(IFlowQueue<?> queue, Throwable thrown) {
             System.out.println("Exception in queue: " + thrown.getMessage());
             thrown.printStackTrace();
         }
     };
-    
+
     AbstractFlowQueue() {
         super();
     }
@@ -79,10 +76,6 @@
         return !pollingDispatch();
     }
 
-    public final IFlowResource getFlowResource() {
-        return this;
-    }
-
     protected final FlowControllable<E> getFlowControllableHook() {
         return this;
     }
@@ -112,6 +105,9 @@
 
     public synchronized void addFlowReadyListener(IPollableFlowSource.FlowReadyListener<E>
watcher) {
 
+        if (readyListeners == null) {
+            readyListeners = new ArrayList<IPollableFlowSource.FlowReadyListener<E>>();
+        }
         readyListeners.add(watcher);
         if (isDispatchReady()) {
             notifyReady();
@@ -144,6 +140,10 @@
                 notify();
             }
 
+            if (readyListeners == null) {
+                return;
+            }
+            
             if (!readyListeners.isEmpty()) {
                 for (FlowReadyListener<E> listener : readyListeners) {
                     listener.onFlowReady(this);

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowRelay.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowRelay.java?rev=777236&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowRelay.java
(added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowRelay.java
Thu May 21 19:58:13 2009
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+import org.apache.activemq.flow.AbstractLimitedFlowResource;
+import org.apache.activemq.flow.FlowController;
+import org.apache.activemq.flow.IFlowRelay;
+import org.apache.activemq.flow.IFlowResource;
+import org.apache.activemq.flow.IFlowSource;
+import org.apache.activemq.flow.ISourceController;
+
+/**
+ * @author cmacnaug
+ * 
+ */
+public abstract class AbstractFlowRelay<E> extends AbstractLimitedFlowResource<E>
implements IFlowRelay<E> {
+
+    protected boolean autoRelease = false;
+    protected QueueDispatchTarget<E> drain;
+
+    public AbstractFlowRelay() {
+        super();
+    }
+
+    public AbstractFlowRelay(String name) {
+        super(name);
+    }
+
+    /**
+     * If set to true the source will automatically release limiter space
+     * associated with {@link IFlowElem}s as they are dispacthed. If set to
+     * false then the {@link IFlowDrain} must release space via a call to
+     * {@link ISourceController#elementDispatched(IFlowElem)}.
+     * 
+     * @param autoRelease
+     *            If the source should release limiter space for elements.
+     */
+    public void setAutoRelease(boolean autoRelease) {
+        this.autoRelease = autoRelease;
+    }
+
+    /**
+     * Returns whether or not this {@link IFlowSource} is set to automatically
+     * release elements via {@link FlowController#elementDispatched(Object) during
+     * dispatch. When auto release is set the caller <i>must</i> not call 
+     * {@link FlowController#elementDispatched(Object). 
+     * 
+     * @return true if auto release is set, false otherwise. 
+     */
+    public boolean getAutoRelease() {
+        return autoRelease;
+    }
+
+    /**
+     * Sets the default drain for elements from this flow source. It will be
+     * invoked to dispatch elements from the source.
+     * 
+     * @param drain
+     *            The drain.
+     */
+    public void setDrain(QueueDispatchTarget<E> dispatchTarget) {
+        this.drain = dispatchTarget;
+    }
+
+    public IFlowResource getFlowResource() {
+        return this;
+    }
+}

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/CursoredQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/CursoredQueue.java?rev=777236&r1=777235&r2=777236&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/CursoredQueue.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/CursoredQueue.java
Thu May 21 19:58:13 2009
@@ -27,7 +27,6 @@
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;
 import org.apache.activemq.flow.IFlowController;
-import org.apache.activemq.flow.IFlowSizeLimiter;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.queue.QueueStore.QueueDescriptor;
 import org.apache.activemq.queue.QueueStore.RestoreListener;
@@ -38,8 +37,6 @@
 import org.apache.activemq.util.SortedLinkedList;
 import org.apache.activemq.util.SortedLinkedListNode;
 
-import com.sun.jmx.mbeanserver.OpenConverter;
-
 /**
  * @author cmacnaug
  * 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java?rev=777236&r1=777235&r2=777236&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java
Thu May 21 19:58:13 2009
@@ -71,16 +71,14 @@
         sourceController = new ISourceController<E>() {
 
             public void elementDispatched(E elem) {
-                // TODO Auto-generated method stub
+                // No Op
             }
 
             public Flow getFlow() {
-                // TODO Auto-generated method stub
                 return controller.getFlow();
             }
 
             public IFlowResource getFlowResource() {
-                // TODO Auto-generated method stub
                 return ExclusivePersistentQueue.this;
             }
 
@@ -265,7 +263,6 @@
         return false;
     }
 
-    private QueueElement last = null;
     public synchronized final boolean pollingDispatch() {
         queue.dispatch();
         if (started && subscription != null && !subBlocked) {
@@ -274,20 +271,10 @@
                 // If the sub doesn't remove on dispatch set an ack listener:
                 SubscriptionDeliveryCallback callback = subscription.isRemoveOnDispatch(qe.elem)
? null : qe;
 
-                if(qe.acquired || limiter.getSize() == 0 || (last != null && last.sequence
>= qe.sequence))
-                {
-                    System.out.println("Offering" + qe + limiter.getSize());
-                }
-                
                 // See if the sink has room:
                 if (subscription.offer(qe.elem, sourceController, callback)) {
-                    if(limiter.getElementSize(qe.getElement()) > 1048)
-                    {
-                        System.out.println("Offering" + qe);
-                    }
                     qe.setAcquired(true);
                     controller.elementDispatched(qe.getElement());
-                    last = qe;
                     // If remove on dispatch acknowledge now:
                     if (callback == null) {
                         qe.acknowledge();

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IAsynchronousFlowSource.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IAsynchronousFlowSource.java?rev=777236&r1=777235&r2=777236&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IAsynchronousFlowSource.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IAsynchronousFlowSource.java
Thu May 21 19:58:13 2009
@@ -19,7 +19,7 @@
 import org.apache.activemq.dispatch.IDispatcher;
 import org.apache.activemq.flow.IFlowSource;
 
-public interface IAsynchronousFlowSource<E> extends IFlowSource<E> {
+public interface IAsynchronousFlowSource<E> {
 
     /**
      * Sets an asynchronous dispatcher for this source. As elements become

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IFlowQueue.java?rev=777236&r1=777235&r2=777236&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IFlowQueue.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IFlowQueue.java
Thu May 21 19:58:13 2009
@@ -34,5 +34,4 @@
     public void setFlowQueueListener(FlowQueueListener listener);
     
     public void setDispatchPriority(int priority);
-
 }

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueDispatchTarget.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueDispatchTarget.java?rev=777236&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueDispatchTarget.java
(added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueDispatchTarget.java
Thu May 21 19:58:13 2009
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue;
+
+import org.apache.activemq.flow.ISourceController;
+
+/**
+ * @author cmacnaug
+ *
+ */
+public interface QueueDispatchTarget<E> {
+    
+    /**
+     * Used by a FlowSource that is being dispatched to drain it's elements.
+     * The implementor is responsible for calling {@link ISourceController#elementDispatched(Object)
+     * when the element has been dispatched to all downstream sinks unless the 
+     * IFlowSource#getAutoRelease() is set to true.
+     * 
+     * @param elem The element being drained
+     * @param controller The source's controller
+     */
+    public void drain(E elem, ISourceController<E> controller);
+}

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java?rev=777236&r1=777235&r2=777236&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java
Thu May 21 19:58:13 2009
@@ -1,16 +1,13 @@
 package org.apache.activemq.queue;
 
-import org.apache.activemq.flow.AbstractLimitedFlowSource;
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;
 import org.apache.activemq.flow.IFlowController;
 import org.apache.activemq.flow.IFlowLimiter;
-import org.apache.activemq.flow.IFlowRelay;
-import org.apache.activemq.flow.IFlowResource;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
 
-public class SingleFlowRelay<E> extends AbstractLimitedFlowSource<E> implements
IFlowRelay<E>, FlowControllable<E> {
+public class SingleFlowRelay<E> extends AbstractFlowRelay<E> implements FlowControllable<E>
{
 
     private final IFlowController<E> controller;
 
@@ -32,14 +29,14 @@
 
     public void flowElemAccepted(ISourceController<E> controller, E elem) {
         drain.drain(elem, controller);
-    }
-
-    public IFlowResource getFlowResource() {
-        return this;
+        if (autoRelease) {
+            controller.elementDispatched(elem);
+        }
     }
 
     @Override
     public String toString() {
         return getResourceName();
     }
+
 }

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java?rev=777236&r1=777235&r2=777236&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
Thu May 21 19:58:13 2009
@@ -51,8 +51,8 @@
 
     protected final boolean USE_KAHA_DB = true;
     protected final boolean PURGE_STORE = true;
-    protected final boolean PERSISTENT = false;
-    protected final boolean DURABLE = false;
+    protected final boolean PERSISTENT = true;
+    protected final boolean DURABLE = true;
 
     // Set to put senders and consumers on separate brokers.
     protected boolean multibroker = false;

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java?rev=777236&r1=777235&r2=777236&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java
Thu May 21 19:58:13 2009
@@ -19,19 +19,15 @@
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.JMSException;
 
-import org.apache.activemq.broker.BrokerSubscription.UserAlreadyConnectedException;
-import org.apache.activemq.broker.Queue.QueueSubscription;
 import org.apache.activemq.broker.openwire.OpenWireMessageDelivery;
 import org.apache.activemq.broker.store.BrokerDatabase;
 import org.apache.activemq.broker.store.Store;
 import org.apache.activemq.broker.store.StoreFactory;
-import org.apache.activemq.command.ActiveMQBytesMessage;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTextMessage;
@@ -45,8 +41,6 @@
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;
 import org.apache.activemq.flow.IFlowController;
-import org.apache.activemq.flow.IFlowDrain;
-import org.apache.activemq.flow.IFlowRelay;
 import org.apache.activemq.flow.IFlowSink;
 import org.apache.activemq.flow.ISinkController;
 import org.apache.activemq.flow.ISourceController;
@@ -56,13 +50,11 @@
 import org.apache.activemq.metric.MetricCounter;
 import org.apache.activemq.metric.Period;
 import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.protobuf.AsciiBuffer;
-import org.apache.activemq.queue.ExclusiveQueue;
+import org.apache.activemq.queue.AbstractFlowRelay;
 import org.apache.activemq.queue.IQueue;
-import org.apache.activemq.queue.QueueStore;
+import org.apache.activemq.queue.QueueDispatchTarget;
 import org.apache.activemq.queue.SingleFlowRelay;
 import org.apache.activemq.queue.Subscription;
-import org.apache.activemq.queue.Subscription.SubscriptionDeliveryCallback;
 
 import junit.framework.TestCase;
 
@@ -287,7 +279,7 @@
         private final DispatchContext dispatchContext;
 
         protected IFlowController<OpenWireMessageDelivery> outboundController;
-        protected final IFlowRelay<OpenWireMessageDelivery> outboundQueue;
+        protected final AbstractFlowRelay<OpenWireMessageDelivery> outboundQueue;
         protected OpenWireMessageDelivery next;
         private int priority;
         private final String payload;
@@ -321,7 +313,7 @@
             Flow flow = new Flow(name, true);
             outboundQueue = new SingleFlowRelay<OpenWireMessageDelivery>(flow, name,
limiter);
             outboundQueue.setFlowExecutor(dispatcher.createPriorityExecutor(dispatcher.getDispatchPriorities()
- 1));
-            outboundQueue.setDrain(new IFlowDrain<OpenWireMessageDelivery>() {
+            outboundQueue.setDrain(new QueueDispatchTarget<OpenWireMessageDelivery>()
{
 
                 public void drain(OpenWireMessageDelivery elem, ISourceController<OpenWireMessageDelivery>
controller) {
 
@@ -357,7 +349,6 @@
                 try {
                     createNextMessage();
                 } catch (JMSException e) {
-                    // TODO Auto-generated catch restoreBlock
                     e.printStackTrace();
                     stopped.set(true);
                     return true;
@@ -406,8 +397,6 @@
         private final SizeLimiter<MessageDelivery> limiter;
         private final FlowController<MessageDelivery> controller;
         private final IQueue<Long, MessageDelivery> sourceQueue;
-        private int limit = 20000;
-        private int count = 0;
 
         public Consumer(String name, IQueue<Long, MessageDelivery> sourceQueue) {
             this.sourceQueue = sourceQueue;

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java?rev=777236&r1=777235&r2=777236&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java
Thu May 21 19:58:13 2009
@@ -26,8 +26,8 @@
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.WireFormatInfo;
 import org.apache.activemq.flow.Flow;
-import org.apache.activemq.flow.IFlowDrain;
 import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.queue.QueueDispatchTarget;
 import org.apache.activemq.queue.SingleFlowRelay;
 
 public class OpenwireRemoteProducer extends RemoteProducer {
@@ -61,7 +61,7 @@
         this.outboundQueue = outboundQueue;
 
         outboundController = outboundQueue.getFlowController(flow);
-        outboundQueue.setDrain(new IFlowDrain<MessageDelivery>() {
+        outboundQueue.setDrain(new QueueDispatchTarget<MessageDelivery>() {
             public void drain(MessageDelivery message, ISourceController<MessageDelivery>
controller) {
                 Message msg = message.asType(Message.class);
                 write(msg);

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteProducer.java?rev=777236&r1=777235&r2=777236&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteProducer.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteProducer.java
Thu May 21 19:58:13 2009
@@ -8,9 +8,9 @@
 import org.apache.activemq.broker.Router;
 import org.apache.activemq.broker.stomp.StompMessageDelivery;
 import org.apache.activemq.flow.Flow;
-import org.apache.activemq.flow.IFlowDrain;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.SizeLimiter;
+import org.apache.activemq.queue.QueueDispatchTarget;
 import org.apache.activemq.queue.SingleFlowRelay;
 import org.apache.activemq.transport.stomp.Stomp;
 import org.apache.activemq.transport.stomp.StompFrame;
@@ -42,7 +42,7 @@
         this.outboundQueue = outboundQueue;
         
         outboundController = outboundQueue.getFlowController(flow);
-        outboundQueue.setDrain(new IFlowDrain<MessageDelivery>() {
+        outboundQueue.setDrain(new QueueDispatchTarget<MessageDelivery>() {
             public void drain(final MessageDelivery message, final ISourceController<MessageDelivery>
controller) {
                 StompFrame msg = message.asType(StompFrame.class);
                 write(msg, new Runnable(){

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java?rev=777236&r1=777235&r2=777236&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
Thu May 21 19:58:13 2009
@@ -15,8 +15,11 @@
 import org.apache.activemq.flow.Commands.FlowControl.FlowControlBean;
 import org.apache.activemq.flow.Commands.FlowControl.FlowControlBuffer;
 import org.apache.activemq.flow.MockBroker.DeliveryTarget;
+import org.apache.activemq.queue.AbstractFlowRelay;
 import org.apache.activemq.queue.ExclusiveQueue;
+import org.apache.activemq.queue.IFlowQueue;
 import org.apache.activemq.queue.IPollableFlowSource;
+import org.apache.activemq.queue.QueueDispatchTarget;
 import org.apache.activemq.queue.SingleFlowRelay;
 import org.apache.activemq.queue.IPollableFlowSource.FlowReadyListener;
 import org.apache.activemq.transport.AsyncTransport;
@@ -39,10 +42,10 @@
     protected AsyncCommandQueue asyncCommandQueue;
 
     protected boolean useInputQueue = false;
-    protected IFlowRelay<Message> inputQueue;
+    protected AbstractFlowRelay<Message> inputQueue;
 
     protected Flow outboundFlow;
-    protected IFlowRelay<Message> outputQueue;
+    protected AbstractFlowRelay<Message> outputQueue;
     protected IPollableFlowSource<Message> outputSource;
     protected ProtocolLimiter<Message> outboundLimiter;
 
@@ -145,7 +148,7 @@
             inputQueue = queue;
         }
         inputQueue.setFlowExecutor(dispatcher.createPriorityExecutor(dispatcher.getDispatchPriorities()
- 1));
-        inputQueue.setDrain(new IFlowDrain<Message>() {
+        inputQueue.setDrain(new QueueDispatchTarget<Message>() {
 
             public void drain(Message message, ISourceController<Message> controller)
{
                 messageReceived(controller, message);
@@ -170,7 +173,7 @@
 
         if (!USE_OUTPUT_QUEUE || asyncTransport == null || blockingTransport) {
             outputQueue = new SingleFlowRelay<Message>(outboundFlow, name + "-outbound",
outboundLimiter);
-            outputQueue.setDrain(new IFlowDrain<Message>() {
+            outputQueue.setDrain(new QueueDispatchTarget<Message>() {
                 public void drain(final Message message, ISourceController<Message>
controller) {
                     write(message);
                 };



Mime
View raw message