activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbert...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-385 On a possible race the Topology final notification may get lost when using many connection factories
Date Thu, 04 Feb 2016 15:46:34 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 0a866471a -> d499e4d8c


ARTEMIS-385 On a possible race the Topology final notification may get lost when using many
connection factories

https://issues.apache.org/jira/browse/ARTEMIS-385

This fix will make sure we only wait for the topologies that are arriving from the current
connection over the createFactory method


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/26945a47
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/26945a47
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/26945a47

Branch: refs/heads/master
Commit: 26945a471693159382348179f456c189f04ab72b
Parents: 0a86647
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Wed Feb 3 21:14:26 2016 -0500
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Thu Feb 4 09:35:08 2016 -0500

----------------------------------------------------------------------
 .../client/impl/ClientSessionFactoryImpl.java   | 26 ++++++-
 .../impl/ClientSessionFactoryInternal.java      |  3 +
 .../core/client/impl/ServerLocatorImpl.java     | 76 ++++++--------------
 .../artemis/core/client/impl/Topology.java      |  2 +-
 .../jms/cluster/MultipleThreadsOpeningTest.java | 26 +++++--
 .../integration/server/ConnectionLimitTest.java |  2 +-
 6 files changed, 73 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/26945a47/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
index 5a6241f..384bced 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
@@ -119,6 +120,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal,
C
 
    private final double retryIntervalMultiplier; // For exponential backoff
 
+   private final CountDownLatch latchFinalTopology = new CountDownLatch(1);
+
    private final long maxRetryInterval;
 
    private int reconnectAttempts;
@@ -474,6 +477,18 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal,
C
    }
 
    @Override
+   public boolean waitForTopology(long timeout, TimeUnit unit) {
+      try {
+         return latchFinalTopology.await(timeout, unit);
+      }
+      catch (InterruptedException e) {
+         Thread.currentThread().interrupt();
+         ActiveMQClientLogger.LOGGER.warn(e.getMessage(), e);
+         return false;
+      }
+   }
+
+   @Override
    public boolean isClosed() {
       return closed || serverLocator.isClosed();
    }
@@ -881,7 +896,9 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal,
C
             return connection;
          }
          else {
-            connection = establishNewConnection();
+            RemotingConnection connection = establishNewConnection();
+
+            this.connection = connection;
 
             //we check if we can actually connect.
             // we do it here as to receive the reply connection has to be not null
@@ -1083,7 +1100,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal,
C
 
             transportConnection = openTransportConnection(backupConnector);
 
-            if ((transportConnection = openTransportConnection(backupConnector)) != null)
{
+            if (transportConnection != null) {
             /*looks like the backup is now live, let's use that*/
 
                if (ClientSessionFactoryImpl.isDebug) {
@@ -1319,6 +1336,11 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal,
C
                                String scaleDownGroupName,
                                Pair<TransportConfiguration, TransportConfiguration>
connectorPair,
                                boolean isLast) {
+
+         if (isLast) {
+            latchFinalTopology.countDown();
+         }
+
          // if it is our connector then set the live id used for failover
          if (connectorPair.getA() != null && TransportConfigurationUtil.isSameHost(connectorPair.getA(),
connectorConfig)) {
             liveNodeID = nodeID;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/26945a47/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java
index ba2dab7..be98a8e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.core.client.impl;
 
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -32,6 +33,8 @@ public interface ClientSessionFactoryInternal extends ClientSessionFactory
{
 
    boolean removeFailureListener(SessionFailureListener listener);
 
+   boolean waitForTopology(long timeout, TimeUnit unit);
+
    void disableFinalizeCheck();
 
    String getLiveNodeId();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/26945a47/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
index 4893f6c..22a2e0e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
@@ -461,11 +461,9 @@ public final class ServerLocatorImpl implements ServerLocatorInternal,
Discovery
 
    @Override
    public void resetToInitialConnectors() {
-      synchronized (topologyArrayGuard) {
-         receivedTopology = false;
-         topologyArray = null;
-         topology.clear();
-      }
+      receivedTopology = false;
+      topologyArray = null;
+      topology.clear();
    }
 
    /*
@@ -807,32 +805,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal,
Discovery
          } while (retry);
       }
 
-      synchronized (topologyArrayGuard) {
-         // We always wait for the topology, as the server
-         // will send a single element if not cluster
-         // so clients can know the id of the server they are connected to
-         final long timeout = System.currentTimeMillis() + callTimeout;
-         while (!isClosed() && !receivedTopology && timeout > System.currentTimeMillis())
{
-            // Now wait for the topology
-            try {
-               topologyArrayGuard.wait(1000);
-            }
-            catch (InterruptedException e) {
-               throw new ActiveMQInterruptedException(e);
-            }
-         }
-
-         // We are waiting for the topology here,
-         // however to avoid a race where the connection is closed (and receivedtopology
set to true)
-         // between the wait and this timeout here, we redo the check for timeout.
-         // if this becomes false there's no big deal and we will just ignore the issue
-         // notice that we can't add more locks here otherwise there wouldn't be able to
avoid a deadlock
-         final boolean hasTimedOut = timeout > System.currentTimeMillis();
-         if (!hasTimedOut && !receivedTopology) {
-            if (factory != null)
-               factory.cleanup();
-            throw ActiveMQClientMessageBundle.BUNDLE.connectionTimedOutOnReceiveTopology(discoveryGroup);
-         }
+      // ATM topology is never != null. Checking here just to be consistent with
+      // how the sendSubscription happens.
+      // in case this ever changes.
+      if (topology != null && !factory.waitForTopology(callTimeout, TimeUnit.MILLISECONDS))
{
+         throw ActiveMQClientMessageBundle.BUNDLE.connectionTimedOutOnReceiveTopology(discoveryGroup);
       }
 
       addFactory(factory);
@@ -1457,19 +1434,17 @@ public final class ServerLocatorImpl implements ServerLocatorInternal,
Discovery
          updateArraysAndPairs();
       }
       else {
-         synchronized (topologyArrayGuard) {
-            if (topology.isEmpty()) {
+         if (topology.isEmpty()) {
+            // Resetting the topology to its original condition as it was brand new
+            receivedTopology = false;
+            topologyArray = null;
+         }
+         else {
+            updateArraysAndPairs();
+
+            if (topology.nodes() == 1 && topology.getMember(this.nodeID) != null)
{
                // Resetting the topology to its original condition as it was brand new
                receivedTopology = false;
-               topologyArray = null;
-            }
-            else {
-               updateArraysAndPairs();
-
-               if (topology.nodes() == 1 && topology.getMember(this.nodeID) != null)
{
-                  // Resetting the topology to its original condition as it was brand new
-                  receivedTopology = false;
-               }
             }
          }
       }
@@ -1507,11 +1482,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal,
Discovery
       updateArraysAndPairs();
 
       if (last) {
-         synchronized (topologyArrayGuard) {
-            receivedTopology = true;
-            // Notify if waiting on getting topology
-            topologyArrayGuard.notifyAll();
-         }
+         receivedTopology = true;
       }
    }
 
@@ -1600,12 +1571,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal,
Discovery
       }
 
       if (!clusterConnection && isEmpty) {
-         // Go back to using the broadcast or static list
-         synchronized (topologyArrayGuard) {
-            receivedTopology = false;
-
-            topologyArray = null;
-         }
+         receivedTopology = false;
+         topologyArray = null;
       }
    }
 
@@ -1632,6 +1599,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal,
Discovery
 
    /**
     * for tests only and not part of the public interface. Do not use it.
+    *
     * @return
     */
    public TransportConfiguration[] getInitialConnectors() {
@@ -1892,7 +1860,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal,
Discovery
       return buffer.toString();
    }
 
-   private void feedInterceptors(final List<Interceptor> interceptors,  final String
interceptorList) {
+   private void feedInterceptors(final List<Interceptor> interceptors, final String
interceptorList) {
       interceptors.clear();
 
       if (interceptorList == null || interceptorList.trim().equals("")) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/26945a47/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java
index 526d9f0..0573b2d 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java
@@ -84,7 +84,7 @@ public final class Topology {
    /**
     * It will remove all elements as if it haven't received anyone from the server.
     */
-   public void clear() {
+   public synchronized void clear() {
       topology.clear();
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/26945a47/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/MultipleThreadsOpeningTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/MultipleThreadsOpeningTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/MultipleThreadsOpeningTest.java
index b98bf07..6ff3748 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/MultipleThreadsOpeningTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/MultipleThreadsOpeningTest.java
@@ -29,12 +29,30 @@ import org.junit.Test;
 
 public class MultipleThreadsOpeningTest extends JMSClusteredTestBase {
 
+   /** created for https://issues.apache.org/jira/browse/ARTEMIS-385 */
+   @Test
+   public void testRepetitions() throws Exception {
+      // This test was eventually failing with way over more iterations.
+      // you might increase it for debugging
+      final int ITERATIONS = 50;
+
+
+      for (int i = 0; i < ITERATIONS; i++) {
+         System.out.println("#test " + i);
+         internalMultipleOpen(200, 1);
+         tearDown();
+         setUp();
+      }
+   }
+
    @Test
    public void testMultipleOpen() throws Exception {
-      cf1 = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.CF, new TransportConfiguration(InVMConnectorFactory.class.getName(),
generateInVMParams(1)));
+      internalMultipleOpen(20, 500);
+   }
 
-      final int numberOfOpens = 500;
-      int numberOfThreads = 20;
+   protected void internalMultipleOpen(final int numberOfThreads, final int numberOfOpens)
throws Exception {
+
+      cf1 = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.CF, new TransportConfiguration(InVMConnectorFactory.class.getName(),
generateInVMParams(1)));
       // I want all the threads aligned, just ready to start creating connections like in
a car race
       final CountDownLatch flagAlignSemaphore = new CountDownLatch(numberOfThreads);
       final CountDownLatch flagStartRace = new CountDownLatch(1);
@@ -55,7 +73,7 @@ public class MultipleThreadsOpeningTest extends JMSClusteredTestBase {
                flagStartRace.await();
 
                for (int i = 0; i < numberOfOpens; i++) {
-                  if (i % 100 == 0)
+                  if (i > 0 && i % 100 == 0)
                      System.out.println("connections created on Thread " + Thread.currentThread()
+ " " + i);
                   Connection conn = cf1.createConnection();
                   Session sess = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/26945a47/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ConnectionLimitTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ConnectionLimitTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ConnectionLimitTest.java
index 67ea3ba..67c478c 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ConnectionLimitTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ConnectionLimitTest.java
@@ -73,9 +73,9 @@ public class ConnectionLimitTest extends ActiveMQTestBase {
       ServerLocator locator = createNonHALocator(true).setCallTimeout(3000);
       ClientSessionFactory clientSessionFactory = locator.createSessionFactory();
       ClientSession clientSession = addClientSession(clientSessionFactory.createSession());
-      ClientSessionFactory extraClientSessionFactory = locator.createSessionFactory();
 
       try {
+         ClientSessionFactory extraClientSessionFactory = locator.createSessionFactory();
          ClientSession extraClientSession = addClientSession(extraClientSessionFactory.createSession());
          fail("creating a session here should fail");
       }


Mime
View raw message