Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E40B3200B5E for ; Tue, 26 Jul 2016 20:30:22 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E2A9F160A69; Tue, 26 Jul 2016 18:30:22 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 19AA7160A7D for ; Tue, 26 Jul 2016 20:30:21 +0200 (CEST) Received: (qmail 93967 invoked by uid 500); 26 Jul 2016 18:30:21 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 93900 invoked by uid 99); 26 Jul 2016 18:30:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 Jul 2016 18:30:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 001DAE0A7D; Tue, 26 Jul 2016 18:30:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Tue, 26 Jul 2016 18:30:21 -0000 Message-Id: <8348e7d6e8a44e4392da416f4e064f46@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] activemq-artemis git commit: ARTEMIS-655 - [AMQP] On transacted session.commit() of receiver client, messages are read, but queue is not cleared out archived-at: Tue, 26 Jul 2016 18:30:23 -0000 ARTEMIS-655 - [AMQP] On transacted session.commit() of receiver client, messages are read, but queue is not cleared out Making sure that when a transaction state of accepted is returned we actually ack the message https://issues.apache.org/jira/browse/ARTEMIS-655 Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/85ede22c Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/85ede22c Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/85ede22c Branch: refs/heads/master Commit: 85ede22c3cbb5f258d46fe5063c76a20d3b045c1 Parents: 7311c56 Author: Andy Taylor Authored: Tue Jul 26 12:46:11 2016 +0100 Committer: Clebert Suconic Committed: Tue Jul 26 14:30:11 2016 -0400 ---------------------------------------------------------------------- .../plug/ProtonSessionIntegrationCallback.java | 2 +- .../server/ProtonServerSenderContext.java | 28 +++++- .../tests/integration/proton/ProtonTest.java | 92 ++++++++++++++++++++ 3 files changed, 120 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/85ede22c/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java index b2d029f..b00474d 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java @@ -311,7 +311,7 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se public void ack(Object brokerConsumer, Object message) throws Exception { recoverContext(); try { - ((ServerConsumer) brokerConsumer).individualAcknowledge(null, ((ServerMessage) message).getMessageID()); + ((ServerConsumer) brokerConsumer).individualAcknowledge(serverSession.getCurrentTransaction(), ((ServerMessage) message).getMessageID()); } finally { resetContext(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/85ede22c/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java index 5fd24d9..40a4548 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java @@ -24,11 +24,13 @@ import org.apache.qpid.proton.amqp.DescribedType; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Accepted; import org.apache.qpid.proton.amqp.messaging.Modified; +import org.apache.qpid.proton.amqp.messaging.Outcome; import org.apache.qpid.proton.amqp.messaging.Rejected; import org.apache.qpid.proton.amqp.messaging.Released; import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.amqp.messaging.TerminusDurability; import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; +import org.apache.qpid.proton.amqp.transaction.TransactionalState; import org.apache.qpid.proton.amqp.transport.AmqpError; import org.apache.qpid.proton.amqp.transport.DeliveryState; import org.apache.qpid.proton.amqp.transport.ErrorCondition; @@ -294,7 +296,31 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple DeliveryState remoteState = delivery.getRemoteState(); if (remoteState != null) { - if (remoteState instanceof Accepted) { + // If we are transactional then we need ack if the msg has been accepted + if (remoteState instanceof TransactionalState) { + TransactionalState txState = (TransactionalState) remoteState; + if (txState.getOutcome() != null) { + Outcome outcome = txState.getOutcome(); + if (outcome instanceof Accepted) { + if (!delivery.remotelySettled()) { + TransactionalState txAccepted = new TransactionalState(); + txAccepted.setOutcome(Accepted.getInstance()); + txAccepted.setTxnId(txState.getTxnId()); + + delivery.disposition(txAccepted); + } + //we have to individual ack as we can't guarantee we will get the delivery updates (including acks) in order + // from dealer, a perf hit but a must + try { + sessionSPI.ack(brokerConsumer, message); + } + catch (Exception e) { + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage()); + } + } + } + } + else if (remoteState instanceof Accepted) { //we have to individual ack as we can't guarantee we will get the delivery updates (including acks) in order // from dealer, a perf hit but a must try { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/85ede22c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java index 2c68dde..98f0e0f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java @@ -230,6 +230,98 @@ public class ProtonTest extends ActiveMQTestBase { } @Test + public void testCommitProducer() throws Throwable { + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + javax.jms.Queue queue = createQueue(address); + System.out.println("queue:" + queue.getQueueName()); + MessageProducer p = session.createProducer(queue); + for (int i = 0; i < 10; i++) { + TextMessage message = session.createTextMessage(); + message.setText("Message:" + i); + p.send(message); + } + session.commit(); + session.close(); + Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable(); + Assert.assertEquals(q.getMessageCount(), 10); + } + + @Test + public void testRollbackProducer() throws Throwable { + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + javax.jms.Queue queue = createQueue(address); + System.out.println("queue:" + queue.getQueueName()); + MessageProducer p = session.createProducer(queue); + for (int i = 0; i < 10; i++) { + TextMessage message = session.createTextMessage(); + message.setText("Message:" + i); + p.send(message); + } + session.rollback(); + session.close(); + Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable(); + Assert.assertEquals(q.getMessageCount(), 0); + } + + @Test + public void testCommitConsumer() throws Throwable { + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue queue = createQueue(address); + System.out.println("queue:" + queue.getQueueName()); + MessageProducer p = session.createProducer(queue); + for (int i = 0; i < 10; i++) { + TextMessage message = session.createTextMessage(); + message.setText("Message:" + i); + p.send(message); + } + session.close(); + + session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer cons = session.createConsumer(queue); + connection.start(); + + for (int i = 0; i < 10; i++) { + TextMessage message = (TextMessage) cons.receive(5000); + Assert.assertNotNull(message); + Assert.assertEquals("Message:" + i, message.getText()); + } + session.commit(); + Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable(); + Assert.assertEquals(q.getMessageCount(), 0); + } + + @Test + public void testRollbackConsumer() throws Throwable { + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue queue = createQueue(address); + System.out.println("queue:" + queue.getQueueName()); + MessageProducer p = session.createProducer(queue); + for (int i = 0; i < 10; i++) { + TextMessage message = session.createTextMessage(); + message.setText("Message:" + i); + p.send(message); + } + session.close(); + + session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer cons = session.createConsumer(queue); + connection.start(); + + for (int i = 0; i < 10; i++) { + TextMessage message = (TextMessage) cons.receive(5000); + Assert.assertNotNull(message); + Assert.assertEquals("Message:" + i, message.getText()); + } + session.rollback(); + Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable(); + Assert.assertEquals(q.getMessageCount(), 10); + } + + @Test public void testResourceLimitExceptionOnAddressFull() throws Exception { if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol setAddressFullBlockPolicy();