activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r754964 [2/2] - in /activemq/sandbox/activemq-flow: ./ src/main/java/org/apache/activemq/ src/main/java/org/apache/activemq/broker/ src/main/java/org/apache/activemq/broker/openwire/ src/main/java/org/apache/activemq/broker/stomp/ src/main/...
Date Mon, 16 Mar 2009 17:25:24 GMT
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java?rev=754964&r1=754963&r2=754964&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java
Mon Mar 16 17:25:23 2009
@@ -1,510 +1,20 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package org.apache.activemq.broker.openwire;
 
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
+import org.apache.activemq.broker.BrokerTestBase;
+import org.apache.activemq.broker.RemoteConsumer;
+import org.apache.activemq.broker.RemoteProducer;
 
-import junit.framework.TestCase;
-
-import org.apache.activemq.broker.Broker;
-import org.apache.activemq.broker.Destination;
-import org.apache.activemq.broker.MessageDelivery;
-import org.apache.activemq.broker.Queue;
-import org.apache.activemq.broker.Router;
-import org.apache.activemq.dispatch.IDispatcher;
-import org.apache.activemq.dispatch.PriorityDispatcher;
-import org.apache.activemq.metric.MetricAggregator;
-import org.apache.activemq.metric.Period;
-import org.apache.activemq.protobuf.AsciiBuffer;
-import org.apache.activemq.queue.Mapper;
-
-public class OpenwireBrokerTest extends TestCase {
-
-    protected static final int PERFORMANCE_SAMPLES = 3;
-
-    protected static final int IO_WORK_AMOUNT = 0;
-    protected static final int FANIN_COUNT = 10;
-    protected static final int FANOUT_COUNT = 10;
-
-    protected static final int PRIORITY_LEVELS = 10;
-    protected static final boolean USE_INPUT_QUEUES = true;
-
-    // Set to put senders and consumers on separate brokers.
-    protected boolean multibroker = false;
-
-    // Set to mockup up ptp:
-    protected boolean ptp = false;
-
-    // Set to use tcp IO
-    protected boolean tcp = true;
-    // set to force marshalling even in the NON tcp case.
-    protected boolean forceMarshalling = false;
-
-    protected String sendBrokerBindURI;
-    protected String receiveBrokerBindURI;
-    protected String sendBrokerConnectURI;
-    protected String receiveBrokerConnectURI;
-
-    // Set's the number of threads to use:
-    protected final int asyncThreadPoolSize = Runtime.getRuntime().availableProcessors();
-    protected boolean usePartitionedQueue = false;
-
-    protected int producerCount;
-    protected int consumerCount;
-    protected int destCount;
-
-    protected MetricAggregator totalProducerRate = new MetricAggregator().name("Aggregate
Producer Rate").unit("items");
-    protected MetricAggregator totalConsumerRate = new MetricAggregator().name("Aggregate
Consumer Rate").unit("items");
-
-    protected Broker sendBroker;
-    protected Broker rcvBroker;
-    protected ArrayList<Broker> brokers = new ArrayList<Broker>();
-    protected IDispatcher dispatcher;
-    protected final AtomicLong msgIdGenerator = new AtomicLong();
-    protected final AtomicBoolean stopping = new AtomicBoolean();
-    
-    final ArrayList<RemoteProducer> producers = new ArrayList<RemoteProducer>();
-    final ArrayList<RemoteConsumer> consumers = new ArrayList<RemoteConsumer>();
-
-    static public final Mapper<AsciiBuffer, MessageDelivery> KEY_MAPPER = new Mapper<AsciiBuffer,
MessageDelivery>() {
-        public AsciiBuffer map(MessageDelivery element) {
-            return element.getMsgId();
-        }
-    };
-    static public final Mapper<Integer, MessageDelivery> PARTITION_MAPPER = new Mapper<Integer,
MessageDelivery>() {
-        public Integer map(MessageDelivery element) {
-            // we modulo 10 to have at most 10 partitions which the producers
-            // gets split across.
-            return (int) (element.getProducerId().hashCode() % 10);
-        }
-    };
+public class OpenwireBrokerTest extends BrokerTestBase {
 
     @Override
-    protected void setUp() throws Exception {
-        dispatcher = createDispatcher();
-        dispatcher.start();
-        if (tcp) {
-            sendBrokerBindURI = "tcp://localhost:10000?wireFormat=multi";
-            receiveBrokerBindURI = "tcp://localhost:20000?wireFormat=multi";
-            sendBrokerConnectURI = "tcp://localhost:10000";
-            receiveBrokerConnectURI = "tcp://localhost:20000";
-        } else {
-            if (forceMarshalling) {
-                sendBrokerBindURI = "pipe://SendBroker";
-                receiveBrokerBindURI = "pipe://ReceiveBroker";
-            } else {
-                sendBrokerBindURI = "pipe://SendBroker";
-                receiveBrokerBindURI = "pipe://ReceiveBroker";
-            }
-            sendBrokerConnectURI = sendBrokerBindURI;
-            receiveBrokerConnectURI = receiveBrokerBindURI;
-        }
+    protected RemoteProducer cerateProducer() {
+        return new OpenwireRemoteProducer();
     }
 
-    protected IDispatcher createDispatcher() {
-        return PriorityDispatcher.createPriorityDispatchPool("BrokerDispatcher", Broker.MAX_PRIORITY,
asyncThreadPoolSize);
+    @Override
+    protected RemoteConsumer createConsumer() {
+        return new OpenwireRemoteConsumer();
     }
     
-    public void test_1_1_0() throws Exception {
-        producerCount = 1;
-        destCount = 1;
-
-        createConnections();
-
-        // Start 'em up.
-        startServices();
-        try {
-            reportRates();
-        } finally {
-            stopServices();
-        }
-    }
-
-    public void test_1_1_1() throws Exception {
-        producerCount = 1;
-        destCount = 1;
-        consumerCount = 1;
-
-        createConnections();
-
-        // Start 'em up.
-        startServices();
-        try {
-            reportRates();
-        } finally {
-            stopServices();
-        }
-    }
-
-    public void test_10_1_10() throws Exception {
-        producerCount = FANIN_COUNT;
-        consumerCount = FANOUT_COUNT;
-        destCount = 1;
-
-        createConnections();
-
-        // Start 'em up.
-        startServices();
-        try {
-            reportRates();
-        } finally {
-            stopServices();
-        }
-    }
-
-    public void test_10_1_1() throws Exception {
-        producerCount = FANIN_COUNT;
-        destCount = 1;
-        consumerCount = 1;
-
-        createConnections();
-
-        // Start 'em up.
-        startServices();
-        try {
-            reportRates();
-        } finally {
-            stopServices();
-        }
-    }
-
-    public void test_1_1_10() throws Exception {
-        producerCount = 1;
-        destCount = 1;
-        consumerCount = FANOUT_COUNT;
-
-        createConnections();
-
-        // Start 'em up.
-        startServices();
-        try {
-            reportRates();
-        } finally {
-            stopServices();
-        }
-    }
-
-    public void test_2_2_2() throws Exception {
-        producerCount = 2;
-        destCount = 2;
-        consumerCount = 2;
-
-        createConnections();
-
-        // Start 'em up.
-        startServices();
-        try {
-            reportRates();
-        } finally {
-            stopServices();
-        }
-    }
-
-    public void test_10_10_10() throws Exception {
-        producerCount = 10;
-        destCount = 10;
-        consumerCount = 10;
-
-        createConnections();
-
-        // Start 'em up.
-        startServices();
-        try {
-            reportRates();
-        } finally {
-            stopServices();
-        }
-    }
-
-
-    /**
-     * Tests 2 producers sending to 1 destination with 2 consumres, but with
-     * consumers set to select only messages from each producer. 1 consumers is
-     * set to slow, the other producer should be able to send quickly.
-     * 
-     * @throws Exception
-     */
-    public void test_2_2_2_SlowConsumer() throws Exception {
-        producerCount = 2;
-        destCount = 2;
-        consumerCount = 2;
-
-        createConnections();
-        consumers.get(0).setThinkTime(50);
-
-        // Start 'em up.
-        startServices();
-        try {
-            reportRates();
-        } finally {
-            stopServices();
-        }
-    }
-
-    public void test_2_2_2_Selector() throws Exception {
-        producerCount = 2;
-        destCount = 2;
-        consumerCount = 2;
-
-        createConnections();
-
-        // Add properties to match producers to their consumers
-        for (int i = 0; i < consumerCount; i++) {
-            String property = "match" + i;
-            consumers.get(i).setSelector(property);
-            producers.get(i).setProperty(property);
-        }
-
-        // Start 'em up.
-        startServices();
-        try {
-            reportRates();
-        } finally {
-            stopServices();
-        }
-    }
-
-    /**
-     * Test sending with 1 high priority sender. The high priority sender should
-     * have higher throughput than the other low priority senders.
-     * 
-     * @throws Exception
-     */
-    public void test_2_1_1_HighPriorityProducer() throws Exception {
-
-        producerCount = 2;
-        destCount = 1;
-        consumerCount = 1;
-
-        createConnections();
-        RemoteProducer producer = producers.get(0);
-        producer.setPriority(1);
-        producer.getRate().setName("High Priority Producer Rate");
-
-        consumers.get(0).setThinkTime(1);
-
-        // Start 'em up.
-        startServices();
-        try {
-
-            System.out.println("Checking rates for test: " + getName());
-            for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
-                Period p = new Period();
-                Thread.sleep(1000 * 5);
-                System.out.println(producer.getRate().getRateSummary(p));
-                System.out.println(totalProducerRate.getRateSummary(p));
-                System.out.println(totalConsumerRate.getRateSummary(p));
-                totalProducerRate.reset();
-                totalConsumerRate.reset();
-            }
-
-        } finally {
-            stopServices();
-        }
-    }
-
-    /**
-     * Test sending with 1 high priority sender. The high priority sender should
-     * have higher throughput than the other low priority senders.
-     * 
-     * @throws Exception
-     */
-    public void test_2_1_1_MixedHighPriorityProducer() throws Exception {
-        producerCount = 2;
-        destCount = 1;
-        consumerCount = 1;
-
-        createConnections();
-        RemoteProducer producer = producers.get(0);
-        producer.setPriority(1);
-        producer.setPriorityMod(3);
-        producer.getRate().setName("High Priority Producer Rate");
-
-        consumers.get(0).setThinkTime(1);
-
-        // Start 'em up.
-        startServices();
-        try {
-
-            System.out.println("Checking rates for test: " + getName());
-            for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
-                Period p = new Period();
-                Thread.sleep(1000 * 5);
-                System.out.println(producer.getRate().getRateSummary(p));
-                System.out.println(totalProducerRate.getRateSummary(p));
-                System.out.println(totalConsumerRate.getRateSummary(p));
-                totalProducerRate.reset();
-                totalConsumerRate.reset();
-            }
-
-        } finally {
-            stopServices();
-        }
-    }
-
-    private void reportRates() throws InterruptedException {
-        System.out.println("Checking rates for test: " + getName() + ", " + (ptp ? "ptp"
: "topic"));
-        for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
-            Period p = new Period();
-            Thread.sleep(1000 * 5);
-            System.out.println(totalProducerRate.getRateSummary(p));
-            System.out.println(totalConsumerRate.getRateSummary(p));
-            totalProducerRate.reset();
-            totalConsumerRate.reset();
-        }
-    }
-
-    private void createConnections() throws IOException, URISyntaxException {
-
-        if (multibroker) {
-            sendBroker = createBroker("SendBroker", sendBrokerBindURI, sendBrokerConnectURI);
-            rcvBroker = createBroker("RcvBroker", receiveBrokerBindURI, receiveBrokerConnectURI);
-            brokers.add(sendBroker);
-            brokers.add(rcvBroker);
-        } else {
-            sendBroker = rcvBroker = createBroker("Broker", sendBrokerBindURI, sendBrokerConnectURI);
-            brokers.add(sendBroker);
-        }
-
-        Destination[] dests = new Destination[destCount];
-
-        for (int i = 0; i < destCount; i++) {
-            Destination.SingleDestination bean = new Destination.SingleDestination();
-            bean.setName(new AsciiBuffer("dest" + (i + 1)));
-            bean.setDomain(ptp ? Router.QUEUE_DOMAIN : Router.TOPIC_DOMAIN);
-            dests[i] = bean;
-            if (ptp) {
-                Queue queue = createQueue(sendBroker, dests[i]);
-                sendBroker.addQueue(queue);
-                if (multibroker) {
-                    queue = createQueue(rcvBroker, dests[i]);
-                    rcvBroker.addQueue(queue);
-                }
-            }
-        }
-
-        for (int i = 0; i < producerCount; i++) {
-            Destination destination = dests[i % destCount];
-            RemoteProducer producer = createProducer(i, destination);
-            producers.add(producer);
-        }
-
-        for (int i = 0; i < consumerCount; i++) {
-            Destination destination = dests[i % destCount];
-            RemoteConsumer consumer = createConsumer(i, destination);
-            consumers.add(consumer);
-        }
-
-        // Create MultiBroker connections:
-        // if (multibroker) {
-        // Pipe<Message> pipe = new Pipe<Message>();
-        // sendBroker.createBrokerConnection(rcvBroker, pipe);
-        // rcvBroker.createBrokerConnection(sendBroker, pipe.connect());
-        // }
-    }
-
-    private RemoteConsumer createConsumer(int i, Destination destination) throws URISyntaxException
{
-        RemoteConsumer consumer = new RemoteConsumer() {
-            public void onException(Exception error) {
-                if( !stopping.get() ) {
-                    System.err.println("Consumer Async Error:");
-                    error.printStackTrace();
-                }
-            }
-        };
-        consumer.setUri(new URI(rcvBroker.getConnectUri()));
-        consumer.setDestination(destination);
-        consumer.setName("consumer" + (i + 1));
-        consumer.setTotalConsumerRate(totalConsumerRate);
-        consumer.setDispatcher(dispatcher);
-        return consumer;
-    }
-
-    private RemoteProducer createProducer(int id, Destination destination) throws URISyntaxException
{
-        RemoteProducer producer = new RemoteProducer() {
-            public void onException(Exception error) {
-                if( !stopping.get() ) {
-                    System.err.println("Producer Async Error:");
-                    error.printStackTrace();
-                }
-            }
-        };
-        producer.setUri(new URI(sendBroker.getConnectUri()));
-        producer.setProducerId(id + 1);
-        producer.setName("producer" + (id + 1));
-        producer.setDestination(destination);
-        producer.setMessageIdGenerator(msgIdGenerator);
-        producer.setTotalProducerRate(totalProducerRate);
-        producer.setDispatcher(dispatcher);
-        return producer;
-    }
-
-    private Queue createQueue(Broker broker, Destination destination) {
-        Queue queue = new Queue();
-        queue.setBroker(broker);
-        queue.setDestination(destination);
-        queue.setKeyExtractor(KEY_MAPPER);
-        if (usePartitionedQueue) {
-            queue.setPartitionMapper(PARTITION_MAPPER);
-        }
-        return queue;
-    }
-
-    private Broker createBroker(String name, String bindURI, String connectUri) {
-        Broker broker = new Broker();
-        broker.setName(name);
-        broker.setBindUri(bindURI);
-        broker.setConnectUri(connectUri);
-        broker.setDispatcher(dispatcher);
-        return broker;
-    }
-
-    private void stopServices() throws Exception {
-        stopping.set(true);
-        for (Broker broker : brokers) {
-            broker.stop();
-        }
-        for (RemoteProducer connection : producers) {
-            connection.stop();
-        }
-        for (RemoteConsumer connection : consumers) {
-            connection.stop();
-        }
-        if (dispatcher != null) {
-            dispatcher.shutdown();
-        }
-    }
-
-    private void startServices() throws Exception {
-        for (Broker broker : brokers) {
-            broker.start();
-        }
-        for (RemoteConsumer connection : consumers) {
-            connection.start();
-        }
-
-        for (RemoteProducer connection : producers) {
-            connection.start();
-        }
-    }
 
 }

Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java?rev=754964&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java
(added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java
Mon Mar 16 17:25:23 2009
@@ -0,0 +1,104 @@
+package org.apache.activemq.broker.openwire;
+
+import static org.apache.activemq.broker.openwire.OpenwireSupport.createConnectionInfo;
+import static org.apache.activemq.broker.openwire.OpenwireSupport.createConsumerInfo;
+import static org.apache.activemq.broker.openwire.OpenwireSupport.createSessionInfo;
+
+import java.io.IOException;
+
+import org.apache.activemq.WindowLimiter;
+import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.broker.RemoteConsumer;
+import org.apache.activemq.broker.Router;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.FlowController;
+import org.apache.activemq.flow.IFlowSink;
+import org.apache.activemq.flow.IFlowSource;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.flow.ISinkController.FlowControllable;
+import org.apache.activemq.transport.InactivityMonitor;
+import org.apache.activemq.transport.tcp.TcpTransport;
+
+public class OpenwireRemoteConsumer extends RemoteConsumer {
+
+    protected final Object inboundMutex = new Object();
+    private FlowController<MessageDelivery> inboundController;
+
+    private ActiveMQDestination activemqDestination;
+    private ConnectionInfo connectionInfo;
+    private SessionInfo sessionInfo;
+    private ConsumerInfo consumerInfo;
+
+    private Message lastMessage;
+    
+    protected void initialize() {
+        // Setup the input processing..
+        final Flow flow = new Flow("client-"+name+"-inbound", false);
+        inputResumeThreshold = inputWindowSize/2;
+        WindowLimiter<MessageDelivery> limiter = new WindowLimiter<MessageDelivery>(false,
flow, inputWindowSize, inputResumeThreshold) {
+            protected void sendCredit(int credit) {
+                MessageAck ack = OpenwireSupport.createAck(consumerInfo, lastMessage, credit,
MessageAck.STANDARD_ACK_TYPE);
+                write(ack);
+            }
+        };
+        inboundController = new FlowController<MessageDelivery>(new FlowControllable<MessageDelivery>()
{
+            public void flowElemAccepted(ISourceController<MessageDelivery> controller,
MessageDelivery elem) {
+                messageReceived(controller, elem);
+            }
+            public String toString() {
+                return flow.getFlowName();
+            }
+            public IFlowSink<MessageDelivery> getFlowSink() {
+                return null;
+            }
+            public IFlowSource<MessageDelivery> getFlowSource() {
+                return null;
+            }
+        }, flow, limiter, inboundMutex);
+        
+    }
+    
+    protected void setupSubscription() throws Exception, IOException {
+        if( destination.getDomain().equals( Router.QUEUE_DOMAIN ) ) {
+            activemqDestination = new ActiveMQQueue(destination.getName().toString());
+        } else {
+            activemqDestination = new ActiveMQTopic(destination.getName().toString());
+        }
+        
+        connectionInfo = createConnectionInfo(name);
+        transport.oneway(connectionInfo);
+        sessionInfo = createSessionInfo(connectionInfo);
+        transport.oneway(sessionInfo);
+        consumerInfo = createConsumerInfo(sessionInfo, activemqDestination);
+        consumerInfo.setPrefetchSize(inputWindowSize);
+        transport.oneway(consumerInfo);
+    }
+    
+    public void onCommand(Object command) {
+        try {
+            if (command.getClass() == WireFormatInfo.class) {
+            } else if (command.getClass() == BrokerInfo.class) {
+                System.out.println("Consumer "+name+" connected to "+((BrokerInfo)command).getBrokerName());
+            } else if (command.getClass() == MessageDispatch.class) {
+                MessageDispatch msg = (MessageDispatch) command;
+                lastMessage = msg.getMessage();
+                inboundController.add(new OpenWireMessageDelivery(msg.getMessage()), null);
+            } else {
+                onException(new Exception("Unrecognized command: " + command));
+            }
+        } catch (Exception e) {
+            onException(e);
+        }
+    }
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java?rev=754964&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java
(added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java
Mon Mar 16 17:25:23 2009
@@ -0,0 +1,106 @@
+package org.apache.activemq.broker.openwire;
+
+import static org.apache.activemq.broker.openwire.OpenwireSupport.createConnectionInfo;
+import static org.apache.activemq.broker.openwire.OpenwireSupport.createMessage;
+import static org.apache.activemq.broker.openwire.OpenwireSupport.createProducerInfo;
+import static org.apache.activemq.broker.openwire.OpenwireSupport.createSessionInfo;
+
+import java.io.IOException;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.WindowLimiter;
+import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.broker.RemoteProducer;
+import org.apache.activemq.broker.Router;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.ProducerAck;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.IFlowDrain;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.queue.SingleFlowRelay;
+
+public class OpenwireRemoteProducer extends RemoteProducer {
+    private ConnectionInfo connectionInfo;
+    private SessionInfo sessionInfo;
+    private ProducerInfo producerInfo;
+    private ActiveMQDestination activemqDestination;
+    private WindowLimiter<MessageDelivery> outboundLimiter;
+
+    protected void setupProducer() throws Exception, IOException {
+        if( destination.getDomain().equals( Router.QUEUE_DOMAIN ) ) {
+            activemqDestination = new ActiveMQQueue(destination.getName().toString());
+        } else {
+            activemqDestination = new ActiveMQTopic(destination.getName().toString());
+        }
+        
+        connectionInfo = createConnectionInfo(name);
+        transport.oneway(connectionInfo);
+        sessionInfo = createSessionInfo(connectionInfo);
+        transport.oneway(sessionInfo);
+        producerInfo = createProducerInfo(sessionInfo);
+        producerInfo.setWindowSize(outputWindowSize);
+        transport.oneway(producerInfo);        
+    }
+    
+    protected void initialize() {
+        Flow flow = new Flow("client-"+name+"-outbound", false);
+        outputResumeThreshold = outputWindowSize/2;
+        outboundLimiter = new WindowLimiter<MessageDelivery>(true, flow, outputWindowSize,
outputResumeThreshold);
+        SingleFlowRelay<MessageDelivery> outboundQueue = new SingleFlowRelay<MessageDelivery>(flow,
flow.getFlowName(), outboundLimiter);
+        this.outboundQueue = outboundQueue;
+        
+        outboundController = outboundQueue.getFlowController(flow);
+        outboundQueue.setDrain(new IFlowDrain<MessageDelivery>() {
+            public void drain(MessageDelivery message, ISourceController<MessageDelivery>
controller) {
+                Message msg = message.asType(Message.class);
+                write(msg);
+            }
+        });
+    }
+    
+    public void onCommand(Object command) {
+        try {
+            if (command.getClass() == WireFormatInfo.class) {
+            } else if (command.getClass() == BrokerInfo.class) {
+                System.out.println("Producer "+name+" connected to "+((BrokerInfo)command).getBrokerName());
+            } else if (command.getClass() == ProducerAck.class) {
+                ProducerAck fc = (ProducerAck) command;
+                synchronized (outboundQueue) {
+                    outboundLimiter.onProtocolCredit(fc.getSize());
+                }
+            } else {
+                onException(new Exception("Unrecognized command: " + command));
+            }
+        } catch (Exception e) {
+            onException(e);
+        }
+    }
+    
+    protected void createNextMessage() {
+        int priority = this.priority;
+        if (priorityMod > 0) {
+            priority = counter % priorityMod == 0 ? 0 : priority;
+        }
+
+        ActiveMQTextMessage msg = createMessage(producerInfo, activemqDestination, priority,
createPayload());
+        if (property != null) {
+            try {
+                msg.setStringProperty(property, property);
+            } catch (JMSException e) {
+                new RuntimeException(e);
+            }
+        }
+        next = new OpenWireMessageDelivery(msg);
+    }
+}
+

Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompBrokerTest.java?rev=754964&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompBrokerTest.java
(added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompBrokerTest.java
Mon Mar 16 17:25:23 2009
@@ -0,0 +1,31 @@
+package org.apache.activemq.broker.openwire.stomp;
+
+import org.apache.activemq.broker.BrokerTestBase;
+import org.apache.activemq.broker.RemoteConsumer;
+import org.apache.activemq.broker.RemoteProducer;
+
+public class StompBrokerTest extends BrokerTestBase {
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        if (tcp) {
+            sendBrokerBindURI = "tcp://localhost:10000?wireFormat=multi&transport.useInactivityMonitor=false";
+            receiveBrokerBindURI = "tcp://localhost:20000?wireFormat=multi&transport.useInactivityMonitor=false";
+            sendBrokerConnectURI = "tcp://localhost:10000?wireFormat=stomp&useInactivityMonitor=false";
+            receiveBrokerConnectURI = "tcp://localhost:20000?wireFormat=stomp&useInactivityMonitor=false";
+        }
+    }
+    
+    @Override
+    protected RemoteProducer cerateProducer() {
+        return new StompRemoteProducer();
+    }
+
+    @Override
+    protected RemoteConsumer createConsumer() {
+        return new StompRemoteConsumer();
+    }
+    
+
+}

Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteConsumer.java?rev=754964&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteConsumer.java
(added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteConsumer.java
Mon Mar 16 17:25:23 2009
@@ -0,0 +1,88 @@
+package org.apache.activemq.broker.openwire.stomp;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.broker.RemoteConsumer;
+import org.apache.activemq.broker.Router;
+import org.apache.activemq.broker.stomp.StompMessageDelivery;
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.FlowController;
+import org.apache.activemq.flow.IFlowSink;
+import org.apache.activemq.flow.IFlowSource;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.flow.SizeLimiter;
+import org.apache.activemq.flow.ISinkController.FlowControllable;
+import org.apache.activemq.transport.stomp.Stomp;
+import org.apache.activemq.transport.stomp.StompFrame;
+
+public class StompRemoteConsumer extends RemoteConsumer {
+
+    protected final Object inboundMutex = new Object();
+    private FlowController<MessageDelivery> inboundController;
+    private String stompDestination;
+    
+
+    protected void setupSubscription() throws Exception, IOException {
+        if( destination.getDomain().equals( Router.QUEUE_DOMAIN ) ) {
+            stompDestination = "/queue/"+destination.getName().toString();
+        } else {
+            stompDestination = "/topic/"+destination.getName().toString();
+        }
+        
+        StompFrame frame = new StompFrame(Stomp.Commands.CONNECT);
+        transport.oneway(frame);
+        
+        HashMap<String, String> headers = new HashMap<String, String>();
+        headers.put(Stomp.Headers.Subscribe.DESTINATION, stompDestination);
+        headers.put(Stomp.Headers.Subscribe.ID, "0001");
+        headers.put(Stomp.Headers.Subscribe.ACK_MODE, Stomp.Headers.Subscribe.AckModeValues.AUTO);
+        
+        frame = new StompFrame(Stomp.Commands.SUBSCRIBE, headers);
+        transport.oneway(frame);
+        
+    }
+    
+    protected void initialize() {
+        // Setup the input processing..
+        final Flow flow = new Flow("client-"+name+"-inbound", false);
+        inputResumeThreshold = inputWindowSize/2;
+        SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(inputWindowSize,
inputResumeThreshold);
+        inboundController = new FlowController<MessageDelivery>(new FlowControllable<MessageDelivery>()
{
+            public void flowElemAccepted(ISourceController<MessageDelivery> controller,
MessageDelivery elem) {
+                messageReceived(controller, elem);
+            }
+            public String toString() {
+                return flow.getFlowName();
+            }
+            public IFlowSink<MessageDelivery> getFlowSink() {
+                return null;
+            }
+            public IFlowSource<MessageDelivery> getFlowSource() {
+                return null;
+            }
+        }, flow, limiter, inboundMutex);
+    }
+    
+    public void onCommand(Object command) {
+        try {
+            if (command.getClass() == StompFrame.class) {
+                StompFrame frame = (StompFrame) command;
+                if( Stomp.Responses.MESSAGE.equals(frame.getAction()) ) {
+                    StompMessageDelivery md = new StompMessageDelivery(frame, getDestination());
+                    while(!inboundController.offer(md, null) ) {
+                        inboundController.waitForFlowUnblock();
+                    }
+                } else if( Stomp.Responses.CONNECTED.equals(frame.getAction()) ) {
+                } else {
+                    onException(new Exception("Unrecognized stomp command: " + frame.getAction()));
+                }
+            } else {
+                onException(new Exception("Unrecognized command: " + command));
+            }
+        } catch (Exception e) {
+            onException(e);
+        }
+    }
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteProducer.java?rev=754964&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteProducer.java
(added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteProducer.java
Mon Mar 16 17:25:23 2009
@@ -0,0 +1,93 @@
+package org.apache.activemq.broker.openwire.stomp;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.broker.RemoteProducer;
+import org.apache.activemq.broker.Router;
+import org.apache.activemq.broker.stomp.StompMessageDelivery;
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.IFlowDrain;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.flow.SizeLimiter;
+import org.apache.activemq.queue.SingleFlowRelay;
+import org.apache.activemq.transport.stomp.Stomp;
+import org.apache.activemq.transport.stomp.StompFrame;
+
+public class StompRemoteProducer extends RemoteProducer {
+
+    private String stompDestination;
+
+    protected void setupProducer() throws Exception, IOException {
+        if( destination.getDomain().equals( Router.QUEUE_DOMAIN ) ) {
+            stompDestination = "/queue/"+destination.getName().toString();
+        } else {
+            stompDestination = "/topic/"+destination.getName().toString();
+        }
+        
+        StompFrame frame = new StompFrame(Stomp.Commands.CONNECT);
+        transport.oneway(frame);
+        
+    }
+    
+    protected void initialize() {
+        Flow flow = new Flow("client-"+name+"-outbound", false);
+        outputResumeThreshold = outputWindowSize/2;
+        SizeLimiter<MessageDelivery> outboundLimiter = new SizeLimiter<MessageDelivery>(outputWindowSize,
outputResumeThreshold);
+        SingleFlowRelay<MessageDelivery> outboundQueue = new SingleFlowRelay<MessageDelivery>(flow,
flow.getFlowName(), outboundLimiter);
+        this.outboundQueue = outboundQueue;
+        
+        outboundController = outboundQueue.getFlowController(flow);
+        outboundQueue.setDrain(new IFlowDrain<MessageDelivery>() {
+            public void drain(MessageDelivery message, ISourceController<MessageDelivery>
controller) {
+                StompFrame msg = message.asType(StompFrame.class);
+                write(msg);
+            }
+        });
+    }
+    
+    public void onCommand(Object command) {
+        try {
+            if (command.getClass() == StompFrame.class) {
+                StompFrame frame = (StompFrame) command;
+                if( Stomp.Responses.CONNECTED.equals(frame.getAction()) ) {
+                    
+                } else {
+                    onException(new Exception("Unrecognized stomp command: " + frame.getAction()));
+                }
+            } else {
+                onException(new Exception("Unrecognized command: " + command));
+            }
+        } catch (Exception e) {
+            onException(e);
+        }
+    }
+    
+    protected void createNextMessage() {
+        int priority = this.priority;
+        if (priorityMod > 0) {
+            priority = counter % priorityMod == 0 ? 0 : priority;
+        }
+
+        HashMap<String, String> headers = new HashMap<String, String>(5);
+        headers.put(Stomp.Headers.Send.DESTINATION, stompDestination);
+        
+        if (property != null) {
+            headers.put(property, property);
+        }
+        
+        StompFrame fram = new StompFrame(Stomp.Commands.SEND, headers, toContent(createPayload()));
+        next = new StompMessageDelivery(fram, getDestination());
+    }
+
+    private byte[] toContent(String data) {
+        byte rc[] = new byte[data.length()];
+        char[] chars = data.toCharArray();
+        for (int i = 0; i < chars.length; i++) {
+            rc[i] = (byte)(chars[i] & 0xFF);
+        }
+        return rc;
+    }
+}
+



Mime
View raw message