activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1036524 [1/2] - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/store/kahadb/ test/java/org/apache/activemq/store/kahadb/ test/java/org/apache/activemq/usecases/ test...
Date Thu, 18 Nov 2010 16:44:51 GMT
Author: gtully
Date: Thu Nov 18 16:44:50 2010
New Revision: 1036524

URL: http://svn.apache.org/viewvc?rev=1036524&view=rev
Log:
resolve issue with kahadb durable subs with selectors after restart, persist the ack locations,
kahadb version to 3 with auto upgrade from 1 or 2. https://issues.apache.org/activemq/browse/AMQ-2985

Added:
    activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion2/
    activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion2/db-1.log
  (with props)
    activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion2/db.data
    activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion2/db.redo
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=1036524&r1=1036523&r2=1036524&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
Thu Nov 18 16:44:50 2010
@@ -53,8 +53,8 @@ public abstract class AbstractStoreCurso
     
     public final synchronized void start() throws Exception{
         if (!isStarted()) {
-            super.start();
             clear();
+            super.start();      
             resetBatch();
             this.size = getStoreSize();
             this.storeHasMessages=this.size > 0;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=1036524&r1=1036523&r2=1036524&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
Thu Nov 18 16:44:50 2010
@@ -928,7 +928,7 @@ public class KahaDBStore extends Message
                         ActiveMQDestination dest = convert(entry.getKey());
                         if (dest.isTopic()) {
                             StoredDestination loadedStore = getStoredDestination(convert(dest),
tx);
-                            if (loadedStore.ackPositions.isEmpty()) {
+                            if (loadedStore.ackPositions.isEmpty(tx)) {
                                 isEmptyTopic = true;
                             }
                         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1036524&r1=1036523&r2=1036524&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Thu Nov 18 16:44:50 2010
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.store.kahadb;
 
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -100,7 +101,7 @@ public class MessageDatabase extends Ser
     static final long NOT_ACKED = -1;
     static final long UNMATCHED_SEQ = -2;
 
-    static final int VERSION = 2;
+    static final int VERSION = 3;
 
 
     protected class Metadata {
@@ -165,9 +166,7 @@ public class MessageDatabase extends Ser
             } else {
                 os.writeBoolean(false);
             }
-            if (version > 1) {
-               os.writeInt(version);
-            }
+            os.writeInt(VERSION);
         }
     }
 
@@ -255,8 +254,6 @@ public class MessageDatabase extends Ser
                     metadata.destinations.load(tx);
                 }
             });
-            pageFile.flush();
-            
             // Load up all the destinations since we need to scan all the indexes to figure
out which journal files can be deleted.
             // Perhaps we should just keep an index of file
             storedDestinations.clear();
@@ -269,6 +266,7 @@ public class MessageDatabase extends Ser
                     }
                 }
             });
+            pageFile.flush();            
         }finally {
             this.indexLock.writeLock().unlock();
         }
@@ -985,7 +983,7 @@ public class MessageDatabase extends Ser
 
         // Skip adding the message to the index if this is a topic and there are
         // no subscriptions.
-        if (sd.subscriptions != null && sd.ackPositions.isEmpty()) {
+        if (sd.subscriptions != null && sd.ackPositions.isEmpty(tx)) {
             return;
         }
 
@@ -1055,18 +1053,19 @@ public class MessageDatabase extends Ser
                 }
 
                 Long prev = sd.subscriptionAcks.put(tx, subscriptionKey, ackSequenceToStore);
-
-                if (ackSequenceToStore != sequence) {
-                    // unmatched, need to add ack locations for the intermediate sequences
-                    for (long matchedGapSequence = extractSequenceId(prev) + 1; matchedGapSequence
< sequence; matchedGapSequence++) {
-                        addAckLocation(sd, matchedGapSequence, subscriptionKey);
+                if (prev != null) {
+                    if (ackSequenceToStore != sequence) {
+                        // unmatched, need to add ack locations for the intermediate sequences
+                        for (long matchedGapSequence = extractSequenceId(prev) + 1; matchedGapSequence
< sequence; matchedGapSequence++) {
+                            addAckLocation(tx, sd, matchedGapSequence, subscriptionKey);
+                        }
                     }
+                    // The following method handles deleting un-referenced messages.
+                    removeAckLocation(tx, sd, subscriptionKey, extractSequenceId(prev));
                 }
-                // The following method handles deleting un-referenced messages.
-                removeAckLocation(tx, sd, subscriptionKey, extractSequenceId(prev));
 
                 // Add it to the new location set.
-                addAckLocation(sd, sequence, subscriptionKey);
+                addAckLocation(tx, sd, sequence, subscriptionKey);
             }
 
         }
@@ -1107,6 +1106,10 @@ public class MessageDatabase extends Ser
             sd.subscriptionAcks.clear(tx);
             sd.subscriptionAcks.unload(tx);
             tx.free(sd.subscriptionAcks.getPageId());
+
+            sd.ackPositions.clear(tx);
+            sd.ackPositions.unload(tx);
+            tx.free(sd.ackPositions.getPageId());
         }
 
         String key = key(command.getDestination());
@@ -1127,7 +1130,7 @@ public class MessageDatabase extends Ser
             }
 
             sd.subscriptionAcks.put(tx, subscriptionKey, ackLocation);
-            addAckLocation(sd, ackLocation, subscriptionKey);
+            addAckLocation(tx, sd, ackLocation, subscriptionKey);
         } else {
             // delete the sub...
             String subscriptionKey = command.getSubscriptionKey();
@@ -1326,13 +1329,13 @@ public class MessageDatabase extends Ser
         BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
         BTreeIndex<String, Long> subscriptionAcks;
         HashMap<String, MessageOrderCursor> subscriptionCursors;
-        TreeMap<Long, HashSet<String>> ackPositions;
+        BTreeIndex<Long, HashSet<String>> ackPositions;
     }
 
     protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination>
{
 
         public StoredDestination readPayload(DataInput dataIn) throws IOException {
-            StoredDestination value = new StoredDestination();
+            final StoredDestination value = new StoredDestination();
             value.orderIndex.defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile,
dataIn.readLong());
             value.locationIndex = new BTreeIndex<Location, Long>(pageFile, dataIn.readLong());
             value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
@@ -1340,11 +1343,40 @@ public class MessageDatabase extends Ser
             if (dataIn.readBoolean()) {
                 value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile,
dataIn.readLong());
                 value.subscriptionAcks = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
+                if (metadata.version >= 3) {
+                    value.ackPositions = new BTreeIndex<Long, HashSet<String>>(pageFile,
dataIn.readLong());
+                } else {
+                    // upgrade
+                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                        public void execute(Transaction tx) throws IOException {
+                            value.ackPositions = new BTreeIndex<Long, HashSet<String>>(pageFile,
tx.allocate());
+                            value.ackPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
+                            value.ackPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
+                            value.ackPositions.load(tx);
+                        }
+                    });
+                }
             }
             if (metadata.version >= 2) {
                 value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile,
dataIn.readLong());
                 value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile,
dataIn.readLong());
+            } else {
+                    // upgrade
+                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                        public void execute(Transaction tx) throws IOException {
+                            value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile,
tx.allocate());
+                            value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
+                            value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
+                            value.orderIndex.lowPriorityIndex.load(tx);
+
+                            value.orderIndex.highPriorityIndex = new BTreeIndex<Long,
MessageKeys>(pageFile, tx.allocate());
+                            value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
+                            value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
+                            value.orderIndex.highPriorityIndex.load(tx);
+                        }
+                    });
             }
+
             return value;
         }
 
@@ -1356,13 +1388,12 @@ public class MessageDatabase extends Ser
                 dataOut.writeBoolean(true);
                 dataOut.writeLong(value.subscriptions.getPageId());
                 dataOut.writeLong(value.subscriptionAcks.getPageId());
+                dataOut.writeLong(value.ackPositions.getPageId());
             } else {
                 dataOut.writeBoolean(false);
             }
-            if (metadata.version >= 2) {
-                dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId());
-                dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId());
-            }
+            dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId());
+            dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId());
         }
     }
 
@@ -1452,6 +1483,7 @@ public class MessageDatabase extends Ser
             if (topic) {
                 rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile,
tx.allocate());
                 rc.subscriptionAcks = new BTreeIndex<String, Long>(pageFile, tx.allocate());
+                rc.ackPositions = new BTreeIndex<Long, HashSet<String>>(pageFile,
tx.allocate());
             }
             metadata.destinations.put(tx, key, rc);
         }
@@ -1481,18 +1513,24 @@ public class MessageDatabase extends Ser
             rc.subscriptionAcks.setValueMarshaller(LongMarshaller.INSTANCE);
             rc.subscriptionAcks.load(tx);
 
-            rc.ackPositions = new TreeMap<Long, HashSet<String>>();
+            rc.ackPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
+            rc.ackPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
+            rc.ackPositions.load(tx);
+
             rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>();
 
-            for (Iterator<Entry<String, Long>> iterator = rc.subscriptionAcks.iterator(tx);
iterator.hasNext();) {
-                Entry<String, Long> entry = iterator.next();
-                addAckLocation(rc, extractSequenceId(entry.getValue()), entry.getKey());
+            if (metadata.version < 3) {
+                // on upgrade need to fill ackLocation
+                for (Iterator<Entry<String, Long>> iterator = rc.subscriptionAcks.iterator(tx);
iterator.hasNext();) {
+                    Entry<String, Long> entry = iterator.next();
+                    addAckLocation(tx, rc, extractSequenceId(entry.getValue()), entry.getKey());
+                }
             }
             
             if (rc.orderIndex.nextMessageId == 0) {
                 // check for existing durable sub all acked out - pull next seq from acks
as messages are gone
-                if (!rc.ackPositions.isEmpty()) {
-                    Long lastAckedMessageId = rc.ackPositions.lastKey();
+                if (!rc.ackPositions.isEmpty(tx)) {
+                    Long lastAckedMessageId = rc.ackPositions.getLast(tx).getKey();
                     if (lastAckedMessageId != NOT_ACKED) {
                         rc.orderIndex.nextMessageId = lastAckedMessageId+1;
                     }
@@ -1500,6 +1538,11 @@ public class MessageDatabase extends Ser
             }
 
         }
+
+        if (metadata.version < 3) {
+            // store again after upgrade
+            metadata.destinations.put(tx, key, rc);
+        }        
         return rc;
     }
 
@@ -1508,13 +1551,14 @@ public class MessageDatabase extends Ser
      * @param messageSequence
      * @param subscriptionKey
      */
-    private void addAckLocation(StoredDestination sd, Long messageSequence, String subscriptionKey)
{
-        HashSet<String> hs = sd.ackPositions.get(messageSequence);
+    private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence,
String subscriptionKey) throws IOException {
+        HashSet<String> hs = sd.ackPositions.get(tx, messageSequence);
         if (hs == null) {
             hs = new HashSet<String>();
-            sd.ackPositions.put(messageSequence, hs);
         }
         hs.add(subscriptionKey);
+        // every ack location addition needs to be a btree modification to get it stored
+        sd.ackPositions.put(tx, messageSequence, hs);
     }
 
     /**
@@ -1527,12 +1571,12 @@ public class MessageDatabase extends Ser
     private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey,
Long sequenceId) throws IOException {
         // Remove the sub from the previous location set..
         if (sequenceId != null) {
-            HashSet<String> hs = sd.ackPositions.get(sequenceId);
+            HashSet<String> hs = sd.ackPositions.get(tx, sequenceId);
             if (hs != null) {
                 hs.remove(subscriptionKey);
                 if (hs.isEmpty()) {
-                    HashSet<String> firstSet = sd.ackPositions.values().iterator().next();
-                    sd.ackPositions.remove(sequenceId);
+                    HashSet<String> firstSet = sd.ackPositions.getFirst(tx).getValue();
+                    sd.ackPositions.remove(tx, sequenceId);
 
                     // Did we just empty out the first set in the
                     // ordered list of ack locations? Then it's time to
@@ -1942,15 +1986,12 @@ public class MessageDatabase extends Ser
             defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
             defaultPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
             defaultPriorityIndex.load(tx);
-            if (metadata.version >= 2) {
-                lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
-                lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
-                lowPriorityIndex.load(tx);
-
-                highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
-                highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
-                highPriorityIndex.load(tx);
-            }
+            lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
+            lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
+            lowPriorityIndex.load(tx);
+            highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
+            highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
+            highPriorityIndex.load(tx);
         }
         
         void allocate(Transaction tx) throws IOException {
@@ -2193,6 +2234,33 @@ public class MessageDatabase extends Ser
         }
     }
     
-    
-    
+    private static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>>
{
+        final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller();
+
+        public void writePayload(HashSet<String> object, DataOutput dataOut) throws
IOException {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            ObjectOutputStream oout = new ObjectOutputStream(baos);
+            oout.writeObject(object);
+            oout.flush();
+            oout.close();
+            byte[] data = baos.toByteArray();
+            dataOut.writeInt(data.length);
+            dataOut.write(data);
+        }
+
+        public HashSet<String> readPayload(DataInput dataIn) throws IOException {
+            int dataLen = dataIn.readInt();
+            byte[] data = new byte[dataLen];
+            dataIn.readFully(data);
+            ByteArrayInputStream bais = new ByteArrayInputStream(data);
+            ObjectInputStream oin = new ObjectInputStream(bais);
+            try {
+                return (HashSet<String>) oin.readObject();
+            } catch (ClassNotFoundException cfe) {
+	            IOException ioe = new IOException("Failed to read HashSet<String>: " +
cfe);
+	            ioe.initCause(cfe);
+	            throw ioe;
+	        }
+        }
+    }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java?rev=1036524&r1=1036523&r2=1036524&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java
Thu Nov 18 16:44:50 2010
@@ -34,6 +34,7 @@ import java.io.FileNotFoundException;
 public class KahaDBVersionTest extends TestCase {
 
     final static File VERSION_1_DB= new File("src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion1");
+    final static File VERSION_2_DB= new File("src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion2");
     protected BrokerService createBroker(KahaDBPersistenceAdapter kaha) throws Exception
{
 
         BrokerService broker = new BrokerService();
@@ -47,21 +48,28 @@ public class KahaDBVersionTest extends T
         
     public void XtestCreateStore() throws Exception {
         KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
-        File dir = new File("src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion1");
+        File dir = new File("src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersionX");
         IOHelper.deleteFile(dir);
         kaha.setDirectory(dir);
         kaha.setJournalMaxFileLength(1024*1024);
         BrokerService broker = createBroker(kaha);
-        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost"); 
      
         Connection connection = cf.createConnection();
         connection.setClientID("test");
         connection.start();
+        producerSomeMessages(connection);
+        connection.close();
+        broker.stop();
+    }
+
+    private void producerSomeMessages(Connection connection) throws Exception {
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Topic topic = session.createTopic("test.topic");
         Queue queue = session.createQueue("test.queue");
         MessageConsumer consumer = session.createDurableSubscriber(topic,"test");
         consumer.close();
         MessageProducer producer = session.createProducer(topic);
+        producer.setPriority(9);
         for (int i =0; i < 1000; i++) {
             Message msg = session.createTextMessage("test message:"+i);
             producer.send(msg);
@@ -71,45 +79,56 @@ public class KahaDBVersionTest extends T
             Message msg = session.createTextMessage("test message:"+i);
             producer.send(msg);
         }
-        connection.stop();
-        broker.stop();
     }
-    
-    public void testVersionConversion() throws Exception{
-        File testDir = new File("target/activemq-data/kahadb/versionDB");
-        IOHelper.deleteFile(testDir);
-        IOHelper.copyFile(VERSION_1_DB, testDir);
-        
-        KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
-        kaha.setDirectory(testDir);
-        kaha.setJournalMaxFileLength(1024*1024);
-        BrokerService broker = createBroker(kaha);
-        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
-        Connection connection = cf.createConnection();
-        connection.setClientID("test");
-        connection.start();
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Topic topic = session.createTopic("test.topic");
-        Queue queue = session.createQueue("test.queue");
-        MessageConsumer queueConsumer = session.createConsumer(queue);
-        for (int i = 0; i < 1000; i++) {
-            TextMessage msg  = (TextMessage) queueConsumer.receive(10000);
-            //System.err.println(msg.getText());
-            assertNotNull(msg);
-        }
-        MessageConsumer topicConsumer = session.createDurableSubscriber(topic,"test");
-        for (int i = 0; i < 1000; i++) {
-            TextMessage msg  = (TextMessage) topicConsumer.receive(10000);
-            //System.err.println(msg.getText());
-            assertNotNull(msg);
-        }
-        broker.stop();
-        
+
+    public void testVersion1Conversion() throws Exception{
+          doConvertRestartCycle(VERSION_1_DB);
     }
 
-    
+    public void testVersion2Conversion() throws Exception{
+          doConvertRestartCycle(VERSION_2_DB);
+    }
 
+    public void doConvertRestartCycle(File existingStore) throws Exception {
 
-    
+        File testDir = new File("target/activemq-data/kahadb/versionDB");
+        IOHelper.deleteFile(testDir);
+        IOHelper.copyFile(existingStore, testDir);
 
+        // on repeat store will be upgraded
+        for (int repeats = 0; repeats < 3; repeats++) {
+            KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
+            kaha.setDirectory(testDir);
+            kaha.setJournalMaxFileLength(1024 * 1024);
+            BrokerService broker = createBroker(kaha);
+            ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
+            Connection connection = cf.createConnection();
+            connection.setClientID("test");
+            connection.start();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Topic topic = session.createTopic("test.topic");
+            Queue queue = session.createQueue("test.queue");
+
+            if (repeats > 0) {
+                // upgraded store will be empty so generated some more messages
+                producerSomeMessages(connection);
+            }
+
+            MessageConsumer queueConsumer = session.createConsumer(queue);
+            for (int i = 0; i < 1000; i++) {
+                TextMessage msg = (TextMessage) queueConsumer.receive(10000);
+                //System.err.println(msg.getText());
+                assertNotNull(msg);
+            }
+            MessageConsumer topicConsumer = session.createDurableSubscriber(topic, "test");
+            for (int i = 0; i < 1000; i++) {
+                TextMessage msg = (TextMessage) topicConsumer.receive(10000);
+                //System.err.println(msg.getText());
+                assertNotNull(msg);
+            }
+            connection.close();
+            
+            broker.stop();
+        }
+    }
 }
\ No newline at end of file

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java?rev=1036524&r1=1036523&r2=1036524&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
Thu Nov 18 16:44:50 2010
@@ -617,6 +617,90 @@ public class DurableSubscriptionOfflineT
         assertEquals(0, listener.count);
     }
 
+    public void testOfflineSubscriptionWithSelectorAfterRestart() throws Exception {
+        // create offline subs 1
+        Connection con = createConnection("offCli1");
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        session.close();
+        con.close();
+
+        // create offline subs 2
+        con = createConnection("offCli2");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        session.close();
+        con.close();
+
+        // send messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(null);
+
+        int filtered = 0;
+        for (int i = 0; i < 10; i++) {
+            boolean filter = (i %2 == 0); //(int) (Math.random() * 2) >= 1;
+            if (filter)
+                filtered++;
+
+            Message message = session.createMessage();
+            message.setStringProperty("filter", filter ? "true" : "false");
+            producer.send(topic, message);
+        }
+
+        LOG.info("sent: " + filtered);
+        Thread.sleep(1 * 1000);
+        session.close();
+        con.close();
+
+        // restart broker
+        Thread.sleep(3 * 1000);
+        broker.stop();
+        createBroker(false /*deleteAllMessages*/);
+ 
+        // send more messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        producer = session.createProducer(null);
+
+        for (int i = 0; i < 10; i++) {
+            boolean filter = (i %2 == 0); //(int) (Math.random() * 2) >= 1;
+            if (filter)
+                filtered++;
+
+            Message message = session.createMessage();
+            message.setStringProperty("filter", filter ? "true" : "false");
+            producer.send(topic, message);
+        }
+
+        LOG.info("after restart, sent: " + filtered);
+        Thread.sleep(1 * 1000);
+        session.close();
+        con.close();
+
+        // test offline subs
+        con = createConnection("offCli1");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter
= 'true'", true);
+        Listener listener = new Listener();
+        consumer.setMessageListener(listener);
+
+        Connection con3 = createConnection("offCli2");
+        Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", "filter
= 'true'", true);
+        Listener listener3 = new Listener();
+        consumer3.setMessageListener(listener3);
+
+        Thread.sleep(3 * 1000);
+
+        session.close();
+        con.close();
+        session3.close();
+        con3.close();
+
+        assertEquals(filtered, listener.count);
+        assertEquals(filtered, listener3.count);
+    }
 
     public static class Listener implements MessageListener {
         int count = 0;

Added: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion2/db-1.log
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion2/db-1.log?rev=1036524&view=auto
==============================================================================
Binary file - no diff available.

Propchange: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion2/db-1.log
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream



Mime
View raw message