activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject activemq git commit: [AMQ-6906] jdbc store; ensure rolled back prepared ack messages are recovered asap
Date Wed, 28 Feb 2018 13:31:33 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 703b7b151 -> efaf9cd77


[AMQ-6906] jdbc store; ensure rolled back prepared ack messages are recovered asap


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/efaf9cd7
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/efaf9cd7
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/efaf9cd7

Branch: refs/heads/master
Commit: efaf9cd77e35e1f811708a7a38bab348b003eeef
Parents: 703b7b1
Author: gtully <gary.tully@gmail.com>
Authored: Wed Feb 28 13:31:12 2018 +0000
Committer: gtully <gary.tully@gmail.com>
Committed: Wed Feb 28 13:31:12 2018 +0000

----------------------------------------------------------------------
 .../region/cursors/AbstractStoreCursor.java     |   9 +
 .../activemq/store/jdbc/JDBCMessageStore.java   |  43 +++++
 .../store/jdbc/JDBCPersistenceAdapter.java      |   1 +
 .../store/jdbc/JdbcMemoryTransactionStore.java  |  34 +++-
 .../activemq/broker/BrokerTestSupport.java      |   2 +-
 .../broker/JdbcXARecoveryBrokerNoCacheTest.java |  46 ++++++
 .../activemq/broker/XARecoveryBrokerTest.java   | 164 ++++++++++++++++++-
 7 files changed, 292 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/efaf9cd7/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
index aef7528..44b0b27 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
@@ -80,6 +80,15 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor
i
     @Override
     public void rebase() {
         resetSize();
+        MessageId lastAdded = lastCachedIds[SYNC_ADD];
+        if (lastAdded != null) {
+            try {
+                setBatch(lastAdded);
+            } catch (Exception e) {
+                LOG.error("{} - Failed to set batch on rebase", this, e);
+                throw new RuntimeException(e);
+            }
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/efaf9cd7/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
index 29db04c..75c0751 100644
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
@@ -35,8 +35,11 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.LinkedList;
+import java.util.Map;
+import java.util.TreeMap;
 
 /**
  *
@@ -68,6 +71,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
     protected final JDBCPersistenceAdapter persistenceAdapter;
     protected ActiveMQMessageAudit audit;
     protected final LinkedList<Long> pendingAdditions = new LinkedList<Long>();
+    protected final TreeMap<Long, Message> rolledBackAcks = new TreeMap<Long, Message>();
     final long[] perPriorityLastRecovered = new long[10];
 
     public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter,
WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) throws
IOException {
@@ -355,6 +359,9 @@ public class JDBCMessageStore extends AbstractMessageStore {
             if (LOG.isTraceEnabled()) {
                 LOG.trace(this + " recoverNext lastRecovered:" + Arrays.toString(perPriorityLastRecovered)
+ ", minPending:" + minPendingSequeunceId());
             }
+
+            maxReturned -= recoverRolledBackAcks(maxReturned, listener);
+
             adapter.doRecoverNextMessages(c, destination, perPriorityLastRecovered, minPendingSequeunceId(),
                     maxReturned, isPrioritizedMessages(), new JDBCMessageRecoveryListener()
{
 
@@ -363,6 +370,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
                         Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
                         msg.getMessageId().setBrokerSequenceId(sequenceId);
                         msg.getMessageId().setFutureOrSequenceLong(sequenceId);
+                        msg.getMessageId().setEntryLocator(sequenceId);
                         listener.recoverMessage(msg);
                         trackLastRecovered(sequenceId, msg.getPriority());
                         return true;
@@ -386,6 +394,41 @@ public class JDBCMessageStore extends AbstractMessageStore {
 
     }
 
+    public void trackRollbackAck(Message message) {
+        synchronized (rolledBackAcks) {
+            rolledBackAcks.put((Long)message.getMessageId().getEntryLocator(), message);
+        }
+    }
+
+    private int recoverRolledBackAcks(int max, MessageRecoveryListener listener) throws Exception
{
+        int recovered = 0;
+        ArrayList<Long> toRemove = new ArrayList<Long>();
+        synchronized (rolledBackAcks) {
+            if (!rolledBackAcks.isEmpty()) {
+                for ( Map.Entry<Long,Message> candidate : rolledBackAcks.entrySet())
{
+                    if (candidate.getKey() <= lastRecovered(candidate.getValue().getPriority()))
{
+                        listener.recoverMessage(candidate.getValue());
+                        recovered++;
+                        toRemove.add(candidate.getKey());
+                        if (recovered == max) {
+                            break;
+                        }
+                    } else {
+                        toRemove.add(candidate.getKey());
+                    }
+                }
+                for (Long key : toRemove) {
+                    rolledBackAcks.remove(key);
+                }
+            }
+        }
+        return recovered;
+    }
+
+    private long lastRecovered(int priority) {
+        return perPriorityLastRecovered[isPrioritizedMessages() ? priority : 0];
+    }
+
     private void trackLastRecovered(long sequenceId, int priority) {
         perPriorityLastRecovered[isPrioritizedMessages() ? priority : 0] = sequenceId;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/efaf9cd7/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
index a6ad870..20430b3 100644
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
@@ -781,6 +781,7 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
         try {
             long sequence = (Long)messageId.getEntryLocator();
             getAdapter().doCommitAddOp(c, preparedSequenceId, sequence);
+            messageId.setEntryLocator(preparedSequenceId);
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
             throw IOExceptionSupport.create("Failed to commit add: " + messageId + ". Reason:
" + e, e);

http://git-wip-us.apache.org/repos/asf/activemq/blob/efaf9cd7/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
index 7f42c7f..40bee61 100644
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
@@ -20,7 +20,13 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
+
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.TransactionBroker;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
@@ -34,6 +40,8 @@ import org.apache.activemq.store.ProxyTopicMessageStore;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.store.TransactionRecoveryListener;
 import org.apache.activemq.store.memory.MemoryTransactionStore;
+import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.transaction.Transaction;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.DataByteArrayInputStream;
 
@@ -179,6 +187,10 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore
{
                             MessageId messageId = removeMessageCommand.getMessageAck().getLastMessageId();
                             // need to unset the txid flag on the existing row
                             ((JDBCPersistenceAdapter) persistenceAdapter).commitAdd(ctx,
messageId, (Long)messageId.getEntryLocator());
+
+                            if (removeMessageCommand instanceof RecoveredRemoveMessageCommand)
{
+                                ((JDBCMessageStore) removeMessageCommand.getMessageStore()).trackRollbackAck(((RecoveredRemoveMessageCommand)
removeMessageCommand).getMessage());
+                            }
                         }
                     }
                 } catch (IOException e) {
@@ -205,12 +217,13 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore
{
     }
 
     public void recoverAck(long id, byte[] xid, byte[] message) throws IOException {
-        Message msg = (Message) ((JDBCPersistenceAdapter)persistenceAdapter).getWireFormat().unmarshal(new
ByteSequence(message));
+        final Message msg = (Message) ((JDBCPersistenceAdapter)persistenceAdapter).getWireFormat().unmarshal(new
ByteSequence(message));
         msg.getMessageId().setFutureOrSequenceLong(id);
         msg.getMessageId().setEntryLocator(id);
         Tx tx = getPreparedTx(new XATransactionId(xid));
         final MessageAck ack = new MessageAck(msg, MessageAck.STANDARD_ACK_TYPE, 1);
-        tx.add(new RemoveMessageCommand() {
+        tx.add(new RecoveredRemoveMessageCommand() {
+            MessageStore messageStore = null;
             @Override
             public MessageAck getMessageAck() {
                 return ack;
@@ -221,13 +234,27 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore
{
                 ((JDBCPersistenceAdapter)persistenceAdapter).commitRemove(context, ack);
             }
 
+            public Message getMessage() {
+                return msg;
+            }
+
+            @Override
+            public void setMessageStore(MessageStore messageStore) {
+                this.messageStore = messageStore;
+            }
+
             @Override
             public MessageStore getMessageStore() {
-                return null;
+                return messageStore;
             }
 
         });
+    }
+
+    interface RecoveredRemoveMessageCommand extends RemoveMessageCommand {
+        Message getMessage();
 
+        void setMessageStore(MessageStore messageStore);
     }
 
     interface LastAckCommand extends RemoveMessageCommand {
@@ -328,6 +355,7 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore
{
                 // but the sql is non portable to match BLOB with LIKE etc
                 // so we make up for it when we recover the ack
                 ((JDBCPersistenceAdapter)persistenceAdapter).getBrokerService().getRegionBroker().getDestinationMap().get(removeMessageCommand.getMessageAck().getDestination()).getDestinationStatistics().getMessages().increment();
+                ((RecoveredRemoveMessageCommand)removeMessageCommand).setMessageStore(queueStores.get(removeMessageCommand.getMessageAck().getDestination()));
             }
         }
         for (AddMessageCommand addMessageCommand : tx.messages) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/efaf9cd7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTestSupport.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTestSupport.java
index 4d2250f..d84ff27 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTestSupport.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTestSupport.java
@@ -340,7 +340,7 @@ public class BrokerTestSupport extends CombinationTestSupport {
                 return;
             }
             if (o instanceof MessageDispatch && ((MessageDispatch)o).getMessage()
!= null) {
-                fail("Received a message: "+((MessageDispatch)o).getMessage().getMessageId());
+                fail("Received a message: "+((MessageDispatch)o).getMessage().getMessageId()
+ " for: " + ((MessageDispatch)o).getMessage().getDestination().getPhysicalName());
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/efaf9cd7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/JdbcXARecoveryBrokerNoCacheTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/JdbcXARecoveryBrokerNoCacheTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/JdbcXARecoveryBrokerNoCacheTest.java
new file mode 100644
index 0000000..ff953ff
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/JdbcXARecoveryBrokerNoCacheTest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import junit.framework.Test;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+
+public class JdbcXARecoveryBrokerNoCacheTest extends JdbcXARecoveryBrokerTest {
+
+    @Override
+    protected PolicyEntry getDefaultPolicy() {
+        PolicyEntry policyEntry = super.getDefaultPolicy();
+        policyEntry.setUseCache(false);
+        policyEntry.setMaxPageSize(5);
+        return policyEntry;
+    }
+
+    public static Test suite() {
+        return suite(JdbcXARecoveryBrokerNoCacheTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+    @Override
+    protected ActiveMQDestination createDestination() {
+        return new ActiveMQQueue("testNoCache");
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/efaf9cd7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
index 9660ef0..f718803 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
@@ -859,6 +859,20 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
         message = receiveMessage(connection);
         assertNull(message);
         assertNoMessagesLeft(connection);
+        connection.request(consumerInfo.createRemoveCommand());
+
+        LOG.info("Send some more before the rollback");
+        // send some more messages
+        producerInfo = createProducerInfo(sessionInfo);
+        connection.send(producerInfo);
+
+        for (int i = 0; i < numMessages*2; i++) {
+            message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            connection.send(message);
+        }
+
+        LOG.info("Send some more before the rollback");
 
         // rollback so we get redelivery
         connection.request(createRollbackTransaction(connectionInfo, txid));
@@ -867,26 +881,69 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
         txid = createXATransaction(sessionInfo);
         connection.send(createBeginTransaction(connectionInfo, txid));
 
+        Set<ConsumerInfo> consumerInfoSet = new HashSet<ConsumerInfo>();
         for (ActiveMQDestination dest : destinationList(destination)) {
             // Setup the consumer and receive the message.
             consumerInfo = createConsumerInfo(sessionInfo, dest);
             connection.send(consumerInfo);
+            consumerInfoSet.add(consumerInfo);
+            LOG.info("consume messages for: " + dest.getPhysicalName() + " " + consumerInfo.getConsumerId());
 
             for (int i = 0; i < numMessages; i++) {
                 message = receiveMessage(connection);
                 assertNotNull("unexpected null on:" + i, message);
+                LOG.info(dest.getPhysicalName()  + " ID: " + message.getMessageId());
             }
             MessageAck ack = createAck(consumerInfo, message, numMessages, MessageAck.STANDARD_ACK_TYPE);
             ack.setTransactionId(txid);
-            connection.send(ack);
+            connection.request(ack);
+
+            // clear any pending messages on the stub connection via prefetch
+            while ((message = receiveMessage(connection)) != null) {
+                LOG.info("Pre fetched and unwanted: " + message.getMessageId() + " on " +
message.getDestination().getPhysicalName());
+            }
         }
 
+        LOG.info("commit..");
         // Commit
         connection.request(createCommitTransaction1Phase(connectionInfo, txid));
 
+        // remove consumers 'after' commit b/c of inflight tally issue
+        for (ConsumerInfo info : consumerInfoSet) {
+            connection.request(info.createRemoveCommand());
+        }
+        consumerInfoSet.clear();
+
         // validate recovery complete
         dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo);
         assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length);
+
+        LOG.info("consume additional messages");
+
+        // clear any pending messages on the stub connection via prefetch
+        while ((message = receiveMessage(connection)) != null) {
+            LOG.info("Pre fetched and unwanted: " + message.getMessageId() + " on " + message.getDestination().getPhysicalName());
+        }
+        // consume the additional messages
+        for (ActiveMQDestination dest : destinationList(destination)) {
+
+            // Setup the consumer and receive the message.
+            consumerInfo = createConsumerInfo(sessionInfo, dest);
+            connection.request(consumerInfo);
+
+            LOG.info("consume additional messages for: " + dest.getPhysicalName() + " " +
consumerInfo.getConsumerId());
+
+            for (int i = 0; i < numMessages*2; i++) {
+                message = receiveMessage(connection);
+                assertNotNull("unexpected null on:" + i, message);
+                LOG.info(dest.getPhysicalName()  + " ID: " + message.getMessageId());
+                MessageAck ack = createAck(consumerInfo, message, 1, MessageAck.STANDARD_ACK_TYPE);
+                connection.request(ack);
+            }
+            connection.request(consumerInfo.createRemoveCommand());
+        }
+
+        assertNoMessagesLeft(connection);
     }
 
     public void testQueuePersistentPreparedAcksAvailableAfterRollbackPrefetchOne() throws
Exception {
@@ -909,8 +966,6 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
             connection.send(message);
         }
 
-        final int messageCount = expectedMessageCount(numMessages, destination);
-
         // Begin the transaction.
         XATransactionId txid = createXATransaction(sessionInfo);
         connection.send(createBeginTransaction(connectionInfo, txid));
@@ -993,6 +1048,109 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport
{
         assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length);
     }
 
+
+    public void testQueuePersistentPreparedAcksAvailableAfterRollback() throws Exception
{
+
+        ActiveMQDestination destination = createDestination();
+
+        // Setup the producer and send the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+
+        int numMessages = 4;
+        for (int i = 0; i < numMessages; i++) {
+            Message message = createMessage(producerInfo, destination);
+            connection.send(message);
+        }
+
+        // Begin the transaction.
+        XATransactionId txid = createXATransaction(sessionInfo);
+        connection.send(createBeginTransaction(connectionInfo, txid));
+
+        // use consumer per destination for the composite dest case
+        // bc the same composite dest is used for sending so there
+        // will be duplicate message ids in the mix which a single
+        // consumer (PrefetchSubscription) cannot handle in a tx
+        // atm. The matching is based on messageId rather than messageId
+        // and destination
+        Set<ConsumerInfo> consumerInfos = new HashSet<ConsumerInfo>();
+        for (ActiveMQDestination dest : destinationList(destination)) {
+            ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, dest);
+            consumerInfos.add(consumerInfo);
+        }
+
+        for (ConsumerInfo info : consumerInfos) {
+            connection.send(info);
+        }
+
+        Message message = null;
+        for (ConsumerInfo info : consumerInfos) {
+            for (int i = 0; i < numMessages; i++) {
+                message = receiveMessage(connection);
+                assertNotNull(message);
+                connection.send(createAck(info, message, 1, MessageAck.DELIVERED_ACK_TYPE));
+            }
+            MessageAck ack = createAck(info, message, numMessages, MessageAck.STANDARD_ACK_TYPE);
+            ack.setTransactionId(txid);
+            connection.send(ack);
+        }
+        connection.request(createPrepareTransaction(connectionInfo, txid));
+
+        // reconnect
+        connection.send(connectionInfo.createRemoveCommand());
+        connection = createConnection();
+        connection.send(connectionInfo);
+
+        // validate recovery
+        TransactionInfo recoverInfo = new TransactionInfo(connectionInfo.getConnectionId(),
null, TransactionInfo.RECOVER);
+        DataArrayResponse dataArrayResponse = (DataArrayResponse) connection.request(recoverInfo);
+
+        assertEquals("there is a prepared tx", 1, dataArrayResponse.getData().length);
+        assertEquals("it matches", txid, dataArrayResponse.getData()[0]);
+
+        connection.send(sessionInfo);
+
+        LOG.info("add consumers..");
+        for (ConsumerInfo info : consumerInfos) {
+            connection.send(info);
+        }
+
+        // no redelivery, exactly once semantics while prepared
+        message = receiveMessage(connection);
+        assertNull(message);
+        assertNoMessagesLeft(connection);
+
+        // rollback so we get redelivery
+        connection.request(createRollbackTransaction(connectionInfo, txid));
+
+        LOG.info("new tx for redelivery");
+        txid = createXATransaction(sessionInfo);
+        connection.send(createBeginTransaction(connectionInfo, txid));
+
+        for (ConsumerInfo info : consumerInfos) {
+            for (int i = 0; i < numMessages; i++) {
+                message = receiveMessage(connection);
+                assertNotNull("unexpected null on:" + i, message);
+                LOG.info("REC " + message.getMessageId());
+                MessageAck ack = createAck(info, message, 1, MessageAck.STANDARD_ACK_TYPE);
+                ack.setTransactionId(txid);
+                connection.send(ack);
+            }
+        }
+
+        // Commit
+        connection.request(createCommitTransaction1Phase(connectionInfo, txid));
+
+        // validate recovery complete
+        dataArrayResponse = (DataArrayResponse) connection.request(recoverInfo);
+        assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length);
+    }
+
     public void initCombosForTestTopicPersistentPreparedAcksAvailableAfterRestartAndRollback()
{
         addCombinationValues("prioritySupport", new Boolean[]{Boolean.FALSE, Boolean.TRUE});
     }


Mime
View raw message