activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbert...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-1814 Try original connection when every other node failed
Date Wed, 18 Apr 2018 13:49:18 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 54ada0e7a -> 7fa8c55f4


ARTEMIS-1814 Try original connection when every other node failed


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1e9f76f4
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1e9f76f4
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1e9f76f4

Branch: refs/heads/master
Commit: 1e9f76f45a5ded70f3c55d40071bfa5304e59044
Parents: 54ada0e
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Mon Apr 16 17:44:21 2018 -0400
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Tue Apr 17 17:00:05 2018 -0400

----------------------------------------------------------------------
 .../client/impl/ClientSessionFactoryImpl.java   | 62 ++++++++++++-----
 .../cluster/failover/FailoverTest.java          | 41 +++++++++++
 .../failover/LiveToLiveFailoverTest.java        | 71 +++++++-------------
 3 files changed, 108 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e9f76f4/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
index e8ac8f8..5c972e3 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
@@ -64,8 +64,8 @@ import org.apache.activemq.artemis.spi.core.remoting.TopologyResponseHandler;
 import org.apache.activemq.artemis.utils.ClassloadingUtil;
 import org.apache.activemq.artemis.utils.ConfirmationWindowWarning;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
-import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
 import org.apache.activemq.artemis.utils.UUIDGenerator;
+import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
 import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
 import org.jboss.logging.Logger;
 
@@ -77,7 +77,9 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal,
C
 
    private final ClientProtocolManager clientProtocolManager;
 
-   private TransportConfiguration connectorConfig;
+   private final TransportConfiguration connectorConfig;
+
+   private TransportConfiguration currentConnectorConfig;
 
    private TransportConfiguration backupConfig;
 
@@ -175,6 +177,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal,
C
 
       this.connectorConfig = connectorConfig;
 
+      this.currentConnectorConfig = connectorConfig;
+
       connectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName());
 
       checkTransportKeys(connectorFactory, connectorConfig);
@@ -238,7 +242,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal,
C
       getConnectionWithRetry(initialConnectAttempts);
 
       if (connection == null) {
-         StringBuilder msg = new StringBuilder("Unable to connect to server using configuration
").append(connectorConfig);
+         StringBuilder msg = new StringBuilder("Unable to connect to server using configuration
").append(currentConnectorConfig);
          if (backupConfig != null) {
             msg.append(" and backup configuration ").append(backupConfig);
          }
@@ -249,7 +253,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal,
C
 
    @Override
    public TransportConfiguration getConnectorConfiguration() {
-      return connectorConfig;
+      return currentConnectorConfig;
    }
 
    @Override
@@ -260,7 +264,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal,
C
       // to create a connector just to validate if the parameters are ok.
       // so this will create the instance to be used on the isEquivalent check
       if (localConnector == null) {
-         localConnector = connectorFactory.createConnector(connectorConfig.getParams(), new
DelegatingBufferHandler(), this, closeExecutor, threadPool, scheduledThreadPool, clientProtocolManager);
+         localConnector = connectorFactory.createConnector(currentConnectorConfig.getParams(),
new DelegatingBufferHandler(), this, closeExecutor, threadPool, scheduledThreadPool, clientProtocolManager);
       }
 
       if (localConnector.isEquivalent(live.getParams()) && backUp != null &&
!localConnector.isEquivalent(backUp.getParams())) {
@@ -274,7 +278,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal,
C
                             " / " +
                             backUp +
                             " but it didn't belong to " +
-                            connectorConfig);
+                            currentConnectorConfig);
          }
       }
    }
@@ -1068,14 +1072,15 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal,
C
       try {
          if (logger.isDebugEnabled()) {
             logger.debug("Trying to connect with connectorFactory = " + connectorFactory
+
-                           ", connectorConfig=" + connectorConfig);
+                           ", connectorConfig=" + currentConnectorConfig);
          }
 
-         Connector liveConnector = createConnector(connectorFactory, connectorConfig);
+         Connector liveConnector = createConnector(connectorFactory, currentConnectorConfig);
 
          if ((transportConnection = openTransportConnection(liveConnector)) != null) {
             // if we can't connect the connect method will return null, hence we have to
try the backup
             connector = liveConnector;
+            return transportConnection;
          } else if (backupConfig != null) {
             if (logger.isDebugEnabled()) {
                logger.debug("Trying backup config = " + backupConfig);
@@ -1096,15 +1101,39 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal,
C
 
                // Switching backup as live
                connector = backupConnector;
-               connectorConfig = backupConfig;
+               currentConnectorConfig = backupConfig;
                backupConfig = null;
                connectorFactory = backupConnectorFactory;
-            } else {
-               if (logger.isDebugEnabled()) {
-                  logger.debug("Backup is not active.");
-               }
+               return transportConnection;
             }
+         }
+
+         if (logger.isDebugEnabled()) {
+            logger.debug("Backup is not active, trying original connection configuration
now.");
+         }
+
+
+         if (currentConnectorConfig.equals(connectorConfig)) {
 
+            // There was no changes on current and original connectors, just return null
here and let the retry happen at the first portion of this method on the next retry
+            return null;
+         }
+
+         ConnectorFactory originalConnectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName());
+
+         Connector originalConnector = createConnector(originalConnectorFactory, connectorConfig);
+
+         transportConnection = openTransportConnection(originalConnector);
+
+         if (transportConnection != null) {
+            logger.debug("Returning into original connector");
+            connector = originalConnector;
+            backupConfig = null;
+            currentConnectorConfig = connectorConfig;
+            return transportConnection;
+         } else {
+            logger.debug("no connection been made, returning null");
+            return null;
          }
       } catch (Exception cause) {
          // Sanity catch for badly behaved remoting plugins
@@ -1124,13 +1153,10 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal,
C
             } catch (Throwable t) {
             }
          }
-
-         transportConnection = null;
-
          connector = null;
+         return null;
       }
 
-      return transportConnection;
    }
 
    private class DelegatingBufferHandler implements BufferHandler {
@@ -1330,7 +1356,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal,
C
 
          try {
             // if it is our connector then set the live id used for failover
-            if (connectorPair.getA() != null && TransportConfigurationUtil.isSameHost(connectorPair.getA(),
connectorConfig)) {
+            if (connectorPair.getA() != null && TransportConfigurationUtil.isSameHost(connectorPair.getA(),
currentConnectorConfig)) {
                liveNodeID = nodeID;
             }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e9f76f4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
index c6ec6dd..20f5fda 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
@@ -610,6 +610,47 @@ public class FailoverTest extends FailoverTestBase {
       Assert.assertEquals(0, sf.numConnections());
    }
 
+   @Test(timeout = 60000)
+   public void testFailBothRestartLive() throws Exception {
+      ServerLocator locator = getServerLocator();
+
+      locator.setReconnectAttempts(-1).setRetryInterval(10);
+
+      sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
+
+      ClientSession session = createSession(sf, true, true);
+
+      session.createQueue(FailoverTestBase.ADDRESS, RoutingType.MULTICAST, FailoverTestBase.ADDRESS,
null, true);
+
+      ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+      sendMessagesSomeDurable(session, producer);
+
+      crash(session);
+
+      ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+
+      session.start();
+
+      receiveDurableMessages(consumer);
+
+      backupServer.getServer().fail(true);
+
+      liveServer.start();
+
+      consumer.close();
+
+      producer.close();
+
+      producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+      sendMessagesSomeDurable(session, producer);
+
+      sf.close();
+      Assert.assertEquals(0, sf.numSessions());
+      Assert.assertEquals(0, sf.numConnections());
+   }
+
    /**
     * Basic fail-back test.
     *

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e9f76f4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveToLiveFailoverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveToLiveFailoverTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveToLiveFailoverTest.java
index 4e6a70a..e65602f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveToLiveFailoverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveToLiveFailoverTest.java
@@ -34,6 +34,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
 import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 
 public class LiveToLiveFailoverTest extends FailoverTest {
@@ -268,125 +269,99 @@ public class LiveToLiveFailoverTest extends FailoverTest {
       session = sendAndConsume(sf, false);
    }
 
-
    @Override
    public void testTimeoutOnFailoverTransactionCommitTimeoutCommunication() throws Exception
{
    }
 
+   @Override
+   @Ignore
+   public void testFailBothRestartLive() throws Exception {
+   }
 
-      //invalid tests for Live to Live failover
+   //invalid tests for Live to Live failover
    //all the timeout ones aren't as we don't migrate timeouts, any failback or server restart
    //or replicating tests aren't either
    @Override
+   @Ignore
    public void testLiveAndBackupBackupComesBackNewFactory() throws Exception {
    }
 
    @Override
+   @Ignore
    public void testLiveAndBackupLiveComesBackNewFactory() {
    }
 
    @Override
+   @Ignore
    public void testTimeoutOnFailoverConsumeBlocked() throws Exception {
    }
 
    @Override
+   @Ignore
    public void testFailoverMultipleSessionsWithConsumers() throws Exception {
       //
    }
 
    @Override
+   @Ignore
    public void testTimeoutOnFailover() throws Exception {
    }
 
    @Override
+   @Ignore
    public void testTimeoutOnFailoverTransactionRollback() throws Exception {
    }
 
    @Override
+   @Ignore
    public void testTimeoutOnFailoverConsume() throws Exception {
    }
 
    @Override
+   @Ignore
    public void testTimeoutOnFailoverTransactionCommit() throws Exception {
    }
 
    @Override
+   @Ignore
    public void testFailBack() throws Exception {
    }
 
    @Override
+   @Ignore
    public void testFailBackLiveRestartsBackupIsGone() throws Exception {
    }
 
    @Override
+   @Ignore
    public void testLiveAndBackupLiveComesBack() throws Exception {
    }
 
    @Override
+   @Ignore
    public void testSimpleFailover() throws Exception {
    }
 
    @Override
+   @Ignore
    public void testFailThenReceiveMoreMessagesAfterFailover2() throws Exception {
    }
 
    @Override
+   @Ignore
    public void testWithoutUsingTheBackup() throws Exception {
    }
 
    //todo check to see which failing tests are valid,
    @Override
+   @Ignore
    public void testSimpleSendAfterFailoverDurableNonTemporary() throws Exception {
    }
 
    @Override
+   @Ignore
    public void testCommitOccurredUnblockedAndResendNoDuplicates() throws Exception {
    }
+}
 
-   /*@Override
-   public void testCommitDidNotOccurUnblockedAndResend() throws Exception
-   {
-   }
-
-
-
-   @Override
-   public void testLiveAndBackupLiveComesBackNewFactory() throws Exception
-   {
-   }
-
-   @Override
-   public void testXAMessagesSentSoRollbackOnEnd() throws Exception
-   {
-   }
-
-   @Override
-   public void testLiveAndBackupBackupComesBackNewFactory() throws Exception
-   {
-   }
-
-   @Override
-   public void testXAMessagesSentSoRollbackOnEnd2() throws Exception
-   {
-   }
-
-   @Override
-   public void testXAMessagesSentSoRollbackOnCommit() throws Exception
-   {
-   }
-
-   @Override
-   public void testTransactedMessagesSentSoRollback() throws Exception
-   {
-   }
-
-   @Override
-   public void testXAMessagesSentSoRollbackOnPrepare() throws Exception
-   {
-   }
 
-   @Override
-   public void testNonTransactedWithZeroConsumerWindowSize() throws Exception
-   {
-   }*/
-}


Mime
View raw message