activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1078799 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/store/kahadb/ test/java/org/apache/activemq/usecases/
Date Mon, 07 Mar 2011 14:58:27 GMT
Author: gtully
Date: Mon Mar  7 14:58:26 2011
New Revision: 1078799

URL: http://svn.apache.org/viewvc?rev=1078799&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3206 - Unsubscribed durable sub can leave dangling
message reference in kahaDB, visible after a restart
resolved by correctly updating the persitent index when a durable sub with an outstanding
backlog is unsubscribed. Additional test that vaidates fix

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=1078799&r1=1078798&r2=1078799&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
Mon Mar  7 14:58:26 2011
@@ -500,7 +500,7 @@ public class KahaDBPersistenceAdapter im
 
     @Override
     public String toString() {
-        String path = getDirectory() != null ? getDirectory().toString() : "DIRECTORY_NOT_SET";
+        String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
         return "KahaDBPersistenceAdapter[" + path + "]";
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1078799&r1=1078798&r2=1078799&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Mon Mar  7 14:58:26 2011
@@ -1133,10 +1133,10 @@ public class MessageDatabase extends Ser
 
     void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location)
throws IOException {
         StoredDestination sd = getStoredDestination(command.getDestination(), tx);
+        final String subscriptionKey = command.getSubscriptionKey();
 
         // If set then we are creating it.. otherwise we are destroying the sub
         if (command.hasSubscriptionInfo()) {
-            String subscriptionKey = command.getSubscriptionKey();
             sd.subscriptions.put(tx, subscriptionKey, command);
             long ackLocation=NOT_ACKED;
             if (!command.getRetroactive()) {
@@ -1147,7 +1147,6 @@ public class MessageDatabase extends Ser
             sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation));
         } else {
             // delete the sub...
-            String subscriptionKey = command.getSubscriptionKey();
             sd.subscriptions.remove(tx, subscriptionKey);
             sd.subscriptionAcks.remove(tx, subscriptionKey);
             removeAckLocationsForSub(tx, sd, subscriptionKey);
@@ -1206,7 +1205,7 @@ public class MessageDatabase extends Ser
             	if( gcCandidateSet.isEmpty() ) {
                 	break;
                 }
-                
+
                 // Use a visitor to cut down the number of pages that we load
                 entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>()
{
                     int last=-1;
@@ -1234,7 +1233,7 @@ public class MessageDatabase extends Ser
 							return !subset.isEmpty();
                     	}
                     }
-    
+
                     public void visit(List<Location> keys, List<Long> values)
{
                     	for (Location l : keys) {
                             int fileId = l.getDataFileId();
@@ -1242,9 +1241,8 @@ public class MessageDatabase extends Ser
                         		gcCandidateSet.remove(fileId);
                                 last = fileId;
                             }
-						}                        
+                        }
                     }
-    
                 });
                 LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet);
             }
@@ -1669,7 +1667,7 @@ public class MessageDatabase extends Ser
     }
 
     private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey)
throws IOException {
-        if (!sd.ackPositions.isEmpty(tx)) {        
+        if (!sd.ackPositions.isEmpty(tx)) {
             Long end = sd.ackPositions.getLast(tx).getKey();
             for (Long sequence = sd.ackPositions.getFirst(tx).getKey(); sequence <= end;
sequence++) {
                 removeAckLocation(tx, sd, subscriptionKey, sequence);
@@ -1704,6 +1702,9 @@ public class MessageDatabase extends Ser
                         sd.messageIdIndex.remove(tx, entry.getValue().messageId);
                         sd.orderIndex.remove(tx, entry.getKey());
                     }
+                } else {
+                    // update
+                    sd.ackPositions.put(tx, sequenceId, hs);
                 }
             }
         }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java?rev=1078799&r1=1078798&r2=1078799&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
Mon Mar  7 14:58:26 2011
@@ -32,13 +32,17 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.util.Wait;
+import org.apache.kahadb.journal.Journal;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupport {
 
     private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionOfflineTest.class);
-    public Boolean usePrioritySupport = Boolean.TRUE;
+    public boolean usePrioritySupport = Boolean.TRUE;
+    public int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
     private BrokerService broker;
     private ActiveMQTopic topic;
     private Vector<Exception> exceptions = new Vector<Exception>();
@@ -97,6 +101,9 @@ public class DurableSubscriptionOfflineT
         if (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter) {
             // ensure it kicks in during tests
             ((JDBCPersistenceAdapter)broker.getPersistenceAdapter()).setCleanupPeriod(2*1000);
+        } else if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
+            // have lots of journal files
+            ((KahaDBPersistenceAdapter)broker.getPersistenceAdapter()).setJournalMaxFileLength(journalMaxFileLength);
         }
         broker.start();
     }
@@ -1049,6 +1056,71 @@ public class DurableSubscriptionOfflineT
         con.close();
     }
 
+    // use very small journal to get lots of files to cleanup
+    public void initCombosForTestCleanupDeletedSubAfterRestart() throws Exception {
+        this.addCombinationValues("journalMaxFileLength",
+                new Object[]{new Integer(64*1024)});
+    }
+
+    // https://issues.apache.org/jira/browse/AMQ-3206
+    public void testCleanupDeletedSubAfterRestart() throws Exception {
+        Connection con = createConnection("cli1");
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", null, true);
+        session.close();
+        con.close();
+
+        con = createConnection("cli2");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", null, true);
+        session.close();
+        con.close();
+
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(null);
+
+        final int toSend = 500;
+        final String payload = new byte[40*1024].toString();
+        int sent = 0;
+        for (int i = sent; i < toSend; i++) {
+            Message message = session.createTextMessage(payload);
+            message.setStringProperty("filter", "false");
+            message.setIntProperty("ID", i);
+            producer.send(topic, message);
+            sent++;
+        }
+        con.close();
+        LOG.info("sent: " + sent);
+
+        // kill off cli1
+        con = createConnection("cli1");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.unsubscribe("SubsId");
+
+        destroyBroker();
+        createBroker(false);
+
+        con = createConnection("cli2");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null,
true);
+        final Listener listener = new Listener();
+        consumer.setMessageListener(listener);
+        assertTrue("got all sent", Wait.waitFor(new Wait.Condition() {
+            public boolean isSatisified() throws Exception {
+                LOG.info("Want: " + toSend  + ", current: " + listener.count);
+                return listener.count == toSend;
+            }
+        }));
+        session.close();
+        con.close();
+
+        destroyBroker();
+        createBroker(false);
+        KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
+        assertEquals("only one journal file left after restart", 1, pa.getStore().getJournal().getFileMap().size());
+    }
+
     public static class Listener implements MessageListener {
         int count = 0;
         String id = null;



Mime
View raw message