Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2F37E200C45 for ; Tue, 28 Mar 2017 15:56:28 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 2DE00160B9B; Tue, 28 Mar 2017 13:56:28 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id F17E3160B7E for ; Tue, 28 Mar 2017 15:56:26 +0200 (CEST) Received: (qmail 64016 invoked by uid 500); 28 Mar 2017 13:56:26 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 64002 invoked by uid 99); 28 Mar 2017 13:56:25 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Mar 2017 13:56:25 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B344ADFF47; Tue, 28 Mar 2017 13:56:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: gtully@apache.org To: commits@activemq.apache.org Message-Id: <40260c653fcd4716bbad2da19939e3f6@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: activemq git commit: [AMQ-6640] fix duplicate suppression sync request on responder end of duplex network connector only b/c that has the async local transport. Additional test. Ensure broker sync is conditional on the need for duplicate suppression whic Date: Tue, 28 Mar 2017 13:56:25 +0000 (UTC) archived-at: Tue, 28 Mar 2017 13:56:28 -0000 Repository: activemq Updated Branches: refs/heads/master 0196be1d2 -> 8e00c6c2b [AMQ-6640] fix duplicate suppression sync request on responder end of duplex network connector only b/c that has the async local transport. Additional test. Ensure broker sync is conditional on the need for duplicate suppression which should only be necessary in ring topologies when properly configured Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8e00c6c2 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8e00c6c2 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8e00c6c2 Branch: refs/heads/master Commit: 8e00c6c2bc30e38cee585d9de97b511ed664951b Parents: 0196be1 Author: gtully Authored: Tue Mar 28 14:49:23 2017 +0100 Committer: gtully Committed: Tue Mar 28 14:49:23 2017 +0100 ---------------------------------------------------------------------- .../network/DemandForwardingBridgeSupport.java | 29 ++- .../org/apache/activemq/bugs/AMQ3274Test.java | 4 +- .../usecases/DuplexAdvisoryRaceTest.java | 247 +++++++++++++++++++ 3 files changed, 272 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/8e00c6c2/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index f7dc745..459501c 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -785,7 +785,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br case ConsumerInfo.DATA_STRUCTURE_TYPE: localStartedLatch.await(); if (started.get()) { - addConsumerInfo((ConsumerInfo) command); + final ConsumerInfo consumerInfo = (ConsumerInfo) command; + if (isDuplicateSuppressionOff(consumerInfo)) { + addConsumerInfo(consumerInfo); + } else { + synchronized (brokerService.getVmConnectorURI()) { + addConsumerInfo(consumerInfo); + } + } } else { // received a subscription whilst stopping LOG.warn("Stopping - ignoring ConsumerInfo: {}", command); @@ -867,8 +874,13 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br // in a cyclic network there can be multiple bridges per broker that can propagate // a network subscription so there is a need to synchronize on a shared entity - synchronized (brokerService.getVmConnectorURI()) { + // if duplicate suppression is required + if (isDuplicateSuppressionOff(info)) { addConsumerInfo(info); + } else { + synchronized (brokerService.getVmConnectorURI()) { + addConsumerInfo(info); + } } } else if (data.getClass() == DestinationInfo.class) { // It's a destination info - we want to pass up information about temporary destinations @@ -1027,8 +1039,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br protected void addSubscription(DemandSubscription sub) throws IOException { if (sub != null) { - if (isDuplex()) { - // async vm transport, need to wait for completion + if (isCreatedByDuplex() && !isDuplicateSuppressionOff(sub.getRemoteInfo())) { + // async vm transport on duplex end, need to wait for completion localBroker.request(sub.getLocalInfo()); } else { localBroker.oneway(sub.getLocalInfo()); @@ -1332,8 +1344,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br final ConsumerInfo consumerInfo = candidate.getRemoteInfo(); boolean suppress = false; - if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() || consumerInfo.getDestination().isTopic() - && !configuration.isSuppressDuplicateTopicSubscriptions()) { + if (isDuplicateSuppressionOff(consumerInfo)) { return suppress; } @@ -1355,6 +1366,12 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br return suppress; } + private boolean isDuplicateSuppressionOff(final ConsumerInfo consumerInfo) { + return !configuration.isSuppressDuplicateQueueSubscriptions() && !configuration.isSuppressDuplicateTopicSubscriptions() + || consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() + || consumerInfo.getDestination().isTopic() && !configuration.isSuppressDuplicateTopicSubscriptions(); + } + private boolean isInActiveDurableSub(Subscription sub) { return (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription && !((DurableTopicSubscription) sub).isActive()); } http://git-wip-us.apache.org/repos/asf/activemq/blob/8e00c6c2/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java index 48c5cbb..f901d3d 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java @@ -496,10 +496,10 @@ public class AMQ3274Test { if (queue_f) { prefix = "queue"; - excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.QUEUE_TYPE); + excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.TOPIC_TYPE); } else { prefix = "topic"; - excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.TOPIC_TYPE); + excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.QUEUE_TYPE); } excludes = new ArrayList(); http://git-wip-us.apache.org/repos/asf/activemq/blob/8e00c6c2/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DuplexAdvisoryRaceTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DuplexAdvisoryRaceTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DuplexAdvisoryRaceTest.java new file mode 100644 index 0000000..9919ec9 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DuplexAdvisoryRaceTest.java @@ -0,0 +1,247 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerPluginSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.network.DemandForwardingBridge; +import org.apache.activemq.network.DiscoveryNetworkConnector; +import org.apache.activemq.network.NetworkBridge; +import org.apache.activemq.network.NetworkConnector; +import org.apache.activemq.util.TestUtils; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import java.net.InetAddress; +import java.net.Socket; +import java.net.URI; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.assertTrue; + +// https://issues.apache.org/jira/browse/AMQ-6640 +public class DuplexAdvisoryRaceTest { + private static final Logger LOG = LoggerFactory.getLogger(DuplexAdvisoryRaceTest.class); + private static String hostName; + + final AtomicLong responseReceived = new AtomicLong(0); + + BrokerService brokerA,brokerB; + String networkConnectorUrlString; + + @BeforeClass + public static void initIp() throws Exception { + // attempt to bypass loopback - not vital but it helps to reproduce + hostName = InetAddress.getLocalHost().getHostAddress(); + } + + @Before + public void createBrokers() throws Exception { + networkConnectorUrlString = "tcp://" + hostName + ":" + TestUtils.findOpenPort(); + + brokerA = newBroker("A"); + brokerB = newBroker("B"); + responseReceived.set(0); + } + + @After + public void stopBrokers() throws Exception { + brokerA.stop(); + brokerB.stop(); + } + + + // to be sure to be sure + public void repeatTestHang() throws Exception { + for (int i=0; i<10;i++) { + testHang(); + stopBrokers(); + createBrokers(); + } + } + + @Test + public void testHang() throws Exception { + + brokerA.setPlugins(new BrokerPlugin[] { new BrokerPluginSupport() { + @Override + public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { + Subscription subscription = super.addConsumer(context, info); + // delay return to allow dispatch to interleave + if (context.isNetworkConnection()) { + TimeUnit.MILLISECONDS.sleep(300); + } + return subscription; + }; + }}); + + // bridge + NetworkConnector networkConnector = bridgeBrokers(brokerA, brokerB); + + brokerA.start(); + brokerB.start(); + + ActiveMQConnectionFactory brokerAFactory = new ActiveMQConnectionFactory(brokerA.getTransportConnectorByScheme("tcp").getPublishableConnectString() + + "?jms.watchTopicAdvisories=false"); + + ActiveMQConnectionFactory brokerBFactory = new ActiveMQConnectionFactory(brokerB.getTransportConnectorByScheme("tcp").getPublishableConnectString() + + "?jms.watchTopicAdvisories=false"); + + // populate dests + final int numDests = 200; + final int numMessagesPerDest = 300; + final int numConsumersPerDest = 100; + populate(brokerAFactory, 0, numDests/2, numMessagesPerDest); + populate(brokerBFactory, numDests/2, numDests, numMessagesPerDest); + + // demand + List connections = new LinkedList<>(); + connections.add(demand(brokerBFactory, 0, numDests/2, numConsumersPerDest)); + connections.add(demand(brokerAFactory, numDests/2, numDests, numConsumersPerDest)); + + + LOG.info("Allow duplex bridge to connect...."); + // allow bridge to start + brokerB.startTransportConnector(brokerB.addConnector(networkConnectorUrlString + "?transport.socketBufferSize=1024")); + + if (!Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("received: " + responseReceived.get()); + return responseReceived.get() >= numMessagesPerDest * numDests; + } + }, 2*60*1000)) { + + org.apache.activemq.TestSupport.dumpAllThreads("DD"); + + // when hung close will also hang! + for (NetworkBridge networkBridge : networkConnector.activeBridges()) { + if (networkBridge instanceof DemandForwardingBridge) { + DemandForwardingBridge demandForwardingBridge = (DemandForwardingBridge) networkBridge; + Socket socket = demandForwardingBridge.getRemoteBroker().narrow(Socket.class); + socket.close(); + } + } + } + + networkConnector.stop(); + for (Connection connection: connections) { + try { + connection.close(); + } catch (Exception ignored) {} + } + assertTrue("received all sent: " + responseReceived.get(), responseReceived.get() >= numMessagesPerDest * numDests); + } + + + private void populate(ActiveMQConnectionFactory factory, int minDest, int maxDest, int numMessages) throws JMSException { + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final BytesMessage message = session.createBytesMessage(); + //message.writeBytes(new byte[50]); + MessageProducer producer = session.createProducer(null);; + for (int i=minDest; i