Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B54C7200C24 for ; Wed, 8 Feb 2017 17:48:04 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id B4048160B5A; Wed, 8 Feb 2017 16:48:04 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B2995160B49 for ; Wed, 8 Feb 2017 17:48:03 +0100 (CET) Received: (qmail 16792 invoked by uid 500); 8 Feb 2017 16:48:02 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 16783 invoked by uid 99); 8 Feb 2017 16:48:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Feb 2017 16:48:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B8BD4DFD9E; Wed, 8 Feb 2017 16:48:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Wed, 08 Feb 2017 16:48:02 -0000 Message-Id: <3fa85da2ca23458d93cd2b9b55091a1b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] activemq-artemis git commit: ARTEMIS-949 Prevent Openwire from closing consumer twice archived-at: Wed, 08 Feb 2017 16:48:04 -0000 Repository: activemq-artemis Updated Branches: refs/heads/master aac21ce16 -> 0d2cd3b72 ARTEMIS-949 Prevent Openwire from closing consumer twice Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1a3fdd09 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1a3fdd09 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1a3fdd09 Branch: refs/heads/master Commit: 1a3fdd0916c3fb9af1eca283b717c1e49d6fbabb Parents: aac21ce Author: Howard Gao Authored: Wed Feb 8 15:57:43 2017 +0800 Committer: Howard Gao Committed: Wed Feb 8 15:57:43 2017 +0800 ---------------------------------------------------------------------- .../protocol/openwire/OpenWireConnection.java | 3 + .../cluster/distribution/ClusterTestBase.java | 16 ++- .../cluster/MessageRedistributionTest.java | 114 +++++++++++++++++++ 3 files changed, 132 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1a3fdd09/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 0bcff66..cf27145 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -1401,6 +1401,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se @Override public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception { + if (destroyed) { + return null; + } SessionId sessionId = id.getParentId(); SessionState ss = state.getSessionState(sessionId); if (ss == null) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1a3fdd09/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java index c11674c..ecbbb5b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.PrintWriter; import java.io.StringWriter; import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -1453,6 +1454,10 @@ public abstract class ClusterTestBase extends ActiveMQTestBase { setupLiveServer(node, fileStorage, false, netty, isLive); } + protected boolean isResolveProtocols() { + return false; + } + protected void setupLiveServer(final int node, final boolean fileStorage, final boolean sharedStorage, @@ -1472,7 +1477,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase { haPolicyConfiguration = new ReplicatedPolicyConfiguration(); } - Configuration configuration = createBasicConfig(node).setJournalMaxIO_AIO(1000).setThreadPoolMaxSize(10).clearAcceptorConfigurations().addAcceptorConfiguration(createTransportConfiguration(netty, true, generateParams(node, netty))).setHAPolicyConfiguration(haPolicyConfiguration).setResolveProtocols(false); + Configuration configuration = createBasicConfig(node).setJournalMaxIO_AIO(1000).setThreadPoolMaxSize(10).clearAcceptorConfigurations().addAcceptorConfiguration(createTransportConfiguration(netty, true, generateParams(node, netty))).setHAPolicyConfiguration(haPolicyConfiguration).setResolveProtocols(isResolveProtocols()); ActiveMQServer server; @@ -1889,4 +1894,13 @@ public abstract class ClusterTestBase extends ActiveMQTestBase { protected boolean isFileStorage() { return true; } + + protected String getServerUri(int node) throws URISyntaxException { + ActiveMQServer server = servers[node]; + if (server == null) { + throw new IllegalStateException("No server at node " + server); + } + int port = TransportConstants.DEFAULT_PORT + node; + return "tcp://localhost:" + port; + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1a3fdd09/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/cluster/MessageRedistributionTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/cluster/MessageRedistributionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/cluster/MessageRedistributionTest.java new file mode 100644 index 0000000..3539c03 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/cluster/MessageRedistributionTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.openwire.cluster; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.postoffice.Bindings; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.util.ConsumerThread; +import org.junit.Test; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Session; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class MessageRedistributionTest extends ClusterTestBase { + + @Test + public void testRemoteConsumerClose() throws Exception { + + setupServer(0, true, true); + setupServer(1, true, true); + + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, true, 0, 1); + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, true, 1, 0); + + startServers(0, 1); + + waitForTopology(servers[0], 2); + waitForTopology(servers[1], 2); + + setupSessionFactory(0, true); + setupSessionFactory(1, true); + + createQueue(0, "queues.testaddress", "queue0", null, false); + createQueue(1, "queues.testaddress", "queue0", null, false); + + //alternately create consumers to the 2 nodes + //close the connection then close consumer quickly + //check server's consumer count + for (int i = 0; i < 50; i++) { + int target = i % 2; + int remote = (i + 1) % 2; + closeConsumerAndConnectionConcurrently(target, remote); + } + } + + @Override + protected boolean isResolveProtocols() { + return true; + } + + private void closeConsumerAndConnectionConcurrently(int targetNode, int remoteNode) throws Exception { + + String targetUri = getServerUri(targetNode); + System.out.println("uri is " + targetUri); + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(targetUri); + Connection conn = null; + CountDownLatch active = new CountDownLatch(1); + try { + conn = factory.createConnection(); + conn.start(); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination dest = ActiveMQDestination.createDestination("queue0", ActiveMQDestination.QUEUE_TYPE); + ConsumerThread consumer = new ConsumerThread(session, dest); + consumer.setMessageCount(0); + consumer.setFinished(active); + consumer.start(); + + assertTrue("consumer takes too long to finish!", active.await(5, TimeUnit.SECONDS)); + } finally { + conn.close(); + } + + //check remote server's consumer count + ActiveMQServer remoteServer = servers[remoteNode]; + Bindings bindings = remoteServer.getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")); + Collection bindingSet = bindings.getBindings(); + + RemoteQueueBinding remoteBinding = null; + for (Binding b : bindingSet) { + if (b instanceof RemoteQueueBinding) { + remoteBinding = (RemoteQueueBinding) b; + break; + } + } + + assertNotNull(remoteBinding); + int count = remoteBinding.consumerCount(); + assertTrue("consumer count should never be negative " + count, count >= 0); + } +}