activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1022890 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/store/kahadb/ test/java/org/apache/activemq/store/ test/java/org/apache/activemq/store/jdbc/
Date Fri, 15 Oct 2010 12:29:40 GMT
Author: gtully
Date: Fri Oct 15 12:29:39 2010
New Revision: 1022890

URL: http://svn.apache.org/viewvc?rev=1022890&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2980 for kahaDB

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=1022890&r1=1022889&r2=1022890&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
Fri Oct 15 12:29:39 2010
@@ -717,7 +717,7 @@ public class KahaDBStore extends Message
         public int getMessageCount(String clientId, String subscriptionName) throws IOException
{
             final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
             final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
-            indexLock.readLock().lock();
+            indexLock.writeLock().lock();
             try {
                 return pageFile.tx().execute(new Transaction.CallableClosure<Integer,
IOException>() {
                     public Integer execute(Transaction tx) throws IOException {
@@ -727,8 +727,8 @@ public class KahaDBStore extends Message
                             // The subscription might not exist.
                             return 0;
                         }
-                        MessageOrderCursor moc = new MessageOrderCursor(cursorPos + 1);
-
+                        sd.orderIndex.resetCursorPosition();
+                        sd.orderIndex.setBatch(tx, cursorPos);
                         int counter = 0;
                         try {
                             String selector = info.getSelector();
@@ -736,7 +736,7 @@ public class KahaDBStore extends Message
                             if (selector != null) {
                                 selectorExpression = SelectorParser.parse(selector);
                             }
-                            for (Iterator<Entry<Long, MessageKeys>> iterator
= sd.orderIndex.iterator(tx, moc); iterator
+                            for (Iterator<Entry<Long, MessageKeys>> iterator
= sd.orderIndex.iterator(tx); iterator
                                     .hasNext();) {
                                 Entry<Long, MessageKeys> entry = iterator.next();
                                 if (selectorExpression != null) {
@@ -757,7 +757,7 @@ public class KahaDBStore extends Message
                     }
                 });
             }finally {
-                indexLock.readLock().unlock();
+                indexLock.writeLock().unlock();
             }
         }
 
@@ -786,15 +786,19 @@ public class KahaDBStore extends Message
         public void recoverNextMessages(String clientId, String subscriptionName, final int
maxReturned,
                 final MessageRecoveryListener listener) throws Exception {
             final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
-            indexLock.readLock().lock();
+            indexLock.writeLock().lock();
             try {
                 pageFile.tx().execute(new Transaction.Closure<Exception>() {
                     public void execute(Transaction tx) throws Exception {
                         StoredDestination sd = getStoredDestination(dest, tx);
+                        sd.orderIndex.resetCursorPosition();
                         MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey);
                         if (moc == null) {
                             long pos = sd.subscriptionAcks.get(tx, subscriptionKey);
-                            moc = new MessageOrderCursor(pos+1);
+                            sd.orderIndex.setBatch(tx, pos);
+                            moc = sd.orderIndex.cursor;
+                        } else {
+                            sd.orderIndex.cursor.sync(moc);
                         }
 
                         Entry<Long, MessageKeys> entry = null;
@@ -813,11 +817,14 @@ public class KahaDBStore extends Message
                         if (entry != null) {
                             MessageOrderCursor copy = sd.orderIndex.cursor.copy();
                             sd.subscriptionCursors.put(subscriptionKey, copy);
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("updated moc: " + copy + ", recovered: " + counter);
+                            }
                         }
                     }
                 });
             }finally {
-                indexLock.readLock().unlock();
+                indexLock.writeLock().unlock();
             }
         }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1022890&r1=1022889&r2=1022890&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Fri Oct 15 12:29:39 2010
@@ -1875,6 +1875,18 @@ public class MessageDatabase extends Ser
                 lowPriorityCursorPosition++;
             }
         }
+
+        public String toString() {
+           return "MessageOrderCursor:[def:" + defaultCursorPosition
+                   + ", low:" + lowPriorityCursorPosition
+                   + ", high:" +  highPriorityCursorPosition + "]";
+        }
+
+        public void sync(MessageOrderCursor other) {
+            this.defaultCursorPosition=other.defaultCursorPosition;
+            this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
+            this.highPriorityCursorPosition=other.highPriorityCursorPosition;
+        }
     }
     
     class MessageOrderIndex{
@@ -2010,11 +2022,11 @@ public class MessageDatabase extends Ser
         
         void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>>
deletes, Long sequenceId)
                 throws IOException {
-            getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId);
-            if (highPriorityIndex != null) {
+            if (defaultPriorityIndex.containsKey(tx, sequenceId)) {
+                getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId);
+            } else if (highPriorityIndex != null && highPriorityIndex.containsKey(tx,
sequenceId)) {
                 getDeleteList(tx, deletes, highPriorityIndex, sequenceId);
-            }
-            if (lowPriorityIndex != null) {
+            } else if (lowPriorityIndex != null && lowPriorityIndex.containsKey(tx,
sequenceId)) {
                 getDeleteList(tx, deletes, lowPriorityIndex, sequenceId);
             }
         }
@@ -2073,7 +2085,6 @@ public class MessageDatabase extends Ser
             final Iterator<Entry<Long, MessageKeys>>highIterator;
             final Iterator<Entry<Long, MessageKeys>>defaultIterator;
             final Iterator<Entry<Long, MessageKeys>>lowIterator;
-            Long lastKey;
             
             
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java?rev=1022890&r1=1022889&r2=1022890&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
Fri Oct 15 12:29:39 2010
@@ -27,6 +27,7 @@ import javax.jms.Topic;
 import javax.jms.TopicSubscriber;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.CombinationTestSupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
@@ -49,7 +50,8 @@ abstract public class MessagePriorityTes
     Session sess;
     
     public boolean useCache;
-    
+    public int prefetchVal = 500;
+
     int MSG_NUM = 1000;
     int HIGH_PRI = 7;
     int LOW_PRI = 3;
@@ -59,6 +61,7 @@ abstract public class MessagePriorityTes
     protected void setUp() throws Exception {
         broker = new BrokerService();
         broker.setBrokerName("priorityTest");
+        broker.setAdvisorySupport(false);
         adapter = createPersistenceAdapter(true);
         broker.setPersistenceAdapter(adapter);
         PolicyEntry policy = new PolicyEntry();
@@ -71,6 +74,10 @@ abstract public class MessagePriorityTes
         broker.waitUntilStarted();
         
         factory = new ActiveMQConnectionFactory("vm://priorityTest");
+        ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
+        prefetch.setAll(prefetchVal);
+        factory.setPrefetchPolicy(prefetch);
+        factory.setWatchTopicAdvisories(false);
         conn = factory.createConnection();
         conn.setClientID("priority");
         conn.start();
@@ -159,6 +166,10 @@ abstract public class MessagePriorityTes
         LOG.info("Sending  " + text);
         return msg;
     }
+
+    public void initCombosForTestDurableSubs() {
+        addCombinationValues("prefetchVal", new Object[] {new Integer(1000), new Integer(MSG_NUM/4)});
+    }
     
     public void testDurableSubs() throws Exception {
         ActiveMQTopic topic = (ActiveMQTopic)sess.createTopic("TEST");
@@ -176,11 +187,45 @@ abstract public class MessagePriorityTes
         
         sub = sess.createDurableSubscriber(topic, "priority");
         for (int i = 0; i < MSG_NUM * 2; i++) {
-            Message msg = sub.receive(1000);
+            Message msg = sub.receive(5000);
             assertNotNull("Message " + i + " was null", msg);
             assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI
: LOW_PRI, msg.getJMSPriority());
         }
         
     }
+
+    public void initCombosForTestDurableSubsReconnect() {
+        addCombinationValues("prefetchVal", new Object[] {new Integer(1000), new Integer(MSG_NUM/2)});
+    }
+    
+    public void testDurableSubsReconnect() throws Exception {
+        ActiveMQTopic topic = (ActiveMQTopic)sess.createTopic("TEST");
+        final String subName = "priorityDisconnect";
+        TopicSubscriber sub = sess.createDurableSubscriber(topic, subName);
+        sub.close();
+
+        ProducerThread lowPri = new ProducerThread(topic, MSG_NUM, LOW_PRI);
+        ProducerThread highPri = new ProducerThread(topic, MSG_NUM, HIGH_PRI);
+
+        lowPri.start();
+        highPri.start();
+
+        lowPri.join();
+        highPri.join();
+
+
+        final int closeFrequency = MSG_NUM/4;
+        sub = sess.createDurableSubscriber(topic, subName);
+        for (int i = 0; i < MSG_NUM * 2; i++) {
+            Message msg = sub.receive(5000);
+            assertNotNull("Message " + i + " was null", msg);
+            assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI
: LOW_PRI, msg.getJMSPriority());
+            if (i>0 && i%closeFrequency==0) {
+                LOG.info("Closing durable sub.. on: " + i);
+                sub.close();
+                sub = sess.createDurableSubscriber(topic, subName);
+            }
+        }
+    }
     
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java?rev=1022890&r1=1022889&r2=1022890&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java
Fri Oct 15 12:29:39 2010
@@ -40,4 +40,9 @@ public class JDBCMessagePriorityTest ext
         return suite(JDBCMessagePriorityTest.class);
     }
 
+    // pending fix...
+    @Override
+    public void testDurableSubsReconnect() throws Exception {
+        // TODO: fix jdbc durable sub recovery 
+    }
 }



Mime
View raw message