activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [11/14] activemq git commit: NO-JIRA: Add some more variants of the .NET transaction tests
Date Tue, 27 Sep 2016 16:46:23 GMT
NO-JIRA: Add some more variants of the .NET transaction tests

Adds ability to not settle accepted messages on the client to enable
creation of tests that are equivalent to the AmqpNetLite client's
transaction tests which hold settlement and expect the resource to
handle it on successful discharge.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/0bb76c7f
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/0bb76c7f
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/0bb76c7f

Branch: refs/heads/activemq-5.14.x
Commit: 0bb76c7fb42d49c50e69265e1c97c463f5fdbc58
Parents: 9211661
Author: Timothy Bish <tabish121@gmail.com>
Authored: Mon Sep 19 17:36:58 2016 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Tue Sep 27 12:15:40 2016 -0400

----------------------------------------------------------------------
 .../transport/amqp/client/AmqpMessage.java      |  31 +++-
 .../transport/amqp/client/AmqpReceiver.java     |  44 +++++-
 .../amqp/interop/AmqpTransactionTest.java       | 153 +++++++++++++++++++
 3 files changed, 221 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/0bb76c7f/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index 8b378e1..2b1b874 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -132,15 +132,28 @@ public class AmqpMessage {
      * @throws Exception if an error occurs during the accept.
      */
     public void accept() throws Exception {
+        accept(true);
+    }
+
+    /**
+     * Accepts the message marking it as consumed on the remote peer.
+     *
+     * @param settle
+     *      true if the client should also settle the delivery when sending the accept.
+     *
+     * @throws Exception if an error occurs during the accept.
+     */
+    public void accept(boolean settle) throws Exception {
         if (receiver == null) {
             throw new IllegalStateException("Can't accept non-received message.");
         }
 
-        receiver.accept(delivery);
+        receiver.accept(delivery, settle);
     }
 
     /**
-     * Accepts the message marking it as consumed on the remote peer.
+     * Accepts the message marking it as consumed on the remote peer.  This method
+     * will automatically settle the accepted delivery.
      *
      * @param session
      *      The session that is used to manage acceptance of the message.
@@ -148,11 +161,23 @@ public class AmqpMessage {
      * @throws Exception if an error occurs during the accept.
      */
     public void accept(AmqpSession txnSession) throws Exception {
+        accept(txnSession, true);
+    }
+
+    /**
+     * Accepts the message marking it as consumed on the remote peer.
+     *
+     * @param session
+     *      The session that is used to manage acceptance of the message.
+     *
+     * @throws Exception if an error occurs during the accept.
+     */
+    public void accept(AmqpSession txnSession, boolean settle) throws Exception {
         if (receiver == null) {
             throw new IllegalStateException("Can't accept non-received message.");
         }
 
-        receiver.accept(delivery, txnSession);
+        receiver.accept(delivery, txnSession, settle);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/activemq/blob/0bb76c7f/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
index 999e033..3543ae3 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
@@ -414,20 +414,34 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver>
{
     }
 
     /**
-     * Accepts a message that was dispatched under the given Delivery instance.
+     * Accepts a message that was dispatched under the given Delivery instance and settles
the delivery.
      *
      * @param delivery
      *        the Delivery instance to accept.
      *
      * @throws IOException if an error occurs while sending the accept.
      */
-    public void accept(final Delivery delivery) throws IOException {
-        accept(delivery, this.session);
+    public void accept(Delivery delivery) throws IOException {
+        accept(delivery, this.session, true);
     }
 
     /**
      * Accepts a message that was dispatched under the given Delivery instance.
      *
+     * @param delivery
+     *        the Delivery instance to accept.
+     * @param settle
+     *        true if the receiver should settle the delivery or just send the disposition.
+     *
+     * @throws IOException if an error occurs while sending the accept.
+     */
+    public void accept(Delivery delivery, boolean settle) throws IOException {
+        accept(delivery, this.session, settle);
+    }
+
+    /**
+     * Accepts a message that was dispatched under the given Delivery instance and settles
the delivery.
+     *
      * This method allows for the session that is used in the accept to be specified by the
      * caller.  This allows for an accepted message to be involved in a transaction that
is
      * being managed by some other session other than the one that created this receiver.
@@ -440,6 +454,26 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver>
{
      * @throws IOException if an error occurs while sending the accept.
      */
     public void accept(final Delivery delivery, final AmqpSession session) throws IOException
{
+        accept(delivery, session, true);
+    }
+
+    /**
+     * Accepts a message that was dispatched under the given Delivery instance.
+     *
+     * This method allows for the session that is used in the accept to be specified by the
+     * caller.  This allows for an accepted message to be involved in a transaction that
is
+     * being managed by some other session other than the one that created this receiver.
+     *
+     * @param delivery
+     *        the Delivery instance to accept.
+     * @param session
+     *        the session under which the message is being accepted.
+     * @param settle
+     *        true if the receiver should settle the delivery or just send the disposition.
+     *
+     * @throws IOException if an error occurs while sending the accept.
+     */
+    public void accept(final Delivery delivery, final AmqpSession session, final boolean
settle) throws IOException {
         checkClosed();
 
         if (delivery == null) {
@@ -469,11 +503,13 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver>
{
                                 txState.setOutcome(Accepted.getInstance());
                                 txState.setTxnId(txnId);
                                 delivery.disposition(txState);
-                                delivery.settle();
                                 session.getTransactionContext().registerTxConsumer(AmqpReceiver.this);
                             }
                         } else {
                             delivery.disposition(Accepted.getInstance());
+                        }
+
+                        if (settle) {
                             delivery.settle();
                         }
                     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/0bb76c7f/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
index 7cf6026..994a2e7 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
@@ -32,6 +32,7 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage;
 import org.apache.activemq.transport.amqp.client.AmqpReceiver;
 import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Ignore;
 import org.junit.Test;
 
 /**
@@ -574,4 +575,156 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
 
         connection.close();
     }
+
+    // TODO - Direct ports of the AmqpNetLite client tests that don't currently with this
broker.
+
+    @Ignore("Fails due to no support for TX enrollment without settlement.")
+    @Test(timeout = 60000)
+    public void testReceiversCommitAndRollbackWithMultipleSessionsInSingleTXNoSettlement()
throws Exception {
+        final int NUM_MESSAGES = 10;
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+
+        // Root TXN session controls all TXN send lifetimes.
+        AmqpSession txnSession = connection.createSession();
+
+        // Normal Session which won't create an TXN itself
+        AmqpSession session = connection.createSession();
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+
+        for (int i = 0; i < NUM_MESSAGES + 1; ++i) {
+            AmqpMessage message = new AmqpMessage();
+            message.setText("Test-Message");
+            message.setApplicationProperty("msgId", i);
+            sender.send(message, txnSession.getTransactionId());
+        }
+
+        // Read all messages from the Queue, do not accept them yet.
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+        ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES);
+        receiver.flow((NUM_MESSAGES + 2) * 2);
+        for (int i = 0; i < NUM_MESSAGES; ++i) {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull(message);
+            messages.add(message);
+        }
+
+        // Commit half the consumed messages
+        txnSession.begin();
+        for (int i = 0; i < NUM_MESSAGES / 2; ++i) {
+            messages.get(i).accept(txnSession, false);
+        }
+        txnSession.commit();
+
+        // Rollback the other half the consumed messages
+        txnSession.begin();
+        for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
+            messages.get(i).accept(txnSession, false);
+        }
+        txnSession.rollback();
+
+        // After rollback message should still be acquired so we read last sent message.
+        {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull(message);
+            assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
+            message.release();
+        }
+
+        // Commit the other half the consumed messages
+        txnSession.begin();
+        for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
+            messages.get(i).accept(txnSession);
+        }
+        txnSession.commit();
+
+        // The final message should still be pending.
+        {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            receiver.flow(1);
+            assertNotNull(message);
+            assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
+            message.accept();
+        }
+
+        // We should have now drained the Queue
+        AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+        receiver.flow(1);
+        assertNull(message);
+
+        connection.close();
+    }
+
+    @Ignore("Fails due to no support for TX enrollment without settlement.")
+    @Test(timeout = 60000)
+    public void testCommitAndRollbackWithMultipleSessionsInSingleTXNoSettlement() throws
Exception {
+        final int NUM_MESSAGES = 10;
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+
+        // Root TXN session controls all TXN send lifetimes.
+        AmqpSession txnSession = connection.createSession();
+
+        // Normal Session which won't create an TXN itself
+        AmqpSession session = connection.createSession();
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+
+        for (int i = 0; i < NUM_MESSAGES; ++i) {
+            AmqpMessage message = new AmqpMessage();
+            message.setText("Test-Message");
+            message.setApplicationProperty("msgId", i);
+            sender.send(message, txnSession.getTransactionId());
+        }
+
+        // Read all messages from the Queue, do not accept them yet.
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+        receiver.flow(2);
+        AmqpMessage message1 = receiver.receive(5, TimeUnit.SECONDS);
+        AmqpMessage message2 = receiver.receive(5, TimeUnit.SECONDS);
+
+        // Accept the first one in a TXN and send a new message in that TXN as well
+        txnSession.begin();
+        {
+            message1.accept(txnSession, false);
+
+            AmqpMessage message = new AmqpMessage();
+            message.setText("Test-Message");
+            message.setApplicationProperty("msgId", NUM_MESSAGES);
+
+            sender.send(message, txnSession.getTransactionId());
+        }
+        txnSession.commit();
+
+        // Accept the second one in a TXN and send a new message in that TXN as well but
rollback
+        txnSession.begin();
+        {
+            message2.accept(txnSession, false);
+
+            AmqpMessage message = new AmqpMessage();
+            message.setText("Test-Message");
+            message.setApplicationProperty("msgId", NUM_MESSAGES + 1);
+            sender.send(message, txnSession.getTransactionId());
+        }
+        txnSession.rollback();
+
+        message2.release();
+
+        // Should be two message available for dispatch given that we sent and committed
one, and
+        // releases another we had previously received.
+        receiver.flow(2);
+        for (int i = 1; i <= NUM_MESSAGES; ++i) {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull(message);
+            assertEquals(i, message.getApplicationProperty("msgId"));
+            message.accept();
+        }
+
+        // Should be nothing left.
+        receiver.flow(1);
+        assertNull(receiver.receive(1, TimeUnit.SECONDS));
+
+        connection.close();
+    }
 }


Mime
View raw message