activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r744094 - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/flow/ main/java/org/apache/activemq/queue/ test/java/org/apache/activemq/flow/
Date Fri, 13 Feb 2009 12:19:16 GMT
Author: chirino
Date: Fri Feb 13 12:19:15 2009
New Revision: 744094

URL: http://svn.apache.org/viewvc?rev=744094&view=rev
Log:
- the ISinkController looks and smells like a IFlowSink.. so lets let it be one.
- Can replace using the exclusive queue for connection outbound messages with just a FlowContoller


Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java?rev=744094&r1=744093&r2=744094&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
Fri Feb 13 12:19:15 2009
@@ -438,4 +438,34 @@
     public IFlowSink<E> getFlowSink() {
         return controllable.getFlowSink();
     }
+
+    public long getResourceId() {
+        IFlowSink<E> flowSink = getFlowSink();
+        if( flowSink!=null ) {
+            return flowSink.getResourceId();
+        }
+        return 0;
+    }
+
+    public String getResourceName() {
+        IFlowSink<E> flowSink = getFlowSink();
+        if( flowSink!=null ) {
+            return flowSink.getResourceName();
+        }
+        return null;
+    }
+
+    public void addFlowLifeCycleListener(FlowLifeCycleListener listener) {
+        IFlowSink<E> flowSink = getFlowSink();
+        if( flowSink!=null ) {
+            flowSink.addFlowLifeCycleListener(listener);
+        }
+    }
+    
+    public void removeFlowLifeCycleListener(FlowLifeCycleListener listener) {
+        IFlowSink<E> flowSink = getFlowSink();
+        if( flowSink!=null ) {
+            flowSink.removeFlowLifeCycleListener(listener);
+        }
+    }
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java?rev=744094&r1=744093&r2=744094&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java
Fri Feb 13 12:19:15 2009
@@ -16,7 +16,7 @@
  */
 package org.apache.activemq.flow;
 
-public interface ISinkController<E> {
+public interface ISinkController<E> extends IFlowSink<E> {
     /**
      * Defines required attributes for an entity that can be flow controlled.
      * 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java?rev=744094&r1=744093&r2=744094&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java
Fri Feb 13 12:19:15 2009
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.flow;
 
+
 public class NoOpFlowController<E> implements ISinkController<E> {
     private final IFlowSource<E> source;
     private final Flow flow;
@@ -89,4 +90,33 @@
         return null;
     }
 
+    public long getResourceId() {
+        IFlowSink<E> flowSink = getFlowSink();
+        if( flowSink!=null ) {
+            return flowSink.getResourceId();
+        }
+        return 0;
+    }
+
+    public String getResourceName() {
+        IFlowSink<E> flowSink = getFlowSink();
+        if( flowSink!=null ) {
+            return flowSink.getResourceName();
+        }
+        return null;
+    }
+
+    public void addFlowLifeCycleListener(FlowLifeCycleListener listener) {
+        IFlowSink<E> flowSink = getFlowSink();
+        if( flowSink!=null ) {
+            flowSink.addFlowLifeCycleListener(listener);
+        }
+    }
+    
+    public void removeFlowLifeCycleListener(FlowLifeCycleListener listener) {
+        IFlowSink<E> flowSink = getFlowSink();
+        if( flowSink!=null ) {
+            flowSink.removeFlowLifeCycleListener(listener);
+        }
+    }
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java?rev=744094&r1=744093&r2=744094&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java
Fri Feb 13 12:19:15 2009
@@ -16,48 +16,40 @@
  */
 package org.apache.activemq.flow;
 
-import org.apache.activemq.queue.Mapper;
+import java.util.ArrayList;
 
 public class PriorityFlowController<E> implements ISourceController<E>, ISinkController<E>
{
 
     private final Object mutex;
-    private final FlowController<E> controllers[];
+    private final ArrayList<FlowController<E>> controllers;
     private final PrioritySizeLimiter<E> limiter;
 
-    private Mapper<Integer, E> priorityMapper;
-
     private final Flow flow;
     private final FlowControllable<E> controllable;
 
-    public PriorityFlowController(int priorities, FlowControllable<E> controllable,
Flow flow, Object mutex, int capacity, int resume) {
+    public PriorityFlowController(FlowControllable<E> controllable, Flow flow, PrioritySizeLimiter<E>
limiter, Object mutex) {
         this.controllable = controllable;
         this.flow = flow;
         this.mutex = mutex;
-        this.limiter = new PrioritySizeLimiter<E>(capacity, resume, priorities);
-        this.limiter.setPriorityMapper(priorityMapper);
-        this.controllers = createControlerArray(priorities);
-        for (int i = 0; i < priorities; i++) {
-            this.controllers[i] = new FlowController<E>(controllable, flow, limiter.getPriorityLimter(i),
mutex);
+        this.limiter =  limiter;
+        this.controllers = new ArrayList<FlowController<E>>(limiter.getPriorities());
+        for (int i = 0; i < limiter.getPriorities(); i++) {
+            controllers.add(new FlowController<E>(controllable, flow, limiter.getPriorityLimter(i),
mutex));
         }
     }
 
-    @SuppressWarnings("unchecked")
-    private FlowController<E>[] createControlerArray(int priorities) {
-        return new FlowController[priorities];
-    }
-
     // /////////////////////////////////////////////////////////////////
     // ISinkController interface impl.
     // /////////////////////////////////////////////////////////////////
 
     public boolean offer(E elem, ISourceController<E> controller) {
-        int prio = priorityMapper.map(elem);
-        return controllers[prio].offer(elem, controller);
+        int prio = limiter.getPriorityMapper().map(elem);
+        return controllers.get(prio).offer(elem, controller);
     }
 
     public void add(E elem, ISourceController<E> controller) {
-        int prio = priorityMapper.map(elem);
-        controllers[prio].add(elem, controller);
+        int prio = limiter.getPriorityMapper().map(elem);
+        controllers.get(prio).add(elem, controller);
     }
 
     public boolean isSinkBlocked() {
@@ -68,8 +60,8 @@
 
     public boolean addUnblockListener(org.apache.activemq.flow.ISinkController.FlowUnblockListener<E>
listener) {
         boolean rc = false;
-        for (int i = 0; i < controllers.length; i++) {
-            rc |= this.controllers[i].addUnblockListener(listener);
+        for (int i = 0; i < controllers.size(); i++) {
+            rc |= this.controllers.get(i).addUnblockListener(listener);
         }
         return rc;
     }
@@ -77,13 +69,45 @@
     public void waitForFlowUnblock() throws InterruptedException {
         throw new UnsupportedOperationException();
     }
+    
+    public long getResourceId() {
+        IFlowSink<E> flowSink = getFlowSink();
+        if( flowSink!=null ) {
+            return flowSink.getResourceId();
+        }
+        return 0;
+    }
+
+    public String getResourceName() {
+        IFlowSink<E> flowSink = getFlowSink();
+        if( flowSink!=null ) {
+            return flowSink.getResourceName();
+        }
+        return null;
+    }
+
+    public void addFlowLifeCycleListener(FlowLifeCycleListener listener) {
+        IFlowSink<E> flowSink = getFlowSink();
+        if( flowSink!=null ) {
+            flowSink.addFlowLifeCycleListener(listener);
+        }
+    }
+    
+    public void removeFlowLifeCycleListener(FlowLifeCycleListener listener) {
+        IFlowSink<E> flowSink = getFlowSink();
+        if( flowSink!=null ) {
+            flowSink.removeFlowLifeCycleListener(listener);
+        }
+    }
+
 
     // /////////////////////////////////////////////////////////////////
     // ISourceController interface impl.
     // /////////////////////////////////////////////////////////////////
 
     public void elementDispatched(E elem) {
-        FlowController<E> controler = controllers[priorityMapper.map(elem)];
+        Integer prio = limiter.getPriorityMapper().map(elem);
+        FlowController<E> controler = controllers.get(prio);
         controler.elementDispatched(elem);
     }
 
@@ -96,14 +120,14 @@
     }
 
     public void onFlowBlock(ISinkController<E> sink) {
-        for (int i = 0; i < controllers.length; i++) {
-            controllers[i].onFlowBlock(sink);
+        for (int i = 0; i < controllers.size(); i++) {
+            controllers.get(i).onFlowBlock(sink);
         }
     }
 
     public void onFlowResume(ISinkController<E> sink) {
-        for (int i = 0; i < controllers.length; i++) {
-            controllers[i].onFlowBlock(sink);
+        for (int i = 0; i < controllers.size(); i++) {
+            controllers.get(i).onFlowBlock(sink);
         }
     }
 
@@ -115,15 +139,6 @@
     // Getters and Setters
     // /////////////////////////////////////////////////////////////////
 
-    public Mapper<Integer, E> getPriorityMapper() {
-        return priorityMapper;
-    }
-
-    public void setPriorityMapper(Mapper<Integer, E> priorityMapper) {
-        this.priorityMapper = priorityMapper;
-        limiter.setPriorityMapper(priorityMapper);
-    }
-
     public IFlowSink<E> getFlowSink() {
         return controllable.getFlowSink();
     }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java?rev=744094&r1=744093&r2=744094&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java
Fri Feb 13 12:19:15 2009
@@ -21,6 +21,7 @@
 import org.apache.activemq.flow.FlowController;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.PriorityFlowController;
+import org.apache.activemq.flow.PrioritySizeLimiter;
 import org.apache.kahadb.util.LinkedNode;
 
 /**
@@ -28,7 +29,6 @@
 public class ExclusivePriorityQueue<E> extends AbstractFlowQueue<E> implements
IFlowQueue<E> {
 
     private final PriorityLinkedList<PriorityNode> queue;
-    private Mapper<Integer, E> priorityMapper;
 
     private class PriorityNode extends LinkedNode<PriorityNode> {
         E elem;
@@ -36,6 +36,7 @@
     }
 
     private final PriorityFlowController<E> controller;
+    private final PrioritySizeLimiter<E> limiter;
 
     /**
      * Creates a flow queue that can handle multiple flows.
@@ -48,10 +49,11 @@
      * @param controller
      *            The FlowController if this queue is flow controlled:
      */
-    public ExclusivePriorityQueue(int priority, Flow flow, String name, int capacity, int
resume) {
+    public ExclusivePriorityQueue(Flow flow, String name, PrioritySizeLimiter<E> limiter)
{
         super(name);
+        this.limiter = limiter;
         this.queue = new PriorityLinkedList<PriorityNode>(10);
-        this.controller = new PriorityFlowController<E>(priority, getFlowControllableHook(),
flow, this, capacity, resume);
+        this.controller = new PriorityFlowController<E>(getFlowControllableHook(),
flow, limiter, this);
 
     }
 
@@ -72,7 +74,7 @@
     public synchronized void flowElemAccepted(ISourceController<E> controller, E elem)
{
         PriorityNode node = new PriorityNode();
         node.elem = elem;
-        node.prio = priorityMapper.map(elem);
+        node.prio = limiter.getPriorityMapper().map(elem);
 
         queue.add(node, node.prio);
         notifyReady();
@@ -107,15 +109,6 @@
         }
     }
 
-    public Mapper<Integer, E> getPriorityMapper() {
-        return priorityMapper;
-    }
-
-    public void setPriorityMapper(Mapper<Integer, E> priorityMapper) {
-        this.priorityMapper = priorityMapper;
-        controller.setPriorityMapper(priorityMapper);
-    }
-
     @Override
     public String toString() {
         return getResourceName();

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java?rev=744094&r1=744093&r2=744094&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
Fri Feb 13 12:19:15 2009
@@ -65,8 +65,9 @@
         if (MockBrokerTest.PRIORITY_LEVELS <= 1) {
             this.output = TestFlowManager.createFlowQueue(flow, name + "-OUTPUT", outputQueueSize,
resumeThreshold);
         } else {
-            ExclusivePriorityQueue<Message> t = new ExclusivePriorityQueue<Message>(MockBrokerTest.PRIORITY_LEVELS,
flow, name + "-OUTPUT", outputQueueSize, resumeThreshold);
-            t.setPriorityMapper(Message.PRIORITY_MAPPER);
+            PrioritySizeLimiter<Message> pl = new PrioritySizeLimiter<Message>(outputQueueSize,
resumeThreshold, MockBrokerTest.PRIORITY_LEVELS);
+            pl.setPriorityMapper(Message.PRIORITY_MAPPER);
+            ExclusivePriorityQueue<Message> t = new ExclusivePriorityQueue<Message>(flow,
name + "-OUTPUT", pl);
             this.output = t;
         }
 

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java?rev=744094&r1=744093&r2=744094&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
Fri Feb 13 12:19:15 2009
@@ -9,21 +9,20 @@
 import org.apache.activemq.flow.Commands.Destination;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
 import org.apache.activemq.flow.MockBroker.DeliveryTarget;
-import org.apache.activemq.queue.ExclusivePriorityQueue;
-import org.apache.activemq.queue.ExclusiveQueue;
-import org.apache.activemq.queue.IFlowQueue;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportListener;
 
 public class RemoteConnection implements TransportListener, DeliveryTarget {
 
-    protected final Object mutex = new Object();
 
     protected Transport transport;
     protected MockBroker broker;
-    protected IFlowQueue<Message> output;
 
+    protected final Object inboundMutex = new Object();
     protected FlowController<Message> inboundController;
+
+    protected final Object outboundMutex = new Object();
+    protected IFlowSink<Message> outboundController;
     protected String name;
 
     private int priorityLevels;
@@ -106,26 +105,12 @@
             public IFlowSource<Message> getFlowSource() {
                 return null;
             }
-        }, flow, limiter, mutex);
+        }, flow, limiter, inboundMutex);
 
         // Setup output processing
-        if (priorityLevels <= 1) {
-            limiter = new SizeLimiter<Message>(outputWindowSize, outputResumeThreshold);
-            flow = new Flow(name + "-outbound", false);
-            ExclusiveQueue<Message> queue = new ExclusiveQueue<Message>(flow,
flow.getFlowName(), limiter);
-            this.output = queue;
-        } else {
-            ExclusivePriorityQueue<Message> t = new ExclusivePriorityQueue<Message>(priorityLevels,
flow, name + "-outbound", outputWindowSize, outputResumeThreshold);
-            t.setPriorityMapper(Message.PRIORITY_MAPPER);
-            this.output = t;
-        }
-
-        // Use an async thread to drain the output queue.
-        // Personally I think it would be better if we polled messages out of the output
queue.
         writer = Executors.newSingleThreadExecutor();
-        output.setDispatcher(dispatcher);
-        output.setDrain(new IFlowDrain<Message>() {
-            public void drain(final Message elem, final ISourceController<Message>
controller) {
+        FlowControllable<Message> controllable = new FlowControllable<Message>(){
+            public void flowElemAccepted(final ISourceController<Message> controller,
final Message elem) {
                 writer.execute(new Runnable() {
                     public void run() {
                         if (!stopping.get()) {
@@ -139,7 +124,24 @@
                     }
                 });
             }
-        });
+            public IFlowSink<Message> getFlowSink() {
+                return null;
+            }
+            public IFlowSource<Message> getFlowSource() {
+                return null;
+            }
+        };
+
+        flow = new Flow(name + "-outbound", false);
+        if (priorityLevels <= 1) {
+            limiter = new SizeLimiter<Message>(outputWindowSize, outputResumeThreshold);
+            outboundController = new FlowController<Message>(controllable, flow, limiter,
 outboundMutex);
+        } else {
+            PrioritySizeLimiter<Message> pl = new PrioritySizeLimiter<Message>(outputWindowSize,
outputResumeThreshold, priorityLevels);
+            pl.setPriorityMapper(Message.PRIORITY_MAPPER);
+            outboundController = new PriorityFlowController<Message>(controllable,
flow, pl,  outboundMutex);
+        }
+
     }
 
     public void onException(IOException error) {
@@ -200,7 +202,7 @@
     }
 
     public IFlowSink<Message> getSink() {
-        return output;
+        return outboundController;
     }
 
     public boolean match(Message message) {



Mime
View raw message