Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 40185 invoked from network); 12 Mar 2009 18:05:19 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 12 Mar 2009 18:05:19 -0000 Received: (qmail 56899 invoked by uid 500); 12 Mar 2009 18:05:19 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 56879 invoked by uid 500); 12 Mar 2009 18:05:19 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 56870 invoked by uid 99); 12 Mar 2009 18:05:18 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Mar 2009 11:05:18 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Mar 2009 18:05:12 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 5E3E32388A50; Thu, 12 Mar 2009 18:04:52 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r752957 [2/2] - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/dispatch/ main/java/org/apache/activemq/flow/ main/java/org/apache/activemq/queue/ main/java/org/apache/activemq/transport/ test/java/org/apache/activemq/... Date: Thu, 12 Mar 2009 18:04:50 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090312180452.5E3E32388A50@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java?rev=752957&r1=752956&r2=752957&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java (original) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java Thu Mar 12 18:04:49 2009 @@ -16,10 +16,7 @@ */ package org.apache.activemq.flow; -import java.io.IOException; -import java.net.URISyntaxException; import java.util.ArrayList; -import java.util.concurrent.atomic.AtomicLong; import junit.framework.TestCase; @@ -28,21 +25,19 @@ import org.apache.activemq.flow.Commands.Destination; import org.apache.activemq.flow.Commands.Destination.DestinationBean; import org.apache.activemq.flow.Commands.Destination.DestinationBuffer; -import org.apache.activemq.metric.MetricAggregator; -import org.apache.activemq.metric.Period; import org.apache.activemq.protobuf.AsciiBuffer; import org.apache.activemq.queue.Mapper; public class MockBrokerTest extends TestCase { - protected static final int PERFORMANCE_SAMPLES = 3; + protected static final int PERFORMANCE_SAMPLES = 5; + protected static final int SAMPLING_FREQUENCY = 5; - protected static final int IO_WORK_AMOUNT = 0; protected static final int FANIN_COUNT = 10; protected static final int FANOUT_COUNT = 10; protected static final int PRIORITY_LEVELS = 10; - protected static final boolean USE_INPUT_QUEUES = true; + protected static final boolean USE_INPUT_QUEUES = false; // Set to put senders and consumers on separate brokers. protected boolean multibroker = false; @@ -59,21 +54,16 @@ protected String receiveBrokerURI; // Set's the number of threads to use: - protected final int asyncThreadPoolSize = Runtime.getRuntime().availableProcessors(); + protected static final boolean SEPARATE_CLIENT_DISPATCHER = false; + protected final int threadsPerDispatcher = Runtime.getRuntime().availableProcessors(); protected boolean usePartitionedQueue = false; - protected int producerCount; - protected int consumerCount; - protected int destCount; - - protected MetricAggregator totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items"); - protected MetricAggregator totalConsumerRate = new MetricAggregator().name("Aggregate Consumer Rate").unit("items"); - + protected ArrayList brokers = new ArrayList(); protected MockBroker sendBroker; protected MockBroker rcvBroker; - protected ArrayList brokers = new ArrayList(); + protected MockClient client; + protected IDispatcher dispatcher; - protected final AtomicLong msgIdGenerator = new AtomicLong(); static public final Mapper KEY_MAPPER = new Mapper() { public Long map(Message element) { @@ -90,8 +80,6 @@ @Override protected void setUp() throws Exception { - dispatcher = createDispatcher(); - dispatcher.start(); if (tcp) { sendBrokerURI = "tcp://localhost:10000?wireFormat=proto"; receiveBrokerURI = "tcp://localhost:20000?wireFormat=proto"; @@ -106,119 +94,79 @@ } } - protected IDispatcher createDispatcher() { - return PriorityDispatcher.createPriorityDispatchPool("BrokerDispatcher", Message.MAX_PRIORITY, asyncThreadPoolSize); + protected IDispatcher createDispatcher(String name) { + return PriorityDispatcher.createPriorityDispatchPool(name, Message.MAX_PRIORITY, threadsPerDispatcher); } - + public void test_1_1_0() throws Exception { - producerCount = 1; - destCount = 1; - createConnections(); + client = new MockClient(); + client.setNumProducers(1); + client.setDestCount(1); + client.setNumConsumers(0); - // Start 'em up. - startServices(); - try { - reportRates(); - } finally { - stopServices(); - } + createConnections(1); + runTestCase(); } public void test_1_1_1() throws Exception { - producerCount = 1; - destCount = 1; - consumerCount = 1; - - createConnections(); + client = new MockClient(); + client.setNumProducers(1); + client.setDestCount(1); + client.setNumConsumers(1); - // Start 'em up. - startServices(); - try { - reportRates(); - } finally { - stopServices(); - } + createConnections(1); + runTestCase(); } public void test_10_10_10() throws Exception { - producerCount = FANIN_COUNT; - destCount = FANIN_COUNT; - consumerCount = FANOUT_COUNT; - - createConnections(); + client = new MockClient(); + client.setNumProducers(FANIN_COUNT); + client.setDestCount(FANIN_COUNT); + client.setNumConsumers(FANOUT_COUNT); - // Start 'em up. - startServices(); - try { - reportRates(); - } finally { - stopServices(); - } + createConnections(FANIN_COUNT); + runTestCase(); } public void test_10_1_10() throws Exception { - producerCount = FANIN_COUNT; - consumerCount = FANOUT_COUNT; - destCount = 1; + client = new MockClient(); + client.setNumProducers(FANIN_COUNT); + client.setDestCount(1); + client.setNumConsumers(FANOUT_COUNT); - createConnections(); - - // Start 'em up. - startServices(); - try { - reportRates(); - } finally { - stopServices(); - } + createConnections(1); + runTestCase(); } public void test_10_1_1() throws Exception { - producerCount = FANIN_COUNT; - destCount = 1; - consumerCount = 1; + client = new MockClient(); + client.setNumProducers(FANIN_COUNT); + client.setDestCount(1); + client.setNumConsumers(1); - createConnections(); - - // Start 'em up. - startServices(); - try { - reportRates(); - } finally { - stopServices(); - } + createConnections(1); + runTestCase(); } public void test_1_1_10() throws Exception { - producerCount = 1; - destCount = 1; - consumerCount = FANOUT_COUNT; - - createConnections(); + client = new MockClient(); + client.setNumProducers(1); + client.setDestCount(1); + client.setNumConsumers(FANOUT_COUNT); - // Start 'em up. - startServices(); - try { - reportRates(); - } finally { - stopServices(); - } + createConnections(1); + runTestCase(); } public void test_2_2_2() throws Exception { - producerCount = 2; - destCount = 2; - consumerCount = 2; - - createConnections(); + client = new MockClient(); + client.setNumProducers(2); + client.setDestCount(2); + client.setNumConsumers(2); - // Start 'em up. - startServices(); - try { - reportRates(); - } finally { - stopServices(); - } + createConnections(2); + runTestCase(); } /** @@ -229,43 +177,33 @@ * @throws Exception */ public void test_2_2_2_SlowConsumer() throws Exception { - producerCount = 2; - destCount = 2; - consumerCount = 2; + client = new MockClient(); + client.setNumProducers(2); + client.setDestCount(2); + client.setNumConsumers(2); + + createConnections(2); + client.consumer(0).setThinkTime(50); + runTestCase(); - createConnections(); - rcvBroker.consumers.get(0).setThinkTime(50); - - // Start 'em up. - startServices(); - try { - reportRates(); - } finally { - stopServices(); - } } public void test_2_2_2_Selector() throws Exception { - producerCount = 2; - destCount = 2; - consumerCount = 2; + client = new MockClient(); + client.setNumProducers(2); + client.setDestCount(2); + client.setNumConsumers(2); - createConnections(); + createConnections(2); // Add properties to match producers to their consumers - for (int i = 0; i < consumerCount; i++) { + for (int i = 0; i < 2; i++) { String property = "match" + i; - rcvBroker.consumers.get(i).setSelector(property); - sendBroker.producers.get(i).setProperty(property); + client.consumer(i).setSelector(property); + client.producer(i).setProperty(property); } - // Start 'em up. - startServices(); - try { - reportRates(); - } finally { - stopServices(); - } + runTestCase(); } /** @@ -276,35 +214,20 @@ */ public void test_2_1_1_HighPriorityProducer() throws Exception { - producerCount = 2; - destCount = 1; - consumerCount = 1; - - createConnections(); - RemoteProducer producer = sendBroker.producers.get(0); + client = new MockClient(); + client.setNumProducers(2); + client.setNumConsumers(1); + client.setDestCount(1); + + createConnections(1); + RemoteProducer producer = client.producer(0); + client.includeInRateReport(producer); producer.setPriority(1); producer.getRate().setName("High Priority Producer Rate"); - rcvBroker.consumers.get(0).setThinkTime(1); + client.consumer(0).setThinkTime(1); - // Start 'em up. - startServices(); - try { - - System.out.println("Checking rates for test: " + getName()); - for (int i = 0; i < PERFORMANCE_SAMPLES; i++) { - Period p = new Period(); - Thread.sleep(1000 * 5); - System.out.println(producer.getRate().getRateSummary(p)); - System.out.println(totalProducerRate.getRateSummary(p)); - System.out.println(totalConsumerRate.getRateSummary(p)); - totalProducerRate.reset(); - totalConsumerRate.reset(); - } - - } finally { - stopServices(); - } + runTestCase(); } /** @@ -314,53 +237,26 @@ * @throws Exception */ public void test_2_1_1_MixedHighPriorityProducer() throws Exception { - producerCount = 2; - destCount = 1; - consumerCount = 1; + client = new MockClient(); + client.setNumProducers(2); + client.setNumConsumers(1); + client.setDestCount(1); - createConnections(); - RemoteProducer producer = sendBroker.producers.get(0); + createConnections(1); + RemoteProducer producer = client.producer(0); producer.setPriority(1); producer.setPriorityMod(3); producer.getRate().setName("High Priority Producer Rate"); - rcvBroker.consumers.get(0).setThinkTime(1); - - // Start 'em up. - startServices(); - try { - - System.out.println("Checking rates for test: " + getName()); - for (int i = 0; i < PERFORMANCE_SAMPLES; i++) { - Period p = new Period(); - Thread.sleep(1000 * 5); - System.out.println(producer.getRate().getRateSummary(p)); - System.out.println(totalProducerRate.getRateSummary(p)); - System.out.println(totalConsumerRate.getRateSummary(p)); - totalProducerRate.reset(); - totalConsumerRate.reset(); - } - - } finally { - stopServices(); - } + client.consumer(0).setThinkTime(1); + runTestCase(); } - private void reportRates() throws InterruptedException { - System.out.println("Checking rates for test: " + getName() + ", " + (ptp ? "ptp" : "topic")); - for (int i = 0; i < PERFORMANCE_SAMPLES; i++) { - Period p = new Period(); - Thread.sleep(1000 * 5); - System.out.println(totalProducerRate.getRateSummary(p)); - System.out.println(totalConsumerRate.getRateSummary(p)); - totalProducerRate.reset(); - totalConsumerRate.reset(); - } - } + private void createConnections(int destCount) throws Exception { - private void createConnections() throws IOException, URISyntaxException { + dispatcher = createDispatcher("BrokerDispatcher"); + dispatcher.start(); - FlowController.setFlowExecutor(dispatcher.createPriorityExecutor(Message.MAX_PRIORITY)); if (multibroker) { sendBroker = createBroker("SendBroker", sendBrokerURI); rcvBroker = createBroker("RcvBroker", receiveBrokerURI); @@ -388,46 +284,27 @@ } } - for (int i = 0; i < producerCount; i++) { - Destination destination = dests[i % destCount]; - RemoteProducer producer = createProducer(i, destination); - sendBroker.producers.add(producer); - } - - for (int i = 0; i < consumerCount; i++) { - Destination destination = dests[i % destCount]; - RemoteConsumer consumer = createConsumer(i, destination); - sendBroker.consumers.add(consumer); - } - - // Create MultiBroker connections: - // if (multibroker) { - // Pipe pipe = new Pipe(); - // sendBroker.createBrokerConnection(rcvBroker, pipe); - // rcvBroker.createBrokerConnection(sendBroker, pipe.connect()); - // } - } - - private RemoteConsumer createConsumer(int i, Destination destination) { - RemoteConsumer consumer = new RemoteConsumer(); - consumer.setBroker(rcvBroker); - consumer.setDestination(destination); - consumer.setName("consumer" + (i + 1)); - consumer.setTotalConsumerRate(totalConsumerRate); - consumer.setDispatcher(dispatcher); - return consumer; - } - - private RemoteProducer createProducer(int id, Destination destination) { - RemoteProducer producer = new RemoteProducer(); - producer.setBroker(sendBroker); - producer.setProducerId(id + 1); - producer.setName("producer" + (id + 1)); - producer.setDestination(destination); - producer.setMessageIdGenerator(msgIdGenerator); - producer.setTotalProducerRate(totalProducerRate); - producer.setDispatcher(dispatcher); - return producer; + IDispatcher clientDispatcher = null; + if (SEPARATE_CLIENT_DISPATCHER) { + clientDispatcher = createDispatcher("ClientDispatcher"); + clientDispatcher.start(); + } else { + clientDispatcher = dispatcher; + } + + // Configure Client: + client.setDispatcher(clientDispatcher); + client.setNumPriorities(PRIORITY_LEVELS); + client.setSendBrokerURI(sendBroker.getUri()); + client.setReceiveBrokerURI(rcvBroker.getUri()); + client.setPerformanceSamples(PERFORMANCE_SAMPLES); + client.setSamplingFrequency(1000 * SAMPLING_FREQUENCY); + client.setThreadsPerDispatcher(threadsPerDispatcher); + client.setUseInputQueues(USE_INPUT_QUEUES); + client.setPtp(ptp); + client.setTestName(getName()); + + client.createConnections(); } private MockQueue createQueue(MockBroker broker, Destination destination) { @@ -446,23 +323,37 @@ broker.setName(name); broker.setUri(uri); broker.setDispatcher(dispatcher); + broker.setUseInputQueues(USE_INPUT_QUEUES); return broker; } + private void runTestCase() throws Exception { + // Start 'em up. + startServices(); + try { + client.runTest(); + } finally { + stopServices(); + } + } + private void stopServices() throws Exception { + for (MockBroker broker : brokers) { broker.stopServices(); } + + client.getDispatcher().shutdown(); if (dispatcher != null) { dispatcher.shutdown(); } } private void startServices() throws Exception { + for (MockBroker broker : brokers) { broker.startServices(); } - //SelectorManager.SINGLETON.setChannelExecutor(dispatcher.createPriorityExecutor(PRIORITY_LEVELS)); } } Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java?rev=752957&r1=752956&r2=752957&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java (original) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java Thu Mar 12 18:04:49 2009 @@ -87,7 +87,7 @@ public void oneway(Object command) throws IOException { try { - if( wireFormat!=null ) { + if (wireFormat != null) { pipe.write(wireFormat.marshal(command)); } else { pipe.write(command); @@ -110,11 +110,12 @@ pipe.setReadReadyListener(this); return true; } else { - if( wireFormat!=null ) { - listener.onCommand(wireFormat.unmarshal((ByteSequence)o)); + if (wireFormat != null) { + listener.onCommand(wireFormat.unmarshal((ByteSequence) o)); } else { listener.onCommand(o); } + return false; } } catch (IOException e) { listener.onException(e); @@ -192,6 +193,10 @@ public void setWireFormat(WireFormat wireFormat) { this.wireFormat = wireFormat; } + + public void setDispatchPriority(int priority) { + readContext.updatePriority(priority); + } } private class PipeTransportServer implements TransportServer { @@ -243,7 +248,7 @@ rc.setRemoteAddress(remoteAddress); PipeTransport serverSide = new PipeTransport(pipe.connect()); serverSide.setRemoteAddress(remoteAddress); - if( wireFormatFactory!=null ) { + if (wireFormatFactory != null) { rc.setWireFormat(wireFormatFactory.createWireFormat()); serverSide.setWireFormat(wireFormatFactory.createWireFormat()); } @@ -268,7 +273,7 @@ PipeTransportServer server = new PipeTransportServer(); server.setConnectURI(uri); server.setName(node); - if( options.containsKey("wireFormat") ) { + if (options.containsKey("wireFormat")) { server.setWireFormatFactory(createWireFormatFactory(options)); } servers.put(node, server); Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java?rev=752957&r1=752956&r2=752957&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java (original) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java Thu Mar 12 18:04:49 2009 @@ -1,336 +0,0 @@ -package org.apache.activemq.flow; - -import java.io.IOException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.activemq.dispatch.IDispatcher; -import org.apache.activemq.flow.Commands.Destination; -import org.apache.activemq.flow.Commands.FlowControl; -import org.apache.activemq.flow.Commands.Destination.DestinationBuffer; -import org.apache.activemq.flow.Commands.FlowControl.FlowControlBean; -import org.apache.activemq.flow.Commands.FlowControl.FlowControlBuffer; -import org.apache.activemq.flow.ISinkController.FlowControllable; -import org.apache.activemq.flow.MockBroker.DeliveryTarget; -import org.apache.activemq.queue.SingleFlowRelay; -import org.apache.activemq.transport.DispatchableTransport; -import org.apache.activemq.transport.Transport; -import org.apache.activemq.transport.TransportListener; - -public class RemoteConnection implements TransportListener, DeliveryTarget { - - protected Transport transport; - protected MockBroker broker; - - protected final Object inboundMutex = new Object(); - protected IFlowController inboundController; - - protected SingleFlowRelay outputQueue; - protected IFlowController outboundController; - protected ProtocolLimiter outboundLimiter; - protected Flow ouboundFlow; - - protected String name; - - private int priorityLevels; - - private final int outputWindowSize = 1000; - private final int outputResumeThreshold = 900; - - private final int inputWindowSize = 1000; - private final int inputResumeThreshold = 500; - - private IDispatcher dispatcher; - private final AtomicBoolean stopping = new AtomicBoolean(); - protected Flow outputFlow; - protected boolean blockingTransport = false; - ExecutorService blockingWriter; - - public void setBroker(MockBroker broker) { - this.broker = broker; - } - - public void setTransport(Transport transport) { - this.transport = transport; - } - - public void start() throws Exception { - transport.setTransportListener(this); - transport.start(); - } - - public void stop() throws Exception { - stopping.set(true); - if (transport != null) { - transport.stop(); - } - if (blockingWriter != null) { - blockingWriter.shutdown(); - } - } - - public void onCommand(Object command) { - try { - // System.out.println("Got Command: " + command); - // First command in should be the name of the connection - if (name == null) { - name = (String) command; - initialize(); - } else if (command.getClass() == Message.class) { - Message msg = (Message) command; - inboundController.add(msg, null); - } else if (command.getClass() == DestinationBuffer.class) { - // This is a subscription request - Destination destination = (Destination) command; - - broker.subscribe(destination, this); - } else if (command.getClass() == FlowControlBuffer.class) { - // This is a subscription request - FlowControl fc = (FlowControl) command; - synchronized (outputQueue) { - outboundLimiter.onProtocolMessage(fc); - } - } else { - onException(new Exception("Unrecognized command: " + command)); - } - } catch (Exception e) { - onException(e); - } - } - - protected void initialize() { - // Setup the input processing.. - Flow flow = new Flow(name, false); - WindowLimiter limiter = new WindowLimiter(false, flow, inputWindowSize, inputResumeThreshold); - - inboundController = new FlowController(new FlowControllable() { - public void flowElemAccepted(ISourceController controller, Message elem) { - messageReceived(controller, elem); - } - - @Override - public String toString() { - return name; - } - - public IFlowSink getFlowSink() { - return null; - } - - public IFlowSource getFlowSource() { - return null; - } - }, flow, limiter, inboundMutex); - - ouboundFlow = new Flow(name, false); - outboundLimiter = new WindowLimiter(true, ouboundFlow, outputWindowSize, outputResumeThreshold); - outputQueue = new SingleFlowRelay(ouboundFlow, name + "-outbound", outboundLimiter); - outboundController = outputQueue.getFlowController(ouboundFlow); - - if (transport instanceof DispatchableTransport) { - outputQueue.setDrain(new IFlowDrain() { - - public void drain(Message message, ISourceController controller) { - write(message); - } - }); - - } else { - blockingTransport = true; - blockingWriter = Executors.newSingleThreadExecutor(); - outputQueue.setDrain(new IFlowDrain() { - public void drain(final Message message, ISourceController controller) { - write(message); - }; - }); - /* - * // Setup output processing final Executor writer = - * Executors.newSingleThreadExecutor(); FlowControllable - * controllable = new FlowControllable() { public void - * flowElemAccepted( final ISourceController controller, - * final Message elem) { writer.execute(new Runnable() { public void - * run() { if (!stopping.get()) { try { transport.oneway(elem); - * controller.elementDispatched(elem); } catch (IOException e) { - * onException(e); } } } }); } - * - * public IFlowSink getFlowSink() { return null; } - * - * public IFlowSource getFlowSource() { return null; } }; - * - * if (priorityLevels <= 1) { outboundController = new - * FlowController(controllable, flow, limiter, - * outboundMutex); } else { PrioritySizeLimiter pl = new - * PrioritySizeLimiter( outputWindowSize, - * outputResumeThreshold, priorityLevels); - * pl.setPriorityMapper(Message.PRIORITY_MAPPER); outboundController - * = new PriorityFlowController( controllable, flow, pl, - * outboundMutex); } - */ - } - // outputQueue.setDispatcher(dispatcher); - - } - - private final void write(final Object o) { - synchronized (outputQueue) { - if (!blockingTransport) { - try { - transport.oneway(o); - } catch (IOException e) { - onException(e); - } - } else { - try { - blockingWriter.execute(new Runnable() { - public void run() { - if (!stopping.get()) { - try { - transport.oneway(o); - } catch (IOException e) { - onException(e); - } - } - } - }); - } catch (RejectedExecutionException re) { - //Must be shutting down. - } - } - } - } - - protected void messageReceived(ISourceController controller, Message elem) { - broker.router.route(controller, elem); - inboundController.elementDispatched(elem); - } - - public void onException(IOException error) { - onException((Exception) error); - } - - public void onException(Exception error) { - if (!stopping.get() && !broker.isStopping()) { - System.out.println("RemoteConnection error: " + error); - error.printStackTrace(); - } - } - - public void transportInterupted() { - } - - public void transportResumed() { - } - - public String getName() { - return name; - } - - public int getPriorityLevels() { - return priorityLevels; - } - - public void setPriorityLevels(int priorityLevels) { - this.priorityLevels = priorityLevels; - } - - public IDispatcher getDispatcher() { - return dispatcher; - } - - public void setDispatcher(IDispatcher dispatcher) { - this.dispatcher = dispatcher; - if (transport instanceof DispatchableTransport) { - DispatchableTransport dt = ((DispatchableTransport) transport); - if (name != null) { - dt.setName(name); - } - dt.setDispatcher(getDispatcher()); - } - } - - public MockBroker getBroker() { - return broker; - } - - public int getOutputWindowSize() { - return outputWindowSize; - } - - public int getOutputResumeThreshold() { - return outputResumeThreshold; - } - - public int getInputWindowSize() { - return inputWindowSize; - } - - public int getInputResumeThreshold() { - return inputResumeThreshold; - } - - public IFlowSink getSink() { - return outputQueue; - } - - public boolean match(Message message) { - return true; - } - - private interface ProtocolLimiter extends IFlowLimiter { - public void onProtocolMessage(FlowControl m); - } - - private class WindowLimiter extends SizeLimiter implements ProtocolLimiter { - final Flow flow; - final boolean clientMode; - private int available; - - public WindowLimiter(boolean clientMode, Flow flow, int capacity, int resumeThreshold) { - super(capacity, resumeThreshold); - this.clientMode = clientMode; - this.flow = flow; - } - - public void reserve(E elem) { - super.reserve(elem); - if (!clientMode) { - // System.out.println(RemoteConnection.this.name + " Reserved " - // + this); - } - } - - public void releaseReserved(E elem) { - super.reserve(elem); - if (!clientMode) { - // System.out.println(RemoteConnection.this.name + - // " Released Reserved " + this); - } - } - - protected void remove(int size) { - super.remove(size); - if (!clientMode) { - available += size; - if (available >= capacity - resumeThreshold) { - FlowControlBean fc = new FlowControlBean(); - fc.setCredit(available); - write(fc.freeze()); - // System.out.println(RemoteConnection.this.name + - // " Send Release " + available + this); - available = 0; - } - } - } - - public void onProtocolMessage(FlowControl m) { - remove(m.getCredit()); - } - - public int getElementSize(Message m) { - return m.getFlowLimiterSize(); - } - } - -} Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java?rev=752957&r1=752956&r2=752957&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java (original) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java Thu Mar 12 18:04:49 2009 @@ -9,7 +9,7 @@ import org.apache.activemq.transport.DispatchableTransport; import org.apache.activemq.transport.TransportFactory; -public class RemoteConsumer extends RemoteConnection{ +public class RemoteConsumer extends ClientConnection { private final MetricCounter consumerRate = new MetricCounter(); @@ -18,52 +18,35 @@ private Destination destination; private String selector; - private boolean schedualWait; - + private boolean schedualWait = true; + public void start() throws Exception { consumerRate.name("Consumer " + name + " Rate"); totalConsumerRate.add(consumerRate); - - URI uri = broker.getConnectURI(); - transport = TransportFactory.compositeConnect(uri); - if(transport instanceof DispatchableTransport) - { - DispatchableTransport dt = ((DispatchableTransport)transport); - dt.setName(name); - dt.setDispatcher(getDispatcher()); - schedualWait = true; - } - transport.setTransportListener(this); - transport.start(); - - // Let the remote side know our name. - transport.oneway(name); - // Sending the destination acts as the subscribe. - transport.oneway(destination); - super.initialize(); + super.start(); + // subscribe: + write(destination); } - + protected void messageReceived(final ISourceController controller, final Message elem) { - if( schedualWait ) { + if (schedualWait) { if (thinkTime > 0) { - getDispatcher().schedule(new Runnable(){ + getDispatcher().schedule(new Runnable() { public void run() { consumerRate.increment(); controller.elementDispatched(elem); } - + }, thinkTime, TimeUnit.MILLISECONDS); - - } - else - { + + } else { consumerRate.increment(); controller.elementDispatched(elem); } } else { - if( thinkTime>0 ) { + if (thinkTime > 0) { try { Thread.sleep(thinkTime); } catch (InterruptedException e) { @@ -74,6 +57,10 @@ } } + public MetricCounter getRate() { + return consumerRate; + } + public void setName(String name) { this.name = name; } @@ -112,4 +99,5 @@ public void setSelector(String selector) { this.selector = selector; - }} + } +} Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java?rev=752957&r1=752956&r2=752957&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java (original) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java Thu Mar 12 18:04:49 2009 @@ -1,6 +1,5 @@ package org.apache.activemq.flow; -import java.net.URI; import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.dispatch.IDispatcher.DispatchContext; @@ -9,11 +8,8 @@ import org.apache.activemq.flow.ISinkController.FlowUnblockListener; import org.apache.activemq.metric.MetricAggregator; import org.apache.activemq.metric.MetricCounter; -import org.apache.activemq.transport.DispatchableTransport; -import org.apache.activemq.transport.TransportFactory; - -public class RemoteProducer extends RemoteConnection implements Dispatchable, FlowUnblockListener{ +public class RemoteProducer extends ClientConnection implements Dispatchable, FlowUnblockListener { private final MetricCounter rate = new MetricCounter(); @@ -29,104 +25,84 @@ private DispatchContext dispatchContext; private String filler; - private int payloadSize = 20; - + private int payloadSize = 0; + IFlowController outboundController; + public void start() throws Exception { - - if( payloadSize>0 ) { + + if (payloadSize > 0) { StringBuilder sb = new StringBuilder(payloadSize); - for( int i=0; i < payloadSize; ++i) { - sb.append((char)('a'+(i%26))); + for (int i = 0; i < payloadSize; ++i) { + sb.append((char) ('a' + (i % 26))); } filler = sb.toString(); } - + rate.name("Producer " + name + " Rate"); totalProducerRate.add(rate); - URI uri = broker.getConnectURI(); - transport = TransportFactory.compositeConnect(uri); - transport.setTransportListener(this); - if(transport instanceof DispatchableTransport) - { - DispatchableTransport dt = ((DispatchableTransport)transport); - dt.setName(name + "-client-transport"); - dt.setDispatcher(getDispatcher()); - } - super.setTransport(transport); - - super.initialize(); - transport.start(); - // Let the remote side know our name. - transport.oneway(name); + super.start(); + outboundController = outputQueue.getFlowController(outboundFlow); dispatchContext = getDispatcher().register(this, name + "-client"); dispatchContext.requestDispatch(); } - - public void stop() throws Exception - { - dispatchContext.close(false); - super.stop(); - } - - public void onFlowUnblocked(ISinkController controller) { - dispatchContext.requestDispatch(); - } - - public boolean dispatch() { - while(true) - { - - if(next == null) - { - int priority = this.priority; - if (priorityMod > 0) { - priority = counter % priorityMod == 0 ? 0 : priority; - } - - next = new Message(messageIdGenerator.getAndIncrement(), producerId, createPayload(), null, destination, priority); - if (property != null) { - next.setProperty(property); - } - } - - //If flow controlled stop until flow control is lifted. - if(outboundController.isSinkBlocked()) - { - if(outboundController.addUnblockListener(this)) - { - return true; - } - } - - getSink().add(next, null); - rate.increment(); - next = null; - } - } + + public void stop() throws Exception { + dispatchContext.close(false); + super.stop(); + } + + public void onFlowUnblocked(ISinkController controller) { + dispatchContext.requestDispatch(); + } + + public boolean dispatch() { + while (true) { + + if (next == null) { + int priority = this.priority; + if (priorityMod > 0) { + priority = counter % priorityMod == 0 ? 0 : priority; + } + + next = new Message(messageIdGenerator.getAndIncrement(), producerId, createPayload(), null, destination, priority); + if (property != null) { + next.setProperty(property); + } + } + + // If flow controlled stop until flow control is lifted. + if (outboundController.isSinkBlocked()) { + if (outboundController.addUnblockListener(this)) { + return true; + } + } + + getSink().add(next, null); + rate.increment(); + next = null; + return false; + } + } private String createPayload() { - if( payloadSize>=0 ) { + if (payloadSize >= 0) { StringBuilder sb = new StringBuilder(payloadSize); sb.append(name); sb.append(':'); sb.append(++counter); sb.append(':'); int length = sb.length(); - if( length <= payloadSize ) { - sb.append(filler.subSequence(0, payloadSize-length)); + if (length <= payloadSize) { + sb.append(filler.subSequence(0, payloadSize - length)); return sb.toString(); } else { - return sb.substring(0, payloadSize); + return sb.substring(0, payloadSize); } } else { - return name+":"+(++counter); + return name + ":" + (++counter); } } - - public void setName(String name) { - this.name = name; - } public AtomicLong getMessageIdGenerator() { return messageIdGenerator; @@ -195,5 +171,9 @@ public void setPayloadSize(int messageSize) { this.payloadSize = messageSize; } -} + @Override + protected void messageReceived(ISourceController controller, Message elem) { + controller.elementDispatched(elem); + } +}