activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject git commit: Adds a basic request / response test using temp topic and temp queue for reply to destination.
Date Fri, 07 Mar 2014 23:08:07 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk 37eb6b0c6 -> ddf0b2a30


Adds a basic request / response test using temp topic and temp queue for
reply to destination.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ddf0b2a3
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ddf0b2a3
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ddf0b2a3

Branch: refs/heads/trunk
Commit: ddf0b2a309125ebf5ff3e3a517ef50831810fab1
Parents: 37eb6b0
Author: Timothy Bish <tabish121@gmai.com>
Authored: Fri Mar 7 18:07:58 2014 -0500
Committer: Timothy Bish <tabish121@gmai.com>
Committed: Fri Mar 7 18:07:58 2014 -0500

----------------------------------------------------------------------
 .../amqp/JmsClientRequestResponseTest.java      | 223 +++++++++++++++++++
 1 file changed, 223 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/ddf0b2a3/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsClientRequestResponseTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsClientRequestResponseTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsClientRequestResponseTest.java
new file mode 100644
index 0000000..9f7b393
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsClientRequestResponseTest.java
@@ -0,0 +1,223 @@
+package org.apache.activemq.transport.amqp;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.List;
+import java.util.Vector;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JmsClientRequestResponseTest extends AmqpTestSupport implements MessageListener
{
+
+    private static final Logger LOG = LoggerFactory.getLogger(JmsClientRequestResponseTest.class);
+
+    @Rule public TestName name = new TestName();
+
+    private Connection requestorConnection;
+    private Destination requestDestination;
+    private Session requestorSession;
+
+    private Connection responderConnection;
+    private MessageProducer responseProducer;
+    private Session responderSession;
+    private Destination replyDestination;
+
+    private final List<JMSException> failures = new Vector<JMSException>();
+    private boolean dynamicallyCreateProducer;
+    private final boolean useAsyncConsumer = true;
+    private Thread syncThread;
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        requestorConnection.close();
+        responderConnection.close();
+
+        if (syncThread != null) {
+            syncThread.join(5000);
+        }
+
+        super.tearDown();
+    }
+
+    private void doSetupConnections(boolean topic) throws Exception {
+        responderConnection = createConnection(name.getMethodName() + "-responder");
+        responderSession = responderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        if (topic) {
+            requestDestination = responderSession.createTopic(name.getMethodName());
+        } else {
+            requestDestination = responderSession.createQueue(name.getMethodName());
+        }
+        responseProducer = responderSession.createProducer(null);
+
+        final MessageConsumer requestConsumer = responderSession.createConsumer(requestDestination);
+        if (useAsyncConsumer) {
+            requestConsumer.setMessageListener(this);
+        } else {
+            syncThread = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    syncConsumeLoop(requestConsumer);
+                }
+            });
+            syncThread.start();
+        }
+        responderConnection.start();
+
+        requestorConnection = createConnection(name.getMethodName() + "-requestor");
+        requestorSession = requestorConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        if (topic) {
+            replyDestination = requestorSession.createTemporaryTopic();
+        } else {
+            replyDestination = requestorSession.createTemporaryQueue();
+        }
+        requestorConnection.start();
+    }
+
+    @Test(timeout=60000)
+    public void testRequestResponseToTempQueue() throws Exception {
+        doSetupConnections(false);
+        doTestRequestResponse();
+    }
+
+    @Test(timeout=60000)
+    public void testRequestResponseToTempTopic() throws Exception {
+        doSetupConnections(true);
+        doTestRequestResponse();
+    }
+
+    private void doTestRequestResponse() throws Exception {
+
+        MessageProducer requestProducer = requestorSession.createProducer(requestDestination);
+        MessageConsumer replyConsumer = requestorSession.createConsumer(replyDestination);
+
+        TextMessage requestMessage = requestorSession.createTextMessage("SomeRequest");
+        requestMessage.setJMSReplyTo(replyDestination);
+        requestProducer.send(requestMessage);
+
+        LOG.info("Sent request to destination: {}", requestDestination.toString());
+
+        Message msg = replyConsumer.receive(10000);
+
+        if (msg instanceof TextMessage) {
+            TextMessage replyMessage = (TextMessage)msg;
+            LOG.info("Received reply.");
+            LOG.info(replyMessage.toString());
+            assertTrue("Wrong message content", replyMessage.getText().startsWith("response"));
+        } else {
+            fail("Should have received a reply by now");
+        }
+        replyConsumer.close();
+
+        assertEquals("Should not have had any failures: " + failures, 0, failures.size());
+    }
+
+    /**
+     * Can be overridden in subclasses to test against a different transport suchs as NIO.
+     *
+     * @return the port to connect to on the Broker.
+     */
+    protected int getBrokerPort() {
+        return port;
+    }
+
+    private Connection createConnection(String clientId) throws JMSException {
+        return createConnection(clientId, false, false);
+    }
+
+    protected Connection createConnection(String clientId, boolean syncPublish, boolean useSsl)
throws JMSException {
+
+        int brokerPort = getBrokerPort();
+        LOG.debug("Creating connection on port {}", brokerPort);
+        final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", brokerPort,
"admin", "password", null, useSsl);
+
+        factory.setSyncPublish(syncPublish);
+        factory.setTopicPrefix("topic://");
+        factory.setQueuePrefix("queue://");
+
+        final Connection connection = factory.createConnection();
+        if (clientId != null && !clientId.isEmpty()) {
+            connection.setClientID(clientId);
+        }
+        connection.setExceptionListener(new ExceptionListener() {
+            @Override
+            public void onException(JMSException exception) {
+                exception.printStackTrace();
+            }
+        });
+        connection.start();
+        return connection;
+    }
+
+    protected void syncConsumeLoop(MessageConsumer requestConsumer) {
+        try {
+            Message message = requestConsumer.receive(5000);
+            if (message != null) {
+                onMessage(message);
+            } else {
+                LOG.error("No message received");
+            }
+        } catch (JMSException e) {
+            onException(e);
+        }
+    }
+
+    @Override
+    public void onMessage(Message message) {
+        try {
+            TextMessage requestMessage = (TextMessage)message;
+
+            LOG.info("Received request.");
+            LOG.info(requestMessage.toString());
+
+            Destination replyDestination = requestMessage.getJMSReplyTo();
+            if (replyDestination instanceof Topic) {
+                LOG.info("Reply destination is: {}", ((Topic)replyDestination).getTopicName());
+            } else {
+                LOG.info("Reply destination is: {}", ((Queue)replyDestination).getQueueName());
+            }
+
+            TextMessage replyMessage = responderSession.createTextMessage("response for:
" + requestMessage.getText());
+            replyMessage.setJMSCorrelationID(requestMessage.getJMSMessageID());
+
+            if (dynamicallyCreateProducer) {
+                responseProducer = responderSession.createProducer(replyDestination);
+                responseProducer.send(replyMessage);
+            } else {
+                responseProducer.send(replyDestination, replyMessage);
+            }
+
+            LOG.info("Sent reply.");
+            LOG.info(replyMessage.toString());
+        } catch (JMSException e) {
+            onException(e);
+        }
+    }
+
+    protected void onException(JMSException e) {
+        LOG.info("Caught: " + e);
+        e.printStackTrace();
+        failures.add(e);
+    }
+}


Mime
View raw message