activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbert...@apache.org
Subject [1/2] activemq-6 git commit: Bug 1174886 - HornetQ TTL / check-period not being respected on the replication channel
Date Wed, 31 Dec 2014 17:06:20 GMT
Repository: activemq-6
Updated Branches:
  refs/heads/master 1b791ef9e -> 16d74b2be


Bug 1174886 - HornetQ TTL / check-period not being respected
              on the replication channel

The connection-ttl and client-failure-check-period are not passed
to the server locator used to create replication connection. So the
fix sets the two parameters in SharedNothingBackupActivation.


Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/1d159e6d
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/1d159e6d
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/1d159e6d

Branch: refs/heads/master
Commit: 1d159e6da04b21fb65dee64486d816051ce5a13b
Parents: 2e14352
Author: Howard Gao <hgao@redhat.com>
Authored: Tue Dec 23 20:38:35 2014 +0800
Committer: Howard Gao <hgao@redhat.com>
Committed: Tue Dec 23 20:38:35 2014 +0800

----------------------------------------------------------------------
 .../core/server/cluster/ClusterController.java  | 26 ++++--
 .../core/server/cluster/ClusterManager.java     |  4 +-
 .../cluster/failover/FailoverTestBase.java      |  2 +-
 .../replication/ReplicationTest.java            | 84 ++++++++++++++++++--
 .../tests/util/ReplicatedBackupUtils.java       |  8 +-
 5 files changed, 106 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-6/blob/1d159e6d/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/ClusterController.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/ClusterController.java
b/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/ClusterController.java
index 47c9ae8..abbf9cb 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/ClusterController.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/ClusterController.java
@@ -36,6 +36,7 @@ import org.apache.activemq.core.client.impl.ClientSessionFactoryInternal;
 import org.apache.activemq.core.client.impl.ServerLocatorImpl;
 import org.apache.activemq.core.client.impl.ServerLocatorInternal;
 import org.apache.activemq.core.client.impl.Topology;
+import org.apache.activemq.core.config.ClusterConnectionConfiguration;
 import org.apache.activemq.core.protocol.core.Channel;
 import org.apache.activemq.core.protocol.core.ChannelHandler;
 import org.apache.activemq.core.protocol.core.CoreRemotingConnection;
@@ -162,16 +163,12 @@ public class ClusterController implements ActiveMQComponent
     *
     * @param name the cluster connection name
     * @param dg the discovery group to use
+    * @param config the cluster connection config
     */
-   public void addClusterConnection(SimpleString name, DiscoveryGroupConfiguration dg)
+   public void addClusterConnection(SimpleString name, DiscoveryGroupConfiguration dg, ClusterConnectionConfiguration
config)
    {
       ServerLocatorImpl serverLocator = (ServerLocatorImpl) ActiveMQClient.createServerLocatorWithHA(dg);
-      //if the cluster isn't available we want to hang around until it is
-      serverLocator.setReconnectAttempts(-1);
-      serverLocator.setInitialConnectAttempts(-1);
-      //this is used for replication so need to use the server packet decoder
-      serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance());
-      locators.put(name, serverLocator);
+      configAndAdd(name, serverLocator, config);
    }
 
    /**
@@ -180,9 +177,16 @@ public class ClusterController implements ActiveMQComponent
     * @param name the cluster connection name
     * @param tcConfigs the transport configurations to use
     */
-   public void addClusterConnection(SimpleString name, TransportConfiguration[] tcConfigs)
+   public void addClusterConnection(SimpleString name, TransportConfiguration[] tcConfigs,
ClusterConnectionConfiguration config)
    {
       ServerLocatorImpl serverLocator = (ServerLocatorImpl) ActiveMQClient.createServerLocatorWithHA(tcConfigs);
+      configAndAdd(name, serverLocator, config);
+   }
+
+   private void configAndAdd(SimpleString name, ServerLocatorInternal serverLocator, ClusterConnectionConfiguration
config)
+   {
+      serverLocator.setConnectionTTL(config.getConnectionTTL());
+      serverLocator.setClientFailureCheckPeriod(config.getClientFailureCheckPeriod());
       //if the cluster isn't available we want to hang around until it is
       serverLocator.setReconnectAttempts(-1);
       serverLocator.setInitialConnectAttempts(-1);
@@ -455,4 +459,10 @@ public class ClusterController implements ActiveMQComponent
          }
       }
    }
+
+   public ServerLocator getReplicationLocator()
+   {
+      return this.replicationLocator;
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/1d159e6d/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/ClusterManager.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/ClusterManager.java
b/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/ClusterManager.java
index 9cfacfd..7d09fdd 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/ClusterManager.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/ClusterManager.java
@@ -751,7 +751,7 @@ public final class ClusterManager implements ActiveMQComponent
                                                        config.getClusterNotificationInterval(),
                                                        config.getClusterNotificationAttempts());
 
-         clusterController.addClusterConnection(clusterConnection.getName(), dg);
+         clusterController.addClusterConnection(clusterConnection.getName(), dg, config);
       }
       else
       {
@@ -794,7 +794,7 @@ public final class ClusterManager implements ActiveMQComponent
                                                        config.getClusterNotificationAttempts());
 
 
-         clusterController.addClusterConnection(clusterConnection.getName(), tcConfigs);
+         clusterController.addClusterConnection(clusterConnection.getName(), tcConfigs, config);
       }
 
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/1d159e6d/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/failover/FailoverTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/failover/FailoverTestBase.java
b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/failover/FailoverTestBase.java
index 77920e9..a83661c 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/failover/FailoverTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/failover/FailoverTestBase.java
@@ -210,7 +210,7 @@ public abstract class FailoverTestBase extends ServiceTestBase
       backupConfig = createDefaultConfig();
       liveConfig = createDefaultConfig();
 
-      ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor,
liveConfig, liveConnector);
+      ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor,
liveConfig, liveConnector, null);
 
       final String suffix = "_backup";
       backupConfig.setBindingsDirectory(backupConfig.getBindingsDirectory() + suffix)

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/1d159e6d/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/replication/ReplicationTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/replication/ReplicationTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/replication/ReplicationTest.java
index df77659..5b2829c 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/replication/ReplicationTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/replication/ReplicationTest.java
@@ -45,6 +45,7 @@ import org.apache.activemq.api.core.client.ClientProducer;
 import org.apache.activemq.api.core.client.ClientSession;
 import org.apache.activemq.api.core.client.ClientSessionFactory;
 import org.apache.activemq.api.core.client.ServerLocator;
+import org.apache.activemq.core.config.ClusterConnectionConfiguration;
 import org.apache.activemq.core.config.Configuration;
 import org.apache.activemq.core.config.ha.SharedStoreSlavePolicyConfiguration;
 import org.apache.activemq.core.journal.EncodingSupport;
@@ -75,6 +76,8 @@ import org.apache.activemq.core.replication.ReplicationManager;
 import org.apache.activemq.core.server.ActiveMQComponent;
 import org.apache.activemq.core.server.ActiveMQServer;
 import org.apache.activemq.core.server.ServerMessage;
+import org.apache.activemq.core.server.cluster.ClusterController;
+import org.apache.activemq.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.core.server.impl.ServerMessageImpl;
 import org.apache.activemq.core.settings.HierarchicalRepository;
 import org.apache.activemq.core.settings.impl.AddressSettings;
@@ -117,10 +120,30 @@ public final class ReplicationTest extends ServiceTestBase
 
    private void setupServer(boolean backup, String... interceptors) throws Exception
    {
+      this.setupServer(false, backup, null, interceptors);
+   }
 
-      final TransportConfiguration liveConnector = TransportConfigurationUtils.getInVMConnector(true);
-      final TransportConfiguration backupConnector = TransportConfigurationUtils.getInVMConnector(false);
-      final TransportConfiguration backupAcceptor = TransportConfigurationUtils.getInVMAcceptor(false);
+   private void setupServer(boolean useNetty, boolean backup,
+                            ExtraConfigurer extraConfig,
+                            String... incomingInterceptors) throws Exception
+   {
+      TransportConfiguration liveConnector = null;
+      TransportConfiguration liveAcceptor = null;
+      TransportConfiguration backupConnector = null;
+      TransportConfiguration backupAcceptor = null;
+      if (useNetty)
+      {
+         liveConnector = TransportConfigurationUtils.getNettyConnector(true, 0);
+         liveAcceptor = TransportConfigurationUtils.getNettyAcceptor(true, 0);
+         backupConnector = TransportConfigurationUtils.getNettyConnector(false, 0);
+         backupAcceptor = TransportConfigurationUtils.getNettyAcceptor(false, 0);
+      }
+      else
+      {
+         liveConnector = TransportConfigurationUtils.getInVMConnector(true);
+         backupConnector = TransportConfigurationUtils.getInVMConnector(false);
+         backupAcceptor = TransportConfigurationUtils.getInVMAcceptor(false);
+      }
 
       final String suffix = "_backup";
       Configuration liveConfig = createDefaultConfig();
@@ -131,9 +154,14 @@ public final class ReplicationTest extends ServiceTestBase
          .setJournalDirectory(ActiveMQDefaultConfiguration.getDefaultJournalDir() + suffix)
          .setPagingDirectory(ActiveMQDefaultConfiguration.getDefaultPagingDir() + suffix)
          .setLargeMessagesDirectory(ActiveMQDefaultConfiguration.getDefaultLargeMessagesDir()
+ suffix)
-         .setIncomingInterceptorClassNames(interceptors.length > 0 ? Arrays.asList(interceptors)
: new ArrayList<String>());
+         .setIncomingInterceptorClassNames(incomingInterceptors.length > 0 ? Arrays.asList(incomingInterceptors)
: new ArrayList<String>());
 
-      ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor,
liveConfig, liveConnector);
+      ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor,
liveConfig, liveConnector, liveAcceptor);
+
+      if (extraConfig != null)
+      {
+         extraConfig.config(liveConfig, backupConfig);
+      }
 
       if (backup)
       {
@@ -143,7 +171,15 @@ public final class ReplicationTest extends ServiceTestBase
       }
 
       backupServer = createServer(backupConfig);
-      locator = createInVMNonHALocator();
+      if (useNetty)
+      {
+         locator = createNettyNonHALocator();
+      }
+      else
+      {
+         locator = createInVMNonHALocator();
+      }
+
       backupServer.start();
       if (backup)
       {
@@ -413,6 +449,37 @@ public final class ReplicationTest extends ServiceTestBase
 
    }
 
+   @Test
+   public void testClusterConnectionConfigs() throws Exception
+   {
+      final long ttlOverride = 123456789;
+      final long checkPeriodOverride = 987654321;
+
+      ExtraConfigurer configurer = new ExtraConfigurer() {
+
+         @Override
+         public void config(Configuration liveConfig, Configuration backupConfig)
+         {
+            List<ClusterConnectionConfiguration> ccList = backupConfig.getClusterConfigurations();
+            assertTrue(ccList.size() > 0);
+            ClusterConnectionConfiguration cc = ccList.get(0);
+            cc.setConnectionTTL(ttlOverride);
+            cc.setClientFailureCheckPeriod(checkPeriodOverride);
+         }
+      };
+      this.setupServer(true, true, configurer);
+      assertTrue(backupServer instanceof ActiveMQServerImpl);
+
+      ClusterController controller = backupServer.getClusterManager().getClusterController();
+
+      ServerLocator replicationLocator = controller.getReplicationLocator();
+
+      assertNotNull(replicationLocator);
+
+      assertEquals(ttlOverride, replicationLocator.getConnectionTTL());
+      assertEquals(checkPeriodOverride, replicationLocator.getClientFailureCheckPeriod());
+   }
+
    /**
     * @return
     * @throws Exception
@@ -901,4 +968,9 @@ public final class ReplicationTest extends ServiceTestBase
          // no-op
       }
    }
+
+   private interface ExtraConfigurer
+   {
+      void config(Configuration liveConfig, Configuration backupConfig);
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/1d159e6d/tests/integration-tests/src/test/java/org/apache/activemq/tests/util/ReplicatedBackupUtils.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/util/ReplicatedBackupUtils.java
b/tests/integration-tests/src/test/java/org/apache/activemq/tests/util/ReplicatedBackupUtils.java
index 36b2ed7..4d61320 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/util/ReplicatedBackupUtils.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/util/ReplicatedBackupUtils.java
@@ -34,13 +34,19 @@ public final class ReplicatedBackupUtils
                                                TransportConfiguration backupConnector,
                                                TransportConfiguration backupAcceptor,
                                                Configuration liveConfig,
-                                               TransportConfiguration liveConnector)
+                                               TransportConfiguration liveConnector,
+                                               TransportConfiguration liveAcceptor)
    {
       if (backupAcceptor != null)
       {
          backupConfig.clearAcceptorConfigurations().addAcceptorConfiguration(backupAcceptor);
       }
 
+      if (liveAcceptor != null)
+      {
+         liveConfig.clearAcceptorConfigurations().addAcceptorConfiguration(liveAcceptor);
+      }
+
       backupConfig.addConnectorConfiguration(BACKUP_NODE_NAME, backupConnector)
          .addConnectorConfiguration(LIVE_NODE_NAME, liveConnector)
          .addClusterConfiguration(UnitTestCase.basicClusterConnectionConfig(BACKUP_NODE_NAME,
LIVE_NODE_NAME))


Mime
View raw message