activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [2/2] activemq-artemis git commit: ARTEMIS-1654 - fix brige reconnect logic
Date Fri, 02 Feb 2018 17:18:10 GMT
ARTEMIS-1654 - fix brige reconnect logic

Make sure that if a bridge disconnects and there is no record in the topology that it uses
the original bridge connector to reconnect.

Originally the live broker that disconnected was left in the Topology, thie broke quorum voting
as when th evote happened all brokers when asked though th etarget broker was still alive.
The fix for this was to remove the target live broker from the Topology. Since the bridge
reconnect logic relied on this in a non HA environment to reconnect this stopped working.
The fix now uses the original target connector (or backup) to reconnect in the case where
the broker was actually removed from the cluster.

https://issues.apache.org/jira/browse/ARTEMIS-1654


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/032210a7
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/032210a7
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/032210a7

Branch: refs/heads/master
Commit: 032210a7c692d26baa13a80f30a3cf62c5df594e
Parents: 2a72923
Author: Andy Taylor <andy.tayls67@gmail.com>
Authored: Thu Feb 1 13:22:59 2018 +0000
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Fri Feb 2 12:17:41 2018 -0500

----------------------------------------------------------------------
 .../core/server/cluster/impl/BridgeImpl.java    |  11 +-
 .../cluster/impl/ClusterConnectionBridge.java   |  14 +-
 .../bridge/ClusteredBridgeReconnectTest.java    | 143 +++++++++++++++++++
 3 files changed, 154 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/032210a7/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index 4790fda..2c4db3e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -506,10 +506,6 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
       }
    }
 
-   protected boolean isPlainCoreBridge() {
-      return true;
-   }
-
    /* Hook for processing message before forwarding */
    protected Message beforeForward(final Message message, final SimpleString forwardingAddress)
{
       if (useDuplicateDetection) {
@@ -824,7 +820,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
       return csf;
    }
 
-   private ClientSessionFactoryInternal reconnectOnOriginalNode() throws Exception {
+   protected ClientSessionFactoryInternal reconnectOnOriginalNode() throws Exception {
       String targetNodeIdUse = targetNodeID;
       TopologyMember nodeUse = targetNode;
       if (targetNodeIdUse != null && nodeUse != null) {
@@ -916,10 +912,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
 
             ActiveMQServerLogger.LOGGER.bridgeConnected(this);
 
-            // We only do this on plain core bridges
-            if (isPlainCoreBridge()) {
-               serverLocator.addClusterTopologyListener(new TopologyListener());
-            }
+            serverLocator.addClusterTopologyListener(new TopologyListener());
 
             keepConnecting = false;
             return;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/032210a7/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
index 7ebc273..cf17bbe 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
@@ -36,6 +36,7 @@ import org.apache.activemq.artemis.api.core.management.ManagementHelper;
 import org.apache.activemq.artemis.api.core.management.ResourceNames;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
 import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
+import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.postoffice.BindingType;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -79,6 +80,7 @@ public class ClusterConnectionBridge extends BridgeImpl {
    private final ServerLocatorInternal discoveryLocator;
 
    private final String storeAndForwardPrefix;
+   private TopologyMemberImpl member;
 
    public ClusterConnectionBridge(final ClusterConnection clusterConnection,
                                   final ClusterManager clusterManager,
@@ -139,6 +141,13 @@ public class ClusterConnectionBridge extends BridgeImpl {
    protected ClientSessionFactoryInternal createSessionFactory() throws Exception {
       serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator));
       ClientSessionFactoryInternal factory = (ClientSessionFactoryInternal) serverLocator.createSessionFactory(targetNodeID);
+      //if it is null then its possible the broker was removed after a disconnect so lets
try the original connectors
+      if (factory == null) {
+         factory = reconnectOnOriginalNode();
+         if (factory == null) {
+            return null;
+         }
+      }
       setSessionFactory(factory);
 
       if (factory == null) {
@@ -372,9 +381,4 @@ public class ClusterConnectionBridge extends BridgeImpl {
          clusterConnection.disconnectRecord(targetNodeID);
       }
    }
-
-   @Override
-   protected boolean isPlainCoreBridge() {
-      return false;
-   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/032210a7/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java
new file mode 100644
index 0000000..a280b85
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.cluster.bridge;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
+import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionBridge;
+import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * This will simulate a failure of a failure.
+ * The bridge could eventually during a race or multiple failures not be able to reconnect
because it failed again.
+ * this should make the bridge to always reconnect itself.
+ */
+
+public class ClusteredBridgeReconnectTest extends ClusterTestBase {
+
+   @Test
+   public void testReconnectBridge() throws Exception {
+      setupServer(0, isFileStorage(), isNetty());
+      setupServer(1, isFileStorage(), isNetty());
+
+      setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1,
isNetty(), 0, 1);
+
+      setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1,
isNetty(), 1, 0);
+
+      startServers(0, 1);
+
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+
+      createQueue(0, "queues.testaddress", "queue0", null, true);
+      createQueue(1, "queues.testaddress", "queue0", null, true);
+
+      addConsumer(0, 0, "queue0", null);
+      addConsumer(1, 1, "queue0", null);
+
+      waitForBindings(0, "queues.testaddress", 1, 1, true);
+      waitForBindings(1, "queues.testaddress", 1, 1, true);
+
+      waitForBindings(0, "queues.testaddress", 1, 1, false);
+      waitForBindings(1, "queues.testaddress", 1, 1, false);
+
+      ClientSession session0 = sfs[0].createSession();
+      ClientSession session1 = sfs[0].createSession();
+
+      session0.start();
+      session1.start();
+
+      ClientProducer producer = session0.createProducer("queues.testaddress");
+
+      int NUMBER_OF_MESSAGES = 100;
+
+      Assert.assertEquals(1, servers[0].getClusterManager().getClusterConnections().size());
+
+      ClusterConnectionImpl connection = servers[0].getClusterManager().getClusterConnections().toArray(new
ClusterConnectionImpl[0])[0];
+      Assert.assertEquals(1, connection.getRecords().size());
+
+      MessageFlowRecord record = connection.getRecords().values().toArray(new MessageFlowRecord[1])[0];
+      ClusterConnectionBridge bridge = (ClusterConnectionBridge) record.getBridge();
+
+      for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+         ClientMessage msg = session0.createMessage(true);
+         producer.send(msg);
+         session0.commit();
+
+         if (i == 17) {
+            bridge.getSessionFactory().getConnection().fail(new ActiveMQException("failed
once!"));
+         }
+      }
+
+      int cons0Count = 0, cons1Count = 0;
+
+      while (true) {
+         ClientMessage msg = consumers[0].getConsumer().receive(1000);
+         if (msg == null) {
+            break;
+         }
+         cons0Count++;
+         msg.acknowledge();
+         session0.commit();
+      }
+
+      while (true) {
+         ClientMessage msg = consumers[1].getConsumer().receive(1000);
+         if (msg == null) {
+            break;
+         }
+         cons1Count++;
+         msg.acknowledge();
+         session1.commit();
+      }
+
+      Assert.assertEquals("cons0 = " + cons0Count + ", cons1 = " + cons1Count, NUMBER_OF_MESSAGES,
cons0Count + cons1Count);
+
+      session0.commit();
+      session1.commit();
+
+      connection = servers[0].getClusterManager().getClusterConnections().toArray(new ClusterConnectionImpl[0])[0];
+      Assert.assertEquals(1, connection.getRecords().size());
+      Assert.assertNotNull(bridge.getSessionFactory());
+
+      stopServers(0, 1);
+
+   }
+
+
+
+   @Override
+   @After
+   public void tearDown() throws Exception {
+      closeAllConsumers();
+      closeAllSessionFactories();
+      closeAllServerLocatorsFactories();
+      super.tearDown();
+   }
+
+   public boolean isNetty() {
+      return true;
+   }
+}


Mime
View raw message