activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1033581 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/store/kahadb/MessageDatabase.java test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
Date Wed, 10 Nov 2010 16:44:23 GMT
Author: gtully
Date: Wed Nov 10 16:44:23 2010
New Revision: 1033581

URL: http://svn.apache.org/viewvc?rev=1033581&view=rev
Log:
fix issue from new test for https://issues.apache.org/activemq/browse/AMQ-2985 - when acking
as unmatched, the matching messages that lie inbetwen the sequences need to be added to the
ack locations to ensure they don't get deleted when other consumers are done with them. test
now enabled for kahaDB

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1033581&r1=1033580&r2=1033581&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Wed Nov 10 16:44:23 2010
@@ -1056,6 +1056,12 @@ public class MessageDatabase extends Ser
 
                 Long prev = sd.subscriptionAcks.put(tx, subscriptionKey, ackSequenceToStore);
 
+                if (ackSequenceToStore != sequence) {
+                    // unmatched, need to add ack locations for the intermediate sequences
+                    for (long matchedGapSequence = extractSequenceId(prev) + 1; matchedGapSequence
< sequence; matchedGapSequence++) {
+                        addAckLocation(sd, matchedGapSequence, subscriptionKey);
+                    }
+                }
                 // The following method handles deleting un-referenced messages.
                 removeAckLocation(tx, sd, subscriptionKey, extractSequenceId(prev));
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java?rev=1033581&r1=1033580&r2=1033581&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
Wed Nov 10 16:44:23 2010
@@ -30,6 +30,7 @@ import javax.jms.*;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import java.io.File;
+import java.util.Vector;
 
 public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupport {
 
@@ -37,6 +38,7 @@ public class DurableSubscriptionOfflineT
     public Boolean usePrioritySupport = Boolean.TRUE;
     private BrokerService broker;
     private ActiveMQTopic topic;
+    private Vector<Exception> exceptions = new Vector<Exception>();
 
     protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
         return new ActiveMQConnectionFactory("vm://" + getName(true));
@@ -59,6 +61,7 @@ public class DurableSubscriptionOfflineT
     }
     
     protected void setUp() throws Exception {
+        exceptions.clear();
         topic = (ActiveMQTopic) createDestination();
         createBroker();
         super.setUp();
@@ -389,7 +392,7 @@ public class DurableSubscriptionOfflineT
 
     public void initCombosForTestInterleavedOfflineSubscriptionCanConsume() throws Exception
{
         this.addCombinationValues("defaultPersistenceAdapter",
-                new Object[]{ /*PersistenceAdapterChoice.KahaDB,*/ PersistenceAdapterChoice.JDBC});
+                new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
     }
 
     public void testInterleavedOfflineSubscriptionCanConsume() throws Exception {
@@ -463,13 +466,13 @@ public class DurableSubscriptionOfflineT
         assertEquals("offline consumer got all", sent, listener.count);
     }    
 
-    public void initCombosForTestMixOfOnLineAndOfflineSubsGetAllMAtched() throws Exception
{
+    public void x_initCombosForTestMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception
{
         this.addCombinationValues("defaultPersistenceAdapter",
-                new Object[]{ /* PersistenceAdapterChoice.KahaDB,*/ PersistenceAdapterChoice.JDBC});
+                new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
     }
 
     private static String filter = "$a='A1' AND (($b=true AND $c=true) OR ($d='D1' OR $d='D2'))";
-    public void testMixOfOnLineAndOfflineSubsGetAllMAtched() throws Exception {
+    public void testMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception {
         // create offline subs 1
         Connection con = createConnection("offCli1");
         Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -549,7 +552,7 @@ public class DurableSubscriptionOfflineT
         con = createConnection("offCli1");
         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", filter,
true);
-        Listener listener = new Listener("offCli1");
+        Listener listener = new FilterCheckListener();
         consumer.setMessageListener(listener);
 
         Thread.sleep(3 * 1000);
@@ -562,7 +565,7 @@ public class DurableSubscriptionOfflineT
         Connection con3 = createConnection("offCli2");
         Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", filter,
true);
-        Listener listener3 = new Listener();
+        Listener listener3 = new FilterCheckListener();
         consumer3.setMessageListener(listener3);
 
         Thread.sleep(3 * 1000);
@@ -570,6 +573,7 @@ public class DurableSubscriptionOfflineT
         con3.close();
 
         assertEquals(filtered, listener3.count);
+        assertTrue("no unexpected exceptions: " + exceptions, exceptions.isEmpty());
     }
 
     public void testRemovedDurableSubDeletes() throws Exception {
@@ -627,9 +631,31 @@ public class DurableSubscriptionOfflineT
             count++;
             if (id != null) {
                 try {
-                    LOG.error(id + ", " + message.getJMSMessageID());
+                    LOG.info(id + ", " + message.getJMSMessageID());
                 } catch (Exception ignored) {}
             }
         }
     }
+
+    public class FilterCheckListener extends Listener  {
+
+        public void onMessage(Message message) {
+            count++;
+
+            try {
+                Object b = message.getObjectProperty("$b");
+                if (b != null) {
+                    boolean c = message.getBooleanProperty("$c");
+                    assertTrue("", c);
+                }
+                else {
+                    String d = message.getStringProperty("$d");
+                    assertTrue("", "D1".equals(d) || "D2".equals(d));
+                }
+            }
+            catch (JMSException e) {
+                exceptions.add(e);
+            }
+        }
+    }
 }



Mime
View raw message