activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1038276 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/store/kahadb/ test/java/org/apache/activemq/store/kahadb/ test/java/org/apache/activemq/usecases/
Date Tue, 23 Nov 2010 18:54:56 GMT
Author: gtully
Date: Tue Nov 23 18:54:55 2010
New Revision: 1038276

URL: http://svn.apache.org/viewvc?rev=1038276&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2985 - issue with duplicates. track acklocations at send time such that out of order acks can remove messages, ensures unmatched messages do not build up. track priority with subscriptionAcks such that a restart or batch reset can select the correct cursor and not wrap around producing duplicates. ensure orderly upgrade of store. additional long running test (4 min) that validates

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessTest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=1038276&r1=1038275&r2=1038276&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Tue Nov 23 18:54:55 2010
@@ -718,17 +718,6 @@ public class KahaDBStore extends Message
             }
         }
 
-        // an ack for an unmatched message is stored as a negative sequence id
-        // if sub has been getting unmatched acks, we need to reset
-        protected Long resetForSelectors(SubscriptionInfo info, Long position) {
-            if (info.getSelector() != null) {
-                if (position < NOT_ACKED) {
-                    position = NOT_ACKED;
-                }
-            }
-            return position;
-        }
-
         public int getMessageCount(String clientId, String subscriptionName) throws IOException {
             final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
             final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
@@ -737,12 +726,11 @@ public class KahaDBStore extends Message
                 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
                     public Integer execute(Transaction tx) throws IOException {
                         StoredDestination sd = getStoredDestination(dest, tx);
-                        Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
+                        LastAck cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
                         if (cursorPos == null) {
                             // The subscription might not exist.
                             return 0;
                         }
-                        cursorPos = resetForSelectors(info, cursorPos);
 
                         int counter = 0;
                         try {
@@ -752,7 +740,7 @@ public class KahaDBStore extends Message
                                 selectorExpression = SelectorParser.parse(selector);
                             }
                             sd.orderIndex.resetCursorPosition();
-                            sd.orderIndex.setBatch(tx, extractSequenceId(cursorPos));
+                            sd.orderIndex.setBatch(tx, cursorPos);
                             for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
                                     .hasNext();) {
                                 Entry<Long, MessageKeys> entry = iterator.next();
@@ -787,9 +775,8 @@ public class KahaDBStore extends Message
                 pageFile.tx().execute(new Transaction.Closure<Exception>() {
                     public void execute(Transaction tx) throws Exception {
                         StoredDestination sd = getStoredDestination(dest, tx);
-                        Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
-                        cursorPos = resetForSelectors(info, cursorPos);
-                        sd.orderIndex.setBatch(tx, extractSequenceId(cursorPos));
+                        LastAck cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
+                        sd.orderIndex.setBatch(tx, cursorPos);
                         for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
                                 .hasNext();) {
                             Entry<Long, MessageKeys> entry = iterator.next();
@@ -815,9 +802,8 @@ public class KahaDBStore extends Message
                         sd.orderIndex.resetCursorPosition();
                         MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey);
                         if (moc == null) {
-                            Long pos = sd.subscriptionAcks.get(tx, subscriptionKey);
-                            pos = resetForSelectors(info, pos);
-                            sd.orderIndex.setBatch(tx, extractSequenceId(pos));
+                            LastAck pos = sd.subscriptionAcks.get(tx, subscriptionKey);
+                            sd.orderIndex.setBatch(tx, pos);
                             moc = sd.orderIndex.cursor;
                         } else {
                             sd.orderIndex.cursor.sync(moc);
@@ -839,9 +825,6 @@ public class KahaDBStore extends Message
                         if (entry != null) {
                             MessageOrderCursor copy = sd.orderIndex.cursor.copy();
                             sd.subscriptionCursors.put(subscriptionKey, copy);
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug("updated moc: " + copy + ", recovered: " + counter);
-                            }
                         }
                     }
                 });

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1038276&r1=1038275&r2=1038276&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Tue Nov 23 18:54:55 2010
@@ -983,7 +983,7 @@ public class MessageDatabase extends Ser
 
         // Skip adding the message to the index if this is a topic and there are
         // no subscriptions.
-        if (sd.subscriptions != null && sd.ackPositions.isEmpty(tx)) {
+        if (sd.subscriptions != null && sd.subscriptions.isEmpty(tx)) {
             return;
         }
 
@@ -995,6 +995,9 @@ public class MessageDatabase extends Ser
             previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
             if (previous == null) {
                 sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location));
+                if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) {
+                    addAckLocationForNewMessage(tx, sd, id);
+                }
             } else {
                 // If the message ID as indexed, then the broker asked us to
                 // store a DUP
@@ -1018,13 +1021,6 @@ public class MessageDatabase extends Ser
         metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
     }
 
-    protected Long extractSequenceId(Long prev) {
-        if (prev < NOT_ACKED) {
-            prev = Math.abs(prev) + UNMATCHED_SEQ;
-        }
-        return prev;
-    }
-
     void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
         StoredDestination sd = getStoredDestination(command.getDestination(), tx);
         if (!command.hasSubscriptionKey()) {
@@ -1046,26 +1042,13 @@ public class MessageDatabase extends Ser
             // Make sure it's a valid message id...
             if (sequence != null) {
                 String subscriptionKey = command.getSubscriptionKey();
-                Long ackSequenceToStore = sequence;
-                if (command.getAck() == UNMATCHED) {
-                    // store negative sequence to indicate that it was unmatched
-                    ackSequenceToStore = new Long(UNMATCHED_SEQ - sequence);
-                }
-
-                Long prev = sd.subscriptionAcks.put(tx, subscriptionKey, ackSequenceToStore);
-                if (prev != null) {
-                    if (ackSequenceToStore != sequence) {
-                        // unmatched, need to add ack locations for the intermediate sequences
-                        for (long matchedGapSequence = extractSequenceId(prev) + 1; matchedGapSequence < sequence; matchedGapSequence++) {
-                            addAckLocation(tx, sd, matchedGapSequence, subscriptionKey);
-                        }
-                    }
-                    // The following method handles deleting un-referenced messages.
-                    removeAckLocation(tx, sd, subscriptionKey, extractSequenceId(prev));
+                if (command.getAck() != UNMATCHED) {
+                    sd.orderIndex.get(tx, sequence);
+                    byte priority = sd.orderIndex.lastGetPriority();
+                    sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(sequence, priority));
                 }
-
-                // Add it to the new location set.
-                addAckLocation(tx, sd, sequence, subscriptionKey);
+                // The following method handles deleting un-referenced messages.
+                removeAckLocation(tx, sd, subscriptionKey, sequence);
             }
 
         }
@@ -1127,20 +1110,17 @@ public class MessageDatabase extends Ser
             long ackLocation=NOT_ACKED;
             if (!command.getRetroactive()) {
                 ackLocation = sd.orderIndex.nextMessageId-1;
+            } else {
+                addAckLocationForRetroactiveSub(tx, sd, ackLocation, subscriptionKey);
             }
-
-            sd.subscriptionAcks.put(tx, subscriptionKey, ackLocation);
-            addAckLocation(tx, sd, ackLocation, subscriptionKey);
+            sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation));
         } else {
             // delete the sub...
             String subscriptionKey = command.getSubscriptionKey();
             sd.subscriptions.remove(tx, subscriptionKey);
-            Long prev = sd.subscriptionAcks.remove(tx, subscriptionKey);
-            if( prev!=null ) {
-                removeAckLocation(tx, sd, subscriptionKey, extractSequenceId(prev));
-            }
+            sd.subscriptionAcks.remove(tx, subscriptionKey);
+            removeAckLocationsForSub(tx, sd, subscriptionKey);
         }
-
     }
     
     /**
@@ -1318,7 +1298,64 @@ public class MessageDatabase extends Ser
             LocationMarshaller.INSTANCE.writePayload(object.location, dataOut);
         }
     }
- 
+
+    class LastAck {
+        long lastAckedSequence;
+        byte priority;
+
+        public LastAck(LastAck source) {
+            this.lastAckedSequence = source.lastAckedSequence;
+            this.priority = source.priority;
+        }
+
+        public LastAck() {
+            this.priority = MessageOrderIndex.HI;
+        }
+
+        public LastAck(long ackLocation) {
+            this.lastAckedSequence = ackLocation;
+            this.priority = MessageOrderIndex.HI;
+        }
+
+        public LastAck(long ackLocation, byte priority) {
+            this.lastAckedSequence = ackLocation;
+            this.priority = priority;
+        }
+
+        public String toString() {
+            return "[" + lastAckedSequence + ":" + priority + "]";
+        }
+    }
+
+    protected class LastAckMarshaller implements Marshaller<LastAck> {
+        
+        public void writePayload(LastAck object, DataOutput dataOut) throws IOException {
+            dataOut.writeLong(object.lastAckedSequence);
+            dataOut.writeByte(object.priority);
+        }
+
+        public LastAck readPayload(DataInput dataIn) throws IOException {
+            LastAck lastAcked = new LastAck();
+            lastAcked.lastAckedSequence = dataIn.readLong();
+            if (metadata.version >= 3) {
+                lastAcked.priority = dataIn.readByte();
+            }
+            return lastAcked;
+        }
+
+        public int getFixedSize() {
+            return 9;
+        }
+
+        public LastAck deepCopy(LastAck source) {
+            return new LastAck(source);
+        }
+
+        public boolean isDeepCopySupported() {
+            return true;
+        }
+    }
+
     class StoredDestination {
         
         MessageOrderIndex orderIndex = new MessageOrderIndex();
@@ -1327,7 +1364,7 @@ public class MessageDatabase extends Ser
 
         // These bits are only set for Topics
         BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
-        BTreeIndex<String, Long> subscriptionAcks;
+        BTreeIndex<String, LastAck> subscriptionAcks;
         HashMap<String, MessageOrderCursor> subscriptionCursors;
         BTreeIndex<Long, HashSet<String>> ackPositions;
     }
@@ -1342,7 +1379,7 @@ public class MessageDatabase extends Ser
 
             if (dataIn.readBoolean()) {
                 value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
-                value.subscriptionAcks = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
+                value.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, dataIn.readLong());
                 if (metadata.version >= 3) {
                     value.ackPositions = new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong());
                 } else {
@@ -1482,7 +1519,7 @@ public class MessageDatabase extends Ser
 
             if (topic) {
                 rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
-                rc.subscriptionAcks = new BTreeIndex<String, Long>(pageFile, tx.allocate());
+                rc.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, tx.allocate());
                 rc.ackPositions = new BTreeIndex<Long, HashSet<String>>(pageFile, tx.allocate());
             }
             metadata.destinations.put(tx, key, rc);
@@ -1510,7 +1547,7 @@ public class MessageDatabase extends Ser
             rc.subscriptions.load(tx);
 
             rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
-            rc.subscriptionAcks.setValueMarshaller(LongMarshaller.INSTANCE);
+            rc.subscriptionAcks.setValueMarshaller(new LastAckMarshaller());
             rc.subscriptionAcks.load(tx);
 
             rc.ackPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
@@ -1520,19 +1557,27 @@ public class MessageDatabase extends Ser
             rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>();
 
             if (metadata.version < 3) {
-                // on upgrade need to fill ackLocation
-                for (Iterator<Entry<String, Long>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
-                    Entry<String, Long> entry = iterator.next();
-                    addAckLocation(tx, rc, extractSequenceId(entry.getValue()), entry.getKey());
+
+                // on upgrade need to fill ackLocation with available messages past last ack
+                for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
+                    Entry<String, LastAck> entry = iterator.next();
+                    for (Iterator<Entry<Long, MessageKeys>> orderIterator =
+                            rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) {
+                        Long sequence = orderIterator.next().getKey();
+                        addAckLocation(tx, rc, sequence, entry.getKey());
+                    }
+                    // modify so it is upgraded                   
+                    rc.subscriptionAcks.put(tx, entry.getKey(), entry.getValue());
                 }
             }
             
             if (rc.orderIndex.nextMessageId == 0) {
                 // check for existing durable sub all acked out - pull next seq from acks as messages are gone
-                if (!rc.ackPositions.isEmpty(tx)) {
-                    Long lastAckedMessageId = rc.ackPositions.getLast(tx).getKey();
-                    if (lastAckedMessageId != NOT_ACKED) {
-                        rc.orderIndex.nextMessageId = lastAckedMessageId+1;
+                if (!rc.subscriptionAcks.isEmpty(tx)) {
+                    for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
+                        Entry<String, LastAck> entry = iterator.next();
+                        rc.orderIndex.nextMessageId =
+                                Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1);
                     }
                 }
             }
@@ -1546,11 +1591,6 @@ public class MessageDatabase extends Ser
         return rc;
     }
 
-    /**
-     * @param sd
-     * @param messageSequence
-     * @param subscriptionKey
-     */
     private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
         HashSet<String> hs = sd.ackPositions.get(tx, messageSequence);
         if (hs == null) {
@@ -1561,6 +1601,34 @@ public class MessageDatabase extends Ser
         sd.ackPositions.put(tx, messageSequence, hs);
     }
 
+    // new sub is interested in potentially all existing messages
+    private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
+        for (Iterator<Entry<Long, HashSet<String>>> iterator = sd.ackPositions.iterator(tx, messageSequence); iterator.hasNext(); ) {
+            Entry<Long, HashSet<String>> entry = iterator.next();
+            entry.getValue().add(subscriptionKey);
+            sd.ackPositions.put(tx, entry.getKey(), entry.getValue());
+        }
+    }
+
+    // on a new message add, all existing subs are interested in this message
+    private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence) throws IOException {
+        HashSet hs = new HashSet<String>();
+        for (Iterator<Entry<String, LastAck>> iterator = sd.subscriptionAcks.iterator(tx); iterator.hasNext();) {
+            Entry<String, LastAck> entry = iterator.next();
+            hs.add(entry.getKey());
+        }
+        sd.ackPositions.put(tx, messageSequence, hs);
+    }
+
+    private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
+        if (!sd.ackPositions.isEmpty(tx)) {        
+            Long end = sd.ackPositions.getLast(tx).getKey();
+            for (Long sequence = sd.ackPositions.getFirst(tx).getKey(); sequence <= end; sequence++) {
+                removeAckLocation(tx, sd, subscriptionKey, sequence);
+            }
+        }
+    }
+
     /**
      * @param tx
      * @param sd
@@ -1578,21 +1646,15 @@ public class MessageDatabase extends Ser
                     HashSet<String> firstSet = sd.ackPositions.getFirst(tx).getValue();
                     sd.ackPositions.remove(tx, sequenceId);
 
-                    // Did we just empty out the first set in the
-                    // ordered list of ack locations? Then it's time to
-                    // delete some messages.
-                    if (hs == firstSet) {
-
-                        // Find all the entries that need to get deleted.
-                        ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
-                        sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
-
-                        // Do the actual deletes.
-                        for (Entry<Long, MessageKeys> entry : deletes) {
-                            sd.locationIndex.remove(tx, entry.getValue().location);
-                            sd.messageIdIndex.remove(tx,entry.getValue().messageId);
-                            sd.orderIndex.remove(tx,entry.getKey());
-                        }
+                    // Find all the entries that need to get deleted.
+                    ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
+                    sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
+
+                    // Do the actual deletes.
+                    for (Entry<Long, MessageKeys> entry : deletes) {
+                        sd.locationIndex.remove(tx, entry.getValue().location);
+                        sd.messageIdIndex.remove(tx, entry.getValue().messageId);
+                        sd.orderIndex.remove(tx, entry.getKey());
                     }
                 }
             }
@@ -1905,7 +1967,7 @@ public class MessageDatabase extends Ser
         this.databaseLockedWaitDelay = databaseLockedWaitDelay;
     }
     
-    
+
     class MessageOrderCursor{
         long defaultCursorPosition;
         long lowPriorityCursorPosition;
@@ -1960,7 +2022,11 @@ public class MessageDatabase extends Ser
         }
     }
     
-    class MessageOrderIndex{
+    class MessageOrderIndex {
+        static final byte HI = 9;
+        static final byte LO = 0;
+        static final byte DEF = 4;
+
         long nextMessageId;
         BTreeIndex<Long, MessageKeys> defaultPriorityIndex;
         BTreeIndex<Long, MessageKeys> lowPriorityIndex;
@@ -1969,8 +2035,8 @@ public class MessageDatabase extends Ser
         Long lastDefaultKey;
         Long lastHighKey;
         Long lastLowKey;
-        
-        
+        byte lastGetPriority;
+
         MessageKeys remove(Transaction tx, Long key) throws IOException {
             MessageKeys result = defaultPriorityIndex.remove(tx, key);
             if (result == null && highPriorityIndex!=null) {
@@ -2072,6 +2138,29 @@ public class MessageDatabase extends Ser
                 }
             }
         }
+
+        void setBatch(Transaction tx, LastAck last) throws IOException {
+            setBatch(tx, last.lastAckedSequence);
+            if (cursor.defaultCursorPosition == 0
+                    && cursor.highPriorityCursorPosition == 0
+                    && cursor.lowPriorityCursorPosition == 0) {
+                long next = last.lastAckedSequence + 1;
+                switch (last.priority) {
+                    case DEF:
+                        cursor.defaultCursorPosition = next;
+                        cursor.highPriorityCursorPosition = next;
+                        break;
+                    case HI:
+                        cursor.highPriorityCursorPosition = next;
+                        break;
+                    case LO:
+                        cursor.lowPriorityCursorPosition = next;
+                        cursor.defaultCursorPosition = next;
+                        cursor.highPriorityCursorPosition = next;
+                        break;
+                }
+            }
+        }
         
         void stoppedIterating() {
             if (lastDefaultKey!=null) {
@@ -2116,7 +2205,12 @@ public class MessageDatabase extends Ser
                 result = highPriorityIndex.get(tx, key);
                 if (result == null) {
                     result = lowPriorityIndex.get(tx, key);
+                    lastGetPriority = LO;
+                } else {
+                  lastGetPriority = HI;
                 }
+            } else {
+                lastGetPriority = DEF;
             }
             return result;
         }
@@ -2138,7 +2232,11 @@ public class MessageDatabase extends Ser
         Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException{
             return new MessageOrderIterator(tx,m);
         }
-        
+
+        public byte lastGetPriority() {
+            return lastGetPriority;
+        }
+
         class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{
             Iterator<Entry<Long, MessageKeys>>currentIterator;
             final Iterator<Entry<Long, MessageKeys>>highIterator;

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java?rev=1038276&r1=1038275&r2=1038276&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java Tue Nov 23 18:54:55 2010
@@ -23,6 +23,8 @@ import org.apache.activemq.command.Activ
 import org.apache.activemq.util.IOHelper;
 
 import javax.jms.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
@@ -33,11 +35,15 @@ import java.io.FileNotFoundException;
  */
 public class KahaDBVersionTest extends TestCase {
 
+    static final Log LOG = LogFactory.getLog(KahaDBVersionTest.class);
     final static File VERSION_1_DB= new File("src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion1");
     final static File VERSION_2_DB= new File("src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion2");
+
+    BrokerService broker = null;
+
     protected BrokerService createBroker(KahaDBPersistenceAdapter kaha) throws Exception {
 
-        BrokerService broker = new BrokerService();
+        broker = new BrokerService();
         broker.setUseJmx(false);
         broker.setPersistenceAdapter(kaha);
         broker.start();
@@ -45,6 +51,11 @@ public class KahaDBVersionTest extends T
 
     }
 
+    protected void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+        }
+    }
         
     public void XtestCreateStore() throws Exception {
         KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
@@ -57,12 +68,12 @@ public class KahaDBVersionTest extends T
         Connection connection = cf.createConnection();
         connection.setClientID("test");
         connection.start();
-        producerSomeMessages(connection);
+        producerSomeMessages(connection, 1000);
         connection.close();
         broker.stop();
     }
 
-    private void producerSomeMessages(Connection connection) throws Exception {
+    private void producerSomeMessages(Connection connection, int numToSend) throws Exception {
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Topic topic = session.createTopic("test.topic");
         Queue queue = session.createQueue("test.queue");
@@ -70,15 +81,17 @@ public class KahaDBVersionTest extends T
         consumer.close();
         MessageProducer producer = session.createProducer(topic);
         producer.setPriority(9);
-        for (int i =0; i < 1000; i++) {
+        for (int i =0; i < numToSend; i++) {
             Message msg = session.createTextMessage("test message:"+i);
             producer.send(msg);
         }
+        LOG.info("sent "  + numToSend +" to topic");
         producer = session.createProducer(queue);
-        for (int i =0; i < 1000; i++) {
+        for (int i =0; i < numToSend; i++) {
             Message msg = session.createTextMessage("test message:"+i);
             producer.send(msg);
         }
+        LOG.info("sent " + numToSend +" to queue");
     }
 
     public void testVersion1Conversion() throws Exception{
@@ -94,6 +107,7 @@ public class KahaDBVersionTest extends T
         File testDir = new File("target/activemq-data/kahadb/versionDB");
         IOHelper.deleteFile(testDir);
         IOHelper.copyFile(existingStore, testDir);
+        final int numToSend = 1000;
 
         // on repeat store will be upgraded
         for (int repeats = 0; repeats < 3; repeats++) {
@@ -111,21 +125,27 @@ public class KahaDBVersionTest extends T
 
             if (repeats > 0) {
                 // upgraded store will be empty so generated some more messages
-                producerSomeMessages(connection);
+                producerSomeMessages(connection, numToSend);
             }
 
             MessageConsumer queueConsumer = session.createConsumer(queue);
-            for (int i = 0; i < 1000; i++) {
+            int count = 0;
+            for (int i = 0; i < (repeats == 0 ? 1000 : numToSend); i++) {
                 TextMessage msg = (TextMessage) queueConsumer.receive(10000);
+                count++;
                 //System.err.println(msg.getText());
                 assertNotNull(msg);
             }
+            LOG.info("Consumed " + count + " from queue");
+            count = 0;
             MessageConsumer topicConsumer = session.createDurableSubscriber(topic, "test");
-            for (int i = 0; i < 1000; i++) {
+            for (int i = 0; i < (repeats == 0 ? 1000 : numToSend); i++) {
                 TextMessage msg = (TextMessage) topicConsumer.receive(10000);
+                count++;
                 //System.err.println(msg.getText());
                 assertNotNull(msg);
             }
+            LOG.info("Consumed " + count + " from topic");
             connection.close();
             
             broker.stop();

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessTest.java?rev=1038276&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessTest.java Tue Nov 23 18:54:55 2010
@@ -0,0 +1,678 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Test;
+
+import javax.jms.*;
+import java.io.File;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Vector;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+// see https://issues.apache.org/activemq/browse/AMQ-2985
+// this demonstrated receiving old messages eventually along with validating order receipt
+public class DurableSubProcessTest extends org.apache.activemq.TestSupport  {
+    private static final Log LOG = LogFactory.getLog(DurableSubProcessTest.class);
+    public static final long RUNTIME = 4 * 60 * 1000;
+
+    public static final int SERVER_SLEEP = 2 * 1000; // max
+    public static final int CARGO_SIZE = 10; // max
+
+    public static final int MAX_CLIENTS = 7;
+    public static final Random CLIENT_LIFETIME = new Random(30 * 1000, 2 * 60 * 1000);
+    public static final Random CLIENT_ONLINE = new Random(2 * 1000, 15 * 1000);
+    public static final Random CLIENT_OFFLINE = new Random(1 * 1000, 20 * 1000);
+
+    public static final boolean PERSISTENT_BROKER = true;
+    public static final boolean ALLOW_SUBSCRIPTION_ABANDONMENT = true;
+
+
+    private BrokerService broker;
+    private ActiveMQTopic topic;
+
+    private ClientManager clientManager;
+    private Server server;
+    private HouseKeeper houseKeeper;
+
+    static final Vector<Throwable> exceptions = new Vector<Throwable>(); 
+
+    @Test
+    public void testProcess() {
+        try {
+            server.start();
+            clientManager.start();
+
+            if (ALLOW_SUBSCRIPTION_ABANDONMENT)
+                houseKeeper.start();
+
+            Thread.sleep(RUNTIME);
+            assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
+        }
+        catch (Throwable e) {
+            exit("DurableSubProcessTest.testProcess failed.", e);
+        }
+        LOG.info("DONE.");
+    }
+
+    /**
+     * Creates batch of messages in a transaction periodically.
+     * The last message in the transaction is always a special
+     * message what contains info about the whole transaction.
+     * <p>Notifies the clients about the created messages also.
+     */
+    final class Server extends Thread {
+
+        final String url = "vm://" + DurableSubProcessTest.this.getName() + "?" +
+                "jms.redeliveryPolicy.maximumRedeliveries=2&jms.redeliveryPolicy.initialRedeliveryDelay=500&" +
+                "jms.producerWindowSize=20971520&jms.prefetchPolicy.all=100&" +
+                "jms.copyMessageOnSend=false&jms.disableTimeStampsByDefault=false&" +
+                "jms.alwaysSyncSend=true&jms.dispatchAsync=false&" +
+                "jms.watchTopicAdvisories=false&" +
+                "waitForStart=200&create=false";
+        final ConnectionFactory cf = new ActiveMQConnectionFactory(url);
+
+        final Object sendMutex = new Object();
+        final String[] cargos = new String[500];
+
+        int transRover = 0;
+        int messageRover = 0;
+
+        public Server() {
+            super("Server");
+            setDaemon(true);
+        }
+
+        @Override
+        public void run() {
+            try {
+                while (true) {
+                    DurableSubProcessTest.sleepRandom(SERVER_SLEEP);
+                    send();
+                }
+            }
+            catch (Throwable e) {
+                exit("Server.run failed", e);
+            }
+        }
+
+        public void send() throws JMSException {
+            // do not create new clients now
+            // ToDo: Test this case later.
+            synchronized (sendMutex) {
+                int trans = ++transRover;
+                boolean relevantTrans = random(2) > 1;
+                ClientType clientType = relevantTrans ? ClientType.randomClientType() : null; // sends this types
+                int count = random(200);
+
+                LOG.info("Sending Trans[id=" + trans + ", count=" + count + ", clientType=" + clientType + "]");
+
+                Connection con = cf.createConnection();
+                Session sess = con.createSession(true, Session.AUTO_ACKNOWLEDGE);
+                MessageProducer prod = sess.createProducer(null);
+
+                for (int i = 0; i < count; i++) {
+                    Message message = sess.createMessage();
+                    message.setIntProperty("ID", ++messageRover);
+                    String type = clientType != null ? clientType.randomMessageType() : ClientType.randomNonRelevantMessageType();
+                    message.setStringProperty("TYPE", type);
+
+                    if (CARGO_SIZE > 0)
+                        message.setStringProperty("CARGO", getCargo(CARGO_SIZE));
+
+                    prod.send(topic, message);
+                    clientManager.onServerMessage(message);
+                }
+
+                Message message = sess.createMessage();
+                message.setIntProperty("ID", ++messageRover);
+                message.setIntProperty("TRANS", trans);
+                message.setBooleanProperty("COMMIT", true);
+                message.setBooleanProperty("RELEVANT", relevantTrans);
+                prod.send(topic, message);
+                clientManager.onServerMessage(message);
+
+                sess.commit();
+                sess.close();
+                con.close();
+            }
+        }
+
+        private String getCargo(int length) {
+            if (length == 0)
+                return null;
+
+            if (length < cargos.length) {
+                String result = cargos[length];
+                if (result == null) {
+                    result = getCargoImpl(length);
+                    cargos[length] = result;
+                }
+                return result;
+            }
+            return getCargoImpl(length);
+        }
+
+        private String getCargoImpl(int length) {
+            StringBuilder sb = new StringBuilder(length);
+            for (int i = length; --i >=0; ) {
+                sb.append('a');
+            }
+            return sb.toString();
+        }
+    }
+
+    /**
+     * Clients listen on different messages in the topic.
+     * The 'TYPE' property helps the client to select the
+     * proper messages.
+     */
+    private enum ClientType {
+        A ("a", "b", "c"),
+        B ("c", "d", "e"),
+        C ("d", "e", "f"),
+        D ("g", "h");
+
+        public final String[] messageTypes;
+        public final HashSet<String> messageTypeSet;
+        public final String selector;
+
+        ClientType(String... messageTypes) {
+            this.messageTypes = messageTypes;
+            messageTypeSet = new HashSet<String>(Arrays.asList(messageTypes));
+
+            StringBuilder sb = new StringBuilder("TYPE in (");
+            for (int i = 0; i < messageTypes.length; i++) {
+                if (i > 0)
+                    sb.append(", ");
+                sb.append('\'').append(messageTypes[i]).append('\'');
+            }
+            sb.append(')');
+            selector = sb.toString();
+        }
+
+        public static ClientType randomClientType() {
+            return values()[DurableSubProcessTest.random(values().length - 1)];
+        }
+
+        public final String randomMessageType() {
+            return messageTypes[DurableSubProcessTest.random(messageTypes.length - 1)];
+        }
+
+        public static String randomNonRelevantMessageType() {
+            return Integer.toString(DurableSubProcessTest.random(20));
+        }
+
+        public final boolean isRelevant(String messageType) {
+            return messageTypeSet.contains(messageType);
+        }
+
+        @Override
+        public final String toString() {
+            return this.name() /*+ '[' + selector + ']'*/;
+        }
+    }
+
+    /**
+     * Creates new cliens.
+     */
+    private final class ClientManager extends Thread {
+
+        private int clientRover = 0;
+
+        private final CopyOnWriteArrayList<Client> clients = new CopyOnWriteArrayList<Client>();
+
+        public ClientManager() {
+            super("ClientManager");
+            setDaemon(true);
+        }
+
+        @Override
+        public void run() {
+            try {
+                while (true) {
+                    if (clients.size() < MAX_CLIENTS)
+                        createNewClient();
+
+                    int size = clients.size();
+                    sleepRandom(size * 3 * 1000, size * 6 * 1000);
+                }
+            }
+            catch (Throwable e) {
+                exit("ClientManager.run failed.", e);
+            }
+        }
+
+        private void createNewClient() throws JMSException {
+            ClientType type = ClientType.randomClientType();
+
+            Client client;
+            synchronized (server.sendMutex) {
+                client = new Client(++clientRover, type, CLIENT_LIFETIME, CLIENT_ONLINE, CLIENT_OFFLINE);
+                clients.add(client);
+            }
+            client.start();
+
+            LOG.info(client.toString() + " created. " + this);
+        }
+
+        public void removeClient(Client client) {
+            clients.remove(client);
+        }
+
+        public void onServerMessage(Message message) throws JMSException {
+            for (Client client: clients) {
+                client.onServerMessage(message);
+            }
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder sb = new StringBuilder("ClientManager[count=");
+            sb.append(clients.size());
+            sb.append(", clients=");
+            boolean sep = false;
+            for (Client client: clients) {
+                if (sep) sb.append(", ");
+                else sep = true;
+                sb.append(client.toString());
+            }
+            sb.append(']');
+            return sb.toString();
+        }
+    }
+
+    /**
+     * Consumes massages from a durable subscription.
+     * Goes online/offline periodically. Checks the incoming messages
+     * against the sent messages of the server.
+     */
+    private final class Client extends Thread {
+
+        String url = "failover:(tcp://localhost:61656?wireFormat.maxInactivityDuration=0)?" +
+                "jms.watchTopicAdvisories=false&" +
+                "jms.alwaysSyncSend=true&jms.dispatchAsync=true&" +
+                "jms.producerWindowSize=20971520&" +
+                "jms.copyMessageOnSend=false&" +
+                "initialReconnectDelay=100&maxReconnectDelay=30000&maxReconnectAttempts=0&" +
+                "useExponentialBackOff=true";
+        final ConnectionFactory cf = new ActiveMQConnectionFactory(url);
+
+        public static final String SUBSCRIPTION_NAME = "subscription";
+
+        private final int id;
+        private final String conClientId;
+
+        private final Random lifetime;
+        private final Random online;
+        private final Random offline;
+
+        private final ClientType clientType;
+        private final String selector;
+
+        private final ConcurrentLinkedQueue<Message> waitingList = new ConcurrentLinkedQueue<Message>();
+
+        public Client(int id, ClientType clientType, Random lifetime, Random online, Random offline) throws JMSException {
+            super("Client" + id);
+            setDaemon(true);
+
+            this.id = id;
+            conClientId = "cli" + id;
+            this.clientType = clientType;
+            selector = "(COMMIT = true and RELEVANT = true) or " + clientType.selector;
+
+            this.lifetime = lifetime;
+            this.online = online;
+            this.offline = offline;
+
+            subscribe();
+        }
+
+        @Override
+        public void run() {
+            long end = System.currentTimeMillis() + lifetime.next();
+            try {
+                boolean sleep = false;
+                while (true) {
+                    long max = end - System.currentTimeMillis();
+                    if (max <= 0)
+                        break;
+
+                    if (sleep) offline.sleepRandom();
+                    else sleep = true;
+
+                    process(online.next());
+                }
+
+                if (!ALLOW_SUBSCRIPTION_ABANDONMENT || random(1) > 0)
+                    unsubscribe();
+                else {
+                    LOG.info("Client abandon the subscription. " + this);
+
+                    // housekeeper should sweep these abandoned subscriptions
+                    houseKeeper.abandonedSubscriptions.add(conClientId);
+                }
+            }
+            catch (Throwable e) {
+                exit(toString() + " failed.", e);
+            }
+
+            clientManager.removeClient(this);
+            LOG.info(toString() + " DONE.");
+        }
+
+        private void process(long millis) throws JMSException {
+            long end = System.currentTimeMillis() + millis;
+            long hardEnd = end + 2000; // wait to finish the transaction.
+            boolean inTransaction = false;
+            int transCount = 0;
+
+            LOG.info(toString() + " ONLINE.");
+            Connection con = openConnection();
+            Session sess = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+            MessageConsumer consumer = sess.createDurableSubscriber(topic, SUBSCRIPTION_NAME, selector, false);
+            try {
+                do {
+                    long max = end - System.currentTimeMillis();
+                    if (max <= 0) {
+                        if (!inTransaction)
+                            break;
+
+                        max = hardEnd - System.currentTimeMillis();
+                        if (max <= 0)
+                            exit("" + this + " failed: Transaction is not finished.");
+                    }
+
+                    Message message = consumer.receive(max);
+                    if (message == null)
+                        continue;
+
+                    onClientMessage(message);
+
+                    if (message.propertyExists("COMMIT")) {
+                        message.acknowledge();
+
+                        LOG.info("Received Trans[id=" + message.getIntProperty("TRANS") + ", count=" + transCount + "] in " + this + ".");
+
+                        inTransaction = false;
+                        transCount = 0;
+                    }
+                    else {
+                        inTransaction = true;
+                        transCount++;
+                    }
+                } while (true);
+            }
+            finally {
+                sess.close();
+                con.close();
+
+                LOG.info(toString() + " OFFLINE.");
+
+                // Check if the messages are in the waiting
+                // list for long time.
+                Message topMessage = waitingList.peek();
+                if (topMessage != null)
+                    checkDeliveryTime(topMessage);
+            }
+        }
+
+        public void onServerMessage(Message message) throws JMSException {
+            if (Boolean.TRUE.equals(message.getObjectProperty("COMMIT"))) {
+                if (Boolean.TRUE.equals(message.getObjectProperty("RELEVANT")))
+                    waitingList.add(message);
+            }
+            else {
+                String messageType = message.getStringProperty("TYPE");
+                if (clientType.isRelevant(messageType))
+                    waitingList.add(message);
+            }
+        }
+
+        public void onClientMessage(Message message) {
+            Message serverMessage = waitingList.poll();
+            try {
+                if (serverMessage == null)
+                    exit("" + this + " failed: There is no next server message, but received: " + message);
+
+                Integer receivedId = (Integer) message.getObjectProperty("ID");
+                Integer serverId = (Integer) serverMessage.getObjectProperty("ID");
+                if (receivedId == null || serverId == null)
+                    exit("" + this + " failed: message ID not found.\r\n" +
+                            " received: " + message + "\r\n" +
+                            "   server: " + serverMessage);
+
+                if (!serverId.equals(receivedId))
+                    exit("" + this + " failed: Received wrong message.\r\n" +
+                            " received: " + message + "\r\n" +
+                            "   server: " + serverMessage);
+
+                checkDeliveryTime(message);
+            }
+            catch (Throwable e) {
+                exit("" + this + ".onClientMessage failed.\r\n" +
+                        " received: " + message + "\r\n" +
+                        "   server: " + serverMessage, e);
+            }
+        }
+
+        /**
+         * Checks if the message was not delivered fast enough.
+         */
+        public void checkDeliveryTime(Message message) throws JMSException {
+            long creation = message.getJMSTimestamp();
+            long min = System.currentTimeMillis() - (offline.max + online.min);
+
+            if (min > creation) {
+                SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss.SSS");
+                exit("" + this + ".checkDeliveryTime failed. Message time: " + df.format(new Date(creation)) + ", min: " + df.format(new Date(min)) + "\r\n" + message);
+            }
+        }
+
+        private Connection openConnection() throws JMSException {
+            Connection con = cf.createConnection();
+            con.setClientID(conClientId);
+            con.start();
+            return con;
+        }
+
+        private void subscribe() throws JMSException {
+            Connection con = openConnection();
+            Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            session.createDurableSubscriber(topic, SUBSCRIPTION_NAME, selector, true);
+            session.close();
+            con.close();
+        }
+
+        private void unsubscribe() throws JMSException {
+            Connection con = openConnection();
+            Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            session.unsubscribe(SUBSCRIPTION_NAME);
+            session.close();
+            con.close();
+        }
+
+        @Override
+        public String toString() {
+            return "Client[id=" + id + ", type=" + clientType + "]";
+        }
+    }
+
+    /**
+     * Sweeps out not-used durable subscriptions.
+     */
+    private final class HouseKeeper extends Thread {
+
+        private HouseKeeper() {
+            super("HouseKeeper");
+            setDaemon(true);
+        }
+
+        public final CopyOnWriteArrayList<String> abandonedSubscriptions = new CopyOnWriteArrayList<String>();
+
+        @Override
+        public void run() {
+            while (true) {
+                try {
+                    Thread.sleep(60 * 1000);
+                    sweep();
+                }
+                catch (InterruptedException ex) {
+                    break;
+                }
+                catch (Throwable e) {
+                    Exception log = new Exception("HouseKeeper failed.", e);
+                    log.printStackTrace();
+                }
+            }
+        }
+
+        private void sweep() throws Exception {
+            LOG.info("Housekeeper sweeping.");
+
+            int closed = 0;
+            ArrayList<String> sweeped = new ArrayList<String>();
+            try {
+                for (String clientId: abandonedSubscriptions) {
+                    sweeped.add(clientId);
+                    LOG.info("Sweeping out subscription of " + clientId + ".");
+                    broker.getAdminView().destroyDurableSubscriber(clientId, Client.SUBSCRIPTION_NAME);
+                    closed++;
+                }
+            }
+            finally {
+                abandonedSubscriptions.removeAll(sweeped);
+            }
+
+            LOG.info("Housekeeper sweeped out " + closed + " subscriptions.");
+        }
+    }
+
+    public static int random(int max) {
+        return (int) (Math.random() * (max + 1));
+    }
+
+    public static int random(int min, int max) {
+        return random(max - min) + min;
+    }
+
+    public static void sleepRandom(int maxMillis) throws InterruptedException {
+        Thread.sleep(random(maxMillis));
+    }
+
+    public static void sleepRandom(int minMillis, int maxMillis) throws InterruptedException {
+        Thread.sleep(random(minMillis, maxMillis));
+    }
+
+    public static final class Random {
+
+        final int min;
+        final int max;
+
+        Random(int min, int max) {
+            this.min = min;
+            this.max = max;
+        }
+
+        public int next() {
+            return random(min, max);
+        }
+
+        public void sleepRandom() throws InterruptedException {
+            DurableSubProcessTest.sleepRandom(min, max);
+        }
+    }
+
+    public static void exit(String message) {
+        exit(message, null);
+    }
+
+    public static void exit(String message, Throwable e) {
+        Throwable log = new RuntimeException(message, e);
+        log.printStackTrace();
+        LOG.error(message, e);
+        exceptions.add(e);
+        fail(message);
+    }
+
+    protected void setUp() throws Exception {
+        topic = (ActiveMQTopic) createDestination();
+        startBroker();
+
+        clientManager = new ClientManager();
+        server = new Server();
+        houseKeeper = new HouseKeeper();
+
+        super.setUp();
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+
+        destroyBroker();
+    }
+
+    private void startBroker() throws Exception {
+        startBroker(true);
+    }
+
+    private void startBroker(boolean deleteAllMessages) throws Exception {
+        if (broker != null)
+            return;
+
+        broker = BrokerFactory.createBroker("broker:(vm://localhost)");
+        broker.setBrokerName(getName());
+        broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+
+        if (PERSISTENT_BROKER) {
+            broker.setPersistent(true);
+            KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
+            persistenceAdapter.setDirectory(new File("activemq-data/" + getName()));
+            broker.setPersistenceAdapter(persistenceAdapter);
+        }
+        else
+            broker.setPersistent(false);
+
+        broker.addConnector("tcp://localhost:61656");
+
+        broker.getSystemUsage().getMemoryUsage().setLimit(256 * 1024 * 1024);
+        broker.getSystemUsage().getTempUsage().setLimit(256 * 1024 * 1024);
+        broker.getSystemUsage().getStoreUsage().setLimit(256 * 1024 * 1024);
+
+        broker.start();
+    }
+
+    private void destroyBroker() throws Exception {
+        if (broker == null)
+            return;
+
+        broker.stop();
+        broker = null;
+    }
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java?rev=1038276&r1=1038275&r2=1038276&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java Tue Nov 23 18:54:55 2010
@@ -466,7 +466,7 @@ public class DurableSubscriptionOfflineT
         assertEquals("offline consumer got all", sent, listener.count);
     }    
 
-    public void x_initCombosForTestMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception {
+    public void initCombosForTestMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception {
         this.addCombinationValues("defaultPersistenceAdapter",
                 new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
     }
@@ -639,7 +639,7 @@ public class DurableSubscriptionOfflineT
 
         int filtered = 0;
         for (int i = 0; i < 10; i++) {
-            boolean filter = (i %2 == 0); //(int) (Math.random() * 2) >= 1;
+            boolean filter = (int) (Math.random() * 2) >= 1;
             if (filter)
                 filtered++;
 
@@ -664,7 +664,7 @@ public class DurableSubscriptionOfflineT
         producer = session.createProducer(null);
 
         for (int i = 0; i < 10; i++) {
-            boolean filter = (i %2 == 0); //(int) (Math.random() * 2) >= 1;
+            boolean filter = (int) (Math.random() * 2) >= 1;
             if (filter)
                 filtered++;
 
@@ -702,6 +702,198 @@ public class DurableSubscriptionOfflineT
         assertEquals(filtered, listener3.count);
     }
 
+
+    public void testInterleavedOfflineSubscriptionCanConsumeAfterUnsub() throws Exception {
+        // create offline subs 1
+        Connection con = createConnection("offCli1");
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        session.close();
+        con.close();
+
+        // create offline subs 2
+        con = createConnection("offCli2");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", null, true);
+        session.close();
+        con.close();
+
+
+        // send messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(null);
+
+        int sent = 0;
+        for (int i = 0; i < 10; i++) {
+            boolean filter = (int) (Math.random() * 2) >= 1;
+
+            sent++;
+
+            Message message = session.createMessage();
+            message.setStringProperty("filter", filter ? "true" : "false");
+            producer.send(topic, message);
+        }
+
+        Thread.sleep(1 * 1000);
+
+        Connection con2 = createConnection("offCli1");
+        Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session2.unsubscribe("SubsId");
+        session2.close();
+        con2.close();
+
+        // consume all messages
+        con = createConnection("offCli2");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
+        Listener listener = new Listener("SubsId");
+        consumer.setMessageListener(listener);
+
+        Thread.sleep(3 * 1000);
+
+        session.close();
+        con.close();
+
+        assertEquals("offline consumer got all", sent, listener.count);
+    }
+
+
+    public void testUnmatchedSubUnsubscribeDeletesAll() throws Exception {
+        // create offline subs 1
+        Connection con = createConnection("offCli1");
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        session.close();
+        con.close();
+
+        // send messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(null);
+
+        int filtered = 0;
+        for (int i = 0; i < 10; i++) {
+            boolean filter = (i %2 == 0); //(int) (Math.random() * 2) >= 1;
+            if (filter)
+                filtered++;
+
+            Message message = session.createMessage();
+            message.setStringProperty("filter", filter ? "true" : "false");
+            producer.send(topic, message);
+        }
+
+        LOG.info("sent: " + filtered);
+        Thread.sleep(1 * 1000);
+        session.close();
+        con.close();
+
+        // test offline subs
+        con = createConnection("offCli1");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.unsubscribe("SubsId");
+        session.close();
+        con.close();
+
+
+        con = createConnection("offCli1");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        Listener listener = new Listener();
+        consumer.setMessageListener(listener);
+
+        Thread.sleep(3 * 1000);
+
+        session.close();
+        con.close();
+
+        assertEquals(0, listener.count);
+    }
+
+
+    public void testAllConsumed() throws Exception {
+        final String filter = "filter = 'true'";
+        Connection con = createConnection("cli1");
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", filter, true);
+        session.close();
+        con.close();
+
+        con = createConnection("cli2");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", filter, true);
+        session.close();
+        con.close();
+
+        // send messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(null);
+
+        int sent = 0;
+        for (int i = 0; i < 10; i++) {
+            Message message = session.createMessage();
+            message.setStringProperty("filter", "true");
+            producer.send(topic, message);
+            sent++;
+        }
+
+        LOG.info("sent: " + sent);
+        Thread.sleep(1 * 1000);
+        session.close();
+        con.close();
+
+        con = createConnection("cli1");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
+        Listener listener = new Listener();
+        consumer.setMessageListener(listener);
+        Thread.sleep(3 * 1000);
+        session.close();
+        con.close();
+
+        assertEquals(sent, listener.count);
+
+        LOG.info("cli2 pull 2");
+        con = createConnection("cli2");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
+        assertNotNull("got message", consumer.receive(2000));
+        assertNotNull("got message", consumer.receive(2000));
+        session.close();
+        con.close();
+
+
+        // send messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        producer = session.createProducer(null);
+
+        sent = 0;
+        for (int i = 0; i < 2; i++) {
+            Message message = session.createMessage();
+            message.setStringProperty("filter", i==1 ? "true" : "false");
+            producer.send(topic, message);
+            sent++;
+        }
+        LOG.info("sent: " + sent);
+        Thread.sleep(1 * 1000);
+        session.close();
+        con.close();
+ 
+        LOG.info("cli1 again, should get 1 new ones");
+        con = createConnection("cli1");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
+        listener = new Listener();
+        consumer.setMessageListener(listener);
+        Thread.sleep(3 * 1000);
+        session.close();
+        con.close();
+
+        assertEquals(1, listener.count);
+    }
+
     public static class Listener implements MessageListener {
         int count = 0;
         String id = null;



Mime
View raw message