Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2C16719A97 for ; Fri, 18 Mar 2016 01:41:55 +0000 (UTC) Received: (qmail 34532 invoked by uid 500); 18 Mar 2016 01:41:55 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 34420 invoked by uid 500); 18 Mar 2016 01:41:55 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 30704 invoked by uid 99); 18 Mar 2016 01:41:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Mar 2016 01:41:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7142FE93D8; Fri, 18 Mar 2016 01:41:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Fri, 18 Mar 2016 01:42:40 -0000 Message-Id: <6f0e51edb4ce4f89a59af672aed805e9@git.apache.org> In-Reply-To: <5f9c254d77c94d93a974dbbe3330320c@git.apache.org> References: <5f9c254d77c94d93a974dbbe3330320c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [51/65] [abbrv] activemq-artemis git commit: Fixed some redelivery tests Fixed some redelivery tests Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c596e852 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c596e852 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c596e852 Branch: refs/heads/refactor-openwire Commit: c596e852d88692a8883a5156c60f0914270be4af Parents: edf415e Author: Howard Gao Authored: Fri Feb 26 22:24:03 2016 +0800 Committer: Clebert Suconic Committed: Thu Mar 17 14:10:46 2016 -0400 ---------------------------------------------------------------------- .../openwire/amq/AMQServerConsumer.java | 22 ++++++++++++++++++++ .../protocol/openwire/amq/AMQServerSession.java | 7 +++++++ .../core/server/impl/ServerConsumerImpl.java | 16 ++++++++------ .../activemq/test/JmsTopicSendReceiveTest.java | 2 ++ 4 files changed, 41 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c596e852/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java index f198cb7..9e93b3d 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java @@ -36,6 +36,7 @@ public class AMQServerConsumer extends ServerConsumerImpl { // TODO-NOW: remove this once unified AMQConsumer amqConsumer; + boolean isClosing; public AMQConsumer getAmqConsumer() { return amqConsumer; @@ -69,6 +70,18 @@ public class AMQServerConsumer extends ServerConsumerImpl { this.browserDeliverer = newBrowserDeliverer; } + public void closing() { + isClosing = true; + } + + @Override + public HandleStatus handle(final MessageReference ref) throws Exception { + if (isClosing) { + return HandleStatus.BUSY; + } + return super.handle(ref); + } + private class AMQBrowserDeliverer extends BrowserDeliverer { private BrowserListener listener = null; @@ -176,4 +189,13 @@ public class AMQServerConsumer extends ServerConsumerImpl { } } + @Override + protected void updateDeliveryCountForCanceledRef(MessageReference ref, boolean failed) { + //activemq5 doesn't decrease the count + //when not failed. + if (failed) { + ref.decrementDeliveryCount(); + } + } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c596e852/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java index 9a938fa..b603257 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.openwire.amq; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -90,6 +91,12 @@ public class AMQServerSession extends ServerSessionImpl { @Override protected void doClose(final boolean failed) throws Exception { + Set consumersClone = new HashSet<>(consumers.values()); + for (ServerConsumer consumer : consumersClone) { + AMQServerConsumer amqConsumer = (AMQServerConsumer)consumer; + amqConsumer.closing();//prevent redeliver + } + synchronized (this) { if (tx != null && tx.getXid() == null) { ((AMQTransactionImpl) tx).setRollbackForClose(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c596e852/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index b5ea5d9..b2ca0df 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -556,12 +556,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } else { refs.add(ref); - if (!failed) { - // We don't decrement delivery count if the client failed, since there's a possibility that refs - // were actually delivered but we just didn't get any acks for them - // before failure - ref.decrementDeliveryCount(); - } + updateDeliveryCountForCanceledRef(ref, failed); } if (isTrace) { @@ -576,6 +571,15 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { return refs; } + protected void updateDeliveryCountForCanceledRef(MessageReference ref, boolean failed) { + if (!failed) { + // We don't decrement delivery count if the client failed, since there's a possibility that refs + // were actually delivered but we just didn't get any acks for them + // before failure + ref.decrementDeliveryCount(); + } + } + @Override public void setStarted(final boolean started) { synchronized (lock) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c596e852/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java index 28ac25e..ddc6cd8 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java @@ -24,6 +24,7 @@ import javax.jms.Session; import javax.jms.Topic; import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper; +import org.apache.activemq.transport.tcp.TcpTransportFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -92,6 +93,7 @@ public class JmsTopicSendReceiveTest extends JmsSendReceiveTestSupport { session.close(); connection.close(); ArtemisBrokerHelper.stopArtemisBroker(); + TcpTransportFactory.clearService(); } /**