activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-6386
Date Fri, 05 Aug 2016 16:05:15 GMT
Repository: activemq
Updated Branches:
  refs/heads/master be032c982 -> 5e5b673af


https://issues.apache.org/jira/browse/AMQ-6386

Add test to help diagnose the issue with cross protocol AMQP -> STOMP
message acking.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/5e5b673a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/5e5b673a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/5e5b673a

Branch: refs/heads/master
Commit: 5e5b673afa04b9ab9096d6c435af4cf99fb7c788
Parents: be032c9
Author: Timothy Bish <tabish121@gmail.com>
Authored: Fri Aug 5 12:04:51 2016 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Fri Aug 5 12:04:51 2016 -0400

----------------------------------------------------------------------
 activemq-amqp/pom.xml                           |   6 +
 .../transport/amqp/AmqpAndStompInteropTest.java | 226 +++++++++++++++++++
 .../src/test/resources/log4j.properties         |   2 +-
 3 files changed, 233 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/5e5b673a/activemq-amqp/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-amqp/pom.xml b/activemq-amqp/pom.xml
index 93e4fe2..d5e2ca2 100644
--- a/activemq-amqp/pom.xml
+++ b/activemq-amqp/pom.xml
@@ -168,6 +168,12 @@
       <version>${netty-all-version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.fusesource.stompjms</groupId>
+      <artifactId>stompjms-client</artifactId>
+      <version>${stompjms-version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/activemq/blob/5e5b673a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpAndStompInteropTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpAndStompInteropTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpAndStompInteropTest.java
new file mode 100644
index 0000000..ce98db3
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpAndStompInteropTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.fusesource.stomp.jms.StompJmsConnectionFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Ignore
+public class AmqpAndStompInteropTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AmqpAndStompInteropTest.class);
+
+    @Rule
+    public TestName name = new TestName();
+
+    protected BrokerService broker;
+    private TransportConnector amqpConnector;
+    private TransportConnector stompConnector;
+
+    @Before
+    public void setUp() throws Exception {
+        broker = createBroker();
+        broker.start();
+        broker.waitUntilStarted();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+            broker = null;
+        }
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        broker.setAdvisorySupport(false);
+        broker.setSchedulerSupport(false);
+
+        amqpConnector = broker.addConnector("amqp://0.0.0.0:0");
+        stompConnector = broker.addConnector("stomp://0.0.0.0:0");
+
+        return broker;
+    }
+
+    @Test(timeout = 30000)
+    public void testSendFromAMQPToSTOMP() throws Exception {
+        sendMessageToQueueUsingAmqp();
+        readMessageFromQueueUsingStomp();
+    }
+
+    @Test(timeout = 30000)
+    public void testSendFromSTOMPToAMQP() throws Exception {
+        sendMessageToQueueUsingStomp();
+        readMessageFromQueueUsingAmqp();
+    }
+
+    @Test(timeout = 30000)
+    public void testSendFromSTOMPToSTOMP() throws Exception {
+        sendMessageToQueueUsingStomp();
+        readMessageFromQueueUsingStomp();
+    }
+
+    @Test(timeout = 30000)
+    public void testSendFromAMQPToAMQP() throws Exception {
+        sendMessageToQueueUsingAmqp();
+        readMessageFromQueueUsingAmqp();
+    }
+
+    private String getQueueName() {
+        return name.getMethodName() + "-Queue";
+    }
+
+    private void sendMessageToQueueUsingAmqp() throws Exception {
+        Connection connection = createAmqpConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getQueueName());
+        MessageProducer producer = session.createProducer(queue);
+
+        try {
+            TextMessage message = session.createTextMessage("test-message-amqp-source");
+            producer.send(message);
+
+            LOG.info("Send AMQP message with Message ID -> {}", message.getJMSMessageID());
+        } finally {
+            connection.close();
+        }
+    }
+
+    private void sendMessageToQueueUsingStomp() throws Exception {
+        Connection connection = createStompConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getQueueName());
+        MessageProducer producer = session.createProducer(queue);
+
+        try {
+            TextMessage message = session.createTextMessage("test-message-stomp-source");
+            producer.send(message);
+
+            LOG.info("Send STOMP message with Message ID -> {}", message.getJMSMessageID());
+        } finally {
+            connection.close();
+        }
+    }
+
+    private void readMessageFromQueueUsingAmqp() throws Exception {
+        Connection connection = createAmqpConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getQueueName());
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        connection.start();
+
+        Message received = consumer.receive(2000);
+        assertNotNull(received);
+
+        LOG.info("Read from AMQP -> message ID = {}", received.getJMSMessageID());
+
+        assertTrue(received instanceof TextMessage);
+
+        TextMessage textMessage = (TextMessage) received;
+        assertNotNull(textMessage.getText());
+    }
+
+    private void readMessageFromQueueUsingStomp() throws Exception {
+        Connection connection = createStompConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getQueueName());
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        connection.start();
+
+        Message received = consumer.receive(2000);
+        assertNotNull(received);
+
+        LOG.info("Read from STOMP -> message ID = {}", received.getJMSMessageID());
+
+        assertTrue(received instanceof TextMessage);
+
+        TextMessage textMessage = (TextMessage) received;
+        assertNotNull(textMessage.getText());
+    }
+
+    private Connection createStompConnection() throws Exception {
+
+        String stompURI = "tcp://localhost:" + stompConnector.getConnectUri().getPort();
+
+        final StompJmsConnectionFactory factory = new StompJmsConnectionFactory();
+
+        factory.setBrokerURI(stompURI);
+        factory.setUsername("admin");
+        factory.setPassword("password");
+
+        final Connection connection = factory.createConnection();
+        connection.setExceptionListener(new ExceptionListener() {
+            @Override
+            public void onException(JMSException exception) {
+                exception.printStackTrace();
+            }
+        });
+
+        connection.start();
+        return connection;
+    }
+
+    private Connection createAmqpConnection() throws Exception {
+
+        String amqpURI = "amqp://localhost:" + amqpConnector.getConnectUri().getPort();
+
+        final JmsConnectionFactory factory = new JmsConnectionFactory(amqpURI);
+
+        factory.setUsername("admin");
+        factory.setPassword("password");
+
+        final Connection connection = factory.createConnection();
+        connection.setExceptionListener(new ExceptionListener() {
+            @Override
+            public void onException(JMSException exception) {
+                exception.printStackTrace();
+            }
+        });
+
+        connection.start();
+        return connection;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/5e5b673a/activemq-amqp/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/resources/log4j.properties b/activemq-amqp/src/test/resources/log4j.properties
index f88b152..d25017d 100755
--- a/activemq-amqp/src/test/resources/log4j.properties
+++ b/activemq-amqp/src/test/resources/log4j.properties
@@ -20,7 +20,7 @@
 #
 log4j.rootLogger=WARN, console, file
 log4j.logger.org.apache.activemq=INFO
-log4j.logger.org.apache.activemq.transport.amqp=TRACE
+log4j.logger.org.apache.activemq.transport.amqp=DEBUG
 log4j.logger.org.apache.activemq.transport.amqp.FRAMES=INFO
 log4j.logger.org.fusesource=INFO
 


Mime
View raw message