activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hadr...@apache.org
Subject [13/29] git commit: Add additional tests for connection loss exception handling.
Date Thu, 20 Mar 2014 17:44:24 GMT
Add additional tests for connection loss exception handling.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/9587ebba
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/9587ebba
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/9587ebba

Branch: refs/heads/activemq-5.9
Commit: 9587ebba5350070c0ecc3ce0e2952cf4a7d9bd71
Parents: 95c90b5
Author: Timothy Bish <tabish121@gmai.com>
Authored: Thu Jan 30 15:24:19 2014 -0500
Committer: Hadrian Zbarcea <hadrian@apache.org>
Committed: Thu Mar 20 13:10:21 2014 -0400

----------------------------------------------------------------------
 .../activemq/transport/amqp/JMSClientTest.java  | 180 +++++++++++++++++--
 1 file changed, 168 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/9587ebba/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
index 878ebc3..af4f6f8 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
@@ -239,9 +239,8 @@ public class JMSClientTest extends AmqpTestSupport {
         connection.close();
     }
 
-    //should through exception IllegalStateException:The session is closed
     @Test(timeout=30000)
-    public void testBrokerRestartPersistentQueueException() throws Exception {
+    public void testProducerThrowsWhenBrokerStops() throws Exception {
 
         Connection connection = createConnection();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -252,20 +251,64 @@ public class JMSClientTest extends AmqpTestSupport {
         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
 
         Message m = session.createTextMessage("Sample text");
-        producer.send(m);
 
-        restartBroker();
+        Thread stopper = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    TimeUnit.SECONDS.sleep(5);
+                    stopBroker();
+                } catch (Exception ex) {}
+            }
+        });
+        stopper.start();
 
         try {
-            session.createConsumer(queue);
+            for (int i = 0; i < 10; ++i) {
+                producer.send(m);
+                TimeUnit.SECONDS.sleep(1);
+            }
             fail("Should have thrown an IllegalStateException");
         } catch (Exception ex) {
-            LOG.info("Caught exception on receive: {}", ex);
+            LOG.info("Caught exception on send: {}", ex);
+        }
+    }
+
+    @Test(timeout=30000)
+    public void testProducerCreateThrowsWhenBrokerStops() throws Exception {
+
+        Connection connection = createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(name.toString());
+        connection.start();
+
+        Thread stopper = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    TimeUnit.SECONDS.sleep(5);
+                    stopBroker();
+                } catch (Exception ex) {}
+            }
+        });
+        stopper.start();
+
+        try {
+            for (int i = 0; i < 10; ++i) {
+                MessageProducer producer = session.createProducer(queue);
+                assertNotNull(producer);
+                TimeUnit.SECONDS.sleep(1);
+            }
+            fail("Should have thrown an IllegalStateException");
+        } catch (Exception ex) {
+            LOG.info("Caught exception on create producer: {}", ex);
         }
     }
 
     @Test(timeout=30000)
-    public void testProducerThrowsWhenBrokerRestarted() throws Exception {
+    public void testConsumerCreateThrowsWhenBrokerStops() throws Exception {
 
         Connection connection = createConnection();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -276,27 +319,106 @@ public class JMSClientTest extends AmqpTestSupport {
         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
 
         Message m = session.createTextMessage("Sample text");
+        producer.send(m);
 
-        Thread restart = new Thread(new Runnable() {
+        stopBroker();
+        try {
+            session.createConsumer(queue);
+            fail("Should have thrown an IllegalStateException");
+        } catch (Exception ex) {
+            LOG.info("Caught exception on receive: {}", ex);
+        }
+    }
+
+    @Test(timeout=30000)
+    public void testConsumerReceiveNoWaitThrowsWhenBrokerStops() throws Exception {
+
+        Connection connection = createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(name.toString());
+        connection.start();
+
+        MessageConsumer consumer=session.createConsumer(queue);
+        Thread stopper = new Thread(new Runnable() {
 
             @Override
             public void run() {
                 try {
                     TimeUnit.SECONDS.sleep(5);
-                    restartBroker();
+                    stopBroker();
                 } catch (Exception ex) {}
             }
         });
-        restart.start();
+        stopper.start();
 
         try {
             for (int i = 0; i < 10; ++i) {
-                producer.send(m);
+                consumer.receiveNoWait();
                 TimeUnit.SECONDS.sleep(1);
             }
             fail("Should have thrown an IllegalStateException");
         } catch (Exception ex) {
-            LOG.info("Caught exception on send: {}", ex);
+            LOG.info("Caught exception on receiveNoWait: {}", ex);
+        }
+    }
+
+    @Test(timeout=30000)
+    public void testConsumerReceiveTimedThrowsWhenBrokerStops() throws Exception {
+
+        Connection connection = createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(name.toString());
+        connection.start();
+
+        MessageConsumer consumer=session.createConsumer(queue);
+        Thread stopper = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    TimeUnit.SECONDS.sleep(5);
+                    stopBroker();
+                } catch (Exception ex) {}
+            }
+        });
+        stopper.start();
+
+        try {
+            for (int i = 0; i < 10; ++i) {
+                consumer.receive(1000);
+            }
+            fail("Should have thrown an IllegalStateException");
+        } catch (Exception ex) {
+            LOG.info("Caught exception on receive(1000): {}", ex);
+        }
+    }
+
+    @Test(timeout=30000)
+    public void testConsumerReceiveReturnsBrokerStops() throws Exception {
+
+        Connection connection = createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(name.toString());
+        connection.start();
+
+        MessageConsumer consumer=session.createConsumer(queue);
+        Thread stopper = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    TimeUnit.SECONDS.sleep(5);
+                    stopBroker();
+                } catch (Exception ex) {}
+            }
+        });
+        stopper.start();
+
+        try {
+            Message m = consumer.receive();
+            assertNull(m);
+        } catch (Exception ex) {
+            LOG.info("Caught exception on receive(1000): {}", ex);
         }
     }
 
@@ -538,6 +660,40 @@ public class JMSClientTest extends AmqpTestSupport {
         }));
     }
 
+    @Test(timeout=30000)
+    public void testExecptionListenerCalledOnBrokerStop() throws Exception {
+
+        Connection connection = createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(name.toString());
+        connection.start();
+
+        final CountDownLatch called = new CountDownLatch(1);
+
+        connection.setExceptionListener(new ExceptionListener() {
+
+            @Override
+            public void onException(JMSException exception) {
+                LOG.info("Exception listener called: ", exception);
+                called.countDown();
+            }
+        });
+
+        Thread stopper = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    TimeUnit.SECONDS.sleep(5);
+                    stopBroker();
+                } catch (Exception ex) {}
+            }
+        });
+        stopper.start();
+
+        assertTrue("No exception listener event fired.", called.await(15, TimeUnit.SECONDS));
+    }
+
     private Connection createConnection() throws JMSException {
         return createConnection(name.toString(), false);
     }


Mime
View raw message