activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-949 Prevent Openwire from closing consumer twice
Date Wed, 08 Feb 2017 16:48:02 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master aac21ce16 -> 0d2cd3b72


ARTEMIS-949 Prevent Openwire from closing consumer twice


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1a3fdd09
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1a3fdd09
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1a3fdd09

Branch: refs/heads/master
Commit: 1a3fdd0916c3fb9af1eca283b717c1e49d6fbabb
Parents: aac21ce
Author: Howard Gao <howard.gao@gmail.com>
Authored: Wed Feb 8 15:57:43 2017 +0800
Committer: Howard Gao <howard.gao@gmail.com>
Committed: Wed Feb 8 15:57:43 2017 +0800

----------------------------------------------------------------------
 .../protocol/openwire/OpenWireConnection.java   |   3 +
 .../cluster/distribution/ClusterTestBase.java   |  16 ++-
 .../cluster/MessageRedistributionTest.java      | 114 +++++++++++++++++++
 3 files changed, 132 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1a3fdd09/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 0bcff66..cf27145 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -1401,6 +1401,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements
Se
 
       @Override
       public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId)
throws Exception {
+         if (destroyed) {
+            return null;
+         }
          SessionId sessionId = id.getParentId();
          SessionState ss = state.getSessionState(sessionId);
          if (ss == null) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1a3fdd09/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
index c11674c..ecbbb5b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
@@ -20,6 +20,7 @@ import java.io.File;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -1453,6 +1454,10 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
       setupLiveServer(node, fileStorage, false, netty, isLive);
    }
 
+   protected boolean isResolveProtocols() {
+      return false;
+   }
+
    protected void setupLiveServer(final int node,
                                   final boolean fileStorage,
                                   final boolean sharedStorage,
@@ -1472,7 +1477,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
             haPolicyConfiguration = new ReplicatedPolicyConfiguration();
       }
 
-      Configuration configuration = createBasicConfig(node).setJournalMaxIO_AIO(1000).setThreadPoolMaxSize(10).clearAcceptorConfigurations().addAcceptorConfiguration(createTransportConfiguration(netty,
true, generateParams(node, netty))).setHAPolicyConfiguration(haPolicyConfiguration).setResolveProtocols(false);
+      Configuration configuration = createBasicConfig(node).setJournalMaxIO_AIO(1000).setThreadPoolMaxSize(10).clearAcceptorConfigurations().addAcceptorConfiguration(createTransportConfiguration(netty,
true, generateParams(node, netty))).setHAPolicyConfiguration(haPolicyConfiguration).setResolveProtocols(isResolveProtocols());
 
       ActiveMQServer server;
 
@@ -1889,4 +1894,13 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
    protected boolean isFileStorage() {
       return true;
    }
+
+   protected String getServerUri(int node) throws URISyntaxException {
+      ActiveMQServer server = servers[node];
+      if (server == null) {
+         throw new IllegalStateException("No server at node " + server);
+      }
+      int port = TransportConstants.DEFAULT_PORT + node;
+      return "tcp://localhost:" + port;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1a3fdd09/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/cluster/MessageRedistributionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/cluster/MessageRedistributionTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/cluster/MessageRedistributionTest.java
new file mode 100644
index 0000000..3539c03
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/cluster/MessageRedistributionTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.openwire.cluster;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.Bindings;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.util.ConsumerThread;
+import org.junit.Test;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Session;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class MessageRedistributionTest extends ClusterTestBase {
+
+   @Test
+   public void testRemoteConsumerClose() throws Exception {
+
+      setupServer(0, true, true);
+      setupServer(1, true, true);
+
+      setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1,
true, 0, 1);
+      setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1,
true, 1, 0);
+
+      startServers(0, 1);
+
+      waitForTopology(servers[0], 2);
+      waitForTopology(servers[1], 2);
+
+      setupSessionFactory(0, true);
+      setupSessionFactory(1, true);
+
+      createQueue(0, "queues.testaddress", "queue0", null, false);
+      createQueue(1, "queues.testaddress", "queue0", null, false);
+
+      //alternately create consumers to the 2 nodes
+      //close the connection then close consumer quickly
+      //check server's consumer count
+      for (int i = 0; i < 50; i++) {
+         int target = i % 2;
+         int remote = (i + 1) % 2;
+         closeConsumerAndConnectionConcurrently(target, remote);
+      }
+   }
+
+   @Override
+   protected boolean isResolveProtocols() {
+      return true;
+   }
+
+   private void closeConsumerAndConnectionConcurrently(int targetNode, int remoteNode) throws
Exception {
+
+      String targetUri = getServerUri(targetNode);
+      System.out.println("uri is " + targetUri);
+      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(targetUri);
+      Connection conn = null;
+      CountDownLatch active = new CountDownLatch(1);
+      try {
+         conn = factory.createConnection();
+         conn.start();
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Destination dest = ActiveMQDestination.createDestination("queue0", ActiveMQDestination.QUEUE_TYPE);
+         ConsumerThread consumer = new ConsumerThread(session, dest);
+         consumer.setMessageCount(0);
+         consumer.setFinished(active);
+         consumer.start();
+
+         assertTrue("consumer takes too long to finish!", active.await(5, TimeUnit.SECONDS));
+      } finally {
+         conn.close();
+      }
+
+      //check remote server's consumer count
+      ActiveMQServer remoteServer = servers[remoteNode];
+      Bindings bindings = remoteServer.getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress"));
+      Collection<Binding> bindingSet = bindings.getBindings();
+
+      RemoteQueueBinding remoteBinding = null;
+      for (Binding b : bindingSet) {
+         if (b instanceof RemoteQueueBinding) {
+            remoteBinding = (RemoteQueueBinding) b;
+            break;
+         }
+      }
+
+      assertNotNull(remoteBinding);
+      int count = remoteBinding.consumerCount();
+      assertTrue("consumer count should never be negative " + count, count >= 0);
+   }
+}


Mime
View raw message