Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B4F5F18A59 for ; Wed, 9 Mar 2016 19:42:30 +0000 (UTC) Received: (qmail 6303 invoked by uid 500); 9 Mar 2016 19:42:27 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 6246 invoked by uid 500); 9 Mar 2016 19:42:27 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 3924 invoked by uid 99); 9 Mar 2016 19:42:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 09 Mar 2016 19:42:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1A195DFC3C; Wed, 9 Mar 2016 19:42:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Wed, 09 Mar 2016 19:43:13 -0000 Message-Id: In-Reply-To: <3b2eeb12253e4e0db5244b3974968001@git.apache.org> References: <3b2eeb12253e4e0db5244b3974968001@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [50/58] [abbrv] activemq-artemis git commit: Fixed some redelivery tests Fixed some redelivery tests Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8e9bb162 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8e9bb162 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8e9bb162 Branch: refs/heads/refactor-openwire Commit: 8e9bb1629a42417f9a3a0916ea728976e65bf5b5 Parents: 478b550 Author: Howard Gao Authored: Fri Feb 26 22:24:03 2016 +0800 Committer: Clebert Suconic Committed: Wed Mar 9 14:41:41 2016 -0500 ---------------------------------------------------------------------- .../openwire/amq/AMQServerConsumer.java | 22 ++++++++++++++++++++ .../protocol/openwire/amq/AMQServerSession.java | 7 +++++++ .../core/server/impl/ServerConsumerImpl.java | 16 ++++++++------ .../activemq/test/JmsTopicSendReceiveTest.java | 2 ++ 4 files changed, 41 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8e9bb162/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java index 3f94351..34789b0 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java @@ -35,6 +35,7 @@ public class AMQServerConsumer extends ServerConsumerImpl { // TODO-NOW: remove this once unified AMQConsumer amqConsumer; + boolean isClosing; public AMQConsumer getAmqConsumer() { return amqConsumer; @@ -67,6 +68,18 @@ public class AMQServerConsumer extends ServerConsumerImpl { this.browserDeliverer = newBrowserDeliverer; } + public void closing() { + isClosing = true; + } + + @Override + public HandleStatus handle(final MessageReference ref) throws Exception { + if (isClosing) { + return HandleStatus.BUSY; + } + return super.handle(ref); + } + private class AMQBrowserDeliverer extends BrowserDeliverer { private BrowserListener listener = null; @@ -174,4 +187,13 @@ public class AMQServerConsumer extends ServerConsumerImpl { } } + @Override + protected void updateDeliveryCountForCanceledRef(MessageReference ref, boolean failed) { + //activemq5 doesn't decrease the count + //when not failed. + if (failed) { + ref.decrementDeliveryCount(); + } + } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8e9bb162/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java index 5403830..390b58f 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.openwire.amq; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -90,6 +91,12 @@ public class AMQServerSession extends ServerSessionImpl { @Override protected void doClose(final boolean failed) throws Exception { + Set consumersClone = new HashSet<>(consumers.values()); + for (ServerConsumer consumer : consumersClone) { + AMQServerConsumer amqConsumer = (AMQServerConsumer)consumer; + amqConsumer.closing();//prevent redeliver + } + synchronized (this) { if (tx != null && tx.getXid() == null) { ((AMQTransactionImpl) tx).setRollbackForClose(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8e9bb162/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index b116849..0ab38c9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -556,12 +556,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } else { refs.add(ref); - if (!failed) { - // We don't decrement delivery count if the client failed, since there's a possibility that refs - // were actually delivered but we just didn't get any acks for them - // before failure - ref.decrementDeliveryCount(); - } + updateDeliveryCountForCanceledRef(ref, failed); } if (isTrace) { @@ -576,6 +571,15 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { return refs; } + protected void updateDeliveryCountForCanceledRef(MessageReference ref, boolean failed) { + if (!failed) { + // We don't decrement delivery count if the client failed, since there's a possibility that refs + // were actually delivered but we just didn't get any acks for them + // before failure + ref.decrementDeliveryCount(); + } + } + @Override public void setStarted(final boolean started) { synchronized (lock) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8e9bb162/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java index 28ac25e..ddc6cd8 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java @@ -24,6 +24,7 @@ import javax.jms.Session; import javax.jms.Topic; import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper; +import org.apache.activemq.transport.tcp.TcpTransportFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -92,6 +93,7 @@ public class JmsTopicSendReceiveTest extends JmsSendReceiveTestSupport { session.close(); connection.close(); ArtemisBrokerHelper.stopArtemisBroker(); + TcpTransportFactory.clearService(); } /**