activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r743802 - /activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/
Date Thu, 12 Feb 2009 16:48:45 GMT
Author: chirino
Date: Thu Feb 12 16:48:44 2009
New Revision: 743802

URL: http://svn.apache.org/viewvc?rev=743802&view=rev
Log:
Getting closer to being able to test real IO

Added:
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/LocalConsumer.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/LocalProducer.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java
Removed:
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockConsumerConnection.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockProducerConnection.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockTransportConnection.java
Modified:
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Destination.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Destination.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Destination.java?rev=743802&r1=743801&r2=743802&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Destination.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Destination.java
Thu Feb 12 16:48:44 2009
@@ -16,12 +16,18 @@
  */
 package org.apache.activemq.flow;
 
-public class Destination {
+import java.io.Serializable;
 
+public class Destination implements Serializable{
+
+    private static final long serialVersionUID = 4020372477810069873L;
+    
     String name;
+    boolean ptp;
 
-    Destination(String name) {
+    Destination(String name, boolean ptp) {
         this.name = name;
+        this.ptp=ptp;
     }
 
     public final String getName() {

Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/LocalConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/LocalConsumer.java?rev=743802&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/LocalConsumer.java
(added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/LocalConsumer.java
Thu Feb 12 16:48:44 2009
@@ -0,0 +1,109 @@
+/**
+ * 
+ */
+package org.apache.activemq.flow;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.flow.AbstractTestConnection.ReadReadyListener;
+import org.apache.activemq.flow.MockBrokerTest.DeliveryTarget;
+import org.apache.activemq.metric.MetricCounter;
+
+class LocalConsumer extends AbstractTestConnection implements MockBrokerTest.DeliveryTarget
{
+
+    /**
+     * 
+     */
+    private final MockBrokerTest mockBrokerTest;
+    MetricCounter consumerRate = new MetricCounter();
+    private final Destination destination;
+    String selector;
+    private boolean autoRelease = false;
+
+    private long thinkTime = 0;
+
+    public LocalConsumer(MockBrokerTest mockBrokerTest, String name, MockBroker broker, Destination
destination) {
+        super(broker, name, broker.getFlowManager().createFlow(destination.getName()), null);
+        this.mockBrokerTest = mockBrokerTest;
+        this.destination = destination;
+        output.setAutoRelease(autoRelease);
+        consumerRate.name("Consumer " + name + " Rate");
+        this.mockBrokerTest.totalConsumerRate.add(consumerRate);
+
+    }
+
+    public Destination getDestination() {
+        return destination;
+    }
+
+    public String getSelector() {
+        return selector;
+    }
+
+    public void setThinkTime(long time) {
+        thinkTime = time;
+    }
+
+    @Override
+    protected synchronized Message getNextMessage() throws InterruptedException {
+        wait();
+        return null;
+    }
+
+    @Override
+    protected void addReadReadyListener(final ReadReadyListener listener) {
+        return;
+    }
+
+    public Message pollNextMessage() {
+        return null;
+    }
+
+    @Override
+    protected void messageReceived(Message m, ISourceController<Message> controller)
{
+    }
+
+    @Override
+    protected void write(final Message m, final ISourceController<Message> controller)
throws InterruptedException {
+        if (!m.isSystem()) {
+            // /IF we are async don't tie up the calling thread
+            // schedule dispatch complete for later.
+            if (this.mockBrokerTest.dispatchMode == ASYNC && thinkTime > 0) {
+                Runnable acker = new Runnable() {
+                    public void run() {
+                        if (thinkTime > 0) {
+                            try {
+                                Thread.sleep(thinkTime);
+                            } catch (InterruptedException e) {
+                            }
+                        }
+                        simulateEncodingWork();
+                        if (!autoRelease) {
+                            controller.elementDispatched(m);
+                        }
+                        consumerRate.increment();
+                    }
+                };
+
+                broker.dispatcher.schedule(acker, thinkTime, TimeUnit.MILLISECONDS);
+            } else {
+                simulateEncodingWork();
+                if (!autoRelease) {
+                    controller.elementDispatched(m);
+                }
+                consumerRate.increment();
+            }
+        }
+    }
+
+    public IFlowSink<Message> getSink() {
+        return output;
+    }
+
+    public boolean match(Message message) {
+        if (selector == null)
+            return true;
+        return message.match(selector);
+    }
+
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/LocalProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/LocalProducer.java?rev=743802&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/LocalProducer.java
(added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/LocalProducer.java
Thu Feb 12 16:48:44 2009
@@ -0,0 +1,113 @@
+/**
+ * 
+ */
+package org.apache.activemq.flow;
+
+import org.apache.activemq.flow.AbstractTestConnection.ReadReadyListener;
+import org.apache.activemq.metric.MetricCounter;
+
+class LocalProducer extends AbstractTestConnection {
+
+    /**
+     * 
+     */
+    private final MockBrokerTest mockBrokerTest;
+
+    MetricCounter producerRate = new MetricCounter();
+
+    private final Destination destination;
+    private int msgCounter;
+    private String name;
+    String property;
+    private Message next;
+    int msgPriority = 0;
+    int priorityMod = 0;
+    int producerId;
+
+    public LocalProducer(MockBrokerTest mockBrokerTest, String name, MockBroker broker, Destination
destination) {
+
+        super(broker, name, broker.getFlowManager().createFlow(name), null);
+        this.mockBrokerTest = mockBrokerTest;
+        this.destination = destination;
+        this.producerId = this.mockBrokerTest.prodcuerIdGenerator.getAndIncrement();
+        
+        producerRate.name("Producer " + name + " Rate");
+        this.mockBrokerTest.totalProducerRate.add(producerRate);
+
+    }
+
+    /*
+     * Gets the next message blocking until space is available for it.
+     * (non-Javadoc)
+     * 
+     * @see com.progress.flow.AbstractTestConnection#getNextMessage()
+     */
+    public Message getNextMessage() throws InterruptedException {
+
+        Message m = new Message(this.mockBrokerTest.msgIdGenerator.getAndIncrement(), producerId,
name + ++msgCounter, flow, destination, msgPriority);
+        if (property != null) {
+            m.setProperty(property);
+        }
+        simulateEncodingWork();
+        input.getFlowController(m.getFlow()).waitForFlowUnblock();
+        return m;
+    }
+
+    @Override
+    protected void addReadReadyListener(final ReadReadyListener listener) {
+        if (next == null) {
+            next = new Message(this.mockBrokerTest.msgIdGenerator.getAndIncrement(), producerId,
name + ++msgCounter, flow, destination, msgPriority);
+            if (property != null) {
+                next.setProperty(property);
+            }
+            simulateEncodingWork();
+        }
+
+        if (!input.getFlowController(next.getFlow()).addUnblockListener(new ISinkController.FlowUnblockListener<Message>()
{
+            public void onFlowUnblocked(ISinkController<Message> controller) {
+                listener.onReadReady();
+            }
+        })) {
+            // Return value of false means that the controller didn't
+            // register the listener because it was not blocked.
+            listener.onReadReady();
+        }
+    }
+
+    @Override
+    public final Message pollNextMessage() {
+        if (next == null) {
+            int priority = msgPriority;
+            if (priorityMod > 0) {
+                priority = msgCounter % priorityMod == 0 ? 0 : msgPriority;
+            }
+
+            next = new Message(this.mockBrokerTest.msgIdGenerator.getAndIncrement(), producerId,
name + ++msgCounter, flow, destination, priority);
+            if (property != null) {
+                next.setProperty(property);
+            }
+            simulateEncodingWork();
+        }
+
+        if (input.getFlowController(next.getFlow()).isSinkBlocked()) {
+            return null;
+        }
+
+        Message m = next;
+        next = null;
+        return m;
+    }
+
+    @Override
+    public void messageReceived(Message m, ISourceController<Message> controller) {
+
+        broker.router.route(controller, m);
+        producerRate.increment();
+    }
+
+    @Override
+    public void write(Message m, ISourceController<Message> controller) {
+        // Noop
+    }
+
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java?rev=743802&r1=743801&r2=743802&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java Thu
Feb 12 16:48:44 2009
@@ -16,12 +16,13 @@
  */
 package org.apache.activemq.flow;
 
+import java.io.Serializable;
 import java.util.HashSet;
 
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.queue.Mapper;
 
-public class Message {
+public class Message implements Serializable {
 
     public static final Mapper<Integer, Message> PRIORITY_MAPPER = new Mapper<Integer,
Message>() {
         public Integer map(Message element) {
@@ -39,7 +40,7 @@
     public static final short TYPE_FLOW_CLOSE = 3;
 
     final String msg;
-    final Flow flow;
+    transient final Flow flow;
     final Destination dest;
     int hopCount;
     HashSet<String> matchProps;

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java?rev=743802&r1=743801&r2=743802&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java
Thu Feb 12 16:48:44 2009
@@ -11,6 +11,7 @@
 
 import org.apache.activemq.dispatch.IDispatcher;
 import org.apache.activemq.flow.MockBrokerTest.BrokerConnection;
+import org.apache.activemq.flow.MockBrokerTest.DeliveryTarget;
 import org.apache.activemq.flow.MockBrokerTest.Router;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportAcceptListener;
@@ -22,9 +23,9 @@
     private final MockBrokerTest mockBrokerTest;
     private final TestFlowManager flowMgr;
     
-    final ArrayList<MockTransportConnection> connections = new ArrayList<MockTransportConnection>();
-    final ArrayList<MockProducerConnection> producers = new ArrayList<MockProducerConnection>();
-    final ArrayList<MockConsumerConnection> consumers = new ArrayList<MockConsumerConnection>();
+    final ArrayList<RemoteConnection> connections = new ArrayList<RemoteConnection>();
+    final ArrayList<LocalProducer> producers = new ArrayList<LocalProducer>();
+    final ArrayList<LocalConsumer> consumers = new ArrayList<LocalConsumer>();
     private final ArrayList<BrokerConnection> brokerConns = new ArrayList<BrokerConnection>();
 
     private final HashMap<Destination, MockQueue> queues = new HashMap<Destination,
MockQueue>();
@@ -38,7 +39,7 @@
     public final int priorityLevels = MockBrokerTest.PRIORITY_LEVELS;
     public final int ioWorkAmount = MockBrokerTest.IO_WORK_AMOUNT;
     public final boolean useInputQueues = MockBrokerTest.USE_INPUT_QUEUES;
-    private TransportServer transportServer;
+    public TransportServer transportServer;
 
     MockBroker(MockBrokerTest mockBrokerTest, String name) throws IOException, URISyntaxException
{
         this.mockBrokerTest = mockBrokerTest;
@@ -58,23 +59,26 @@
     }
 
     public void createProducerConnection(Destination destination) {
-        MockProducerConnection c = new MockProducerConnection(this.mockBrokerTest, "producer"
+ ++pCount, this, destination);
+        LocalProducer c = new LocalProducer(this.mockBrokerTest, "producer" + ++pCount, this,
destination);
         producers.add(c);
     }
 
-    public void createConsumerConnection(Destination destination, boolean ptp) {
-        MockConsumerConnection c = new MockConsumerConnection(this.mockBrokerTest, "consumer"
+ ++cCount, this, destination);
+    public void createConsumerConnection(Destination destination) {
+        LocalConsumer c = new LocalConsumer(this.mockBrokerTest, "consumer" + ++cCount, this,
destination);
         consumers.add(c);
-        if (ptp) {
-            queues.get(destination).addConsumer(c);
+        subscribe(destination, c);
+    }
+
+    public void subscribe(Destination destination, DeliveryTarget deliveryTarget) {
+        if (destination.ptp) {
+            queues.get(destination).addConsumer(deliveryTarget);
         } else {
-            router.bind(c, destination);
+            router.bind(deliveryTarget, destination);
         }
-
     }
 
     public void createClusterConnection(Destination destination) {
-        MockConsumerConnection c = new MockConsumerConnection(this.mockBrokerTest, "consumer"
+ ++cCount, this, destination);
+        LocalConsumer c = new LocalConsumer(this.mockBrokerTest, "consumer" + ++cCount, this,
destination);
         consumers.add(c);
         router.bind(c, destination);
     }
@@ -100,13 +104,13 @@
     final void stopServices() throws Exception {
         transportServer.stop();
         
-        for (MockTransportConnection connection : connections) {
+        for (RemoteConnection connection : connections) {
             connection.stop();
         }
-        for (MockProducerConnection connection : producers) {
+        for (LocalProducer connection : producers) {
             connection.stop();
         }
-        for (MockConsumerConnection connection : consumers) {
+        for (LocalConsumer connection : consumers) {
             connection.stop();
         }
         for (BrokerConnection connection : brokerConns) {
@@ -127,7 +131,7 @@
         
         dispatcher.start();
 
-        for (MockConsumerConnection connection : consumers) {
+        for (LocalConsumer connection : consumers) {
             connection.start();
         }
 
@@ -135,7 +139,7 @@
             queue.start();
         }
         
-        for (MockProducerConnection connection : producers) {
+        for (LocalProducer connection : producers) {
             connection.start();
         }
 
@@ -145,7 +149,8 @@
     }
 
     public void onAccept(final Transport transport) {
-        MockTransportConnection connection = new MockTransportConnection();
+        RemoteConnection connection = new RemoteConnection();
+        connection.setBroker(this);
         connection.setTransport(transport);
         try {
             connection.start();

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java?rev=743802&r1=743801&r2=743802&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
Thu Feb 12 16:48:44 2009
@@ -50,7 +50,7 @@
     private boolean multibroker = false;
 
     // Set to mockup up ptp:
-    private boolean ptp = true;
+    boolean ptp = true;
 
     // Can be set to BLOCKING, POLLING or ASYNC
     final int dispatchMode = AbstractTestConnection.ASYNC;
@@ -195,7 +195,7 @@
         consumerCount = 1;
 
         createConnections();
-        MockProducerConnection producer = sendBroker.producers.get(0);
+        LocalProducer producer = sendBroker.producers.get(0);
         producer.msgPriority = 1;
         producer.producerRate.setName("High Priority Producer Rate");
 
@@ -233,7 +233,7 @@
         consumerCount = 1;
 
         createConnections();
-        MockProducerConnection producer = sendBroker.producers.get(0);
+        LocalProducer producer = sendBroker.producers.get(0);
         producer.msgPriority = 1;
         producer.priorityMod = 3;
         producer.producerRate.setName("High Priority Producer Rate");
@@ -421,7 +421,7 @@
         Destination[] dests = new Destination[destCount];
 
         for (int i = 0; i < destCount; i++) {
-            dests[i] = new Destination("dest" + (i + 1));
+            dests[i] = new Destination("dest" + (i + 1), ptp);
             if (ptp) {
                 sendBroker.createQueue(dests[i]);
                 if (multibroker) {
@@ -434,7 +434,7 @@
             sendBroker.createProducerConnection(dests[i % destCount]);
         }
         for (int i = 0; i < consumerCount; i++) {
-            rcvBroker.createConsumerConnection(dests[i % destCount], ptp);
+            rcvBroker.createConsumerConnection(dests[i % destCount]);
         }
 
         // Create MultiBroker connections:

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java?rev=743802&r1=743801&r2=743802&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java Thu
Feb 12 16:48:44 2009
@@ -15,7 +15,7 @@
 class MockQueue implements MockBrokerTest.DeliveryTarget {
 
     private final MockBrokerTest mockBrokerTest;
-    HashMap<MockConsumerConnection, Subscription<Message>> subs = new HashMap<MockConsumerConnection,
Subscription<Message>>();
+    HashMap<DeliveryTarget, Subscription<Message>> subs = new HashMap<DeliveryTarget,
Subscription<Message>>();
     private final Destination destination;
     private final IQueue<Long, Message> queue;
     private final MockBroker broker;
@@ -77,7 +77,7 @@
         return destination;
     }
 
-    public final void addConsumer(final MockConsumerConnection dt) {
+    public final void addConsumer(final DeliveryTarget dt) {
         Subscription<Message> sub = new Subscription<Message>() {
             public boolean isPreAcquired() {
                 return true;

Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java?rev=743802&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
(added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
Thu Feb 12 16:48:44 2009
@@ -0,0 +1,204 @@
+package org.apache.activemq.flow;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.flow.ISinkController.FlowControllable;
+import org.apache.activemq.flow.MockBrokerTest.DeliveryTarget;
+import org.apache.activemq.queue.ExclusivePriorityQueue;
+import org.apache.activemq.queue.ExclusiveQueue;
+import org.apache.activemq.queue.IFlowQueue;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportListener;
+
+public class RemoteConnection implements TransportListener, DeliveryTarget {
+
+    protected final Object mutex = new Object();
+
+    protected Transport transport;
+    protected MockBroker broker;
+    protected IFlowQueue<Message> output;
+
+    protected FlowController<Message> inboundController;
+    protected String name;
+
+    private int priorityLevels;
+
+    private final int outputWindowSize = 1000;
+    private final int outputResumeThreshold = 500;
+
+    private final int inputWindowSize = 1000;
+    private final int inputResumeThreshold = 900;
+
+    private IDispatcher dispatcher;
+    private ExecutorService writer;
+
+    private final AtomicBoolean stopping = new AtomicBoolean();
+
+    public void setBroker(MockBroker broker) {
+        this.broker = broker;
+
+    }
+
+    public void setTransport(Transport transport) {
+        this.transport = transport;
+    }
+
+    public void start() throws Exception {
+
+        // Setup the input processing..
+        SizeLimiter<Message> limiter = new SizeLimiter<Message>(inputWindowSize,
inputResumeThreshold);
+        Flow flow = new Flow(name + "-inbound", false);
+        inboundController = new FlowController<Message>(new FlowControllable<Message>()
{
+            public void flowElemAccepted(ISourceController<Message> controller, Message
elem) {
+                broker.router.route(controller, elem);
+            }
+
+            public IFlowSink<Message> getFlowSink() {
+                return null;
+            }
+
+            public IFlowSource<Message> getFlowSource() {
+                return null;
+            }
+        }, flow, limiter, mutex);
+
+        // Setup output processing
+        if (priorityLevels <= 1) {
+            limiter = new SizeLimiter<Message>(outputWindowSize, outputResumeThreshold);
+            flow = new Flow(name + "-outbound", false);
+            ExclusiveQueue<Message> queue = new ExclusiveQueue<Message>(flow,
flow.getFlowName(), limiter);
+            this.output = queue;
+        } else {
+            ExclusivePriorityQueue<Message> t = new ExclusivePriorityQueue<Message>(broker.priorityLevels,
flow, name + "-outbound", outputWindowSize, outputResumeThreshold);
+            t.setPriorityMapper(Message.PRIORITY_MAPPER);
+            this.output = t;
+        }
+
+        // Use an async thread to drain the output queue.
+        // Personally I think it would be better if we polled messages out of the output
queue.
+        writer = Executors.newSingleThreadExecutor();
+        output.setDispatcher(dispatcher);
+        output.setDrain(new IFlowDrain<Message>() {
+            public void drain(final Message elem, final ISourceController<Message>
controller) {
+                writer.execute(new Runnable() {
+                    public void run() {
+                        if (!stopping.get()) {
+                            try {
+                                transport.oneway(elem);
+                                controller.elementDispatched(elem);
+                            } catch (IOException e) {
+                                onException(e);
+                            }
+                        }
+                    }
+                });
+            }
+        });
+
+        transport.setTransportListener(this);
+        transport.start();
+    }
+
+    public void stop() throws Exception {
+        stopping.set(true);
+        writer.shutdown();
+        if (transport != null) {
+            transport.stop();
+        }
+    }
+
+    public void onCommand(Object command) {
+        try {
+            if (command.getClass() == Message.class) {
+                Message msg = (Message) command;
+                // Use the flow controller to send the message on so that we do
+                // not overflow
+                // the broker.
+                while (!inboundController.offer(msg, null)) {
+                    inboundController.waitForFlowUnblock();
+                }
+            } else if (command.getClass() == Destination.class) {
+                // This is a subscription request
+                Destination destination = (Destination) command;
+                broker.subscribe(destination, this);
+            }
+        } catch (Exception e) {
+            onException(e);
+        }
+    }
+
+    public void onException(IOException error) {
+        if (!stopping.get()) {
+            error.printStackTrace();
+        }
+    }
+
+    public void onException(Exception error) {
+        if (!stopping.get()) {
+            error.printStackTrace();
+        }
+    }
+
+    public void transportInterupted() {
+    }
+
+    public void transportResumed() {
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public int getPriorityLevels() {
+        return priorityLevels;
+    }
+
+    public void setPriorityLevels(int priorityLevels) {
+        this.priorityLevels = priorityLevels;
+    }
+
+    public IDispatcher getDispatcher() {
+        return dispatcher;
+    }
+
+    public void setDispatcher(IDispatcher dispatcher) {
+        this.dispatcher = dispatcher;
+    }
+
+    public MockBroker getBroker() {
+        return broker;
+    }
+
+    public int getOutputWindowSize() {
+        return outputWindowSize;
+    }
+
+    public int getOutputResumeThreshold() {
+        return outputResumeThreshold;
+    }
+
+    public int getInputWindowSize() {
+        return inputWindowSize;
+    }
+
+    public int getInputResumeThreshold() {
+        return inputResumeThreshold;
+    }
+
+    public IFlowSink<Message> getSink() {
+        return output;
+    }
+
+    public boolean match(Message message) {
+        return true;
+    }
+
+}

Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java?rev=743802&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java
(added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java
Thu Feb 12 16:48:44 2009
@@ -0,0 +1,111 @@
+package org.apache.activemq.flow;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.metric.MetricAggregator;
+import org.apache.activemq.metric.MetricCounter;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportListener;
+
+public class RemoteConsumer implements TransportListener {
+
+    private final AtomicBoolean stopping = new AtomicBoolean();
+    private final MetricCounter consumerRate = new MetricCounter();
+
+    private Transport transport;
+    private MockBroker broker;
+    private String name;
+    private MetricAggregator totalConsumerRate;
+    private long thinkTime;
+    private Destination destination;
+    
+    public void start() throws Exception {
+        consumerRate.name("Consumer " + name + " Rate");
+        totalConsumerRate.add(consumerRate);
+
+        URI uri = broker.transportServer.getConnectURI();
+        transport = TransportFactory.connect(uri);
+        transport.setTransportListener(this);
+        transport.start();
+        
+        // Sending the destination acts as the subscribe.
+        transport.oneway(destination);
+    }
+    
+    public void stop() throws Exception {
+        stopping.set(true);
+        if( transport!=null ) {
+            transport.stop();
+            transport=null;
+        }
+    }
+
+    public void onCommand(Object command) {
+        if( command.getClass() == Message.class ) {
+            
+            if (thinkTime > 0) {
+                try {
+                    Thread.sleep(thinkTime);
+                } catch (InterruptedException e) {
+                }
+            }
+            consumerRate.increment();
+            
+        } else {
+            System.out.println("Unhandled command: "+command);
+        }
+    }
+
+    public void onException(IOException error) {
+        if( !stopping.get() ) {
+            error.printStackTrace();
+        }
+    }
+
+    public void transportInterupted() {
+    }
+    public void transportResumed() {
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+    public void setBroker(MockBroker broker) {
+        this.broker = broker;
+    }
+
+    public Transport getTransport() {
+        return transport;
+    }
+
+    public void setTransport(Transport transport) {
+        this.transport = transport;
+    }
+
+
+    public MockBroker getBroker() {
+        return broker;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public MetricAggregator getTotalConsumerRate() {
+        return totalConsumerRate;
+    }
+
+    public void setTotalConsumerRate(MetricAggregator totalProducerRate) {
+        this.totalConsumerRate = totalProducerRate;
+    }
+
+    public Destination getDestination() {
+        return destination;
+    }
+
+    public void setDestination(Destination destination) {
+        this.destination = destination;
+    }}

Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java?rev=743802&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java
(added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java
Thu Feb 12 16:48:44 2009
@@ -0,0 +1,175 @@
+package org.apache.activemq.flow;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.metric.MetricAggregator;
+import org.apache.activemq.metric.MetricCounter;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportListener;
+
+public class RemoteProducer implements TransportListener, Runnable {
+
+    private final AtomicBoolean stopping = new AtomicBoolean();
+    private final MetricCounter producerRate = new MetricCounter();
+
+    private Transport transport;
+    private MockBroker broker;
+    private String name;
+    private Thread thread;
+    private AtomicLong messageIdGenerator;
+    private int priority;
+    private int priorityMod;
+    private int counter;
+    private int producerId;
+    private Destination destination;
+    private String property;
+    private MetricAggregator totalProducerRate;
+    
+    public void start() throws Exception {
+        producerRate.name("Producer " + name + " Rate");
+        totalProducerRate.add(producerRate);
+
+        URI uri = broker.transportServer.getConnectURI();
+        transport = TransportFactory.connect(uri);
+        transport.setTransportListener(this);
+        transport.start();
+        
+        thread = new Thread(this, name);
+        thread.start();
+    }
+    
+    public void stop() throws Exception {
+        stopping.set(true);
+        if( transport!=null ) {
+            transport.stop();
+            transport=null;
+        }
+        thread.join();
+    }
+
+    public void run() {
+        try {
+            while( !stopping.get() ) {
+                
+                int priority = this.priority;
+                if (priorityMod > 0) {
+                    priority = counter % priorityMod == 0 ? 0 : priority;
+                }
+
+                Message next = new Message(messageIdGenerator.getAndIncrement(), producerId,
name + ++counter, null, destination, priority);
+                if (property != null) {
+                    next.setProperty(property);
+                }
+                
+                transport.oneway(next);
+            }
+        } catch (IOException e) {
+            onException(e);
+        }
+    }
+
+    public void onCommand(Object command) {
+        System.out.println("Unhandled command: "+command);
+    }
+
+    public void onException(IOException error) {
+        if( !stopping.get() ) {
+            error.printStackTrace();
+        }
+    }
+
+    public void transportInterupted() {
+    }
+    public void transportResumed() {
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+    public void setBroker(MockBroker broker) {
+        this.broker = broker;
+    }
+
+    public Transport getTransport() {
+        return transport;
+    }
+
+    public void setTransport(Transport transport) {
+        this.transport = transport;
+    }
+
+    public AtomicLong getMessageIdGenerator() {
+        return messageIdGenerator;
+    }
+
+    public void setMessageIdGenerator(AtomicLong msgIdGenerator) {
+        this.messageIdGenerator = msgIdGenerator;
+    }
+
+    public int getPriority() {
+        return priority;
+    }
+
+    public void setPriority(int msgPriority) {
+        this.priority = msgPriority;
+    }
+
+    public int getPriorityMod() {
+        return priorityMod;
+    }
+
+    public void setPriorityMod(int priorityMod) {
+        this.priorityMod = priorityMod;
+    }
+
+    public int getCounter() {
+        return counter;
+    }
+
+    public void setCounter(int msgCounter) {
+        this.counter = msgCounter;
+    }
+
+    public int getProducerId() {
+        return producerId;
+    }
+
+    public void setProducerId(int producerId) {
+        this.producerId = producerId;
+    }
+
+    public Destination getDestination() {
+        return destination;
+    }
+
+    public void setDestination(Destination destination) {
+        this.destination = destination;
+    }
+
+    public String getProperty() {
+        return property;
+    }
+
+    public void setProperty(String property) {
+        this.property = property;
+    }
+
+    public MockBroker getBroker() {
+        return broker;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public MetricAggregator getTotalProducerRate() {
+        return totalProducerRate;
+    }
+
+    public void setTotalProducerRate(MetricAggregator totalProducerRate) {
+        this.totalProducerRate = totalProducerRate;
+    }}



Mime
View raw message