activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1084550 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/store/kahadaptor/ main/java/org/apache/activemq/store/kahadb/ main/java/org/apache/activemq/store/memory/ test/java/org/apache/activemq/usecases/
Date Wed, 23 Mar 2011 11:50:51 GMT
Author: gtully
Date: Wed Mar 23 11:50:50 2011
New Revision: 1084550

URL: http://svn.apache.org/viewvc?rev=1084550&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3238 - Topic-Messages not redelivered to durable
subscription after rollback and reconnect. resolve by making durable sub ack transaction aware,
issue existed across all stores

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQStoreDurableSubscriptionTest.java
  (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/KahaDBDurableSubscriptionTest.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransaction.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TxCommand.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionTestSupport.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransaction.java?rev=1084550&r1=1084549&r2=1084550&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransaction.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransaction.java
Wed Mar 23 11:50:50 2011
@@ -20,10 +20,13 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.BaseCommand;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
 import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.TopicMessageStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,6 +45,16 @@ class KahaTransaction {
         list.add(tx);
     }
 
+    public void add(KahaMessageStore destination, String clientId, String subscriptionName,
MessageId messageId, MessageAck ack) {
+        TxCommand tx = new TxCommand();
+        tx.setCommand(ack);
+        tx.setMessageStoreKey(destination.getId());
+        tx.setClientId(clientId);
+        tx.setSubName(subscriptionName);
+        tx.setMessageId(messageId);
+        list.add(tx);
+    }
+
     Message[] getMessages() {
         List<BaseCommand> result = new ArrayList<BaseCommand>();
         for (int i = 0; i < list.size(); i++) {
@@ -89,6 +102,9 @@ class KahaTransaction {
             MessageStore ms = transactionStore.getStoreById(command.getMessageStoreKey());
             if (command.isRemove()) {
                 ms.removeMessage(null, (MessageAck)command.getCommand());
+            } else if (command.isAck()) {
+                ((TopicMessageStore)ms).acknowledge(null, command.getClientId(), command.getSubscriptionName(),
+                        command.getMessageId(), (MessageAck)command.getCommand());
             }
         }
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java?rev=1084550&r1=1084549&r2=1084550&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
Wed Mar 23 11:50:50 2011
@@ -27,6 +27,7 @@ import org.apache.activemq.broker.Broker
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.XATransactionId;
 import org.apache.activemq.kaha.RuntimeStoreException;
@@ -84,6 +85,12 @@ public class KahaTransactionStore implem
             public void removeMessage(ConnectionContext context, final MessageAck ack) throws
IOException {
                 KahaTransactionStore.this.removeMessage(getDelegate(), ack);
             }
+
+            @Override
+            public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
+                            MessageId messageId, MessageAck ack) throws IOException {
+                KahaTransactionStore.this.acknowledge((TopicMessageStore)getDelegate(), clientId,
subscriptionName, messageId, ack);
+            }
         };
     }
 
@@ -98,10 +105,6 @@ public class KahaTransactionStore implem
         }
     }
 
-    /**
-     * @throws XAException
-     * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
-     */
     public void commit(TransactionId txid, boolean wasPrepared, Runnable before,Runnable
after) throws IOException {
         if(before != null) {
             before.run();
@@ -182,6 +185,23 @@ public class KahaTransactionStore implem
     	}
     }
 
+    final void acknowledge(final TopicMessageStore destination, String clientId,
+                           String subscriptionName, MessageId messageId, MessageAck ack)
throws IOException {
+        try {
+            if (ack.isInTransaction()) {
+                KahaTransaction tx = getOrCreateTx(ack.getTransactionId());
+                tx.add((KahaMessageStore)destination, clientId, subscriptionName, messageId,
ack);
+            } else {
+                destination.acknowledge(null, clientId, subscriptionName, messageId, ack);
+            }
+        } catch (RuntimeStoreException rse) {
+            if (rse.getCause() instanceof IOException) {
+                brokerService.handleIOException((IOException)rse.getCause());
+            }
+            throw rse;
+        }
+    }
+
     protected synchronized KahaTransaction getTx(TransactionId key) {
         KahaTransaction result = (KahaTransaction)transactions.get(key);
         if (result == null) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TxCommand.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TxCommand.java?rev=1084550&r1=1084549&r2=1084550&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TxCommand.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TxCommand.java
Wed Mar 23 11:50:50 2011
@@ -18,6 +18,7 @@ package org.apache.activemq.store.kahada
 
 import org.apache.activemq.command.BaseCommand;
 import org.apache.activemq.command.CommandTypes;
+import org.apache.activemq.command.MessageId;
 
 /**
  * Base class for messages/acknowledgements for a transaction
@@ -27,6 +28,9 @@ import org.apache.activemq.command.Comma
 class TxCommand {
     protected Object messageStoreKey;
     protected BaseCommand command;
+    private String clientId;
+    private String subscriptionName;
+    private MessageId messageId;
 
     /**
      * @return Returns the messageStoreKey.
@@ -67,7 +71,34 @@ class TxCommand {
      * @return true if a MessageAck command
      */
     public boolean isRemove() {
-        return command != null && command.getDataStructureType() == CommandTypes.MESSAGE_ACK;
+        return command != null && command.getDataStructureType() == CommandTypes.MESSAGE_ACK
&& subscriptionName == null;
     }
 
+    public boolean isAck() {
+        return command != null && command.getDataStructureType() == CommandTypes.MESSAGE_ACK
&& subscriptionName != null;
+    }
+
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+
+    public void setSubName(String subscriptionName) {
+        this.subscriptionName = subscriptionName;
+    }
+
+    public void setMessageId(MessageId messageId) {
+        this.messageId = messageId;
+    }
+
+    public String getClientId() {
+        return clientId;
+    }
+
+    public String getSubscriptionName() {
+        return subscriptionName;
+    }
+
+    public MessageId getMessageId() {
+        return messageId;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=1084550&r1=1084549&r2=1084550&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
Wed Mar 23 11:50:50 2011
@@ -642,6 +642,7 @@ public class KahaDBStore extends Message
             command.setDestination(dest);
             command.setSubscriptionKey(subscriptionKey);
             command.setMessageId(messageId.toString());
+            command.setTransactionInfo(createTransactionInfo(ack.getTransactionId()));
             if (ack != null && ack.isUnmatchedAck()) {
                 command.setAck(UNMATCHED);
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java?rev=1084550&r1=1084549&r2=1084550&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
Wed Mar 23 11:50:50 2011
@@ -30,6 +30,7 @@ import javax.transaction.xa.XAException;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.XATransactionId;
 import org.apache.activemq.openwire.OpenWireFormat;
@@ -194,6 +195,14 @@ public class KahaDBTransactionStore impl
             public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws
IOException {
                 KahaDBTransactionStore.this.removeAsyncMessage(context, getDelegate(), ack);
             }
+
+            @Override
+            public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
+                            MessageId messageId, MessageAck ack) throws IOException {
+                KahaDBTransactionStore.this.acknowledge(context, (TopicMessageStore)getDelegate(),
clientId,
+                        subscriptionName, messageId, ack);
+            }
+
         };
     }
 
@@ -216,9 +225,6 @@ public class KahaDBTransactionStore impl
         return tx;
     }
 
-    /**
-     * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
-     */
     public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable
postCommit)
             throws IOException {
         if (txid != null) {
@@ -458,6 +464,31 @@ public class KahaDBTransactionStore impl
         }
     }
 
+    final void acknowledge(ConnectionContext context, final TopicMessageStore destination,
final String clientId, final String subscriptionName,
+                           final MessageId messageId, final MessageAck ack) throws IOException
{
+
+        if (ack.isInTransaction()) {
+            if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==
false) {
+                destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
+            } else {
+                Tx tx = getTx(ack.getTransactionId());
+                tx.add(new RemoveMessageCommand(context) {
+                    public MessageAck getMessageAck() {
+                        return ack;
+                    }
+
+                    public Future<Object> run(ConnectionContext ctx) throws IOException
{
+                        destination.acknowledge(ctx, clientId, subscriptionName, messageId,
ack);
+                        return AbstractMessageStore.FUTURE;
+                    }
+                });
+            }
+        } else {
+            destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
+        }
+    }
+
+
     private KahaTransactionInfo getTransactionInfo(TransactionId txid) {
         return theStore.createTransactionInfo(txid);
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java?rev=1084550&r1=1084549&r2=1084550&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
Wed Mar 23 11:50:50 2011
@@ -25,6 +25,7 @@ import javax.transaction.xa.XAException;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.XATransactionId;
 import org.apache.activemq.store.AbstractMessageStore;
@@ -173,6 +174,13 @@ public class MemoryTransactionStore impl
             public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws
IOException {
                 MemoryTransactionStore.this.removeMessage(getDelegate(), ack);       
             }
+
+            @Override
+            public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
+                            MessageId messageId, MessageAck ack) throws IOException {
+                MemoryTransactionStore.this.acknowledge((TopicMessageStore)getDelegate(),
clientId,
+                        subscriptionName, messageId, ack);
+            }
         };
     }
 
@@ -196,10 +204,6 @@ public class MemoryTransactionStore impl
         return tx;
     }
 
-    /**
-     * @throws XAException
-     * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
-     */
     public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable
postCommit) throws IOException {
         if (preCommit != null) {
             preCommit.run();
@@ -307,6 +311,29 @@ public class MemoryTransactionStore impl
         }
     }
 
+    final void acknowledge(final TopicMessageStore destination, final String clientId, final
String subscriptionName,
+                           final MessageId messageId, final MessageAck ack) throws IOException
{
+        if (doingRecover) {
+            return;
+        }
+
+        if (ack.isInTransaction()) {
+            Tx tx = getTx(ack.getTransactionId());
+            tx.add(new RemoveMessageCommand() {
+                public MessageAck getMessageAck() {
+                    return ack;
+                }
+
+                public void run(ConnectionContext ctx) throws IOException {
+                    destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack);
+                }
+            });
+        } else {
+            destination.acknowledge(null, clientId, subscriptionName, messageId, ack);
+        }
+    }
+
+
     public void delete() {
         inflightTransactions.clear();
         preparedTransactions.clear();

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQStoreDurableSubscriptionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQStoreDurableSubscriptionTest.java?rev=1084550&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQStoreDurableSubscriptionTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQStoreDurableSubscriptionTest.java
Wed Mar 23 11:50:50 2011
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import java.io.File;
+import java.io.IOException;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.amq.AMQPersistenceAdapter;
+
+public class AMQStoreDurableSubscriptionTest extends DurableSubscriptionTestSupport {
+
+    protected PersistenceAdapter createPersistenceAdapter() throws IOException {
+        File dataDir = new File("target/test-data/durableAmq");
+        AMQPersistenceAdapter adapter = new AMQPersistenceAdapter();
+        adapter.setDirectory(dataDir);
+        return adapter;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQStoreDurableSubscriptionTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQStoreDurableSubscriptionTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionTestSupport.java?rev=1084550&r1=1084549&r2=1084550&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionTestSupport.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionTestSupport.java
Wed Mar 23 11:50:50 2011
@@ -280,6 +280,38 @@ public abstract class DurableSubscriptio
         assertNull(consumer.receive(5000));
     }
 
+    public void testDurableSubscriptionRollbackRedeliver() throws Exception {
+
+        // Create the durable sub.
+        connection.start();
+
+        session = connection.createSession(true, javax.jms.Session.SESSION_TRANSACTED);
+        Topic topic = session.createTopic("TestTopic");
+        consumer = session.createDurableSubscriber(topic, "sub1");
+
+        Session producerSession = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+        producer = producerSession.createProducer(topic);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+        producer.send(session.createTextMessage("Msg:1"));
+
+        // receive and rollback
+        assertTextMessageEquals("Msg:1", consumer.receive(5000));
+        session.rollback();
+        consumer.close();
+        session.close();
+
+        session = connection.createSession(true, javax.jms.Session.SESSION_TRANSACTED);
+
+        // Ensure that consumer will receive messages sent and rolled back
+        consumer = session.createDurableSubscriber(topic, "sub1");
+
+        assertTextMessageEquals("Msg:1", consumer.receive(5000));
+        session.commit();
+
+        assertNull(consumer.receive(5000));
+    }
+
     public void xtestInactiveDurableSubscriptionOneConnection() throws Exception {
         session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
         Topic topic = session.createTopic("TestTopic");

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/KahaDBDurableSubscriptionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/KahaDBDurableSubscriptionTest.java?rev=1084550&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/KahaDBDurableSubscriptionTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/KahaDBDurableSubscriptionTest.java
Wed Mar 23 11:50:50 2011
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import java.io.IOException;
+import org.apache.activemq.store.PersistenceAdapter;
+
+public class KahaDBDurableSubscriptionTest extends DurableSubscriptionTestSupport {
+
+    protected PersistenceAdapter createPersistenceAdapter() throws IOException {
+        return null; // use default
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/KahaDBDurableSubscriptionTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/KahaDBDurableSubscriptionTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date



Mime
View raw message