activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [2/2] activemq-artemis git commit: ARTEMIS-655 - [AMQP] On transacted session.commit() of receiver client, messages are read, but queue is not cleared out
Date Tue, 26 Jul 2016 18:30:21 GMT
ARTEMIS-655 - [AMQP] On transacted session.commit() of receiver client, messages are read,
but queue is not cleared out

Making sure that when a transaction state of accepted is returned we actually ack the message

https://issues.apache.org/jira/browse/ARTEMIS-655


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/85ede22c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/85ede22c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/85ede22c

Branch: refs/heads/master
Commit: 85ede22c3cbb5f258d46fe5063c76a20d3b045c1
Parents: 7311c56
Author: Andy Taylor <andy.tayls67@gmail.com>
Authored: Tue Jul 26 12:46:11 2016 +0100
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Tue Jul 26 14:30:11 2016 -0400

----------------------------------------------------------------------
 .../plug/ProtonSessionIntegrationCallback.java  |  2 +-
 .../server/ProtonServerSenderContext.java       | 28 +++++-
 .../tests/integration/proton/ProtonTest.java    | 92 ++++++++++++++++++++
 3 files changed, 120 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/85ede22c/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
index b2d029f..b00474d 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
@@ -311,7 +311,7 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback,
Se
    public void ack(Object brokerConsumer, Object message) throws Exception {
       recoverContext();
       try {
-         ((ServerConsumer) brokerConsumer).individualAcknowledge(null, ((ServerMessage) message).getMessageID());
+         ((ServerConsumer) brokerConsumer).individualAcknowledge(serverSession.getCurrentTransaction(),
((ServerMessage) message).getMessageID());
       }
       finally {
          resetContext();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/85ede22c/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
index 5fd24d9..40a4548 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
@@ -24,11 +24,13 @@ import org.apache.qpid.proton.amqp.DescribedType;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.apache.qpid.proton.amqp.messaging.Modified;
+import org.apache.qpid.proton.amqp.messaging.Outcome;
 import org.apache.qpid.proton.amqp.messaging.Rejected;
 import org.apache.qpid.proton.amqp.messaging.Released;
 import org.apache.qpid.proton.amqp.messaging.Source;
 import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
 import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
+import org.apache.qpid.proton.amqp.transaction.TransactionalState;
 import org.apache.qpid.proton.amqp.transport.AmqpError;
 import org.apache.qpid.proton.amqp.transport.DeliveryState;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
@@ -294,7 +296,31 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender
imple
       DeliveryState remoteState = delivery.getRemoteState();
 
       if (remoteState != null) {
-         if (remoteState instanceof Accepted) {
+         // If we are transactional then we need ack if the msg has been accepted
+         if (remoteState instanceof TransactionalState) {
+            TransactionalState txState = (TransactionalState) remoteState;
+            if (txState.getOutcome() != null) {
+               Outcome outcome = txState.getOutcome();
+               if (outcome instanceof Accepted) {
+                  if (!delivery.remotelySettled()) {
+                     TransactionalState txAccepted = new TransactionalState();
+                     txAccepted.setOutcome(Accepted.getInstance());
+                     txAccepted.setTxnId(txState.getTxnId());
+
+                     delivery.disposition(txAccepted);
+                  }
+                  //we have to individual ack as we can't guarantee we will get the delivery
updates (including acks) in order
+                  // from dealer, a perf hit but a must
+                  try {
+                     sessionSPI.ack(brokerConsumer, message);
+                  }
+                  catch (Exception e) {
+                     throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(),
e.getMessage());
+                  }
+               }
+            }
+         }
+         else if (remoteState instanceof Accepted) {
             //we have to individual ack as we can't guarantee we will get the delivery updates
(including acks) in order
             // from dealer, a perf hit but a must
             try {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/85ede22c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
index 2c68dde..98f0e0f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
@@ -230,6 +230,98 @@ public class ProtonTest extends ActiveMQTestBase {
    }
 
    @Test
+   public void testCommitProducer() throws Throwable {
+
+      Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+      javax.jms.Queue queue = createQueue(address);
+      System.out.println("queue:" + queue.getQueueName());
+      MessageProducer p = session.createProducer(queue);
+      for (int i = 0; i < 10; i++) {
+         TextMessage message = session.createTextMessage();
+         message.setText("Message:" + i);
+         p.send(message);
+      }
+      session.commit();
+      session.close();
+      Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable();
+      Assert.assertEquals(q.getMessageCount(), 10);
+   }
+
+   @Test
+   public void testRollbackProducer() throws Throwable {
+
+      Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+      javax.jms.Queue queue = createQueue(address);
+      System.out.println("queue:" + queue.getQueueName());
+      MessageProducer p = session.createProducer(queue);
+      for (int i = 0; i < 10; i++) {
+         TextMessage message = session.createTextMessage();
+         message.setText("Message:" + i);
+         p.send(message);
+      }
+      session.rollback();
+      session.close();
+      Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable();
+      Assert.assertEquals(q.getMessageCount(), 0);
+   }
+
+   @Test
+   public void testCommitConsumer() throws Throwable {
+
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      javax.jms.Queue queue = createQueue(address);
+      System.out.println("queue:" + queue.getQueueName());
+      MessageProducer p = session.createProducer(queue);
+      for (int i = 0; i < 10; i++) {
+         TextMessage message = session.createTextMessage();
+         message.setText("Message:" + i);
+         p.send(message);
+      }
+      session.close();
+
+      session = connection.createSession(true, Session.SESSION_TRANSACTED);
+      MessageConsumer cons = session.createConsumer(queue);
+      connection.start();
+
+      for (int i = 0; i < 10; i++) {
+         TextMessage message = (TextMessage) cons.receive(5000);
+         Assert.assertNotNull(message);
+         Assert.assertEquals("Message:" + i, message.getText());
+      }
+      session.commit();
+      Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable();
+      Assert.assertEquals(q.getMessageCount(), 0);
+   }
+
+   @Test
+   public void testRollbackConsumer() throws Throwable {
+
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      javax.jms.Queue queue = createQueue(address);
+      System.out.println("queue:" + queue.getQueueName());
+      MessageProducer p = session.createProducer(queue);
+      for (int i = 0; i < 10; i++) {
+         TextMessage message = session.createTextMessage();
+         message.setText("Message:" + i);
+         p.send(message);
+      }
+      session.close();
+
+      session = connection.createSession(true, Session.SESSION_TRANSACTED);
+      MessageConsumer cons = session.createConsumer(queue);
+      connection.start();
+
+      for (int i = 0; i < 10; i++) {
+         TextMessage message = (TextMessage) cons.receive(5000);
+         Assert.assertNotNull(message);
+         Assert.assertEquals("Message:" + i, message.getText());
+      }
+      session.rollback();
+      Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable();
+      Assert.assertEquals(q.getMessageCount(), 10);
+   }
+
+   @Test
    public void testResourceLimitExceptionOnAddressFull() throws Exception {
       if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP
protocol
       setAddressFullBlockPolicy();


Mime
View raw message