activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1026457 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/store/kahadb/ test/java/org/apache/activemq/usecases/
Date Fri, 22 Oct 2010 19:31:19 GMT
Author: gtully
Date: Fri Oct 22 19:31:18 2010
New Revision: 1026457

URL: http://svn.apache.org/viewvc?rev=1026457&view=rev
Log:
additional tests and refactoring of fix for https://issues.apache.org/activemq/browse/AMQ-2985,
selector scan was off by one and unmatched acking skipped some messages leaving them available
to subsequent consumers. Also acking always left the first consumed message available for
a reconnected durable

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1026457&r1=1026456&r2=1026457&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Oct 22 19:31:18 2010
@@ -1655,7 +1655,8 @@ public class Queue extends BaseDestinati
         if (LOG.isDebugEnabled()) {
             LOG.debug(destination.getPhysicalName() + " toPageIn: " + toPageIn + ", Inflight:
"
                     + destinationStatistics.getInflight().getCount() + ", pagedInMessages.size
"
-                    + pagedInMessages.size() + ", enqueueSize: " + destinationStatistics.getEnqueues().getCount());
+                    + pagedInMessages.size() + ", enqueueCount: " + destinationStatistics.getEnqueues().getCount()
+                    + ", dequeueCount: " + destinationStatistics.getDequeues().getCount());
         }
 
         if (isLazyDispatch() && !force) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=1026457&r1=1026456&r2=1026457&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
Fri Oct 22 19:31:18 2010
@@ -737,7 +737,7 @@ public class KahaDBStore extends Message
                                 selectorExpression = SelectorParser.parse(selector);
                             }
                             sd.orderIndex.resetCursorPosition();
-                            sd.orderIndex.setBatch(tx, (selectorExpression != null? 0 : cursorPos));
+                            sd.orderIndex.setBatch(tx, (selectorExpression == null ? cursorPos
: -1));
                             for (Iterator<Entry<Long, MessageKeys>> iterator
= sd.orderIndex.iterator(tx); iterator
                                     .hasNext();) {
                                 Entry<Long, MessageKeys> entry = iterator.next();
@@ -773,7 +773,7 @@ public class KahaDBStore extends Message
                     public void execute(Transaction tx) throws Exception {
                         StoredDestination sd = getStoredDestination(dest, tx);
                         Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
-                        sd.orderIndex.setBatch(tx, (info.getSelector() == null ? cursorPos
: 0));
+                        sd.orderIndex.setBatch(tx, (info.getSelector() == null ? cursorPos
: -1));
                         for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx);
iterator
                                 .hasNext();) {
                             Entry<Long, MessageKeys> entry = iterator.next();
@@ -800,7 +800,7 @@ public class KahaDBStore extends Message
                         MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey);
                         if (moc == null) {
                             long pos = sd.subscriptionAcks.get(tx, subscriptionKey);
-                            sd.orderIndex.setBatch(tx, (info.getSelector() == null ? pos
: 0));
+                            sd.orderIndex.setBatch(tx, (info.getSelector() == null ? pos
: -1));
                             moc = sd.orderIndex.cursor;
                         } else {
                             sd.orderIndex.cursor.sync(moc);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1026457&r1=1026456&r2=1026457&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Fri Oct 22 19:31:18 2010
@@ -1037,11 +1037,11 @@ public class MessageDatabase extends Ser
                 String subscriptionKey = command.getSubscriptionKey();
                 Long prev = sd.subscriptionAcks.put(tx, subscriptionKey, sequence);
 
-                // The following method handles deleting un-referenced messages.
-                removeAckLocation(tx, sd, subscriptionKey, prev);
-
                 // Add it to the new location set.
                 addAckLocation(sd, sequence, subscriptionKey);
+
+                // The following method handles deleting un-referenced messages.
+                removeAckLocation(tx, sd, subscriptionKey, sequence);
             }
 
         }
@@ -1152,16 +1152,17 @@ public class MessageDatabase extends Ser
             			break;
             		}
             	}
+                LOG.trace("gc candidates after first tx:" + firstTxLocation.getDataFileId()
+ ", " + gcCandidateSet);
             }
 
             // Go through all the destinations to see if any of them can remove GC candidates.
-            for (StoredDestination sd : storedDestinations.values()) {
+            for (Entry<String, StoredDestination> entry : storedDestinations.entrySet())
{
             	if( gcCandidateSet.isEmpty() ) {
                 	break;
                 }
                 
                 // Use a visitor to cut down the number of pages that we load
-                sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
+                entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>()
{
                     int last=-1;
                     public boolean isInterestedInKeysBetween(Location first, Location second)
{
                     	if( first==null ) {
@@ -1199,10 +1200,11 @@ public class MessageDatabase extends Ser
                     }
     
                 });
+                LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet);
             }
 
             // check we are not deleting file with ack for in-use journal files
-            LOG.debug("gc candidates: " + gcCandidateSet);
+            LOG.trace("gc candidates: " + gcCandidateSet);
             final TreeSet<Integer> gcCandidates = new TreeSet<Integer>(gcCandidateSet);
             Iterator<Integer> candidates = gcCandidateSet.iterator();
             while (candidates.hasNext()) {
@@ -1219,7 +1221,7 @@ public class MessageDatabase extends Ser
                     if (gcCandidateSet.contains(candidate)) {
                         ackMessageFileMap.remove(candidate);
                     } else {
-                        LOG.debug("not removing data file: " + candidate
+                        LOG.trace("not removing data file: " + candidate
                                 + " as contained ack(s) refer to referenced file: " + referencedFileIds);
                     }
                 }
@@ -1504,24 +1506,16 @@ public class MessageDatabase extends Ser
             if (hs != null) {
                 hs.remove(subscriptionKey);
                 if (hs.isEmpty()) {
-                    HashSet<String> firstSet = sd.ackPositions.values().iterator().next();
                     sd.ackPositions.remove(sequenceId);
 
-                    // Did we just empty out the first set in the
-                    // ordered list of ack locations? Then it's time to
-                    // delete some messages.
-                    if (hs == firstSet) {
-
-                        // Find all the entries that need to get deleted.
-                        ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long,
MessageKeys>>();
-                        sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
-
-                        // Do the actual deletes.
-                        for (Entry<Long, MessageKeys> entry : deletes) {
-                            sd.locationIndex.remove(tx, entry.getValue().location);
-                            sd.messageIdIndex.remove(tx,entry.getValue().messageId);
-                            sd.orderIndex.remove(tx,entry.getKey());
-                        }
+                    ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long,
MessageKeys>>();
+                    sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
+
+                    // Do the actual delete.
+                    for (Entry<Long, MessageKeys> entry : deletes) {
+                        sd.locationIndex.remove(tx, entry.getValue().location);
+                        sd.messageIdIndex.remove(tx, entry.getValue().messageId);
+                        sd.orderIndex.remove(tx, entry.getKey());
                     }
                 }
             }
@@ -2033,19 +2027,10 @@ public class MessageDatabase extends Ser
         
         void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>>
deletes,
                 BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException
{
-            for (Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx);
iterator.hasNext();) {
-                Entry<Long, MessageKeys> entry = iterator.next();
-                if (entry.getKey().compareTo(sequenceId) == 0) {
-                    // We don't do the actually delete while we are
-                    // iterating the BTree since
-                    // iterating would fail.
-                    deletes.add(entry);
-                } else {
-                    // no point in iterating the in-order sequences anymore
-                    break;
-                }
-            }
-        } 
+
+            Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx,
sequenceId);
+            deletes.add(iterator.next());
+        }
         
         long getNextMessageId(int priority) {
             return nextMessageId++;

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java?rev=1026457&r1=1026456&r2=1026457&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
Fri Oct 22 19:31:18 2010
@@ -26,10 +26,13 @@ import org.apache.activemq.command.Activ
 import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
 
 import javax.jms.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import java.io.File;
 
 public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupport {
 
+    private static final Log LOG = LogFactory.getLog(DurableSubscriptionOfflineTest.class);
     public Boolean usePrioritySupport = Boolean.TRUE;
     private BrokerService broker;
     private ActiveMQTopic topic;
@@ -65,6 +68,7 @@ public class DurableSubscriptionOfflineT
         broker = BrokerFactory.createBroker("broker:(vm://" + getName(true) +")");
         broker.setBrokerName(getName(true));
         broker.setDeleteAllMessagesOnStartup(true);
+        broker.getManagementContext().setCreateConnector(false);
 
         if (usePrioritySupport) {
             PolicyEntry policy = new PolicyEntry();
@@ -132,6 +136,104 @@ public class DurableSubscriptionOfflineT
         assertEquals(sent, listener.count);
     }
 
+     public void testOfflineSubscription2() throws Exception {
+         // create durable subscription
+         Connection con = createConnection();
+         Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+         session.close();
+         con.close();
+
+         // send messages
+         con = createConnection();
+         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = session.createProducer(null);
+
+         int sent = 0;
+         for (int i = 0; i < 10; i++) {
+             sent++;
+             Message message = session.createMessage();
+             message.setStringProperty("filter", "true");
+             producer.send(topic, message);
+         }
+
+         Thread.sleep(1 * 1000);
+
+         session.close();
+         con.close();
+
+         // consume messages
+         con = createConnection();
+         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter
= 'true'", true);
+         Listener listener = new Listener();
+         consumer.setMessageListener(listener);
+
+         Thread.sleep(3 * 1000);
+
+         session.close();
+         con.close();
+
+         assertEquals(sent, listener.count);
+     }
+
+     public void testOfflineSubscription3() throws Exception {
+         // create durable subscription
+         Connection con = createConnection();
+         Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+         session.close();
+         con.close();
+
+         // send messages
+         con = createConnection();
+         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = session.createProducer(null);
+
+         final int numMessages = 10;
+         int sent = 0;
+         for (int i = 0; i < numMessages; i++) {
+             sent++;
+             Message message = session.createMessage();
+             message.setStringProperty("filter", "true");
+             producer.send(topic, message);
+         }
+
+         Thread.sleep(1 * 1000);
+
+         session.close();
+         con.close();
+
+         // consume messages
+         con = createConnection();
+         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter
= 'true'", true);
+         Listener listener = new Listener();
+         consumer.setMessageListener(listener);
+
+         Thread.sleep(3 * 1000);
+
+         session.close();
+         con.close();
+
+         LOG.info("Consumed: " + listener.count);
+         assertEquals(numMessages, listener.count);
+
+         // consume messages again, should not get any
+         con = createConnection();
+         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+         listener = new Listener();
+         consumer.setMessageListener(listener);
+
+         Thread.sleep(3 * 1000);
+
+         session.close();
+         con.close();
+
+         assertEquals(0, listener.count);
+     }
+    
     public static class Listener implements MessageListener {
         int count = 0;
 



Mime
View raw message