Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5170D200B9B for ; Tue, 27 Sep 2016 18:46:16 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 50216160AB9; Tue, 27 Sep 2016 16:46:16 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 970DF160AEC for ; Tue, 27 Sep 2016 18:46:14 +0200 (CEST) Received: (qmail 24600 invoked by uid 500); 27 Sep 2016 16:46:13 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 24211 invoked by uid 99); 27 Sep 2016 16:46:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Sep 2016 16:46:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 536B2ED321; Tue, 27 Sep 2016 16:46:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tabish@apache.org To: commits@activemq.apache.org Date: Tue, 27 Sep 2016 16:46:23 -0000 Message-Id: <98c5578eeacd4296908650fb8c4ed61c@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [11/14] activemq git commit: NO-JIRA: Add some more variants of the .NET transaction tests archived-at: Tue, 27 Sep 2016 16:46:16 -0000 NO-JIRA: Add some more variants of the .NET transaction tests Adds ability to not settle accepted messages on the client to enable creation of tests that are equivalent to the AmqpNetLite client's transaction tests which hold settlement and expect the resource to handle it on successful discharge. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/0bb76c7f Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/0bb76c7f Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/0bb76c7f Branch: refs/heads/activemq-5.14.x Commit: 0bb76c7fb42d49c50e69265e1c97c463f5fdbc58 Parents: 9211661 Author: Timothy Bish Authored: Mon Sep 19 17:36:58 2016 -0400 Committer: Timothy Bish Committed: Tue Sep 27 12:15:40 2016 -0400 ---------------------------------------------------------------------- .../transport/amqp/client/AmqpMessage.java | 31 +++- .../transport/amqp/client/AmqpReceiver.java | 44 +++++- .../amqp/interop/AmqpTransactionTest.java | 153 +++++++++++++++++++ 3 files changed, 221 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/0bb76c7f/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java index 8b378e1..2b1b874 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java @@ -132,15 +132,28 @@ public class AmqpMessage { * @throws Exception if an error occurs during the accept. */ public void accept() throws Exception { + accept(true); + } + + /** + * Accepts the message marking it as consumed on the remote peer. + * + * @param settle + * true if the client should also settle the delivery when sending the accept. + * + * @throws Exception if an error occurs during the accept. + */ + public void accept(boolean settle) throws Exception { if (receiver == null) { throw new IllegalStateException("Can't accept non-received message."); } - receiver.accept(delivery); + receiver.accept(delivery, settle); } /** - * Accepts the message marking it as consumed on the remote peer. + * Accepts the message marking it as consumed on the remote peer. This method + * will automatically settle the accepted delivery. * * @param session * The session that is used to manage acceptance of the message. @@ -148,11 +161,23 @@ public class AmqpMessage { * @throws Exception if an error occurs during the accept. */ public void accept(AmqpSession txnSession) throws Exception { + accept(txnSession, true); + } + + /** + * Accepts the message marking it as consumed on the remote peer. + * + * @param session + * The session that is used to manage acceptance of the message. + * + * @throws Exception if an error occurs during the accept. + */ + public void accept(AmqpSession txnSession, boolean settle) throws Exception { if (receiver == null) { throw new IllegalStateException("Can't accept non-received message."); } - receiver.accept(delivery, txnSession); + receiver.accept(delivery, txnSession, settle); } /** http://git-wip-us.apache.org/repos/asf/activemq/blob/0bb76c7f/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java index 999e033..3543ae3 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java @@ -414,20 +414,34 @@ public class AmqpReceiver extends AmqpAbstractResource { } /** - * Accepts a message that was dispatched under the given Delivery instance. + * Accepts a message that was dispatched under the given Delivery instance and settles the delivery. * * @param delivery * the Delivery instance to accept. * * @throws IOException if an error occurs while sending the accept. */ - public void accept(final Delivery delivery) throws IOException { - accept(delivery, this.session); + public void accept(Delivery delivery) throws IOException { + accept(delivery, this.session, true); } /** * Accepts a message that was dispatched under the given Delivery instance. * + * @param delivery + * the Delivery instance to accept. + * @param settle + * true if the receiver should settle the delivery or just send the disposition. + * + * @throws IOException if an error occurs while sending the accept. + */ + public void accept(Delivery delivery, boolean settle) throws IOException { + accept(delivery, this.session, settle); + } + + /** + * Accepts a message that was dispatched under the given Delivery instance and settles the delivery. + * * This method allows for the session that is used in the accept to be specified by the * caller. This allows for an accepted message to be involved in a transaction that is * being managed by some other session other than the one that created this receiver. @@ -440,6 +454,26 @@ public class AmqpReceiver extends AmqpAbstractResource { * @throws IOException if an error occurs while sending the accept. */ public void accept(final Delivery delivery, final AmqpSession session) throws IOException { + accept(delivery, session, true); + } + + /** + * Accepts a message that was dispatched under the given Delivery instance. + * + * This method allows for the session that is used in the accept to be specified by the + * caller. This allows for an accepted message to be involved in a transaction that is + * being managed by some other session other than the one that created this receiver. + * + * @param delivery + * the Delivery instance to accept. + * @param session + * the session under which the message is being accepted. + * @param settle + * true if the receiver should settle the delivery or just send the disposition. + * + * @throws IOException if an error occurs while sending the accept. + */ + public void accept(final Delivery delivery, final AmqpSession session, final boolean settle) throws IOException { checkClosed(); if (delivery == null) { @@ -469,11 +503,13 @@ public class AmqpReceiver extends AmqpAbstractResource { txState.setOutcome(Accepted.getInstance()); txState.setTxnId(txnId); delivery.disposition(txState); - delivery.settle(); session.getTransactionContext().registerTxConsumer(AmqpReceiver.this); } } else { delivery.disposition(Accepted.getInstance()); + } + + if (settle) { delivery.settle(); } } http://git-wip-us.apache.org/repos/asf/activemq/blob/0bb76c7f/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java index 7cf6026..994a2e7 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java @@ -32,6 +32,7 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.junit.Ignore; import org.junit.Test; /** @@ -574,4 +575,156 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { connection.close(); } + + // TODO - Direct ports of the AmqpNetLite client tests that don't currently with this broker. + + @Ignore("Fails due to no support for TX enrollment without settlement.") + @Test(timeout = 60000) + public void testReceiversCommitAndRollbackWithMultipleSessionsInSingleTXNoSettlement() throws Exception { + final int NUM_MESSAGES = 10; + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + + // Root TXN session controls all TXN send lifetimes. + AmqpSession txnSession = connection.createSession(); + + // Normal Session which won't create an TXN itself + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender("queue://" + getTestName()); + + for (int i = 0; i < NUM_MESSAGES + 1; ++i) { + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + message.setApplicationProperty("msgId", i); + sender.send(message, txnSession.getTransactionId()); + } + + // Read all messages from the Queue, do not accept them yet. + AmqpReceiver receiver = session.createReceiver("queue://" + getTestName()); + ArrayList messages = new ArrayList<>(NUM_MESSAGES); + receiver.flow((NUM_MESSAGES + 2) * 2); + for (int i = 0; i < NUM_MESSAGES; ++i) { + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + messages.add(message); + } + + // Commit half the consumed messages + txnSession.begin(); + for (int i = 0; i < NUM_MESSAGES / 2; ++i) { + messages.get(i).accept(txnSession, false); + } + txnSession.commit(); + + // Rollback the other half the consumed messages + txnSession.begin(); + for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) { + messages.get(i).accept(txnSession, false); + } + txnSession.rollback(); + + // After rollback message should still be acquired so we read last sent message. + { + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId")); + message.release(); + } + + // Commit the other half the consumed messages + txnSession.begin(); + for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) { + messages.get(i).accept(txnSession); + } + txnSession.commit(); + + // The final message should still be pending. + { + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + receiver.flow(1); + assertNotNull(message); + assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId")); + message.accept(); + } + + // We should have now drained the Queue + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + receiver.flow(1); + assertNull(message); + + connection.close(); + } + + @Ignore("Fails due to no support for TX enrollment without settlement.") + @Test(timeout = 60000) + public void testCommitAndRollbackWithMultipleSessionsInSingleTXNoSettlement() throws Exception { + final int NUM_MESSAGES = 10; + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + + // Root TXN session controls all TXN send lifetimes. + AmqpSession txnSession = connection.createSession(); + + // Normal Session which won't create an TXN itself + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender("queue://" + getTestName()); + + for (int i = 0; i < NUM_MESSAGES; ++i) { + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + message.setApplicationProperty("msgId", i); + sender.send(message, txnSession.getTransactionId()); + } + + // Read all messages from the Queue, do not accept them yet. + AmqpReceiver receiver = session.createReceiver("queue://" + getTestName()); + receiver.flow(2); + AmqpMessage message1 = receiver.receive(5, TimeUnit.SECONDS); + AmqpMessage message2 = receiver.receive(5, TimeUnit.SECONDS); + + // Accept the first one in a TXN and send a new message in that TXN as well + txnSession.begin(); + { + message1.accept(txnSession, false); + + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + message.setApplicationProperty("msgId", NUM_MESSAGES); + + sender.send(message, txnSession.getTransactionId()); + } + txnSession.commit(); + + // Accept the second one in a TXN and send a new message in that TXN as well but rollback + txnSession.begin(); + { + message2.accept(txnSession, false); + + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + message.setApplicationProperty("msgId", NUM_MESSAGES + 1); + sender.send(message, txnSession.getTransactionId()); + } + txnSession.rollback(); + + message2.release(); + + // Should be two message available for dispatch given that we sent and committed one, and + // releases another we had previously received. + receiver.flow(2); + for (int i = 1; i <= NUM_MESSAGES; ++i) { + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + assertEquals(i, message.getApplicationProperty("msgId")); + message.accept(); + } + + // Should be nothing left. + receiver.flow(1); + assertNull(receiver.receive(1, TimeUnit.SECONDS)); + + connection.close(); + } }