Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BFB3C1138F for ; Thu, 17 Jul 2014 16:08:28 +0000 (UTC) Received: (qmail 11036 invoked by uid 500); 17 Jul 2014 16:08:28 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 10997 invoked by uid 500); 17 Jul 2014 16:08:28 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 10988 invoked by uid 99); 17 Jul 2014 16:08:28 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Jul 2014 16:08:28 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 4AE5C8C09C2; Thu, 17 Jul 2014 16:08:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: gtully@apache.org To: commits@activemq.apache.org Message-Id: <05baac177d4e4e69a0aee05b3242c291@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: https://issues.apache.org/jira/browse/AMQ-5279 - ensure poison on failover redelivery only when delivery is not pending else where Date: Thu, 17 Jul 2014 16:08:28 +0000 (UTC) Repository: activemq Updated Branches: refs/heads/trunk 4da588d4f -> c34851fd5 https://issues.apache.org/jira/browse/AMQ-5279 - ensure poison on failover redelivery only when delivery is not pending else where Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/c34851fd Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/c34851fd Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/c34851fd Branch: refs/heads/trunk Commit: c34851fd57bc7ef3fc2c847d71ae4cd8f32670e3 Parents: 4da588d Author: gtully Authored: Thu Jul 17 17:07:19 2014 +0100 Committer: gtully Committed: Thu Jul 17 17:08:13 2014 +0100 ---------------------------------------------------------------------- .../org/apache/activemq/ActiveMQConnection.java | 6 +- .../activemq/ActiveMQMessageConsumer.java | 67 ++++++++++++-------- .../failover/FailoverTransactionTest.java | 29 ++++----- 3 files changed, 60 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/c34851fd/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java index 326310c..2df8607 100755 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -2215,7 +2215,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon @Override @Deprecated public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException { - return createInputStream(dest, messageSelector, noLocal, -1); + return createInputStream(dest, messageSelector, noLocal, -1); } @Override @@ -2571,6 +2571,10 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon return this.executor; } + protected CopyOnWriteArrayList getSessions() { + return sessions; + } + /** * @return the checkForDuplicates */ http://git-wip-us.apache.org/repos/asf/activemq/blob/c34851fd/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index e17a1bb..a60a7ac 100755 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -1413,36 +1413,24 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } } } else { - if (!session.isTransacted()) { - LOG.warn("Duplicate non transacted dispatch to consumer: " + getConsumerId() + ", poison acking: " + md); - posionAck(md, "Duplicate non transacted delivery to " + getConsumerId()); - } else { + // deal with duplicate delivery + ConsumerId consumerWithPendingTransaction; + if (redeliveryExpectedInCurrentTransaction(md, true)) { if (LOG.isDebugEnabled()) { - LOG.debug(getConsumerId() + " tracking transacted redelivery of duplicate: " + md.getMessage()); - } - boolean needsPoisonAck = false; - synchronized (deliveredMessages) { - if (previouslyDeliveredMessages != null) { - previouslyDeliveredMessages.put(md.getMessage().getMessageId(), true); - } else { - // delivery while pending redelivery to another consumer on the same connection - // not waiting for redelivery will help here - needsPoisonAck = true; - } + LOG.debug("{} tracking transacted redelivery {}", getConsumerId(), md.getMessage()); } - if (needsPoisonAck) { - LOG.warn("acking duplicate delivery as poison, redelivery must be pending to another" - + " consumer on this connection, failoverRedeliveryWaitPeriod=" - + failoverRedeliveryWaitPeriod + ". Message: " + md); - posionAck(md, "Duplicate dispatch with transacted redeliver pending on another consumer, connection: " - + session.getConnection().getConnectionInfo().getConnectionId()); + if (transactedIndividualAck) { + immediateIndividualTransactedAck(md); } else { - if (transactedIndividualAck) { - immediateIndividualTransactedAck(md); - } else { - session.sendAck(new MessageAck(md, MessageAck.DELIVERED_ACK_TYPE, 1)); - } + session.sendAck(new MessageAck(md, MessageAck.DELIVERED_ACK_TYPE, 1)); } + } else if ((consumerWithPendingTransaction = redeliveryPendingInCompetingTransaction(md)) != null) { + LOG.warn("{} delivering duplicate {}, pending transaction completion on {} will rollback", getConsumerId(), md.getMessage(), consumerWithPendingTransaction); + session.getConnection().rollbackDuplicate(this, md.getMessage()); + dispatch(md); + } else { + LOG.warn("{} suppressing duplicate delivery on connection, poison acking: {}", getConsumerId(), md); + posionAck(md, "Suppressing duplicate delivery on connection, consumer " + getConsumerId()); } } } @@ -1456,6 +1444,33 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } } + private boolean redeliveryExpectedInCurrentTransaction(MessageDispatch md, boolean markReceipt) { + if (session.isTransacted()) { + synchronized (deliveredMessages) { + if (previouslyDeliveredMessages != null) { + if (previouslyDeliveredMessages.containsKey(md.getMessage().getMessageId())) { + if (markReceipt) { + previouslyDeliveredMessages.put(md.getMessage().getMessageId(), true); + } + return true; + } + } + } + } + return false; + } + + private ConsumerId redeliveryPendingInCompetingTransaction(MessageDispatch md) { + for (ActiveMQSession activeMQSession: session.connection.getSessions()) { + for (ActiveMQMessageConsumer activeMQMessageConsumer : activeMQSession.consumers) { + if (activeMQMessageConsumer.redeliveryExpectedInCurrentTransaction(md, false)) { + return activeMQMessageConsumer.getConsumerId(); + } + } + } + return null; + } + // async (on next call) clear or track delivered as they may be flagged as duplicates if they arrive again private void clearDeliveredList() { if (clearDeliveredList) { http://git-wip-us.apache.org/repos/asf/activemq/blob/c34851fd/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java index c365d37..9b41713 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java @@ -831,7 +831,7 @@ public class FailoverTransactionTest extends TestSupport { setDefaultPersistenceAdapter(broker); broker.start(); - assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS)); + assertTrue("tx committed through failover", commitDoneLatch.await(30, TimeUnit.SECONDS)); LOG.info("received message count: " + receivedMessages.size()); @@ -841,7 +841,7 @@ public class FailoverTransactionTest extends TestSupport { if (gotTransactionRolledBackException.get()) { assertNotNull("should be available again after commit rollback ex", msg); } else { - assertNull("should be nothing left for consumer as recieve should have committed", msg); + assertNull("should be nothing left for consumer as receive should have committed", msg); } consumerSession1.commit(); @@ -1103,8 +1103,8 @@ public class FailoverTransactionTest extends TestSupport { connection.close(); } - public void testPoisonOnDeliveryWhilePending() throws Exception { - LOG.info("testPoisonOnDeliveryWhilePending()"); + public void testReDeliveryWhilePending() throws Exception { + LOG.info("testReDeliveryWhilePending()"); broker = createBroker(true); broker.start(); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.consumerFailoverRedeliveryWaitPeriod=10000"); @@ -1134,8 +1134,7 @@ public class FailoverTransactionTest extends TestSupport { final Vector exceptions = new Vector(); - // commit may fail if other consumer gets the message on restart, it will be seen as a duplicate on the connection - // but with no transaction and it pending on another consumer it will be poison + // commit may fail if other consumer gets the message on restart Executors.newSingleThreadExecutor().execute(new Runnable() { public void run() { LOG.info("doing async commit..."); @@ -1149,24 +1148,24 @@ public class FailoverTransactionTest extends TestSupport { } }); - assertNull("consumer2 not get a message while pending to 1 or consumed by 1", consumer2.receive(2000)); assertTrue("commit completed ", commitDone.await(15, TimeUnit.SECONDS)); - // either message consumed or sent to dlq via poison on redelivery to wrong consumer - // message should not be available again in any event + // either message redelivered in existing tx or consumed by consumer2 + // should not be available again in any event assertNull("consumer should not get rolled back on non redelivered message or duplicate", consumer.receive(5000)); // consumer replay is hashmap order dependent on a failover connection state recover so need to deal with both cases if (exceptions.isEmpty()) { - // commit succeeded, message was redelivered to the correct consumer after restart so commit was fine + LOG.info("commit succeeded, message was redelivered to the correct consumer after restart so commit was fine"); + assertNull("consumer2 not get a second message consumed by 1", consumer2.receive(2000)); } else { - // message should be in dlq - MessageConsumer dlqConsumer = consumerSession.createConsumer(consumerSession.createQueue("ActiveMQ.DLQ")); - TextMessage dlqMessage = (TextMessage) dlqConsumer.receive(5000); - assertNotNull("found message in dlq", dlqMessage); - assertEquals("text matches", "Test message", dlqMessage.getText()); + LOG.info("commit failed, consumer2 should get it", exceptions.get(0)); + assertNotNull("consumer2 got message", consumer2.receive(2000)); consumerSession.commit(); + // no message should be in dlq + MessageConsumer dlqConsumer = consumerSession.createConsumer(consumerSession.createQueue("ActiveMQ.DLQ")); + assertNull("nothing in the dlq", dlqConsumer.receive(5000)); } connection.close(); }