activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [51/65] [abbrv] activemq-artemis git commit: Fixed some redelivery tests
Date Fri, 18 Mar 2016 01:42:40 GMT
Fixed some redelivery tests


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c596e852
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c596e852
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c596e852

Branch: refs/heads/refactor-openwire
Commit: c596e852d88692a8883a5156c60f0914270be4af
Parents: edf415e
Author: Howard Gao <howard.gao@gmail.com>
Authored: Fri Feb 26 22:24:03 2016 +0800
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Thu Mar 17 14:10:46 2016 -0400

----------------------------------------------------------------------
 .../openwire/amq/AMQServerConsumer.java         | 22 ++++++++++++++++++++
 .../protocol/openwire/amq/AMQServerSession.java |  7 +++++++
 .../core/server/impl/ServerConsumerImpl.java    | 16 ++++++++------
 .../activemq/test/JmsTopicSendReceiveTest.java  |  2 ++
 4 files changed, 41 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c596e852/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
index f198cb7..9e93b3d 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
@@ -36,6 +36,7 @@ public class AMQServerConsumer extends ServerConsumerImpl {
 
    // TODO-NOW: remove this once unified
    AMQConsumer amqConsumer;
+   boolean isClosing;
 
    public AMQConsumer getAmqConsumer() {
       return amqConsumer;
@@ -69,6 +70,18 @@ public class AMQServerConsumer extends ServerConsumerImpl {
       this.browserDeliverer = newBrowserDeliverer;
    }
 
+   public void closing() {
+      isClosing = true;
+   }
+
+   @Override
+   public HandleStatus handle(final MessageReference ref) throws Exception {
+      if (isClosing) {
+         return HandleStatus.BUSY;
+      }
+      return super.handle(ref);
+   }
+
    private class AMQBrowserDeliverer extends BrowserDeliverer {
 
       private BrowserListener listener = null;
@@ -176,4 +189,13 @@ public class AMQServerConsumer extends ServerConsumerImpl {
       }
    }
 
+   @Override
+   protected void updateDeliveryCountForCanceledRef(MessageReference ref, boolean failed)
{
+      //activemq5 doesn't decrease the count
+      //when not failed.
+      if (failed) {
+         ref.decrementDeliveryCount();
+      }
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c596e852/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java
index 9a938fa..b603257 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.openwire.amq;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -90,6 +91,12 @@ public class AMQServerSession extends ServerSessionImpl {
 
    @Override
    protected void doClose(final boolean failed) throws Exception {
+      Set<ServerConsumer> consumersClone = new HashSet<>(consumers.values());
+      for (ServerConsumer consumer : consumersClone) {
+         AMQServerConsumer amqConsumer = (AMQServerConsumer)consumer;
+         amqConsumer.closing();//prevent redeliver
+      }
+
       synchronized (this) {
          if (tx != null && tx.getXid() == null) {
             ((AMQTransactionImpl) tx).setRollbackForClose();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c596e852/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index b5ea5d9..b2ca0df 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -556,12 +556,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
{
                }
                else {
                   refs.add(ref);
-                  if (!failed) {
-                     // We don't decrement delivery count if the client failed, since there's
a possibility that refs
-                     // were actually delivered but we just didn't get any acks for them
-                     // before failure
-                     ref.decrementDeliveryCount();
-                  }
+                  updateDeliveryCountForCanceledRef(ref, failed);
                }
 
                if (isTrace) {
@@ -576,6 +571,15 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
{
       return refs;
    }
 
+   protected void updateDeliveryCountForCanceledRef(MessageReference ref, boolean failed)
{
+      if (!failed) {
+         // We don't decrement delivery count if the client failed, since there's a possibility
that refs
+         // were actually delivered but we just didn't get any acks for them
+         // before failure
+         ref.decrementDeliveryCount();
+      }
+   }
+
    @Override
    public void setStarted(final boolean started) {
       synchronized (lock) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c596e852/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java
index 28ac25e..ddc6cd8 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java
@@ -24,6 +24,7 @@ import javax.jms.Session;
 import javax.jms.Topic;
 
 import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -92,6 +93,7 @@ public class JmsTopicSendReceiveTest extends JmsSendReceiveTestSupport {
       session.close();
       connection.close();
       ArtemisBrokerHelper.stopArtemisBroker();
+      TcpTransportFactory.clearService();
    }
 
    /**


Mime
View raw message