activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-293 rebalance inflow on topology change
Date Tue, 03 Nov 2015 23:02:50 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 63d5f3fc1 -> 8f1848108


ARTEMIS-293 rebalance inflow on topology change


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3f608989
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3f608989
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3f608989

Branch: refs/heads/master
Commit: 3f6089891dcc5dbb9f960b24c4ad3d9aecd72dce
Parents: 63d5f3f
Author: jbertram <jbertram@apache.org>
Authored: Fri Oct 30 11:20:06 2015 -0500
Committer: jbertram <jbertram@apache.org>
Committed: Tue Nov 3 16:42:15 2015 -0600

----------------------------------------------------------------------
 .../activemq/artemis/ra/ActiveMQRALogger.java   |  4 +
 .../artemis/ra/inflow/ActiveMQActivation.java   | 99 ++++++++++++++++----
 .../ra/inflow/ActiveMQActivationSpec.java       | 12 +++
 .../artemis/tests/util/ActiveMQTestBase.java    |  4 +
 .../integration/ra/ActiveMQClusteredTest.java   | 72 ++++++++++++++
 .../ra/ActiveMQRAClusteredTestBase.java         | 27 ++++--
 6 files changed, 196 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3f608989/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRALogger.java
----------------------------------------------------------------------
diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRALogger.java
b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRALogger.java
index 093e3ee..6ec3a5e 100644
--- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRALogger.java
+++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRALogger.java
@@ -73,6 +73,10 @@ public interface ActiveMQRALogger extends BasicLogger {
    @Message(id = 151005, value = "awaiting server availability", format = Message.Format.MESSAGE_FORMAT)
    void awaitingJMSServerCreation();
 
+   @LogMessage(level = Logger.Level.INFO)
+   @Message(id = 151006, value = "Cluster topology change detected. Re-balancing connections.",
format = Message.Format.MESSAGE_FORMAT)
+   void rebalancingConnections();
+
    @LogMessage(level = Logger.Level.WARN)
    @Message(id = 152001, value = "problem resetting xa session after failure", format = Message.Format.MESSAGE_FORMAT)
    void problemResettingXASession();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3f608989/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java
----------------------------------------------------------------------
diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java
b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java
index 8e55061..cb439f9 100644
--- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java
+++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java
@@ -30,9 +30,11 @@ import javax.resource.spi.work.WorkManager;
 import javax.transaction.xa.XAResource;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -42,6 +44,8 @@ import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
+import org.apache.activemq.artemis.api.core.client.TopologyMember;
 import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
 import org.apache.activemq.artemis.ra.ActiveMQRAConnectionFactory;
@@ -111,8 +115,14 @@ public class ActiveMQActivation {
 
    private ActiveMQConnectionFactory factory;
 
+   private List<String> nodes = Collections.synchronizedList(new ArrayList<String>());
+
+   private Map<String, Long> removedNodes = new ConcurrentHashMap<String, Long>();
+
+   private boolean lastReceived = false;
+
    // Whether we are in the failure recovery loop
-   private final AtomicBoolean inFailure = new AtomicBoolean(false);
+   private final AtomicBoolean inReconnect = new AtomicBoolean(false);
    private XARecoveryConfig resourceRecovery;
 
    static {
@@ -338,6 +348,9 @@ public class ActiveMQActivation {
       Map<String, String> recoveryConfProps = new HashMap<String, String>();
       recoveryConfProps.put(XARecoveryConfig.JNDI_NAME_PROPERTY_KEY, ra.getJndiName());
       resourceRecovery = ra.getRecoveryManager().register(factory, spec.getUser(), spec.getPassword(),
recoveryConfProps);
+      if (spec.isRebalanceConnections()) {
+         factory.getServerLocator().addClusterTopologyListener(new RebalancingListener());
+      }
 
       ActiveMQRALogger.LOGGER.debug("Setup complete " + this);
    }
@@ -431,6 +444,9 @@ public class ActiveMQActivation {
          factory = null;
       }
 
+      nodes.clear();
+      lastReceived = false;
+
       ActiveMQRALogger.LOGGER.debug("Tearing down complete " + this);
    }
 
@@ -610,27 +626,34 @@ public class ActiveMQActivation {
       return buffer.toString();
    }
 
+   public void rebalance() {
+      ActiveMQRALogger.LOGGER.rebalancingConnections();
+      reconnect(null);
+   }
+
    /**
-    * Handles any failure by trying to reconnect
+    * Drops all existing connection-related resources and reconnects
     *
-    * @param failure the reason for the failure
+    * @param failure if reconnecting in the event of a failure
     */
-   public void handleFailure(Throwable failure) {
-      if (failure instanceof ActiveMQException && ((ActiveMQException) failure).getType()
== ActiveMQExceptionType.QUEUE_DOES_NOT_EXIST) {
-         ActiveMQRALogger.LOGGER.awaitingTopicQueueCreation(getActivationSpec().getDestination());
-      }
-      else if (failure instanceof ActiveMQException && ((ActiveMQException) failure).getType()
== ActiveMQExceptionType.NOT_CONNECTED) {
-         ActiveMQRALogger.LOGGER.awaitingJMSServerCreation();
-      }
-      else {
-         ActiveMQRALogger.LOGGER.failureInActivation(failure, spec);
+   public void reconnect(Throwable failure) {
+      if (failure != null) {
+         if (failure instanceof ActiveMQException && ((ActiveMQException) failure).getType()
== ActiveMQExceptionType.QUEUE_DOES_NOT_EXIST) {
+            ActiveMQRALogger.LOGGER.awaitingTopicQueueCreation(getActivationSpec().getDestination());
+         }
+         else if (failure instanceof ActiveMQException && ((ActiveMQException) failure).getType()
== ActiveMQExceptionType.NOT_CONNECTED) {
+            ActiveMQRALogger.LOGGER.awaitingJMSServerCreation();
+         }
+         else {
+            ActiveMQRALogger.LOGGER.failureInActivation(failure, spec);
+         }
       }
       int reconnectCount = 0;
       int setupAttempts = spec.getSetupAttempts();
       long setupInterval = spec.getSetupInterval();
 
-      // Only enter the failure loop once
-      if (inFailure.getAndSet(true))
+      // Only enter the reconnect loop once
+      if (inReconnect.getAndSet(true))
          return;
       try {
          Throwable lastException = failure;
@@ -675,7 +698,7 @@ public class ActiveMQActivation {
       }
       finally {
          // Leaving failure recovery loop
-         inFailure.set(false);
+         inReconnect.set(false);
       }
    }
 
@@ -693,11 +716,55 @@ public class ActiveMQActivation {
             setup();
          }
          catch (Throwable t) {
-            handleFailure(t);
+            reconnect(t);
          }
       }
 
       public void release() {
       }
    }
+
+   private class RebalancingListener implements ClusterTopologyListener {
+      @Override
+      public void nodeUP(TopologyMember member, boolean last) {
+         boolean newNode = false;
+
+         String id = member.getNodeId();
+         if (!nodes.contains(id)) {
+            if (removedNodes.get(id) == null || (removedNodes.get(id) != null &&
removedNodes.get(id) < member.getUniqueEventID())) {
+               nodes.add(id);
+               newNode = true;
+            }
+         }
+
+         if (lastReceived && newNode) {
+            Runnable runnable = new Runnable() {
+               @Override
+               public void run() {
+                  rebalance();
+               }
+            };
+            Thread t = new Thread(runnable, "NodeUP Connection Rebalancer");
+            t.start();
+         }
+         else if (last) {
+            lastReceived = true;
+         }
+      }
+
+      @Override
+      public void nodeDown(long eventUID, String nodeID) {
+         if (nodes.remove(nodeID)) {
+            removedNodes.put(nodeID, eventUID);
+            Runnable runnable = new Runnable() {
+               @Override
+               public void run() {
+                  rebalance();
+               }
+            };
+            Thread t = new Thread(runnable, "NodeDOWN Connection Rebalancer");
+            t.start();
+         }
+      }
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3f608989/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivationSpec.java
----------------------------------------------------------------------
diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivationSpec.java
b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivationSpec.java
index f80342b..32d253e 100644
--- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivationSpec.java
+++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivationSpec.java
@@ -135,6 +135,8 @@ public class ActiveMQActivationSpec extends ConnectionFactoryProperties
implemen
    // undefined by default, default is specified at the RA level in ActiveMQRAProperties
    private Long setupInterval;
 
+   private Boolean rebalanceConnections = false;
+
    /**
     * Constructor
     */
@@ -626,6 +628,14 @@ public class ActiveMQActivationSpec extends ConnectionFactoryProperties
implemen
       this.localTx = localTx;
    }
 
+   public boolean isRebalanceConnections() {
+      return rebalanceConnections;
+   }
+
+   public void setRebalanceConnections(boolean rebalanceConnections) {
+      this.rebalanceConnections = rebalanceConnections;
+   }
+
    public int getSetupAttempts() {
       if (ActiveMQActivationSpec.trace) {
          ActiveMQRALogger.LOGGER.trace("getSetupAttempts()");
@@ -846,6 +856,7 @@ public class ActiveMQActivationSpec extends ConnectionFactoryProperties
implemen
       if (parsedJndiParams != null ? !parsedJndiParams.equals(that.parsedJndiParams) : that.parsedJndiParams
!= null)
          return false;
       if (localTx != null ? !localTx.equals(that.localTx) : that.localTx != null) return
false;
+      if (rebalanceConnections != null ? !rebalanceConnections.equals(that.rebalanceConnections)
: that.rebalanceConnections != null) return false;
       if (setupAttempts != null ? !setupAttempts.equals(that.setupAttempts) : that.setupAttempts
!= null) return false;
       return !(setupInterval != null ? !setupInterval.equals(that.setupInterval) : that.setupInterval
!= null);
 
@@ -873,6 +884,7 @@ public class ActiveMQActivationSpec extends ConnectionFactoryProperties
implemen
       result = 31 * result + (jndiParams != null ? jndiParams.hashCode() : 0);
       result = 31 * result + (parsedJndiParams != null ? parsedJndiParams.hashCode() : 0);
       result = 31 * result + (localTx != null ? localTx.hashCode() : 0);
+      result = 31 * result + (rebalanceConnections != null ? rebalanceConnections.hashCode()
: 0);
       result = 31 * result + (setupAttempts != null ? setupAttempts.hashCode() : 0);
       result = 31 * result + (setupInterval != null ? setupInterval.hashCode() : 0);
       return result;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3f608989/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index 048ab08..76ac5ab 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -810,6 +810,10 @@ public abstract class ActiveMQTestBase extends Assert {
       deleteDirectory(file);
       file.mkdirs();
 
+      recreateDataDirectories(testDir1, index, backup);
+   }
+
+   protected void recreateDataDirectories(String testDir1, int index, boolean backup) {
       recreateDirectory(getJournalDir(testDir1, index, backup));
       recreateDirectory(getBindingsDir(testDir1, index, backup));
       recreateDirectory(getPageDir(testDir1, index, backup));

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3f608989/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQClusteredTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQClusteredTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQClusteredTest.java
index fbab7f0..3ffac2e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQClusteredTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQClusteredTest.java
@@ -170,4 +170,76 @@ public class ActiveMQClusteredTest extends ActiveMQRAClusteredTestBase
{
          }
       }
    }
+
+   @Test
+   public void testRebalance() throws Exception {
+      final int CONSUMER_COUNT = 10;
+      secondaryJmsServer.createQueue(true, MDBQUEUE, null, true, "/jms/" + MDBQUEUE);
+
+      ActiveMQResourceAdapter qResourceAdapter = newResourceAdapter();
+      MyBootstrapContext ctx = new MyBootstrapContext();
+      qResourceAdapter.start(ctx);
+      ActiveMQActivationSpec spec = new ActiveMQActivationSpec();
+      spec.setResourceAdapter(qResourceAdapter);
+      spec.setUseJNDI(false);
+      spec.setDestinationType("javax.jms.Queue");
+      spec.setDestination(MDBQUEUE);
+      spec.setRebalanceConnections(true);
+      spec.setMaxSession(CONSUMER_COUNT);
+      spec.setSetupAttempts(5);
+      spec.setSetupInterval(200);
+      spec.setHA(true); // if this isn't true then the toplogy listener won't get nodeDown
notifications
+      spec.setCallTimeout(500L); // if this isn't set then it may take a long time for tearDown
to occur on the MDB connection
+      qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
+      CountDownLatch latch = new CountDownLatch(1);
+      DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
+      DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint,
false);
+      qResourceAdapter.endpointActivation(endpointFactory, spec);
+
+      Queue primaryQueue = server.locateQueue(MDBQUEUEPREFIXEDSIMPLE);
+      Queue secondaryQueue = secondaryServer.locateQueue(MDBQUEUEPREFIXEDSIMPLE);
+
+      assertTrue(primaryQueue.getConsumerCount() < CONSUMER_COUNT);
+      assertTrue(secondaryQueue.getConsumerCount() < CONSUMER_COUNT);
+      assertTrue(primaryQueue.getConsumerCount() + secondaryQueue.getConsumerCount() == CONSUMER_COUNT);
+
+      ClientSession session = addClientSession(locator.createSessionFactory().createSession());
+      ClientProducer clientProducer = session.createProducer(MDBQUEUEPREFIXED);
+      ClientMessage message = session.createMessage(true);
+      message.getBodyBuffer().writeString("test");
+      clientProducer.send(message);
+
+      latch.await(5, TimeUnit.SECONDS);
+
+      assertNotNull(endpoint.lastMessage);
+      assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "test");
+
+      for (int i = 0; i < 10; i++) {
+         secondaryServer.stop();
+
+         long mark = System.currentTimeMillis();
+         long timeout = 5000;
+         while (primaryQueue.getConsumerCount() < CONSUMER_COUNT && (System.currentTimeMillis()
- mark) < timeout) {
+            Thread.sleep(100);
+         }
+
+         assertTrue(primaryQueue.getConsumerCount() == CONSUMER_COUNT);
+
+         secondaryServer.start();
+         waitForServerToStart(secondaryServer);
+         secondaryQueue = secondaryServer.locateQueue(MDBQUEUEPREFIXEDSIMPLE);
+
+         mark = System.currentTimeMillis();
+         while (((primaryQueue.getConsumerCount() + secondaryQueue.getConsumerCount()) <
(CONSUMER_COUNT) || primaryQueue.getConsumerCount() == CONSUMER_COUNT) && (System.currentTimeMillis()
- mark) <= timeout) {
+            Thread.sleep(100);
+         }
+
+         assertTrue(primaryQueue.getConsumerCount() < CONSUMER_COUNT);
+         assertTrue(secondaryQueue.getConsumerCount() < CONSUMER_COUNT);
+         assertTrue(primaryQueue.getConsumerCount() + secondaryQueue.getConsumerCount() ==
CONSUMER_COUNT);
+      }
+
+      qResourceAdapter.endpointDeactivation(endpointFactory, spec);
+      qResourceAdapter.stop();
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3f608989/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQRAClusteredTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQRAClusteredTestBase.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQRAClusteredTestBase.java
index a2e69e1..c49d38a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQRAClusteredTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQRAClusteredTestBase.java
@@ -45,34 +45,49 @@ public class ActiveMQRAClusteredTestBase extends ActiveMQRATestBase {
       params.put(TransportConstants.SERVER_ID_PROP_NAME, "1");
       secondaryConnector = new TransportConfiguration(INVM_CONNECTOR_FACTORY, params);
 
-      secondaryServer = addServer(ActiveMQServers.newActiveMQServer(createSecondaryDefaultConfig(true,
true), mbeanServer, usePersistence()));
+      secondaryServer = addServer(ActiveMQServers.newActiveMQServer(createSecondaryDefaultConfig(true),
mbeanServer, usePersistence()));
       addServer(secondaryServer);
       secondaryJmsServer = new JMSServerManagerImpl(secondaryServer);
       secondaryJmsServer.start();
       waitForTopology(secondaryServer, 2);
+
    }
 
    protected Configuration createDefaultConfig(boolean netty) throws Exception {
-      return createSecondaryDefaultConfig(netty, false);
+      return createSecondaryDefaultConfig(false);
    }
 
-   protected Configuration createSecondaryDefaultConfig(boolean netty, boolean secondary)
throws Exception {
+   protected Configuration createSecondaryDefaultConfig(boolean secondary) throws Exception
{
       HashMap invmMap = new HashMap();
       HashMap nettyMap = new HashMap();
       String primaryConnectorName = "invm2";
       String secondaryConnectorName = "invm";
-      String directoryPrefix = "first";
+      int index = 0;
 
       if (secondary) {
          invmMap.put(TransportConstants.SERVER_ID_PROP_NAME, "1");
          nettyMap.put("port", "5545");
          primaryConnectorName = "invm";
          secondaryConnectorName = "invm2";
-         directoryPrefix = "second";
+         index = 1;
       }
 
-      ConfigurationImpl configuration = createBasicConfig().setJMXManagementEnabled(false).clearAcceptorConfigurations().addAcceptorConfiguration(new
TransportConfiguration(INVM_ACCEPTOR_FACTORY, invmMap)).addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY,
nettyMap)).setJournalDirectory(getTestDir() + "/" + directoryPrefix + "Journal/").setBindingsDirectory(getTestDir()
+ "/" + directoryPrefix + "Bind/").setLargeMessagesDirectory(getTestDir() + "/" + directoryPrefix
+ "Large/").setPagingDirectory(getTestDir() + "/" + directoryPrefix + "Page/").addConnectorConfiguration(secondaryConnectorName,
secondaryConnector).addConnectorConfiguration(primaryConnectorName, primaryConnector).addClusterConfiguration(ActiveMQTestBase.basicClusterConnectionConfig(secondaryConnectorName,
primaryConnectorName));
+      ConfigurationImpl configuration = createBasicConfig(index)
+         .setJMXManagementEnabled(false)
+         .clearAcceptorConfigurations()
+         .addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, invmMap))
+         .addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, nettyMap))
+         .addConnectorConfiguration(secondaryConnectorName, secondaryConnector)
+         .addConnectorConfiguration(primaryConnectorName, primaryConnector)
+         .addClusterConfiguration(ActiveMQTestBase.basicClusterConnectionConfig(secondaryConnectorName,
primaryConnectorName).setReconnectAttempts(0));
+
+      recreateDataDirectories(getTestDir(), index, false);
 
       return configuration;
    }
+
+   @Override
+   protected boolean usePersistence() {
+      return true;
+   }
 }


Mime
View raw message