Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 35180200B43 for ; Tue, 19 Jul 2016 17:42:43 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 33BC0160A76; Tue, 19 Jul 2016 15:42:43 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 54EE7160A5C for ; Tue, 19 Jul 2016 17:42:42 +0200 (CEST) Received: (qmail 67319 invoked by uid 500); 19 Jul 2016 15:36:01 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 67310 invoked by uid 99); 19 Jul 2016 15:36:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 Jul 2016 15:36:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B6867E03A6; Tue, 19 Jul 2016 15:36:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: cshannon@apache.org To: commits@activemq.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: activemq git commit: https://issues.apache.org/jira/browse/AMQ-6366 Date: Tue, 19 Jul 2016 15:36:01 +0000 (UTC) archived-at: Tue, 19 Jul 2016 15:42:43 -0000 Repository: activemq Updated Branches: refs/heads/activemq-5.13.x b547e4613 -> a82bd3cf7 https://issues.apache.org/jira/browse/AMQ-6366 Fixing the duplex bridge case for restarting durable subscriptions when dynamicOnly is false (cherry picked from commit 39184e2fb052fc73c69934890a16b333f1ea31d5) Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a82bd3cf Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a82bd3cf Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a82bd3cf Branch: refs/heads/activemq-5.13.x Commit: a82bd3cf721e32272b2cf3dee3aa1afcc726c3cb Parents: b547e46 Author: Christopher L. Shannon (cshannon) Authored: Tue Jul 19 11:30:32 2016 -0400 Committer: Christopher L. Shannon (cshannon) Committed: Tue Jul 19 11:34:38 2016 -0400 ---------------------------------------------------------------------- .../activemq/broker/TransportConnection.java | 14 +- .../apache/activemq/usecases/AMQ6366Test.java | 141 +++++++++++++++++++ 2 files changed, 151 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/a82bd3cf/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java index 350f529..b16383a 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -27,6 +27,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -115,9 +116,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor { protected final Map brokerConnectionStates; // The broker and wireformat info that was exchanged. protected BrokerInfo brokerInfo; - protected final List dispatchQueue = new LinkedList(); + protected final List dispatchQueue = new LinkedList<>(); protected TaskRunner taskRunner; - protected final AtomicReference transportException = new AtomicReference(); + protected final AtomicReference transportException = new AtomicReference<>(); protected AtomicBoolean dispatchStopped = new AtomicBoolean(false); private final Transport transport; private MessageAuthorizationPolicy messageAuthorizationPolicy; @@ -139,8 +140,8 @@ public class TransportConnection implements Connection, Task, CommandVisitor { private final AtomicBoolean stopping = new AtomicBoolean(false); private final CountDownLatch stopped = new CountDownLatch(1); private final AtomicBoolean asyncException = new AtomicBoolean(false); - private final Map producerExchanges = new HashMap(); - private final Map consumerExchanges = new HashMap(); + private final Map producerExchanges = new HashMap<>(); + private final Map consumerExchanges = new HashMap<>(); private final CountDownLatch dispatchStoppedLatch = new CountDownLatch(1); private ConnectionContext context; private boolean networkConnection; @@ -1394,6 +1395,11 @@ public class TransportConnection implements Connection, Task, CommandVisitor { listener.setCreatedByDuplex(true); duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener); duplexBridge.setBrokerService(broker.getBrokerService()); + Set durableDestinations = broker.getDurableDestinations(); + //Need to set durableDestinations to properly restart subs when dynamicOnly=false + if (durableDestinations != null) { + duplexBridge.setDurableDestinations(broker.getDurableDestinations().toArray(new ActiveMQDestination[0])); + } // now turn duplex off this side info.setDuplexConnection(false); duplexBridge.setCreatedByDuplex(true); http://git-wip-us.apache.org/repos/asf/activemq/blob/a82bd3cf/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6366Test.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6366Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6366Test.java new file mode 100644 index 0000000..ec75232 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6366Test.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import java.io.File; +import java.net.URI; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import javax.jms.MessageConsumer; + +import org.apache.activemq.JmsMultipleBrokersTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.DurableTopicSubscription; +import org.apache.activemq.broker.region.Topic; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.network.DiscoveryNetworkConnector; +import org.apache.activemq.network.NetworkConnector; +import org.apache.activemq.util.IOHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Show that both directions of a duplex bridge will properly restart the + * network durable consumers if dynamicOnly is false. + */ +public class AMQ6366Test extends JmsMultipleBrokersTestSupport { + protected static final Logger LOG = LoggerFactory.getLogger(AMQ6366Test.class); + final ActiveMQTopic dest = new ActiveMQTopic("TEST.FOO"); + + + /** + * This test works even before AMQ6366 + * @throws Exception + */ + public void testDuplexDurableSubRestarted() throws Exception { + testNonDurableReceiveThrougRestart("BrokerA", "BrokerB"); + } + + /** + * This test failed before AMQ6366 because the NC durable consumer was + * never properly activated. + * + * @throws Exception + */ + public void testDuplexDurableSubRestartedReverse() throws Exception { + testNonDurableReceiveThrougRestart("BrokerB", "BrokerA"); + } + + protected void testNonDurableReceiveThrougRestart(String pubBroker, String conBroker) throws Exception { + NetworkConnector networkConnector = bridgeBrokerPair("BrokerA", "BrokerB"); + + startAllBrokers(); + waitForBridgeFormation(); + + MessageConsumer client = createDurableSubscriber(conBroker, dest, "sub1"); + client.close(); + + Thread.sleep(1000); + networkConnector.stop(); + Thread.sleep(1000); + + Set durableDests = new HashSet<>(); + durableDests.add(dest); + //Normally set on broker start from the persistence layer but + //simulate here since we just stopped and started the network connector + //without a restart + networkConnector.setDurableDestinations(durableDests); + networkConnector.start(); + waitForBridgeFormation(); + + // Send messages + sendMessages(pubBroker, dest, 1); + Thread.sleep(1000); + + Topic destination = (Topic) brokers.get(conBroker).broker.getDestination(dest); + DurableTopicSubscription sub = destination.getDurableTopicSubs(). + values().toArray(new DurableTopicSubscription[0])[0]; + + //Assert that the message made it to the other broker + assertEquals(1, sub.getSubscriptionStatistics().getEnqueues().getCount()); + } + + @Override + protected void configureBroker(BrokerService broker) { + broker.getManagementContext().setCreateConnector(false); + broker.setAdvisorySupport(true); + } + + protected NetworkConnector bridgeBrokerPair(String localBrokerName, String remoteBrokerName) throws Exception { + BrokerService localBroker = brokers.get(localBrokerName).broker; + BrokerService remoteBroker = brokers.get(remoteBrokerName).broker; + + List transportConnectors = remoteBroker.getTransportConnectors(); + URI remoteURI; + if (!transportConnectors.isEmpty()) { + remoteURI = transportConnectors.get(0).getConnectUri(); + String uri = "static:(" + remoteURI + ")"; + NetworkConnector connector = new DiscoveryNetworkConnector(new URI(uri)); + connector.setDynamicOnly(false); // so matching durable subs are loaded on start + connector.setStaticBridge(false); + connector.setDuplex(true); + connector.addDynamicallyIncludedDestination(dest); + localBroker.addNetworkConnector(connector); + return connector; + } else { + throw new Exception("Remote broker has no registered connectors."); + } + } + + @Override + public void setUp() throws Exception { + File dataDir = new File(IOHelper.getDefaultDataDirectory()); + LOG.info("Delete dataDir.." + dataDir.getCanonicalPath()); + org.apache.activemq.TestSupport.recursiveDelete(dataDir); + super.setAutoFail(true); + super.setUp(); + createBroker(new URI( + "broker:(tcp://0.0.0.0:0)/BrokerA")); + createBroker(new URI( + "broker:(tcp://0.0.0.0:0)/BrokerB")); + + } +}