activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cshan...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-6366
Date Tue, 19 Jul 2016 15:32:01 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 4b018b420 -> 39184e2fb


https://issues.apache.org/jira/browse/AMQ-6366

Fixing the duplex bridge case for restarting durable subscriptions when
dynamicOnly is false


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/39184e2f
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/39184e2f
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/39184e2f

Branch: refs/heads/master
Commit: 39184e2fb052fc73c69934890a16b333f1ea31d5
Parents: 4b018b4
Author: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Authored: Tue Jul 19 11:30:32 2016 -0400
Committer: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Committed: Tue Jul 19 11:31:44 2016 -0400

----------------------------------------------------------------------
 .../activemq/broker/TransportConnection.java    |  14 +-
 .../apache/activemq/usecases/AMQ6366Test.java   | 141 +++++++++++++++++++
 2 files changed, 151 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/39184e2f/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index fde46c4..8019c12 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -27,6 +27,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
@@ -116,9 +117,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor
{
     protected final Map<ConnectionId, ConnectionState> brokerConnectionStates;
     // The broker and wireformat info that was exchanged.
     protected BrokerInfo brokerInfo;
-    protected final List<Command> dispatchQueue = new LinkedList<Command>();
+    protected final List<Command> dispatchQueue = new LinkedList<>();
     protected TaskRunner taskRunner;
-    protected final AtomicReference<Throwable> transportException = new AtomicReference<Throwable>();
+    protected final AtomicReference<Throwable> transportException = new AtomicReference<>();
     protected AtomicBoolean dispatchStopped = new AtomicBoolean(false);
     private final Transport transport;
     private MessageAuthorizationPolicy messageAuthorizationPolicy;
@@ -140,8 +141,8 @@ public class TransportConnection implements Connection, Task, CommandVisitor
{
     private final AtomicBoolean stopping = new AtomicBoolean(false);
     private final CountDownLatch stopped = new CountDownLatch(1);
     private final AtomicBoolean asyncException = new AtomicBoolean(false);
-    private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<ProducerId,
ProducerBrokerExchange>();
-    private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId,
ConsumerBrokerExchange>();
+    private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<>();
+    private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<>();
     private final CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
     private ConnectionContext context;
     private boolean networkConnection;
@@ -1419,6 +1420,11 @@ public class TransportConnection implements Connection, Task, CommandVisitor
{
                 listener.setCreatedByDuplex(true);
                 duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport,
remoteBridgeTransport, listener);
                 duplexBridge.setBrokerService(brokerService);
+                Set<ActiveMQDestination> durableDestinations = broker.getDurableDestinations();
+                //Need to set durableDestinations to properly restart subs when dynamicOnly=false
+                if (durableDestinations != null) {
+                    duplexBridge.setDurableDestinations(broker.getDurableDestinations().toArray(new
ActiveMQDestination[0]));
+                }
                 // now turn duplex off this side
                 info.setDuplexConnection(false);
                 duplexBridge.setCreatedByDuplex(true);

http://git-wip-us.apache.org/repos/asf/activemq/blob/39184e2f/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6366Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6366Test.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6366Test.java
new file mode 100644
index 0000000..ec75232
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6366Test.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import java.io.File;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import javax.jms.MessageConsumer;
+
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
+import org.apache.activemq.broker.region.Topic;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.util.IOHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Show that both directions of a duplex bridge will properly restart the
+ * network durable consumers if dynamicOnly is false.
+ */
+public class AMQ6366Test extends JmsMultipleBrokersTestSupport {
+    protected static final Logger LOG = LoggerFactory.getLogger(AMQ6366Test.class);
+    final ActiveMQTopic dest = new ActiveMQTopic("TEST.FOO");
+
+
+    /**
+     * This test works even before AMQ6366
+     * @throws Exception
+     */
+    public void testDuplexDurableSubRestarted() throws Exception {
+        testNonDurableReceiveThrougRestart("BrokerA", "BrokerB");
+    }
+
+    /**
+     * This test failed before AMQ6366 because the NC durable consumer was
+     * never properly activated.
+     *
+     * @throws Exception
+     */
+    public void testDuplexDurableSubRestartedReverse() throws Exception {
+        testNonDurableReceiveThrougRestart("BrokerB", "BrokerA");
+    }
+
+    protected void testNonDurableReceiveThrougRestart(String pubBroker, String conBroker)
throws Exception {
+        NetworkConnector networkConnector = bridgeBrokerPair("BrokerA", "BrokerB");
+
+        startAllBrokers();
+        waitForBridgeFormation();
+
+        MessageConsumer client = createDurableSubscriber(conBroker, dest, "sub1");
+        client.close();
+
+        Thread.sleep(1000);
+        networkConnector.stop();
+        Thread.sleep(1000);
+
+        Set<ActiveMQDestination> durableDests = new HashSet<>();
+        durableDests.add(dest);
+        //Normally set on broker start from the persistence layer but
+        //simulate here since we just stopped and started the network connector
+        //without a restart
+        networkConnector.setDurableDestinations(durableDests);
+        networkConnector.start();
+        waitForBridgeFormation();
+
+        // Send messages
+        sendMessages(pubBroker, dest, 1);
+        Thread.sleep(1000);
+
+        Topic destination = (Topic) brokers.get(conBroker).broker.getDestination(dest);
+        DurableTopicSubscription sub = destination.getDurableTopicSubs().
+                values().toArray(new DurableTopicSubscription[0])[0];
+
+        //Assert that the message made it to the other broker
+        assertEquals(1, sub.getSubscriptionStatistics().getEnqueues().getCount());
+    }
+
+    @Override
+    protected void configureBroker(BrokerService broker) {
+        broker.getManagementContext().setCreateConnector(false);
+        broker.setAdvisorySupport(true);
+    }
+
+    protected NetworkConnector bridgeBrokerPair(String localBrokerName, String remoteBrokerName)
throws Exception {
+        BrokerService localBroker = brokers.get(localBrokerName).broker;
+        BrokerService remoteBroker = brokers.get(remoteBrokerName).broker;
+
+        List<TransportConnector> transportConnectors = remoteBroker.getTransportConnectors();
+        URI remoteURI;
+        if (!transportConnectors.isEmpty()) {
+            remoteURI = transportConnectors.get(0).getConnectUri();
+            String uri = "static:(" + remoteURI + ")";
+            NetworkConnector connector = new DiscoveryNetworkConnector(new URI(uri));
+            connector.setDynamicOnly(false); // so matching durable subs are loaded on start
+            connector.setStaticBridge(false);
+            connector.setDuplex(true);
+            connector.addDynamicallyIncludedDestination(dest);
+            localBroker.addNetworkConnector(connector);
+            return connector;
+        } else {
+            throw new Exception("Remote broker has no registered connectors.");
+        }
+    }
+
+    @Override
+    public void setUp() throws Exception {
+        File dataDir = new File(IOHelper.getDefaultDataDirectory());
+        LOG.info("Delete dataDir.." + dataDir.getCanonicalPath());
+        org.apache.activemq.TestSupport.recursiveDelete(dataDir);
+        super.setAutoFail(true);
+        super.setUp();
+        createBroker(new URI(
+                "broker:(tcp://0.0.0.0:0)/BrokerA"));
+        createBroker(new URI(
+                "broker:(tcp://0.0.0.0:0)/BrokerB"));
+
+    }
+}


Mime
View raw message