activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [3/3] activemq-artemis git commit: ARTEMIS-1779 ClusterConnectionBridge may connect to other nodes than its target
Date Tue, 03 Apr 2018 20:22:59 GMT
ARTEMIS-1779 ClusterConnectionBridge may connect to other nodes than its target

The cluster connection bridge has a TopologyListener and connects to a new node
each time it receives a nodeUp() event. It needs to put a check here to make
sure that the cluster bridge only connects to its target node and it's backups.

This issue shows up when you run LiveToLiveFailoverTest.testConsumerTransacted
test.

Also in this commit improvement of BackupSyncJournalTest so that it runs more
stable.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/262990fa
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/262990fa
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/262990fa

Branch: refs/heads/master
Commit: 262990fa6716f5bf3c72bc8a44616cef4df17c11
Parents: 650c79e
Author: Howard Gao <howard.gao@gmail.com>
Authored: Mon Apr 2 19:16:30 2018 +0800
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Tue Apr 3 16:22:13 2018 -0400

----------------------------------------------------------------------
 .../core/server/cluster/impl/BridgeImpl.java    |  7 ++++
 .../cluster/impl/ClusterConnectionBridge.java   |  4 ---
 .../cluster/failover/BackupSyncJournalTest.java | 35 ++++++++++++++++++--
 3 files changed, 40 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/262990fa/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index 01596fd..16724bf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -1159,6 +1159,13 @@ public class BridgeImpl implements Bridge, SessionFailureListener,
SendAcknowled
       // ClusterListener
       @Override
       public void nodeUP(TopologyMember member, boolean last) {
+         if (BridgeImpl.this.queue.isInternalQueue() && member != null &&
BridgeImpl.this.targetNodeID != null && !BridgeImpl.this.targetNodeID.equals(member.getNodeId()))
{
+            //A ClusterConnectionBridge (identified by holding an internal queue)
+            //never re-connects to another node here. It only connects to its original
+            //target node (from the ClusterConnection) or its backups. That's why
+            //we put a return here.
+            return;
+         }
          ClientSessionInternal sessionToUse = session;
          RemotingConnection connectionToUse = sessionToUse != null ? sessionToUse.getConnection()
: null;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/262990fa/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
index cf17bbe..88005aa 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
@@ -58,7 +58,6 @@ import org.jboss.logging.Logger;
  * Such as such adding extra properties and setting up notifications between the nodes.
  */
 public class ClusterConnectionBridge extends BridgeImpl {
-
    private static final Logger logger = Logger.getLogger(ClusterConnectionBridge.class);
 
    private final ClusterConnection clusterConnection;
@@ -127,9 +126,6 @@ public class ClusterConnectionBridge extends BridgeImpl {
       this.managementNotificationAddress = managementNotificationAddress;
       this.flowRecord = flowRecord;
 
-      // we need to disable DLQ check on the clustered bridges
-      queue.setInternalQueue(true);
-
       if (logger.isTraceEnabled()) {
          logger.trace("Setting up bridge between " + clusterConnection.getConnector() + "
and " + targetLocator, new Exception("trace"));
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/262990fa/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java
index c654472..8342c62 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -32,6 +33,8 @@ import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientProducer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.FailoverEventListener;
+import org.apache.activemq.artemis.api.core.client.FailoverEventType;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
 import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
 import org.apache.activemq.artemis.core.config.Configuration;
@@ -73,16 +76,18 @@ public class BackupSyncJournalTest extends FailoverTestBase {
       return n_msgs;
    }
 
+   protected final FailoverWaiter failoverWaiter = new FailoverWaiter();
+
    @Override
    @Before
    public void setUp() throws Exception {
       startBackupServer = false;
       super.setUp();
       setNumberOfMessages(defaultNMsgs);
-      locator = (ServerLocatorInternal) getServerLocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setReconnectAttempts(15);
+      locator = (ServerLocatorInternal) getServerLocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setReconnectAttempts(15).setRetryInterval(200);
       sessionFactory = createSessionFactoryAndWaitForTopology(locator, 1);
+      sessionFactory.addFailoverListener(failoverWaiter);
       syncDelay = new BackupSyncDelay(backupServer, liveServer);
-
    }
 
    @Test
@@ -326,8 +331,13 @@ public class BackupSyncJournalTest extends FailoverTestBase {
       liveServer.removeInterceptor(syncDelay);
       backupServer.start();
       waitForBackup(sessionFactory, BACKUP_WAIT_TIME);
+      failoverWaiter.reset();
       crash(session);
       backupServer.getServer().waitForActivation(5, TimeUnit.SECONDS);
+      //for some system the retryAttempts and retryInterval may be too small
+      //so that during failover all attempts have failed before the backup
+      //server is fully activated.
+      assertTrue("Session didn't failover, the maxRetryAttempts and retryInterval may be
too small", failoverWaiter.waitFailoverComplete());
    }
 
    protected void createProducerSendSomeMessages() throws ActiveMQException {
@@ -384,4 +394,25 @@ public class BackupSyncJournalTest extends FailoverTestBase {
    protected TransportConfiguration getConnectorTransportConfiguration(boolean live) {
       return TransportConfigurationUtils.getInVMConnector(live);
    }
+
+   private class FailoverWaiter implements FailoverEventListener {
+
+      private CountDownLatch latch;
+
+      public void reset() {
+         latch = new CountDownLatch(1);
+      }
+
+      @Override
+      public void failoverEvent(FailoverEventType eventType) {
+         if (eventType == FailoverEventType.FAILOVER_COMPLETED) {
+            latch.countDown();
+         }
+      }
+
+      public boolean waitFailoverComplete() throws InterruptedException {
+         return latch.await(10, TimeUnit.SECONDS);
+      }
+   }
+
 }


Mime
View raw message