Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5509E10CF7 for ; Mon, 1 Dec 2014 19:16:27 +0000 (UTC) Received: (qmail 54878 invoked by uid 500); 1 Dec 2014 19:16:27 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 54830 invoked by uid 500); 1 Dec 2014 19:16:27 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 54821 invoked by uid 99); 1 Dec 2014 19:16:27 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 01 Dec 2014 19:16:27 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id E061B9B0F5F; Mon, 1 Dec 2014 19:16:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tabish@apache.org To: commits@activemq.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: activemq git commit: https://issues.apache.org/jira/browse/AMQ-5467 Date: Mon, 1 Dec 2014 19:16:26 +0000 (UTC) Repository: activemq Updated Branches: refs/heads/trunk 9797d3b95 -> 9edf907ae https://issues.apache.org/jira/browse/AMQ-5467 Apply patch to use individual ack for messages in a TX to avoid unmatched ack when ack range is non-sequential Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/9edf907a Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/9edf907a Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/9edf907a Branch: refs/heads/trunk Commit: 9edf907aedb8c0337577235467061aade237d72d Parents: 9797d3b Author: Timothy Bish Authored: Mon Dec 1 14:16:01 2014 -0500 Committer: Timothy Bish Committed: Mon Dec 1 14:16:01 2014 -0500 ---------------------------------------------------------------------- .../transport/amqp/AmqpProtocolConverter.java | 36 ++++++------- .../transport/amqp/AmqpTestSupport.java | 5 +- .../activemq/transport/amqp/JMSClientTest.java | 55 ++++++++++++++++++-- 3 files changed, 73 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/9edf907a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index a08e08b..9a252f2 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -1203,29 +1203,29 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { @Override void doCommit() throws Exception { if (!dispatchedInTx.isEmpty()) { + for (MessageDispatch md : dispatchedInTx) { + MessageAck pendingTxAck = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1); + pendingTxAck.setFirstMessageId(md.getMessage().getMessageId()); + pendingTxAck.setTransactionId(md.getMessage().getTransactionId()); - MessageDispatch md = dispatchedInTx.getFirst(); - MessageAck pendingTxAck = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, dispatchedInTx.size()); - pendingTxAck.setTransactionId(md.getMessage().getTransactionId()); - pendingTxAck.setFirstMessageId(dispatchedInTx.getLast().getMessage().getMessageId()); - - LOG.trace("Sending commit Ack to ActiveMQ: {}", pendingTxAck); - - dispatchedInTx.clear(); + LOG.trace("Sending commit Ack to ActiveMQ: {}", pendingTxAck); - sendToActiveMQ(pendingTxAck, new ResponseHandler() { - @Override - public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException { - if (response.isException()) { + sendToActiveMQ(pendingTxAck, new ResponseHandler() { + @Override + public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException { if (response.isException()) { - Throwable exception = ((ExceptionResponse) response).getException(); - exception.printStackTrace(); - sender.close(); + if (response.isException()) { + Throwable exception = ((ExceptionResponse) response).getException(); + exception.printStackTrace(); + sender.close(); + } } + pumpProtonToSocket(); } - pumpProtonToSocket(); - } - }); + }); + } + + dispatchedInTx.clear(); } } http://git-wip-us.apache.org/repos/asf/activemq/blob/9edf907a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java index 8150bc0..2f2f5af 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java @@ -54,6 +54,8 @@ import org.slf4j.LoggerFactory; public class AmqpTestSupport { + public static final String MESSAGE_NUMBER = "MessageNumber"; + @Rule public TestName name = new TestName(); protected static final Logger LOG = LoggerFactory.getLogger(AmqpTestSupport.class); @@ -249,9 +251,10 @@ public class AmqpTestSupport { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer p = session.createProducer(destination); - for (int i = 0; i < count; i++) { + for (int i = 1; i <= count; i++) { TextMessage message = session.createTextMessage(); message.setText("TextMessage: " + i); + message.setIntProperty(MESSAGE_NUMBER, i); p.send(message); } http://git-wip-us.apache.org/repos/asf/activemq/blob/9edf907a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java index 0380a87..ffa7b24 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java @@ -18,6 +18,9 @@ package org.apache.activemq.transport.amqp; import java.util.ArrayList; import java.util.Enumeration; +import java.util.HashSet; +import java.util.List; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -140,7 +143,7 @@ public class JMSClientTest extends JMSClientTestSupport { final int msgCount = 1; connection = createConnection(); - Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Queue queue = session.createQueue(getDestinationName()); sendMessages(connection, queue, msgCount); @@ -170,7 +173,7 @@ public class JMSClientTest extends JMSClientTestSupport { final int msgCount = 1; connection = createConnection(); - Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Queue queue = session.createQueue(getDestinationName()); sendMessages(connection, queue, msgCount); @@ -206,6 +209,50 @@ public class JMSClientTest extends JMSClientTestSupport { session.close(); } + @Test(timeout = 60000) + public void testRollbackSomeThenReceiveAndCommit() throws Exception { + int totalCount = 5; + int consumeBeforeRollback = 2; + + connection = createConnection(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue(getDestinationName()); + sendMessages(connection, queue, totalCount); + + QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + assertEquals(totalCount, proxy.getQueueSize()); + + MessageConsumer consumer = session.createConsumer(queue); + + for(int i = 1; i <= consumeBeforeRollback; i++) { + Message message = consumer.receive(1000); + assertNotNull(message); + assertEquals("Unexpected message number", i, message.getIntProperty(AmqpTestSupport.MESSAGE_NUMBER)); + } + + session.rollback(); + + assertEquals(totalCount, proxy.getQueueSize()); + + // Consume again..check we receive all the messages. + Set messageNumbers = new HashSet(); + for(int i = 1; i <= totalCount; i++) { + messageNumbers.add(i); + } + + for(int i = 1; i <= totalCount; i++) { + Message message = consumer.receive(1000); + assertNotNull(message); + int msgNum = message.getIntProperty(AmqpTestSupport.MESSAGE_NUMBER); + messageNumbers.remove(msgNum); + } + + session.commit(); + + assertTrue("Did not consume all expected messages, missing messages: " + messageNumbers, messageNumbers.isEmpty()); + assertEquals("Queue should have no messages left after commit", 0, proxy.getQueueSize()); + } + @Test(timeout=60000) public void testTXConsumerAndLargeNumberOfMessages() throws Exception { @@ -213,7 +260,7 @@ public class JMSClientTest extends JMSClientTestSupport { final int msgCount = 500; connection = createConnection(); - Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Queue queue = session.createQueue(getDestinationName()); sendMessages(connection, queue, msgCount); @@ -757,7 +804,7 @@ public class JMSClientTest extends JMSClientTestSupport { ActiveMQAdmin.enableJMSFrameTracing(); connection = createConnection(); - Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Queue queue = session.createQueue(getDestinationName()); connection.start();