activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1304020 - in /activemq/trunk: activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java kahadb/src/main/java/org/apache/kahadb/index/ListNode.java
Date Thu, 22 Mar 2012 20:01:41 GMT
Author: gtully
Date: Thu Mar 22 20:01:40 2012
New Revision: 1304020

URL: http://svn.apache.org/viewvc?rev=1304020&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3768: ClassCastException when running some Durable
Consumer test cases. root cause of the classcast was the reuse of a freed node that was still
referenced as the head page id of a listindex. The fix is to not modify the head page id of
a listindex when removing and coalescing linked pages, the head page remains valid for the
duration of a subscription. Eventually got a test case that could easlily reproduce

Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java?rev=1304020&r1=1304019&r2=1304020&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
Thu Mar 22 20:01:40 2012
@@ -33,6 +33,8 @@ import org.apache.activemq.broker.jmx.Du
 import org.apache.activemq.broker.jmx.TopicViewMBean;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
 import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
@@ -1244,6 +1246,52 @@ public class DurableSubscriptionOfflineT
         assertEquals("only one journal file left after restart", 1, pa.getStore().getJournal().getFileMap().size());
     }
 
+    // https://issues.apache.org/jira/browse/AMQ-3768
+    public void testPageReuse() throws Exception {
+        Connection con = null;
+        Session session = null;
+
+        final int numConsumers = 115;
+        for (int i=0; i<=numConsumers;i++) {
+            con = createConnection("cli" + i);
+            session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            session.createDurableSubscriber(topic, "SubsId", null, true);
+            session.close();
+            con.close();
+        }
+
+
+        // populate ack locations
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(null);
+        Message message = session.createTextMessage(new byte[10].toString());
+        producer.send(topic, message);
+        con.close();
+
+        // we have a split, remove all but the last so that
+        // the head pageid changes in the acklocations listindex
+        for (int i=0; i<=numConsumers -1; i++) {
+            con = createConnection("cli" + i);
+            session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            session.unsubscribe("SubsId");
+            session.close();
+            con.close();
+        }
+
+        destroyBroker();
+        createBroker(false);
+
+        // create a bunch more subs to reuse the freed page and get us in a knot
+        for (int i=1; i<=numConsumers;i++) {
+            con = createConnection("cli" + i);
+            session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            session.createDurableSubscriber(topic, "SubsId", filter, true);
+            session.close();
+            con.close();
+        }
+    }
+
     public static class Listener implements MessageListener {
         int count = 0;
         String id = null;

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java?rev=1304020&r1=1304019&r2=1304020&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java Thu Mar 22
20:01:40 2012
@@ -304,14 +304,10 @@ public class ListIndex<Key,Value> implem
 
     ListNode<Key,Value> loadNode(Transaction tx, long pageId) throws IOException {
         Page<ListNode<Key,Value>> page = tx.load(pageId, marshaller);
-        try {
-            ListNode<Key, Value> node = page.get();
-            node.setPage(page);
-            node.setContainingList(this);
-            return node;
-        } catch (ClassCastException e) {
-            throw e;
-        }
+        ListNode<Key, Value> node = page.get();
+        node.setPage(page);
+        node.setContainingList(this);
+        return node;
     }
 
     ListNode<Key,Value> createNode(Page<ListNode<Key,Value>> page) throws
IOException {

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java?rev=1304020&r1=1304019&r2=1304020&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java Thu Mar 22 20:01:40
2012
@@ -198,10 +198,22 @@ public final class ListNode<Key,Value> {
                     if (currentNode.isHead() && currentNode.isTail()) {
                         // store empty list
                     } else if (currentNode.isHead()) {
-                        // new head
+                        // merge next node into existing headNode
+                        // as we don't want to change our headPageId b/c
+                        // that is our identity
+                        ListNode<Key,Value> headNode = currentNode;
+                        nextEntry = getFromNextNode(); // will move currentNode
+
+                        if (currentNode.isTail()) {
+                            targetList.setTailPageId(headNode.getPageId());
+                        }
+                        // copy next/currentNode into head
+                        headNode.setEntries(currentNode.entries);
+                        headNode.setNext(currentNode.getNext());
+                        headNode.store(tx);
                         toRemoveNode = currentNode;
-                        nextEntry = getFromNextNode();
-                        targetList.setHeadPageId(currentNode.getPageId());
+                        currentNode = headNode;
+
                     } else if (currentNode.isTail()) {
                         toRemoveNode = currentNode;
                         previousNode.setNext(ListIndex.NOT_SET);
@@ -262,7 +274,7 @@ public final class ListNode<Key,Value> {
         @SuppressWarnings({ "unchecked", "rawtypes" })
         public ListNode<Key,Value> readPayload(DataInput is) throws IOException {
             ListNode<Key,Value> node = new ListNode<Key,Value>();
-            node.next = is.readLong();
+            node.setNext(is.readLong());
             final short size = is.readShort();
             for (short i = 0; i < size; i++) {
                 node.entries.addLast(



Mime
View raw message