activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cepo...@apache.org
Subject svn commit: r1461153 - /activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java
Date Tue, 26 Mar 2013 14:30:37 GMT
Author: ceposta
Date: Tue Mar 26 14:30:36 2013
New Revision: 1461153

URL: http://svn.apache.org/r1461153
Log:
Updated to take into account scenario from mailing list where durable sub would connect.reconnect
every second message

Modified:
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java

Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java?rev=1461153&r1=1461152&r2=1461153&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java
(original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java
Tue Mar 26 14:30:36 2013
@@ -28,10 +28,15 @@ import javax.jms.TextMessage;
 import javax.jms.Topic;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBStore;
 import org.apache.activemq.test.TestSupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+
 /**
  * 
  */
@@ -39,6 +44,8 @@ public class DurableConsumerCloseAndReco
     protected static final long RECEIVE_TIMEOUT = 5000L;
     private static final Logger LOG = LoggerFactory.getLogger(DurableConsumerCloseAndReconnectTest.class);
 
+    BrokerService brokerService;
+
     protected Connection connection;
     private Session session;
     private MessageConsumer consumer;
@@ -46,29 +53,64 @@ public class DurableConsumerCloseAndReco
     private Destination destination;
     private int messageCount;
 
+    private String vmConnectorURI;
+
     
     @Override
     protected void setUp() throws Exception {
+        createBroker();
         super.setUp();
-        deleteAllMessages();
     }
 
     @Override
     protected void tearDown() throws Exception {
+        stopBroker();
         super.tearDown();
-        deleteAllMessages();
     }
 
-    private void deleteAllMessages() throws Exception {
-        ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory("vm://localhost?broker.deleteAllMessagesOnStartup=true");
-        Connection dummyConnection = fac.createConnection();
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory(vmConnectorURI);
+    }
+
+    protected void createBroker() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setUseJmx(false);
+        brokerService.setPersistent(false);
+        KahaDBPersistenceAdapter store = new KahaDBPersistenceAdapter();
+        brokerService.setPersistenceAdapter(store);
+        brokerService.start();
+        brokerService.waitUntilStarted();
+        vmConnectorURI = brokerService.getVmConnectorURI().toString();
+    }
+
+    protected void stopBroker() throws Exception {
+        brokerService.stop();
+        brokerService.waitUntilStopped();
+    }
+
+    public void testDurableSubscriberReconnectMultipleTimes() throws Exception {
+        Connection dummyConnection = createConnection();
         dummyConnection.start();
+
+        makeConsumer(Session.AUTO_ACKNOWLEDGE);
+        closeConsumer();
+
+        publish(30);
+
+        int counter = 1;
+        for (int i = 0; i < 15; i++) {
+            makeConsumer(Session.AUTO_ACKNOWLEDGE);
+            Message message = consumer.receive(RECEIVE_TIMEOUT);
+            assertTrue("Should have received a message!", message != null);
+            LOG.info("Received message " + counter++);
+            message = consumer.receive(RECEIVE_TIMEOUT);
+            assertTrue("Should have received a message!", message != null);
+            LOG.info("Received message " + counter++);
+            closeConsumer();
+        }
+
         dummyConnection.close();
     }
-    
-    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
-        return new ActiveMQConnectionFactory("vm://localhost?broker.deleteAllMessagesOnStartup=false");
-    }
 
     public void testCreateDurableConsumerCloseThenReconnect() throws Exception {
         // force the server to stay up across both connection tests
@@ -84,10 +126,11 @@ public class DurableConsumerCloseAndReco
     }
 
     protected void consumeMessagesDeliveredWhileConsumerClosed() throws Exception {
+        // default to client ack for consumer
         makeConsumer();
         closeConsumer();
 
-        publish();
+        publish(1);
 
         // wait a few moments for the close to really occur
         Thread.sleep(1000);
@@ -117,7 +160,7 @@ public class DurableConsumerCloseAndReco
         closeConsumer();
 
         LOG.info("Lets publish one more message now");
-        publish();
+        publish(1);
 
         makeConsumer();
         message = consumer.receive(RECEIVE_TIMEOUT);
@@ -127,7 +170,7 @@ public class DurableConsumerCloseAndReco
         closeConsumer();
     }
 
-    protected void publish() throws Exception {
+    protected void publish(int numMessages) throws Exception {
         connection = createConnection();
         connection.start();
 
@@ -136,8 +179,10 @@ public class DurableConsumerCloseAndReco
 
         producer = session.createProducer(destination);
         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-        TextMessage msg = session.createTextMessage("This is a test: " + messageCount++);
-        producer.send(msg);
+        for (int i = 0; i < numMessages; i++) {
+            TextMessage msg = session.createTextMessage("This is a test: " + messageCount++);
+            producer.send(msg);
+        }
 
         producer.close();
         producer = null;
@@ -157,6 +202,7 @@ public class DurableConsumerCloseAndReco
     }
 
     protected void closeConsumer() throws JMSException {
+        LOG.info("Closing the consumer");
         consumer.close();
         consumer = null;
         closeSession();
@@ -170,10 +216,14 @@ public class DurableConsumerCloseAndReco
     }
 
     protected void makeConsumer() throws Exception {
+        makeConsumer(Session.CLIENT_ACKNOWLEDGE);
+    }
+
+    protected void makeConsumer(int ackMode) throws Exception {
         String durableName = getName();
         String clientID = getSubject();
-        LOG.info("Creating a durable subscribe for clientID: " + clientID + " and durable
name: " + durableName);
-        createSession(clientID);
+        LOG.info("Creating a durable subscriber for clientID: " + clientID + " and durable
name: " + durableName);
+        createSession(clientID, ackMode);
         consumer = createConsumer(durableName);
     }
 
@@ -185,12 +235,12 @@ public class DurableConsumerCloseAndReco
         }
     }
 
-    protected void createSession(String clientID) throws Exception {
+    protected void createSession(String clientID, int ackMode) throws Exception {
         connection = createConnection();
         connection.setClientID(clientID);
         connection.start();
 
-        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        session = connection.createSession(false, ackMode);
         destination = createDestination();
     }
 }



Mime
View raw message