activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [8/8] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6444
Date Wed, 28 Sep 2016 19:26:19 GMT
https://issues.apache.org/jira/browse/AMQ-6444

Ensure that unsettled TX messages remain acquired and not redelivered to
the receiver.   Add several tests that demonstrate that a received
message can be released, rejected, accepted or modified after a TX
rollback if it was not settled.
(cherry picked from commit 0dd806f43f3bee9372ee9b9481089d417c265dfe)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/aa32a0f7
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/aa32a0f7
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/aa32a0f7

Branch: refs/heads/activemq-5.14.x
Commit: aa32a0f7925c4981aca9a4369b5e95f3336cde94
Parents: aebb365
Author: Timothy Bish <tabish121@gmail.com>
Authored: Wed Sep 28 14:56:36 2016 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Wed Sep 28 15:08:49 2016 -0400

----------------------------------------------------------------------
 .../amqp/protocol/AmqpAbstractReceiver.java     |   5 +-
 .../transport/amqp/protocol/AmqpLink.java       |  11 +-
 .../transport/amqp/protocol/AmqpSender.java     |  64 +++----
 .../transport/amqp/protocol/AmqpSession.java    |  15 +-
 .../protocol/AmqpTransactionCoordinator.java    |  10 +-
 .../amqp/interop/AmqpTransactionTest.java       | 178 +++++++++++++++++--
 .../JMSMappingOutboundTransformerTest.java      |  10 +-
 7 files changed, 232 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/aa32a0f7/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java
index 7ed2f92..9ed465a 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.transport.amqp.protocol;
 
+import org.apache.activemq.command.LocalTransactionId;
 import org.apache.activemq.transport.amqp.AmqpProtocolException;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Receiver;
@@ -78,11 +79,11 @@ public abstract class AmqpAbstractReceiver extends AmqpAbstractLink<Receiver>
{
     }
 
     @Override
-    public void commit() throws Exception {
+    public void commit(LocalTransactionId txnId) throws Exception {
     }
 
     @Override
-    public void rollback() throws Exception {
+    public void rollback(LocalTransactionId txnId) throws Exception {
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/aa32a0f7/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpLink.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpLink.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpLink.java
index d245769..0c75f48 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpLink.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpLink.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.transport.amqp.protocol;
 
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.LocalTransactionId;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.engine.Delivery;
 
@@ -60,17 +61,23 @@ public interface AmqpLink extends AmqpResource {
      * Handle work necessary on commit of transacted resources associated with
      * this Link instance.
      *
+     * @param txnId
+     *      The Transaction ID being committed.
+     *
      * @throws Exception if an error occurs while performing the commit.
      */
-    void commit() throws Exception;
+    void commit(LocalTransactionId txnId) throws Exception;
 
     /**
      * Handle work necessary on rollback of transacted resources associated with
      * this Link instance.
      *
+     * @param txnId
+     *      The Transaction ID being rolled back.
+     *
      * @throws Exception if an error occurs while performing the rollback.
      */
-    void rollback() throws Exception;
+    void rollback(LocalTransactionId txnId) throws Exception;
 
     /**
      * @return the ActiveMQDestination that this link is servicing.

http://git-wip-us.apache.org/repos/asf/activemq/blob/aa32a0f7/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
index 2531c1a..149b2e8 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
@@ -78,7 +78,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
     private final OutboundTransformer outboundTransformer = new AutoOutboundTransformer();
     private final AmqpTransferTagGenerator tagCache = new AmqpTransferTagGenerator();
     private final LinkedList<MessageDispatch> outbound = new LinkedList<MessageDispatch>();
-    private final LinkedList<MessageDispatch> dispatchedInTx = new LinkedList<MessageDispatch>();
+    private final LinkedList<Delivery> dispatchedInTx = new LinkedList<Delivery>();
 
     private final ConsumerInfo consumerInfo;
     private AbstractSubscription subscription;
@@ -208,26 +208,26 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
         } else if (endpointCredit >= 0) {
 
             if (endpointCredit == 0 && currentCreditRequest != 0) {
-
                 prefetchExtension.set(0);
                 currentCreditRequest = 0;
                 logicalDeliveryCount = 0;
                 LOG.trace("Flow: credit 0 for sub:" + subscription);
-
             } else {
-
                 int deltaToAdd = endpointCredit;
                 int logicalCredit = currentCreditRequest - logicalDeliveryCount;
                 if (logicalCredit > 0) {
                     deltaToAdd -= logicalCredit;
                 } else {
-                    // reset delivery counter - dispatch from broker concurrent with credit=0
flow can go negative
+                    // reset delivery counter - dispatch from broker concurrent with credit=0
+                    // flow can go negative
                     logicalDeliveryCount = 0;
                 }
+
                 if (deltaToAdd > 0) {
                     currentCreditRequest = prefetchExtension.addAndGet(deltaToAdd);
                     subscription.wakeupDestinationsForDispatch();
-                    // force dispatch of matched/pending for topics (pending messages accumulate
in the sub and are dispatched on update of prefetch)
+                    // force dispatch of matched/pending for topics (pending messages accumulate
+                    // in the sub and are dispatched on update of prefetch)
                     subscription.setPrefetchSize(0);
                     LOG.trace("Flow: credit addition of {} for sub {}", deltaToAdd, subscription);
                 }
@@ -246,14 +246,20 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
             if (txState.getOutcome() != null) {
                 Outcome outcome = txState.getOutcome();
                 if (outcome instanceof Accepted) {
+                    TransactionId txId = new LocalTransactionId(session.getConnection().getConnectionId(),
toLong(txState.getTxnId()));
+
+                    // Store the message sent in this TX we might need to re-send on rollback
+                    // and we need to ACK it on commit.
+                    session.enlist(txId);
+                    dispatchedInTx.addFirst(delivery);
+
                     if (!delivery.remotelySettled()) {
                         TransactionalState txAccepted = new TransactionalState();
                         txAccepted.setOutcome(Accepted.getInstance());
-                        txAccepted.setTxnId(((TransactionalState) state).getTxnId());
+                        txAccepted.setTxnId(txState.getTxnId());
 
                         delivery.disposition(txAccepted);
                     }
-                    settle(delivery, MessageAck.DELIVERED_ACK_TYPE);
                 }
             }
         } else {
@@ -294,12 +300,14 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
     }
 
     @Override
-    public void commit() throws Exception {
+    public void commit(LocalTransactionId txnId) throws Exception {
         if (!dispatchedInTx.isEmpty()) {
-            for (MessageDispatch md : dispatchedInTx) {
-                MessageAck pendingTxAck = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE,
1);
-                pendingTxAck.setFirstMessageId(md.getMessage().getMessageId());
-                pendingTxAck.setTransactionId(md.getMessage().getTransactionId());
+            for (final Delivery delivery : dispatchedInTx) {
+                MessageDispatch dispatch = (MessageDispatch) delivery.getContext();
+
+                MessageAck pendingTxAck = new MessageAck(dispatch, MessageAck.INDIVIDUAL_ACK_TYPE,
1);
+                pendingTxAck.setFirstMessageId(dispatch.getMessage().getMessageId());
+                pendingTxAck.setTransactionId(txnId);
 
                 LOG.trace("Sending commit Ack to ActiveMQ: {}", pendingTxAck);
 
@@ -310,6 +318,8 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
                             Throwable exception = ((ExceptionResponse) response).getException();
                             exception.printStackTrace();
                             getEndpoint().close();
+                        } else {
+                            delivery.settle();
                         }
                         session.pumpProtonToSocket();
                     }
@@ -321,15 +331,22 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
     }
 
     @Override
-    public void rollback() throws Exception {
+    public void rollback(LocalTransactionId txnId) throws Exception {
         synchronized (outbound) {
 
             LOG.trace("Rolling back {} messages for redelivery. ", dispatchedInTx.size());
 
-            for (MessageDispatch dispatch : dispatchedInTx) {
-                dispatch.setRedeliveryCounter(dispatch.getRedeliveryCounter() + 1);
+            for (Delivery delivery : dispatchedInTx) {
+                // Only settled deliveries should be re-dispatched, unsettled deliveries
+                // remain acquired on the remote end and can be accepted again in a new
+                // TX or released or rejected etc.
+                MessageDispatch dispatch = (MessageDispatch) delivery.getContext();
                 dispatch.getMessage().setTransactionId(null);
-                outbound.addFirst(dispatch);
+
+                if (delivery.remotelySettled()) {
+                    dispatch.setRedeliveryCounter(dispatch.getRedeliveryCounter() + 1);
+                    outbound.addFirst(dispatch);
+                }
             }
 
             dispatchedInTx.clear();
@@ -507,19 +524,6 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
             ack.setMessageCount(1);
             ack.setAckType((byte) ackType);
             ack.setDestination(md.getDestination());
-
-            DeliveryState remoteState = delivery.getRemoteState();
-            if (remoteState != null && remoteState instanceof TransactionalState)
{
-                TransactionalState txState = (TransactionalState) remoteState;
-                TransactionId txId = new LocalTransactionId(session.getConnection().getConnectionId(),
toLong(txState.getTxnId()));
-                ack.setTransactionId(txId);
-
-                // Store the message sent in this TX we might need to re-send on rollback
-                session.enlist(txId);
-                md.getMessage().setTransactionId(txId);
-                dispatchedInTx.addFirst(md);
-            }
-
             LOG.trace("Sending Ack to ActiveMQ: {}", ack);
 
             sendToActiveMQ(ack, new ResponseHandler() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/aa32a0f7/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
index 4cb5f37..1c91962 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
@@ -35,6 +35,7 @@ import org.apache.activemq.command.ActiveMQTempDestination;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.ExceptionResponse;
+import org.apache.activemq.command.LocalTransactionId;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.RemoveInfo;
@@ -123,11 +124,14 @@ public class AmqpSession implements AmqpResource {
     /**
      * Commits all pending work for all resources managed under this session.
      *
+     * @param txId
+     *      The specific TransactionId that is being committed.
+     *
      * @throws Exception if an error occurs while attempting to commit work.
      */
-    public void commit() throws Exception {
+    public void commit(LocalTransactionId txId) throws Exception {
         for (AmqpSender consumer : consumers.values()) {
-            consumer.commit();
+            consumer.commit(txId);
         }
 
         enlisted = false;
@@ -136,11 +140,14 @@ public class AmqpSession implements AmqpResource {
     /**
      * Rolls back any pending work being down under this session.
      *
+     * @param txId
+     *      The specific TransactionId that is being rolled back.
+     *
      * @throws Exception if an error occurs while attempting to roll back work.
      */
-    public void rollback() throws Exception {
+    public void rollback(LocalTransactionId txId) throws Exception {
         for (AmqpSender consumer : consumers.values()) {
-            consumer.rollback();
+            consumer.rollback(txId);
         }
 
         enlisted = false;

http://git-wip-us.apache.org/repos/asf/activemq/blob/aa32a0f7/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java
index 40bcda5..95cd5e3 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java
@@ -98,7 +98,7 @@ public class AmqpTransactionCoordinator extends AmqpAbstractReceiver {
             TransactionInfo txInfo = new TransactionInfo(connectionId, txId, TransactionInfo.BEGIN);
             session.getConnection().registerTransaction(txId, this);
             sendToActiveMQ(txInfo, null);
-            LOG.trace("started transaction {}", txId.getValue());
+            LOG.trace("started transaction {}", txId);
 
             Declared declared = new Declared();
             declared.setTxnId(new Binary(toBytes(txId.getValue())));
@@ -110,18 +110,18 @@ public class AmqpTransactionCoordinator extends AmqpAbstractReceiver
{
             final byte operation;
 
             if (discharge.getFail()) {
-                LOG.trace("rollback transaction {}", txId.getValue());
+                LOG.trace("rollback transaction {}", txId);
                 operation = TransactionInfo.ROLLBACK;
             } else {
-                LOG.trace("commit transaction {}", txId.getValue());
+                LOG.trace("commit transaction {}", txId);
                 operation = TransactionInfo.COMMIT_ONE_PHASE;
             }
 
             for (AmqpSession txSession : txSessions) {
                 if (operation == TransactionInfo.ROLLBACK) {
-                    txSession.rollback();
+                    txSession.rollback(txId);
                 } else {
-                    txSession.commit();
+                    txSession.commit(txId);
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/aa32a0f7/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
index 0815f8a..f61cbc3 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
@@ -32,7 +32,11 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage;
 import org.apache.activemq.transport.amqp.client.AmqpReceiver;
 import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
-import org.junit.Ignore;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.Modified;
+import org.apache.qpid.proton.amqp.messaging.Outcome;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.messaging.Released;
 import org.junit.Test;
 
 /**
@@ -89,7 +93,6 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
 
         assertEquals(1, queue.getQueueSize());
 
-        sender.close();
         connection.close();
     }
 
@@ -114,7 +117,6 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
 
         assertEquals(0, queue.getQueueSize());
 
-        sender.close();
         connection.close();
     }
 
@@ -146,7 +148,6 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
 
         assertEquals(0, queue.getQueueSize());
 
-        sender.close();
         connection.close();
     }
 
@@ -194,7 +195,6 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
         connection.close();
     }
 
-
     @Test(timeout = 60000)
     public void testReceiveMessageWithRollback() throws Exception {
         AmqpClient client = createAmqpClient();
@@ -223,7 +223,6 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
 
         assertEquals(1, queue.getQueueSize());
 
-        sender.close();
         connection.close();
     }
 
@@ -421,6 +420,163 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
         assertEquals(0, queue.getQueueSize());
     }
 
+    @Test(timeout = 60000)
+    public void testAcceptedButNotSettledInTXRemainsAquiredCanBeAccepted() throws Exception
{
+        doTestAcceptedButNotSettledInTXRemainsAquired(Accepted.getInstance());
+    }
+
+    @Test(timeout = 60000)
+    public void testAcceptedButNotSettledInTXRemainsAquiredCanBeReleased() throws Exception
{
+        doTestAcceptedButNotSettledInTXRemainsAquired(Released.getInstance());
+    }
+
+    @Test(timeout = 60000)
+    public void testAcceptedButNotSettledInTXRemainsAquiredCanBeRejected() throws Exception
{
+        doTestAcceptedButNotSettledInTXRemainsAquired(new Rejected());
+    }
+
+    @Test(timeout = 60000)
+    public void testAcceptedButNotSettledInTXRemainsAquiredCanBeModifiedAsFailed() throws
Exception {
+        Modified outcome = new Modified();
+        outcome.setDeliveryFailed(true);
+        doTestAcceptedButNotSettledInTXRemainsAquired(outcome);
+    }
+
+    @Test(timeout = 60000)
+    public void testAcceptedButNotSettledInTXRemainsAquiredCanBeModifiedAsUndeliverable()
throws Exception {
+        Modified outcome = new Modified();
+        outcome.setDeliveryFailed(true);
+        outcome.setUndeliverableHere(true);
+        doTestAcceptedButNotSettledInTXRemainsAquired(outcome);
+    }
+
+    private void doTestAcceptedButNotSettledInTXRemainsAquired(Outcome outcome) throws Exception
{
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+
+        final QueueViewMBean queue = getProxyToQueue(getTestName());
+
+        AmqpMessage message = new AmqpMessage();
+        message.setText("Test-Message");
+        sender.send(message);
+
+        session.begin();
+
+        receiver.flow(10);
+        AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+        assertNotNull(received);
+        received.accept(false);
+
+        session.rollback();
+
+        // Message should remain acquired an not be redelivered.
+        assertEquals(1, queue.getQueueSize());
+        assertNull(receiver.receive(2, TimeUnit.SECONDS));
+
+        if (outcome instanceof Released || outcome instanceof Rejected) {
+            // Receiver should be able to release the still acquired message and the
+            // broker should redispatch it to the client again.
+            received.release();
+            received = receiver.receive(3, TimeUnit.SECONDS);
+            assertNotNull(received);
+            received.accept();
+            received = receiver.receive(2, TimeUnit.SECONDS);
+            assertNull(received);
+            assertEquals(0, queue.getQueueSize());
+        } else if (outcome instanceof Accepted) {
+            // Receiver should be able to accept the still acquired message and the
+            // broker should then mark it as consumed.
+            received.accept();
+            received = receiver.receive(2, TimeUnit.SECONDS);
+            assertNull(received);
+            assertEquals(0, queue.getQueueSize());
+        } else if (outcome instanceof Modified) {
+            // Depending on the undeliverable here state the message will either be
+            // redelivered or DLQ'd
+            Modified modified = (Modified) outcome;
+            received.modified(Boolean.TRUE.equals(modified.getDeliveryFailed()), Boolean.TRUE.equals(modified.getUndeliverableHere()));
+            if (Boolean.TRUE.equals(modified.getUndeliverableHere())) {
+                received = receiver.receive(2, TimeUnit.SECONDS);
+                assertNull(received);
+                assertEquals(0, queue.getQueueSize());
+            } else {
+                received = receiver.receive(3, TimeUnit.SECONDS);
+                assertNotNull(received);
+                received.accept();
+                received = receiver.receive(2, TimeUnit.SECONDS);
+                assertNull(received);
+                assertEquals(0, queue.getQueueSize());
+            }
+        }
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testTransactionallyAcquiredMessageCanBeTransactionallyConsumed() throws Exception
{
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+
+        final QueueViewMBean queue = getProxyToQueue(getTestName());
+
+        AmqpMessage message = new AmqpMessage();
+        message.setText("Test-Message");
+        sender.send(message);
+
+        session.begin();
+
+        receiver.flow(10);
+        AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+        assertNotNull(received);
+        received.accept(false);
+
+        session.rollback();
+
+        // Message should remain acquired an not be redelivered.
+        assertEquals(1, queue.getQueueSize());
+        assertNull(receiver.receive(1, TimeUnit.SECONDS));
+
+        // Consume under TX but settle this time
+        session.begin();
+        received.accept(false);
+        session.rollback();
+
+        // Should still be acquired
+        assertEquals(1, queue.getQueueSize());
+        assertNull(receiver.receive(1, TimeUnit.SECONDS));
+
+        // Consume under TX and settle but rollback, message should be redelivered.
+        session.begin();
+        received.accept();
+        session.rollback();
+
+        assertEquals(1, queue.getQueueSize());
+        received = receiver.receive(1, TimeUnit.SECONDS);
+        assertNotNull(received);
+
+        // Consume under TX and commit it this time.
+        session.begin();
+        received.accept(false);
+        session.commit();
+
+        // Check that it is now consumed and no more message available
+        assertTrue(received.getWrappedDelivery().remotelySettled());
+        assertEquals(0, queue.getQueueSize());
+        assertNull(receiver.receive(1, TimeUnit.SECONDS));
+
+        connection.close();
+    }
+
     //----- Tests Ported from AmqpNetLite client -----------------------------//
 
     @Test(timeout = 60000)
@@ -621,9 +777,6 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
         connection.close();
     }
 
-    // TODO - Direct ports of the AmqpNetLite client tests that don't currently with this
broker.
-
-    @Ignore("Fails due to no support for TX enrollment without settlement.")
     @Test(timeout = 60000)
     public void testReceiversCommitAndRollbackWithMultipleSessionsInSingleTXNoSettlement()
throws Exception {
         final int NUM_MESSAGES = 10;
@@ -701,7 +854,6 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
         connection.close();
     }
 
-    @Ignore("Fails due to no support for TX enrollment without settlement.")
     @Test(timeout = 60000)
     public void testCommitAndRollbackWithMultipleSessionsInSingleTXNoSettlement() throws
Exception {
         final int NUM_MESSAGES = 10;
@@ -756,12 +908,12 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
 
         message2.release();
 
-        // Should be two message available for dispatch given that we sent and committed
one, and
+        // Should be ten message available for dispatch given that we sent and committed
one, and
         // releases another we had previously received.
-        receiver.flow(2);
+        receiver.flow(10);
         for (int i = 1; i <= NUM_MESSAGES; ++i) {
             AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
-            assertNotNull(message);
+            assertNotNull("Expected a message for: " + i, message);
             assertEquals(i, message.getApplicationProperty("msgId"));
             message.accept();
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/aa32a0f7/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
index d0d31cc..ee69650 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
@@ -480,7 +480,7 @@ public class JMSMappingOutboundTransformerTest {
 
     @Test
     public void testConvertObjectMessageToAmqpMessageWithDataBody() throws Exception {
-        ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID());
+        ActiveMQObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE);
         outbound.onSend();
         outbound.storeContent();
 
@@ -502,7 +502,7 @@ public class JMSMappingOutboundTransformerTest {
 
     @Test
     public void testConvertObjectMessageToAmqpMessageUnknownEncodingGetsDataSection() throws
Exception {
-        ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID());
+        ActiveMQObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE);
         outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN);
         outbound.onSend();
         outbound.storeContent();
@@ -525,7 +525,7 @@ public class JMSMappingOutboundTransformerTest {
 
     @Test
     public void testConvertObjectMessageToAmqpMessageWithAmqpValueBody() throws Exception
{
-        ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID());
+        ActiveMQObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE);
         outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
         outbound.onSend();
         outbound.storeContent();
@@ -571,7 +571,7 @@ public class JMSMappingOutboundTransformerTest {
 
     @Test
     public void testConvertCompressedObjectMessageToAmqpMessageUnknownEncodingGetsDataSection()
throws Exception {
-        ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID(), true);
+        ActiveMQObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE, true);
         outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN);
         outbound.onSend();
         outbound.storeContent();
@@ -594,7 +594,7 @@ public class JMSMappingOutboundTransformerTest {
 
     @Test
     public void testConvertCompressedObjectMessageToAmqpMessageWithAmqpValueBody() throws
Exception {
-        ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID(), true);
+        ActiveMQObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE, true);
         outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
         outbound.onSend();
         outbound.storeContent();


Mime
View raw message