Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4E9B918A6D for ; Tue, 1 Mar 2016 16:38:07 +0000 (UTC) Received: (qmail 65682 invoked by uid 500); 1 Mar 2016 16:37:59 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 65571 invoked by uid 500); 1 Mar 2016 16:37:59 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 64226 invoked by uid 99); 1 Mar 2016 16:37:58 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 01 Mar 2016 16:37:58 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C55D0E00D8; Tue, 1 Mar 2016 16:37:58 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Tue, 01 Mar 2016 16:38:46 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [50/52] [abbrv] activemq-artemis git commit: Fixed some redelivery tests Fixed some redelivery tests Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/552be8c5 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/552be8c5 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/552be8c5 Branch: refs/heads/refactor-openwire Commit: 552be8c5ffe50eaac5030174bd438bc9b7b30ee2 Parents: 4a4b682 Author: Howard Gao Authored: Fri Feb 26 22:24:03 2016 +0800 Committer: Clebert Suconic Committed: Tue Mar 1 11:37:32 2016 -0500 ---------------------------------------------------------------------- .../openwire/amq/AMQServerConsumer.java | 22 ++++++++++++++++++++ .../protocol/openwire/amq/AMQServerSession.java | 7 +++++++ .../core/server/impl/ServerConsumerImpl.java | 16 ++++++++------ .../activemq/test/JmsTopicSendReceiveTest.java | 2 ++ 4 files changed, 41 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/552be8c5/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java index 3f94351..34789b0 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java @@ -35,6 +35,7 @@ public class AMQServerConsumer extends ServerConsumerImpl { // TODO-NOW: remove this once unified AMQConsumer amqConsumer; + boolean isClosing; public AMQConsumer getAmqConsumer() { return amqConsumer; @@ -67,6 +68,18 @@ public class AMQServerConsumer extends ServerConsumerImpl { this.browserDeliverer = newBrowserDeliverer; } + public void closing() { + isClosing = true; + } + + @Override + public HandleStatus handle(final MessageReference ref) throws Exception { + if (isClosing) { + return HandleStatus.BUSY; + } + return super.handle(ref); + } + private class AMQBrowserDeliverer extends BrowserDeliverer { private BrowserListener listener = null; @@ -174,4 +187,13 @@ public class AMQServerConsumer extends ServerConsumerImpl { } } + @Override + protected void updateDeliveryCountForCanceledRef(MessageReference ref, boolean failed) { + //activemq5 doesn't decrease the count + //when not failed. + if (failed) { + ref.decrementDeliveryCount(); + } + } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/552be8c5/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java index 5403830..390b58f 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.openwire.amq; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -90,6 +91,12 @@ public class AMQServerSession extends ServerSessionImpl { @Override protected void doClose(final boolean failed) throws Exception { + Set consumersClone = new HashSet<>(consumers.values()); + for (ServerConsumer consumer : consumersClone) { + AMQServerConsumer amqConsumer = (AMQServerConsumer)consumer; + amqConsumer.closing();//prevent redeliver + } + synchronized (this) { if (tx != null && tx.getXid() == null) { ((AMQTransactionImpl) tx).setRollbackForClose(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/552be8c5/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index ab9dec9..08185db 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -556,12 +556,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } else { refs.add(ref); - if (!failed) { - // We don't decrement delivery count if the client failed, since there's a possibility that refs - // were actually delivered but we just didn't get any acks for them - // before failure - ref.decrementDeliveryCount(); - } + updateDeliveryCountForCanceledRef(ref, failed); } if (isTrace) { @@ -576,6 +571,15 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { return refs; } + protected void updateDeliveryCountForCanceledRef(MessageReference ref, boolean failed) { + if (!failed) { + // We don't decrement delivery count if the client failed, since there's a possibility that refs + // were actually delivered but we just didn't get any acks for them + // before failure + ref.decrementDeliveryCount(); + } + } + @Override public void setStarted(final boolean started) { synchronized (lock) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/552be8c5/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java index 28ac25e..ddc6cd8 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java @@ -24,6 +24,7 @@ import javax.jms.Session; import javax.jms.Topic; import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper; +import org.apache.activemq.transport.tcp.TcpTransportFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -92,6 +93,7 @@ public class JmsTopicSendReceiveTest extends JmsSendReceiveTestSupport { session.close(); connection.close(); ArtemisBrokerHelper.stopArtemisBroker(); + TcpTransportFactory.clearService(); } /**