activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1026543 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/store/kahadb/ test/java/org/apache/activemq/usecases/
Date Sat, 23 Oct 2010 01:36:13 GMT
Author: gtully
Date: Sat Oct 23 01:36:13 2010
New Revision: 1026543

URL: http://svn.apache.org/viewvc?rev=1026543&view=rev
Log:
additional tests and refactoring of fix for https://issues.apache.org/activemq/browse/AMQ-2985,
not updating subscriptionAck with an unmatched message resolves recovery of unmatched selector
durables, reveting the previous change

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=1026543&r1=1026542&r2=1026543&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
Sat Oct 23 01:36:13 2010
@@ -631,19 +631,22 @@ public class KahaDBStore extends Message
                         }
                     }
                 } else {
-                    doAcknowledge(context, subscriptionKey, messageId);
+                    doAcknowledge(context, subscriptionKey, messageId, ack);
                 }
             } else {
-                doAcknowledge(context, subscriptionKey, messageId);
+                doAcknowledge(context, subscriptionKey, messageId, ack);
             }
         }
 
-        protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId
messageId)
+        protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId
messageId, MessageAck ack)
                 throws IOException {
             KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
             command.setDestination(dest);
             command.setSubscriptionKey(subscriptionKey);
             command.setMessageId(messageId.toString());
+            if (ack != null && ack.isUnmatchedAck()) {
+                command.setAck(UNMATCHED);
+            }
             store(command, false, null, null);
         }
 
@@ -737,7 +740,7 @@ public class KahaDBStore extends Message
                                 selectorExpression = SelectorParser.parse(selector);
                             }
                             sd.orderIndex.resetCursorPosition();
-                            sd.orderIndex.setBatch(tx, (selectorExpression == null ? cursorPos
: -1));
+                            sd.orderIndex.setBatch(tx, cursorPos);
                             for (Iterator<Entry<Long, MessageKeys>> iterator
= sd.orderIndex.iterator(tx); iterator
                                     .hasNext();) {
                                 Entry<Long, MessageKeys> entry = iterator.next();
@@ -766,14 +769,13 @@ public class KahaDBStore extends Message
         public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener
listener)
                 throws Exception {
             final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
-            final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
             indexLock.writeLock().lock();
             try {
                 pageFile.tx().execute(new Transaction.Closure<Exception>() {
                     public void execute(Transaction tx) throws Exception {
                         StoredDestination sd = getStoredDestination(dest, tx);
                         Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
-                        sd.orderIndex.setBatch(tx, (info.getSelector() == null ? cursorPos
: -1));
+                        sd.orderIndex.setBatch(tx, cursorPos);
                         for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx);
iterator
                                 .hasNext();) {
                             Entry<Long, MessageKeys> entry = iterator.next();
@@ -790,7 +792,6 @@ public class KahaDBStore extends Message
         public void recoverNextMessages(String clientId, String subscriptionName, final int
maxReturned,
                 final MessageRecoveryListener listener) throws Exception {
             final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
-            final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
             indexLock.writeLock().lock();
             try {
                 pageFile.tx().execute(new Transaction.Closure<Exception>() {
@@ -800,7 +801,7 @@ public class KahaDBStore extends Message
                         MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey);
                         if (moc == null) {
                             long pos = sd.subscriptionAcks.get(tx, subscriptionKey);
-                            sd.orderIndex.setBatch(tx, (info.getSelector() == null ? pos
: -1));
+                            sd.orderIndex.setBatch(tx, pos);
                             moc = sd.orderIndex.cursor;
                         } else {
                             sd.orderIndex.cursor.sync(moc);
@@ -1228,7 +1229,7 @@ public class KahaDBStore extends Message
                     // apply any acks we have
                     synchronized (this.subscriptionKeys) {
                         for (String key : this.subscriptionKeys) {
-                            this.topicStore.doAcknowledge(context, key, this.message.getMessageId());
+                            this.topicStore.doAcknowledge(context, key, this.message.getMessageId(),
null);
 
                         }
                     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1026543&r1=1026542&r2=1026543&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Sat Oct 23 01:36:13 2010
@@ -88,6 +88,7 @@ public class MessageDatabase extends Ser
     public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
     public static final int LOG_SLOW_ACCESS_TIME = Integer.parseInt(System.getProperty(PROPERTY_LOG_SLOW_ACCESS_TIME,
"0"));
 
+    protected static final Buffer UNMATCHED = new Buffer(new byte[]{});
     private static final Log LOG = LogFactory.getLog(MessageDatabase.class);
     private static final int DEFAULT_DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
 
@@ -1037,11 +1038,14 @@ public class MessageDatabase extends Ser
                 String subscriptionKey = command.getSubscriptionKey();
                 Long prev = sd.subscriptionAcks.put(tx, subscriptionKey, sequence);
 
+                if (command.getAck() == UNMATCHED) {
+                    sd.subscriptionAcks.put(tx, subscriptionKey, prev);    
+                }
+                // The following method handles deleting un-referenced messages.
+                removeAckLocation(tx, sd, subscriptionKey, prev);
+
                 // Add it to the new location set.
                 addAckLocation(sd, sequence, subscriptionKey);
-
-                // The following method handles deleting un-referenced messages.
-                removeAckLocation(tx, sd, subscriptionKey, sequence);
             }
 
         }
@@ -1506,16 +1510,24 @@ public class MessageDatabase extends Ser
             if (hs != null) {
                 hs.remove(subscriptionKey);
                 if (hs.isEmpty()) {
+                    HashSet<String> firstSet = sd.ackPositions.values().iterator().next();
                     sd.ackPositions.remove(sequenceId);
 
-                    ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long,
MessageKeys>>();
-                    sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
-
-                    // Do the actual delete.
-                    for (Entry<Long, MessageKeys> entry : deletes) {
-                        sd.locationIndex.remove(tx, entry.getValue().location);
-                        sd.messageIdIndex.remove(tx, entry.getValue().messageId);
-                        sd.orderIndex.remove(tx, entry.getKey());
+                    // Did we just empty out the first set in the
+                    // ordered list of ack locations? Then it's time to
+                    // delete some messages.
+                    if (hs == firstSet) {
+
+                        // Find all the entries that need to get deleted.
+                        ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long,
MessageKeys>>();
+                        sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
+
+                        // Do the actual deletes.
+                        for (Entry<Long, MessageKeys> entry : deletes) {
+                            sd.locationIndex.remove(tx, entry.getValue().location);
+                            sd.messageIdIndex.remove(tx,entry.getValue().messageId);
+                            sd.orderIndex.remove(tx,entry.getKey());
+                        }
                     }
                 }
             }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java?rev=1026543&r1=1026542&r2=1026543&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
Sat Oct 23 01:36:13 2010
@@ -43,8 +43,12 @@ public class DurableSubscriptionOfflineT
 
     @Override
     protected Connection createConnection() throws Exception {
+        return createConnection("cliName");
+    }
+
+    protected Connection createConnection(String name) throws Exception {
         Connection con = super.createConnection();
-        con.setClientID("cliName");
+        con.setClientID(name);
         con.start();
         return con;
     }
@@ -190,9 +194,8 @@ public class DurableSubscriptionOfflineT
          session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
          MessageProducer producer = session.createProducer(null);
 
-         final int numMessages = 10;
          int sent = 0;
-         for (int i = 0; i < numMessages; i++) {
+         for (int i = 0; i < 10; i++) {
              sent++;
              Message message = session.createMessage();
              message.setStringProperty("filter", "true");
@@ -217,7 +220,7 @@ public class DurableSubscriptionOfflineT
          con.close();
 
          LOG.info("Consumed: " + listener.count);
-         assertEquals(numMessages, listener.count);
+         assertEquals(sent, listener.count);
 
          // consume messages again, should not get any
          con = createConnection();
@@ -233,6 +236,60 @@ public class DurableSubscriptionOfflineT
 
          assertEquals(0, listener.count);
      }
+
+    public void testOfflineSubscription4() throws Exception {
+        // create durable subscription 1
+        Connection con = createConnection("cliId1");
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        session.close();
+        con.close();
+
+        // create durable subscription 2
+        Connection con2 = createConnection("cliId2");
+        Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter
= 'true'", true);
+        Listener listener2 = new Listener();
+        consumer2.setMessageListener(listener2);
+
+        // send messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(null);
+
+        int sent = 0;
+        for (int i = 0; i < 10; i++) {
+            sent++;
+            Message message = session.createMessage();
+            message.setStringProperty("filter", "true");
+            producer.send(topic, message);
+        }
+
+        Thread.sleep(1 * 1000);
+        session.close();
+        con.close();
+
+        // test online subs
+        Thread.sleep(3 * 1000);
+        session2.close();
+        con2.close();
+
+        assertEquals(sent, listener2.count);
+
+        // consume messages
+        con = createConnection("cliId1");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter
= 'true'", true);
+        Listener listener = new Listener();
+        consumer.setMessageListener(listener);
+
+        Thread.sleep(3 * 1000);
+
+        session.close();
+        con.close();
+
+        assertEquals("offline consumer got all", sent, listener.count);
+    }
     
     public static class Listener implements MessageListener {
         int count = 0;



Mime
View raw message