activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1027644 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/store/kahadb/ test/java/org/apache/activemq/usecases/
Date Tue, 26 Oct 2010 16:52:08 GMT
Author: gtully
Date: Tue Oct 26 16:52:08 2010
New Revision: 1027644

URL: http://svn.apache.org/viewvc?rev=1027644&view=rev
Log:
fix regression of https://issues.apache.org/activemq/browse/AMQ-2870 from refactoring of fix
for https://issues.apache.org/activemq/browse/AMQ-2985 - unmatched acks are now stored as
negative sequence ids so that we can differenciate between a durables that have acked messages
after receipt and those that have acked because of unmatched. Unmatched messages are now deleted
in the normal way

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=1027644&r1=1027643&r2=1027644&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
Tue Oct 26 16:52:08 2010
@@ -718,6 +718,17 @@ public class KahaDBStore extends Message
             }
         }
 
+        // an ack for an unmatched message is stored as a negative sequence id
+        // if sub has been getting unmatched acks, we need to reset
+        protected Long resetForSelectors(SubscriptionInfo info, Long position) {
+            if (info.getSelector() != null) {
+                if (position < NOT_ACKED) {
+                    position = NOT_ACKED;
+                }
+            }
+            return position;
+        }
+
         public int getMessageCount(String clientId, String subscriptionName) throws IOException
{
             final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
             final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
@@ -731,6 +742,7 @@ public class KahaDBStore extends Message
                             // The subscription might not exist.
                             return 0;
                         }
+                        cursorPos = resetForSelectors(info, cursorPos);
 
                         int counter = 0;
                         try {
@@ -740,7 +752,7 @@ public class KahaDBStore extends Message
                                 selectorExpression = SelectorParser.parse(selector);
                             }
                             sd.orderIndex.resetCursorPosition();
-                            sd.orderIndex.setBatch(tx, cursorPos);
+                            sd.orderIndex.setBatch(tx, extractSequenceId(cursorPos));
                             for (Iterator<Entry<Long, MessageKeys>> iterator
= sd.orderIndex.iterator(tx); iterator
                                     .hasNext();) {
                                 Entry<Long, MessageKeys> entry = iterator.next();
@@ -769,13 +781,15 @@ public class KahaDBStore extends Message
         public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener
listener)
                 throws Exception {
             final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
+            final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
             indexLock.writeLock().lock();
             try {
                 pageFile.tx().execute(new Transaction.Closure<Exception>() {
                     public void execute(Transaction tx) throws Exception {
                         StoredDestination sd = getStoredDestination(dest, tx);
                         Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
-                        sd.orderIndex.setBatch(tx, cursorPos);
+                        cursorPos = resetForSelectors(info, cursorPos);
+                        sd.orderIndex.setBatch(tx, extractSequenceId(cursorPos));
                         for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx);
iterator
                                 .hasNext();) {
                             Entry<Long, MessageKeys> entry = iterator.next();
@@ -792,6 +806,7 @@ public class KahaDBStore extends Message
         public void recoverNextMessages(String clientId, String subscriptionName, final int
maxReturned,
                 final MessageRecoveryListener listener) throws Exception {
             final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
+            final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
             indexLock.writeLock().lock();
             try {
                 pageFile.tx().execute(new Transaction.Closure<Exception>() {
@@ -800,8 +815,9 @@ public class KahaDBStore extends Message
                         sd.orderIndex.resetCursorPosition();
                         MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey);
                         if (moc == null) {
-                            long pos = sd.subscriptionAcks.get(tx, subscriptionKey);
-                            sd.orderIndex.setBatch(tx, pos);
+                            Long pos = sd.subscriptionAcks.get(tx, subscriptionKey);
+                            pos = resetForSelectors(info, pos);
+                            sd.orderIndex.setBatch(tx, extractSequenceId(pos));
                             moc = sd.orderIndex.cursor;
                         } else {
                             sd.orderIndex.cursor.sync(moc);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1027644&r1=1027643&r2=1027644&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Tue Oct 26 16:52:08 2010
@@ -98,6 +98,8 @@ public class MessageDatabase extends Ser
     static final int CLOSED_STATE = 1;
     static final int OPEN_STATE = 2;
     static final long NOT_ACKED = -1;
+    static final long UNMATCHED_SEQ = -2;
+
     static final int VERSION = 2;
 
 
@@ -1018,6 +1020,13 @@ public class MessageDatabase extends Ser
         metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
     }
 
+    protected Long extractSequenceId(Long prev) {
+        if (prev < NOT_ACKED) {
+            prev = Math.abs(prev) + UNMATCHED_SEQ;
+        }
+        return prev;
+    }
+
     void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation)
throws IOException {
         StoredDestination sd = getStoredDestination(command.getDestination(), tx);
         if (!command.hasSubscriptionKey()) {
@@ -1039,13 +1048,16 @@ public class MessageDatabase extends Ser
             // Make sure it's a valid message id...
             if (sequence != null) {
                 String subscriptionKey = command.getSubscriptionKey();
-                Long prev = sd.subscriptionAcks.put(tx, subscriptionKey, sequence);
-
+                Long ackSequenceToStore = sequence;
                 if (command.getAck() == UNMATCHED) {
-                    sd.subscriptionAcks.put(tx, subscriptionKey, prev);
+                    // store negative sequence to indicate that it was unmatched
+                    ackSequenceToStore = new Long(UNMATCHED_SEQ - sequence);
                 }
+
+                Long prev = sd.subscriptionAcks.put(tx, subscriptionKey, ackSequenceToStore);
+
                 // The following method handles deleting un-referenced messages.
-                removeAckLocation(tx, sd, subscriptionKey, prev);
+                removeAckLocation(tx, sd, subscriptionKey, extractSequenceId(prev));
 
                 // Add it to the new location set.
                 addAckLocation(sd, sequence, subscriptionKey);
@@ -1116,7 +1128,7 @@ public class MessageDatabase extends Ser
             sd.subscriptions.remove(tx, subscriptionKey);
             Long prev = sd.subscriptionAcks.remove(tx, subscriptionKey);
             if( prev!=null ) {
-                removeAckLocation(tx, sd, subscriptionKey, prev);
+                removeAckLocation(tx, sd, subscriptionKey, extractSequenceId(prev));
             }
         }
 
@@ -1468,7 +1480,7 @@ public class MessageDatabase extends Ser
 
             for (Iterator<Entry<String, Long>> iterator = rc.subscriptionAcks.iterator(tx);
iterator.hasNext();) {
                 Entry<String, Long> entry = iterator.next();
-                addAckLocation(rc, entry.getValue(), entry.getKey());
+                addAckLocation(rc, extractSequenceId(entry.getValue()), entry.getKey());
             }
             
             if (rc.orderIndex.nextMessageId == 0) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java?rev=1027644&r1=1027643&r2=1027644&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
Tue Oct 26 16:52:08 2010
@@ -91,14 +91,14 @@ public class DurableSubscriptionOfflineT
             broker.stop();
     }
 
-    public void initCombosForTestOfflineSubscription() throws Exception {
+    public void x_initCombosForTestConsumeOnlyMatchedMessages() throws Exception {
         this.addCombinationValues("defaultPersistenceAdapter",
                 new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
         this.addCombinationValues("usePrioritySupport",
                 new Object[]{ Boolean.TRUE, Boolean.FALSE});
     }
 
-    public void testOfflineSubscription() throws Exception {
+    public void testConsumeOnlyMatchedMessages() throws Exception {
         // create durable subscription
         Connection con = createConnection();
         Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -140,7 +140,7 @@ public class DurableSubscriptionOfflineT
         assertEquals(sent, listener.count);
     }
 
-     public void testOfflineSubscription2() throws Exception {
+     public void testConsumeAllMatchedMessages() throws Exception {
          // create durable subscription
          Connection con = createConnection();
          Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -181,7 +181,7 @@ public class DurableSubscriptionOfflineT
          assertEquals(sent, listener.count);
      }
 
-     public void testOfflineSubscription3() throws Exception {
+     public void testVerifyAllConsumedAreAcked() throws Exception {
          // create durable subscription
          Connection con = createConnection();
          Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -237,7 +237,7 @@ public class DurableSubscriptionOfflineT
          assertEquals(0, listener.count);
      }
 
-    public void testOfflineSubscription4() throws Exception {
+    public void testTwoOfflineSubscriptionCanConsume() throws Exception {
         // create durable subscription 1
         Connection con = createConnection("cliId1");
         Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);



Mime
View raw message