activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1028702 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
Date Fri, 29 Oct 2010 11:46:31 GMT
Author: gtully
Date: Fri Oct 29 11:46:31 2010
New Revision: 1028702

URL: http://svn.apache.org/viewvc?rev=1028702&view=rev
Log:
additional offiline test, and enable the jdbc variants - https://issues.apache.org/activemq/browse/AMQ-2985

Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java?rev=1028702&r1=1028701&r2=1028702&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
Fri Oct 29 11:46:31 2010
@@ -69,9 +69,13 @@ public class DurableSubscriptionOfflineT
     }
 
     private void createBroker() throws Exception {
+        createBroker(true);
+    }
+    
+    private void createBroker(boolean deleteAllMessages) throws Exception {
         broker = BrokerFactory.createBroker("broker:(vm://" + getName(true) +")");
         broker.setBrokerName(getName(true));
-        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
         broker.getManagementContext().setCreateConnector(false);
 
         if (usePrioritySupport) {
@@ -91,7 +95,7 @@ public class DurableSubscriptionOfflineT
             broker.stop();
     }
 
-    public void x_initCombosForTestConsumeOnlyMatchedMessages() throws Exception {
+    public void initCombosForTestConsumeOnlyMatchedMessages() throws Exception {
         this.addCombinationValues("defaultPersistenceAdapter",
                 new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
         this.addCombinationValues("usePrioritySupport",
@@ -290,6 +294,77 @@ public class DurableSubscriptionOfflineT
 
         assertEquals("offline consumer got all", sent, listener.count);
     }
+
+    public void testOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
+        Connection con = createConnection("offCli1");
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        session.close();
+        con.close();
+
+        con = createConnection("offCli2");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        session.close();
+        con.close();
+
+        Connection con2 = createConnection("onlineCli1");
+        Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter
= 'true'", true);
+        Listener listener2 = new Listener();
+        consumer2.setMessageListener(listener2);
+
+        // send messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(null);
+
+        int sent = 0;
+        for (int i = 0; i < 10; i++) {
+            sent++;
+            Message message = session.createMessage();
+            message.setStringProperty("filter", "true");
+            producer.send(topic, message);
+        }
+
+        Thread.sleep(1 * 1000);
+        session.close();
+        con.close();
+
+        // test online subs
+        Thread.sleep(3 * 1000);
+        session2.close();
+        con2.close();
+        assertEquals(sent, listener2.count);
+
+        // restart broker
+        broker.stop();
+        createBroker(false /*deleteAllMessages*/);
+
+        // test offline
+        con = createConnection("offCli1");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter
= 'true'", true);
+
+        Connection con3 = createConnection("offCli2");
+        Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", "filter
= 'true'", true);
+
+        Listener listener = new Listener();
+        consumer.setMessageListener(listener);
+        Listener listener3 = new Listener();
+        consumer3.setMessageListener(listener3);
+
+        Thread.sleep(3 * 1000);
+
+        session.close();
+        con.close();
+        session3.close();
+        con3.close();
+
+        assertEquals(sent, listener.count);
+        assertEquals(sent, listener3.count);
+    }
     
     public static class Listener implements MessageListener {
         int count = 0;



Mime
View raw message