activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r755235 - in /activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq: flow/ queue/
Date Tue, 17 Mar 2009 13:27:45 GMT
Author: chirino
Date: Tue Mar 17 13:27:44 2009
New Revision: 755235

URL: http://svn.apache.org/viewvc?rev=755235&view=rev
Log:
Committing Colin's patch for https://issues.apache.org/activemq/browse/AMQ-2165
Thanks!


Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSink.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowPriorityQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java?rev=755235&r1=755234&r2=755235&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
Tue Mar 17 13:27:44 2009
@@ -30,10 +30,10 @@
 
     private static final Executor DEFAULT_EXECUTOR = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
     // Sinks that are blocking us.
-    private final HashSet<ISinkController<E>> blockingSinks = new HashSet<ISinkController<E>>();
+    private final HashSet<ISinkController<?>> blockingSinks = new HashSet<ISinkController<?>>();
 
     // Holds the sources that this limiter is currently blocking
-    private final HashSet<ISourceController<E>> blockedSources = new HashSet<ISourceController<E>>();
+    private final HashSet<ISourceController<?>> blockedSources = new HashSet<ISourceController<?>>();
 
     // Holds the sources that this limiter is currently blocking
     private final HashSet<FlowUnblockListener<E>> unblockListeners = new HashSet<FlowUnblockListener<E>>();
@@ -122,7 +122,7 @@
     /**
      * Should be called by a resource anytime it's limits are exceeded.
      */
-    public final void onFlowBlock(ISinkController<E> sinkController) {
+    public final void onFlowBlock(ISinkController<?> sinkController) {
         synchronized (mutex) {
             if (!blockingSinks.add(sinkController)) {
                 throw new IllegalStateException(sinkController + " has already blocked: "
+ this);
@@ -135,7 +135,7 @@
         }
     }
 
-    public final void onFlowResume(ISinkController<E> sinkController) {
+    public final void onFlowResume(ISinkController<?> sinkController) {
         synchronized (mutex) {
             if (!blockingSinks.remove(sinkController)) {
                 throw new IllegalStateException(sinkController + " can't resume unblocked
" + this);
@@ -217,7 +217,7 @@
      * @param controller
      *            the source flow controller.
      */
-    public void add(E elem, ISourceController<E> sourceController) {
+    public void add(E elem, ISourceController<?> sourceController) {
         boolean ok = false;
         synchronized (mutex) {
             // If we don't have an fc sink, then just increment the limiter.
@@ -255,7 +255,7 @@
      * @param controller
      *            the source flow controller.
      */
-    public boolean offer(E elem, ISourceController<E> sourceController) {
+    public boolean offer(E elem, ISourceController<?> sourceController) {
         boolean ok = false;
         synchronized (mutex) {
             // If we don't have an fc sink, then just increment the limiter.
@@ -310,7 +310,7 @@
      * @param source
      *            The {@link ISinkController} of the source to be blocked.
      */
-    protected void blockSource(final ISourceController<E> source) {
+    protected void blockSource(final ISourceController<?> source) {
         if (source == null) {
             return;
         }
@@ -389,7 +389,7 @@
                     }
                     String was = Thread.currentThread().getName();
                     try {
-                        for (ISourceController<E> source : blockedSources) {
+                        for (ISourceController<?> source : blockedSources) {
 //                            System.out.println("UNBLOCKING: SINK[" + FlowController.this
+ "], SOURCE[" + source + "]");
                             source.onFlowResume(FlowController.this);
                         }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSink.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSink.java?rev=755235&r1=755234&r2=755235&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSink.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSink.java Tue
Mar 17 13:27:44 2009
@@ -28,7 +28,7 @@
      * @param source
      *            The source's flow controller.
      */
-    public void add(E elem, ISourceController<E> source);
+    public void add(E elem, ISourceController<?> source);
 
     /**
      * Offers an element to the sink. If there is no room available the source's
@@ -40,7 +40,7 @@
      *            The source's controller.
      * @return false if the element wasn't accepted.
      */
-    public boolean offer(E elem, ISourceController<E> source);
+    public boolean offer(E elem, ISourceController<?> source);
 
     /**
      * Sets the executor to be used by the sink's {@link IFlowController}s.

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java?rev=755235&r1=755234&r2=755235&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java
Tue Mar 17 13:27:44 2009
@@ -51,7 +51,7 @@
      * @param controller
      *            the source flow controller.
      */
-    public boolean offer(E elem, ISourceController<E> sourceController);
+    public boolean offer(E elem, ISourceController<?> sourceController);
 
     /**
      * Adds an element to the sink associated with this resource if space is
@@ -64,7 +64,7 @@
      * @param controller
      *            the source flow controller.
      */
-    public void add(E elem, ISourceController<E> controller);
+    public void add(E elem, ISourceController<?> controller);
 
     /**
      * Called to check if this FlowController is currently being blocked

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java?rev=755235&r1=755234&r2=755235&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java
Tue Mar 17 13:27:44 2009
@@ -41,19 +41,19 @@
     /**
      * This is called when a particular flow is blocked for a resource
      * 
-     * @param sink
-     *            The sink blocking this source
+     * @param sinkController
+     *            The sink controller blocking this source
      */
-    public void onFlowBlock(ISinkController<E> sink);
+    public void onFlowBlock(ISinkController<?> sinkController);
 
     /**
      * Callback used with FlowControllers to get a notification that an
      * IFlowController has been resumed.
      * 
-     * @param controller
-     *            The IFlowController that was unblocked.
+     * @param sinkController
+     *            The sink controller that was unblocked.
      */
-    public void onFlowResume(ISinkController<E> sink);
+    public void onFlowResume(ISinkController<?> sinkController);
 
     public boolean isSourceBlocked();
 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java?rev=755235&r1=755234&r2=755235&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java
Tue Mar 17 13:27:44 2009
@@ -63,11 +63,11 @@
         return "DISABLED Flow Controller for: " + source;
     }
 
-    public boolean offer(E elem, ISourceController<E> sourceController) {
+    public boolean offer(E elem, ISourceController<?> sourceController) {
         throw new UnsupportedOperationException();
     }
 
-    public void add(E elem, ISourceController<E> controller) {
+    public void add(E elem, ISourceController<?> controller) {
         throw new UnsupportedOperationException();
     }
 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java?rev=755235&r1=755234&r2=755235&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java
Tue Mar 17 13:27:44 2009
@@ -43,12 +43,12 @@
     // ISinkController interface impl.
     // /////////////////////////////////////////////////////////////////
 
-    public boolean offer(E elem, ISourceController<E> controller) {
+    public boolean offer(E elem, ISourceController<?> controller) {
         int prio = limiter.getPriorityMapper().map(elem);
         return controllers.get(prio).offer(elem, controller);
     }
 
-    public void add(E elem, ISourceController<E> controller) {
+    public void add(E elem, ISourceController<?> controller) {
         int prio = limiter.getPriorityMapper().map(elem);
         controllers.get(prio).add(elem, controller);
     }
@@ -89,13 +89,13 @@
         return controllable.getFlowSource();
     }
 
-    public void onFlowBlock(ISinkController<E> sink) {
+    public void onFlowBlock(ISinkController<?> sink) {
         for (int i = 0; i < controllers.size(); i++) {
             controllers.get(i).onFlowBlock(sink);
         }
     }
 
-    public void onFlowResume(ISinkController<E> sink) {
+    public void onFlowResume(ISinkController<?> sink) {
         for (int i = 0; i < controllers.size(); i++) {
             controllers.get(i).onFlowBlock(sink);
         }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java?rev=755235&r1=755234&r2=755235&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java
Tue Mar 17 13:27:44 2009
@@ -18,14 +18,14 @@
 
 public class SizeLimiter<E> extends AbstractLimiter<E> {
 
-    protected int capacity;
-    protected int resumeThreshold;
+    protected long capacity;
+    protected long resumeThreshold;
 
-    private int size;
+    private long size;
     private boolean throttled;
-    private int reserved;
+    private long reserved;
 
-    public SizeLimiter(int capacity, int resumeThreshold) {
+    public SizeLimiter(long capacity, long resumeThreshold) {
         this.capacity = capacity;
         throttled = false;
         this.resumeThreshold = resumeThreshold;
@@ -50,13 +50,13 @@
 
     public void releaseReserved() {
         if (reserved > 0) {
-            int res = reserved;
+            long res = reserved;
             reserved = 0;
             remove(res);
         }
     }
 
-    protected void remove(int s) {
+    public void remove(long s) {
         this.size -= s;
         if (size < 0) {
             Exception ie = new IllegalStateException("Size Negative!" + size);
@@ -91,18 +91,41 @@
         return !throttled;
     }
 
-    public int getCapacity() {
+    public long getCapacity() {
         return capacity;
     }
 
-    public int getResumeThreshold() {
+    public long getResumeThreshold() {
         return resumeThreshold;
     }
 
-    public int getSize() {
+    public long getSize() {
         return size;
     }
 
+    public void setCapacity(long capacity) {
+        if (capacity < resumeThreshold) {
+            throw new IllegalArgumentException("capacity less than resume threshold");
+        }
+
+        this.capacity = capacity;
+
+        if (this.size >= capacity) {
+            throttled = true;
+        }
+    }
+
+    public void setResumeThreshold(long size) {
+
+        if (capacity < resumeThreshold) {
+            throw new IllegalArgumentException("capacity less than resume threshold");
+        }
+        if (throttled && this.size <= resumeThreshold) {
+            throttled = false;
+            notifyUnThrottleListeners();
+        }
+    }
+
     public String toString() {
         return "SizeLimiter " + capacity + "/" + resumeThreshold + ", s=" + size + " res="
+ reserved + ", thr= " + throttled;
     }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java?rev=755235&r1=755234&r2=755235&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java
Tue Mar 17 13:27:44 2009
@@ -57,14 +57,14 @@
 
     }
 
-    public boolean offer(E elem, ISourceController<E> source) {
+    public boolean offer(E elem, ISourceController<?> source) {
         return controller.offer(elem, source);
     }
 
     /**
      * Performs a limited add to the queue.
      */
-    public final void add(E elem, ISourceController<E> source) {
+    public final void add(E elem, ISourceController<?> source) {
         controller.add(elem, source);
     }
 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java?rev=755235&r1=755234&r2=755235&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java
Tue Mar 17 13:27:44 2009
@@ -41,14 +41,14 @@
         super.onFlowOpened(controller);
     }
 
-    public boolean offer(E elem, ISourceController<E> source) {
+    public boolean offer(E elem, ISourceController<?> source) {
         return controller.offer(elem, source);
     }
 
     /**
      * Performs a limited add to the queue.
      */
-    public final void add(E elem, ISourceController<E> source) {
+    public final void add(E elem, ISourceController<?> source) {
         controller.add(elem, source);
     }
 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java?rev=755235&r1=755234&r2=755235&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java
Tue Mar 17 13:27:44 2009
@@ -50,7 +50,7 @@
             return LoadBalancedFlowQueue.this;
         }
 
-        public void onFlowBlock(ISinkController<E> sink) {
+        public void onFlowBlock(ISinkController<?> sink) {
             synchronized (LoadBalancedFlowQueue.this) {
                 SinkNode node = consumers.get(sink);
                 if (node != null) {
@@ -61,7 +61,7 @@
 
         }
 
-        public void onFlowResume(ISinkController<E> sink) {
+        public void onFlowResume(ISinkController<?> sink) {
             synchronized (LoadBalancedFlowQueue.this) {
                 SinkNode node = consumers.get(sink);
                 if (node != null) {
@@ -108,14 +108,14 @@
         super.onFlowOpened(sinkController);
     }
 
-    public boolean offer(E elem, ISourceController<E> source) {
+    public boolean offer(E elem, ISourceController<?> source) {
         return sinkController.offer(elem, source);
     }
 
     /**
      * Performs a limited add to the queue.
      */
-    public final void add(E elem, ISourceController<E> source) {
+    public final void add(E elem, ISourceController<?> source) {
         sinkController.add(elem, source);
     }
 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java?rev=755235&r1=755234&r2=755235&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java
Tue Mar 17 13:27:44 2009
@@ -48,11 +48,11 @@
         throw new UnsupportedOperationException();
     }
 
-    public boolean offer(E elem, ISourceController<E> source) {
+    public boolean offer(E elem, ISourceController<?> source) {
         throw new UnsupportedOperationException("Not yet implemented");
     }
 
-    public synchronized void add(E elem, ISourceController<E> source) {
+    public synchronized void add(E elem, ISourceController<?> source) {
         SingleFlowQueue queue = flowQueues.get(source.getFlow());
         if (queue == null) {
             queue = new SingleFlowQueue(source.getFlow(), new SizeLimiter<E>(perFlowWindow,
resumeThreshold));
@@ -139,7 +139,7 @@
             this.controller = new FlowController<E>(this, flow, limiter, MultiFlowQueue.this);
         }
 
-        final void enqueue(E elem, ISourceController<E> source) {
+        final void enqueue(E elem, ISourceController<?> source) {
             controller.add(elem, source);
         }
 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java?rev=755235&r1=755234&r2=755235&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
Tue Mar 17 13:27:44 2009
@@ -94,13 +94,13 @@
         return partitionMapper;
     }
 
-    public void add(V value, ISourceController<V> source) {
+    public void add(V value, ISourceController<?> source) {
         P partitionKey = partitionMapper.map(value);
         IQueue<K, V> partition = getPartition(partitionKey);
         partition.add(value, source);
     }
 
-    public boolean offer(V value, ISourceController<V> source) {
+    public boolean offer(V value, ISourceController<?> source) {
         P partitionKey = partitionMapper.map(value);
         IQueue<K, V> partition = getPartition(partitionKey);
         return partition.offer(value, source);

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java?rev=755235&r1=755234&r2=755235&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java
Tue Mar 17 13:27:44 2009
@@ -109,13 +109,13 @@
         }
     }
 
-    public void add(V value, ISourceController<V> source) {
+    public void add(V value, ISourceController<?> source) {
         int prio = priorityMapper.map(value);
         IQueue<K, V> partition = getPartition(prio);
         partition.add(value, source);
     }
 
-    public boolean offer(V value, ISourceController<V> source) {
+    public boolean offer(V value, ISourceController<?> source) {
         int prio = priorityMapper.map(value);
         IQueue<K, V> partition = getPartition(prio);
         return partition.offer(value, source);

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java?rev=755235&r1=755234&r2=755235&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
Tue Mar 17 13:27:44 2009
@@ -49,7 +49,6 @@
 
     private final FlowController<V> sinkController;
     private final Object mutex;
-    private final AbstractFlowQueue whoToWakeup = this;
 
     protected Mapper<K, V> keyMapper;
     private long directs;
@@ -63,11 +62,11 @@
         public void elementDispatched(V elem) {
         }
 
-        public void onFlowBlock(ISinkController<V> sink) {
+        public void onFlowBlock(ISinkController<?> sink) {
         }
 
-        public void onFlowResume(ISinkController<V> sinkController) {
-            IFlowSink<V> sink = sinkController.getFlowSink();
+        public void onFlowResume(ISinkController<?> sinkController) {
+            IFlowSink<V> sink = (IFlowSink<V>)sinkController.getFlowSink();
             synchronized (mutex) {
                 SubscriptionNode node = sinks.get(sink);
                 if (node != null) {
@@ -75,13 +74,13 @@
                     boolean notify = false;
                     if (node.cursor == null) {
                         readyDirectSubs.addLast(node);
-                        // System.out.println("Subscription state change: un-ready direct
-> ready direct: "+node);
+                        //System.out.println("Subscription state change: un-ready direct
-> ready direct: "+node);
                     } else {
                         if (readyPollingSubs.isEmpty()) {
                             notify = !store.isEmpty();
                         }
                         readyPollingSubs.addLast(node);
-                        // System.out.println("Subscription state change: un-ready polling
-> ready polling: "+node);
+                        //System.out.println("Subscription state change: un-ready polling
-> ready polling: "+node);
                     }
 
                     if (notify) {
@@ -108,6 +107,7 @@
 
     public SharedQueue(String name, IFlowLimiter<V> limiter) {
         this(name, limiter, new Object());
+        autoRelease = true;
     }
 
     /**
@@ -126,14 +126,14 @@
         super.onFlowOpened(sinkController);
     }
 
-    public boolean offer(V elem, ISourceController<V> source) {
+    public boolean offer(V elem, ISourceController<?> source) {
         return sinkController.offer(elem, source);
     }
 
     /**
      * Performs a limited add to the queue.
      */
-    public final void add(V value, ISourceController<V> source) {
+    public final void add(V value, ISourceController<?> source) {
         sinkController.add(value, source);
     }
 
@@ -170,7 +170,7 @@
                         sub.resumeAt(node);
                         unreadyPollingSubs.addLast(sub);
                         matchCount++;
-                        // System.out.println("Subscription state change: un-ready direct
-> un-ready polling: "+sub);
+                        //System.out.println("Subscription state change: un-ready direct
-> un-ready polling: "+sub);
                     }
                     sub = next;
                 }
@@ -182,7 +182,7 @@
                     subNode.unlink();
                     subNode.resumeAt(node);
                     unreadyPollingSubs.addLast(subNode);
-                    // System.out.println("Subscription state change: ready direct ->
un-ready polling: "+subNode);
+                    //System.out.println("Subscription state change: ready direct -> un-ready
polling: "+subNode);
                 }
                 matchCount += matches.size();
 
@@ -294,7 +294,7 @@
                     }
                     return true;
                 } else {
-                    // System.out.println("Subscription state change: ready polling ->
un-ready polling: "+subNode);
+                    //System.out.println("Subscription state change: ready polling ->
un-ready polling: "+subNode);
                     // Subscription is no longer ready..
                     subNode.cursorUnPeek(storeNode);
                     subNode.unlink();

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowPriorityQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowPriorityQueue.java?rev=755235&r1=755234&r2=755235&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowPriorityQueue.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowPriorityQueue.java
Tue Mar 17 13:27:44 2009
@@ -53,14 +53,14 @@
         super.onFlowOpened(controller);
     }
 
-    public boolean offer(E elem, ISourceController<E> source) {
+    public boolean offer(E elem, ISourceController<?> source) {
         return controller.offer(elem, source);
     }
 
     /**
      * Performs a limited add to the queue.
      */
-    public final void add(E elem, ISourceController<E> source) {
+    public final void add(E elem, ISourceController<?> source) {
         controller.add(elem, source);
     }
 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java?rev=755235&r1=755234&r2=755235&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java
Tue Mar 17 13:27:44 2009
@@ -23,11 +23,11 @@
         super.onFlowOpened(controller);
     }
 
-    public void add(E elem, ISourceController<E> source) {
+    public void add(E elem, ISourceController<?> source) {
         controller.add(elem, source);
     }
 
-    public boolean offer(E elem, ISourceController<E> source) {
+    public boolean offer(E elem, ISourceController<?> source) {
         return controller.offer(elem, source);
     }
 



Mime
View raw message