activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject git commit: Some additional JMS Tests focused on Topics. Useful when updating the AMQP JMS Client version as it shows some new problem in the latest SNAPSHOT builds.
Date Wed, 11 Dec 2013 21:58:12 GMT
Updated Branches:
  refs/heads/trunk 78c4e4337 -> e1e8c5b08


Some additional JMS Tests focused on Topics.  Useful when updating the
AMQP JMS Client version as it shows some new problem in the latest
SNAPSHOT builds.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e1e8c5b0
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e1e8c5b0
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e1e8c5b0

Branch: refs/heads/trunk
Commit: e1e8c5b083e0c9f6e6c6f4dd25d35ad039fc4ca1
Parents: 78c4e43
Author: Timothy Bish <tabish121@gmai.com>
Authored: Wed Dec 11 16:58:08 2013 -0500
Committer: Timothy Bish <tabish121@gmai.com>
Committed: Wed Dec 11 16:58:08 2013 -0500

----------------------------------------------------------------------
 .../activemq/transport/amqp/JMSClientTest.java  | 182 +++++++++++++++++--
 1 file changed, 168 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/e1e8c5b0/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
index e35127d..ebed8d6 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
@@ -23,7 +23,9 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.util.Enumeration;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
@@ -31,6 +33,7 @@ import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.QueueBrowser;
 import javax.jms.Session;
@@ -38,8 +41,10 @@ import javax.jms.TextMessage;
 
 import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin;
+import org.apache.activemq.util.Wait;
 import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
 import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
+import org.apache.qpid.amqp_1_0.jms.impl.TopicImpl;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
@@ -347,26 +352,175 @@ public class JMSClientTest extends AmqpTestSupport {
 
     @Test(timeout=30000)
     public void testTTL() throws Exception {
-        QueueImpl queue = new QueueImpl("queue://" + name);
+        Connection connection = null;
+        try {
+            QueueImpl queue = new QueueImpl("queue://" + name);
+            connection = createConnection();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            connection.start();
+            MessageProducer producer = session.createProducer(queue);
+            producer.setTimeToLive(1000);
+            Message toSend = session.createTextMessage("Sample text");
+            producer.send(toSend);
+            MessageConsumer consumer = session.createConsumer(queue);
+            Message received = consumer.receive(5000);
+            assertNotNull(received);
+            LOG.info("Message JMSExpiration = {}", received.getJMSExpiration());
+            producer.setTimeToLive(100);
+            producer.send(toSend);
+            TimeUnit.SECONDS.sleep(2);
+            received = consumer.receive(5000);
+            if (received != null) {
+                LOG.info("Message JMSExpiration = {} JMSTimeStamp = {} TTL = {}",
+                         new Object[] { received.getJMSExpiration(), received.getJMSTimestamp(),
+                                        received.getJMSExpiration() - received.getJMSTimestamp()});
+            }
+            assertNull(received);
+        } finally {
+            connection.close();
+        }
+    }
+
+    @Test(timeout=30000)
+    public void testDurableConsumerAsync() throws Exception {
+        ActiveMQAdmin.enableJMSFrameTracing();
+        TopicImpl topic = new TopicImpl("topic://"+name);
+        final CountDownLatch latch = new CountDownLatch(1);
+        final AtomicReference<Message> received = new AtomicReference<Message>();
+
         Connection connection = createConnection();
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        connection.start();
-        MessageProducer producer = session.createProducer(queue);
-        producer.setTimeToLive(1000);
-        Message toSend = session.createTextMessage("Sample text");
-        producer.send(toSend);
-        MessageConsumer consumer = session.createConsumer(queue);
-        Message received = consumer.receive(5000);
-        assertNotNull(received);
-        producer.setTimeToLive(100);
-        producer.send(toSend);
-        TimeUnit.SECONDS.sleep(1);
-        assertNull(consumer.receive(5000));
+        {
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer = session.createDurableSubscriber(topic, "DurbaleTopic");
+            consumer.setMessageListener(new MessageListener() {
+
+                @Override
+                public void onMessage(Message message) {
+                    received.set(message);
+                    latch.countDown();
+                }
+            });
+
+            MessageProducer producer = session.createProducer(topic);
+            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+            connection.start();
+
+            TextMessage message = session.createTextMessage();
+            message.setText("hello");
+            producer.send(message);
+
+            assertTrue(latch.await(10, TimeUnit.SECONDS));
+            assertNotNull("Should have received a message by now.", received.get());
+            assertTrue("Should be an instance of TextMessage", received.get() instanceof
TextMessage);
+        }
+        connection.close();
+    }
+
+    @Test(timeout=30000)
+    public void testDurableConsumerSync() throws Exception {
+        ActiveMQAdmin.enableJMSFrameTracing();
+        TopicImpl topic = new TopicImpl("topic://"+name);
+
+        Connection connection = createConnection();
+        {
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            final MessageConsumer consumer = session.createDurableSubscriber(topic, "DurbaleTopic");
+            MessageProducer producer = session.createProducer(topic);
+            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+            connection.start();
+
+            TextMessage message = session.createTextMessage();
+            message.setText("hello");
+            producer.send(message);
+
+            final AtomicReference<Message> msg = new AtomicReference<Message>();
+            assertTrue(Wait.waitFor(new Wait.Condition() {
+
+                @Override
+                public boolean isSatisified() throws Exception {
+                    msg.set(consumer.receiveNoWait());
+                    return msg.get() != null;
+                }
+            }));
+
+            assertNotNull("Should have received a message by now.", msg.get());
+            assertTrue("Should be an instance of TextMessage", msg.get() instanceof TextMessage);
+        }
+        connection.close();
+    }
+
+    @Test(timeout=30000)
+    public void testTopicConsumerAsync() throws Exception {
+        ActiveMQAdmin.enableJMSFrameTracing();
+        TopicImpl topic = new TopicImpl("topic://"+name);
+        final CountDownLatch latch = new CountDownLatch(1);
+        final AtomicReference<Message> received = new AtomicReference<Message>();
+
+        Connection connection = createConnection();
+        {
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer = session.createConsumer(topic);
+            consumer.setMessageListener(new MessageListener() {
+
+                @Override
+                public void onMessage(Message message) {
+                    received.set(message);
+                    latch.countDown();
+                }
+            });
+
+            MessageProducer producer = session.createProducer(topic);
+            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+            connection.start();
+
+            TextMessage message = session.createTextMessage();
+            message.setText("hello");
+            producer.send(message);
+
+            assertTrue(latch.await(10, TimeUnit.SECONDS));
+            assertNotNull("Should have received a message by now.", received.get());
+            assertTrue("Should be an instance of TextMessage", received.get() instanceof
TextMessage);
+        }
+        connection.close();
+    }
+
+    @Test(timeout=45000)
+    public void testTopicConsumerSync() throws Exception {
+        ActiveMQAdmin.enableJMSFrameTracing();
+        TopicImpl topic = new TopicImpl("topic://"+name);
+
+        Connection connection = createConnection();
+        {
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            final MessageConsumer consumer = session.createConsumer(topic);
+            MessageProducer producer = session.createProducer(topic);
+            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+            connection.start();
+
+            TextMessage message = session.createTextMessage();
+            message.setText("hello");
+            producer.send(message);
+
+            final AtomicReference<Message> msg = new AtomicReference<Message>();
+            assertTrue(Wait.waitFor(new Wait.Condition() {
+
+                @Override
+                public boolean isSatisified() throws Exception {
+                    msg.set(consumer.receiveNoWait());
+                    return msg.get() != null;
+                }
+            }));
+
+            assertNotNull("Should have received a message by now.", msg.get());
+            assertTrue("Should be an instance of TextMessage", msg.get() instanceof TextMessage);
+        }
+        connection.close();
     }
 
     private Connection createConnection() throws JMSException {
         final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port,
"admin", "password");
         final Connection connection = factory.createConnection();
+        connection.setClientID(name.toString());
         connection.setExceptionListener(new ExceptionListener() {
             @Override
             public void onException(JMSException exception) {


Mime
View raw message