activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1023704 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/store/kahadb/ test/java/org/apache/activemq/usecases/
Date Mon, 18 Oct 2010 09:27:43 GMT
Author: gtully
Date: Mon Oct 18 09:27:43 2010
New Revision: 1023704

URL: http://svn.apache.org/viewvc?rev=1023704&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2985 - the use of selectors means replay
and recovery from the begining of the store. unmatched are removed on initial dispatch

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=1023704&r1=1023703&r2=1023704&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
Mon Oct 18 09:27:43 2010
@@ -150,6 +150,7 @@ public class DurableTopicSubscription ex
     }
 
     public void deactivate(boolean keepDurableSubsActive) throws Exception {
+        LOG.debug("Dectivating " + this);
         active = false;
         this.usageManager.getMemoryUsage().removeUsageListener(this);
         synchronized (pending) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=1023704&r1=1023703&r2=1023704&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
Mon Oct 18 09:27:43 2010
@@ -727,8 +727,7 @@ public class KahaDBStore extends Message
                             // The subscription might not exist.
                             return 0;
                         }
-                        sd.orderIndex.resetCursorPosition();
-                        sd.orderIndex.setBatch(tx, cursorPos);
+
                         int counter = 0;
                         try {
                             String selector = info.getSelector();
@@ -736,6 +735,8 @@ public class KahaDBStore extends Message
                             if (selector != null) {
                                 selectorExpression = SelectorParser.parse(selector);
                             }
+                            sd.orderIndex.resetCursorPosition();
+                            sd.orderIndex.setBatch(tx, (selectorExpression != null? 0 : cursorPos));
                             for (Iterator<Entry<Long, MessageKeys>> iterator
= sd.orderIndex.iterator(tx); iterator
                                     .hasNext();) {
                                 Entry<Long, MessageKeys> entry = iterator.next();
@@ -764,28 +765,31 @@ public class KahaDBStore extends Message
         public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener
listener)
                 throws Exception {
             final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
-            indexLock.readLock().lock();
+            final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
+            indexLock.writeLock().lock();
             try {
                 pageFile.tx().execute(new Transaction.Closure<Exception>() {
                     public void execute(Transaction tx) throws Exception {
                         StoredDestination sd = getStoredDestination(dest, tx);
                         Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
-                        MessageOrderCursor moc = new MessageOrderCursor(cursorPos + 1);
-                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx,
moc); iterator
+                        sd.orderIndex.setBatch(tx, (info.getSelector() == null ? cursorPos
: 0));
+                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx);
iterator
                                 .hasNext();) {
                             Entry<Long, MessageKeys> entry = iterator.next();
                             listener.recoverMessage(loadMessage(entry.getValue().location));
                         }
+                        sd.orderIndex.resetCursorPosition();
                     }
                 });
             }finally {
-                indexLock.readLock().unlock();
+                indexLock.writeLock().unlock();
             }
         }
 
         public void recoverNextMessages(String clientId, String subscriptionName, final int
maxReturned,
                 final MessageRecoveryListener listener) throws Exception {
             final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
+            final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
             indexLock.writeLock().lock();
             try {
                 pageFile.tx().execute(new Transaction.Closure<Exception>() {
@@ -795,7 +799,7 @@ public class KahaDBStore extends Message
                         MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey);
                         if (moc == null) {
                             long pos = sd.subscriptionAcks.get(tx, subscriptionKey);
-                            sd.orderIndex.setBatch(tx, pos);
+                            sd.orderIndex.setBatch(tx, (info.getSelector() == null ? pos
: 0));
                             moc = sd.orderIndex.cursor;
                         } else {
                             sd.orderIndex.cursor.sync(moc);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1023704&r1=1023703&r2=1023704&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Mon Oct 18 09:27:43 2010
@@ -2035,7 +2035,7 @@ public class MessageDatabase extends Ser
                 BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException
{
             for (Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx);
iterator.hasNext();) {
                 Entry<Long, MessageKeys> entry = iterator.next();
-                if (entry.getKey().compareTo(sequenceId) <= 0) {
+                if (entry.getKey().compareTo(sequenceId) == 0) {
                     // We don't do the actually delete while we are
                     // iterating the BTree since
                     // iterating would fail.

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java?rev=1023704&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
Mon Oct 18 09:27:43 2010
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+
+import javax.jms.*;
+import java.io.File;
+
+public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupport {
+
+    private BrokerService broker;
+    private ActiveMQTopic topic;
+
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory("vm://" + getName());
+    }
+
+    @Override
+    protected Connection createConnection() throws Exception {
+        Connection con = super.createConnection();
+        con.setClientID("cliName");
+        con.start();
+        return con;
+    }
+
+    protected void setUp() throws Exception {
+        topic = (ActiveMQTopic) createDestination();
+        createBroker();
+        super.setUp();
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        destroyBroker();
+    }
+
+    private void createBroker() throws Exception {
+        broker = BrokerFactory.createBroker("broker:(vm://localhost)");
+        broker.setBrokerName(getName());
+        broker.setDeleteAllMessagesOnStartup(true);
+
+        broker.setPersistent(true);
+        KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
+        persistenceAdapter.setDirectory(new File("activemq-data-kaha/" + getName()));
+        broker.setPersistenceAdapter(persistenceAdapter);
+
+        broker.start();
+    }
+
+    private void destroyBroker() throws Exception {
+        if (broker != null)
+            broker.stop();
+    }
+
+    public void testOfflineSubscription() throws Exception {
+        // create durable subscription
+        Connection con = createConnection();
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        session.close();
+        con.close();
+
+        // send messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(null);
+
+        int sent = 0;
+        for (int i = 0; i < 10; i++) {
+            boolean filter = i % 2 == 1;
+            if (filter)
+                sent++;
+
+            Message message = session.createMessage();
+            message.setStringProperty("filter", filter ? "true" : "false");
+            producer.send(topic, message);
+        }
+
+        session.close();
+        con.close();
+
+        // consume messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter
= 'true'", true);
+        Listener listener = new Listener();
+        consumer.setMessageListener(listener);
+
+        Thread.sleep(3 * 1000);
+
+        session.close();
+        con.close();
+
+        assertEquals(sent, listener.count);
+    }
+
+    public static class Listener implements MessageListener {
+        int count = 0;
+
+        public void onMessage(Message message) {
+            count++;
+        }
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date



Mime
View raw message