helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/3] helix git commit: Add integration test cases to test crush rebalance strategy for non-rackaware clusters.
Date Tue, 26 Sep 2017 20:55:39 GMT
Repository: helix
Updated Branches:
  refs/heads/master 1c855ae85 -> 79ebc0469


Add integration test cases to test crush rebalance strategy for non-rackaware clusters.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/de1a27f6
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/de1a27f6
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/de1a27f6

Branch: refs/heads/master
Commit: de1a27f6cf13c190275f2544c6c9d4afe78d5a82
Parents: 1c855ae
Author: Lei Xia <lxia@linkedin.com>
Authored: Mon Sep 25 17:08:04 2017 -0700
Committer: Junkai Xue <jxue@linkedin.com>
Committed: Mon Sep 25 17:08:04 2017 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/helix/HelixAdmin.java  |   4 +-
 .../org/apache/helix/model/InstanceConfig.java  |  43 ++++
 .../org/apache/helix/tools/ClusterSetup.java    |  42 +---
 .../TestCrushAutoRebalanceNonRack.java          | 216 +++++++++++++++++++
 4 files changed, 263 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/de1a27f6/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
index 75cbfcf..8f47ea5 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
@@ -445,14 +445,14 @@ public interface HelixAdmin {
 
   /**
    * @param clusterName
-   * @param instanceNames
+   * @param instanceName
    * @param tag
    */
   void addInstanceTag(String clusterName, String instanceName, String tag);
 
   /**
    * @param clusterName
-   * @param instanceNames
+   * @param instanceName
    * @param tag
    */
   void removeInstanceTag(String clusterName, String instanceName, String tag);

http://git-wip-us.apache.org/repos/asf/helix/blob/de1a27f6/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
index 6002591..1a80e70 100644
--- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
@@ -452,4 +452,47 @@ public class InstanceConfig extends HelixProperty {
     // HELIX-65: remove check for hostname/port existence
     return true;
   }
+
+  /**
+   * Create InstanceConfig with given instanceId, instanceId should be in format of host:port
+   * @param instanceId
+   * @return
+   */
+  public static InstanceConfig toInstanceConfig(String instanceId) {
+    String host = null;
+    int port = -1;
+    // to maintain backward compatibility we parse string of format host:port
+    // and host_port, where host port must be of type string and int
+    char[] delims = new char[] {
+        ':', '_'
+    };
+    for (char delim : delims) {
+      String regex = String.format("(.*)[%c]([\\d]+)", delim);
+      if (instanceId.matches(regex)) {
+        int lastIndexOf = instanceId.lastIndexOf(delim);
+        try {
+          port = Integer.parseInt(instanceId.substring(lastIndexOf + 1));
+          host = instanceId.substring(0, lastIndexOf);
+        } catch (Exception e) {
+          _logger.warn("Unable to extract host and port from instanceId:" + instanceId);
+        }
+        break;
+      }
+    }
+    if (host != null && port > 0) {
+      instanceId = host + "_" + port;
+    }
+    InstanceConfig config = new InstanceConfig(instanceId);
+    if (host != null && port > 0) {
+      config.setHostName(host);
+      config.setPort(String.valueOf(port));
+
+    }
+
+    config.setInstanceEnabled(true);
+    if (config.getHostName() == null) {
+      config.setHostName(instanceId);
+    }
+    return config;
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/de1a27f6/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index e79410d..03070b2 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -178,46 +178,8 @@ public class ClusterSetup {
     }
   }
 
-  private InstanceConfig toInstanceConfig(String instanceId) {
-    String host = null;
-    int port = -1;
-    // to maintain backward compatibility we parse string of format host:port
-    // and host_port, where host port must be of type string and int
-    char[] delims = new char[] {
-        ':', '_'
-    };
-    for (char delim : delims) {
-      String regex = String.format("(.*)[%c]([\\d]+)", delim);
-      if (instanceId.matches(regex)) {
-        int lastIndexOf = instanceId.lastIndexOf(delim);
-        try {
-          port = Integer.parseInt(instanceId.substring(lastIndexOf + 1));
-          host = instanceId.substring(0, lastIndexOf);
-        } catch (Exception e) {
-          _logger.warn("Unable to extract host and port from instanceId:" + instanceId);
-        }
-        break;
-      }
-    }
-    if (host != null && port > 0) {
-      instanceId = host + "_" + port;
-    }
-    InstanceConfig config = new InstanceConfig(instanceId);
-    if (host != null && port > 0) {
-      config.setHostName(host);
-      config.setPort(String.valueOf(port));
-
-    }
-
-    config.setInstanceEnabled(true);
-    if (config.getHostName() == null) {
-      config.setHostName(instanceId);
-    }
-    return config;
-  }
-
   public void addInstanceToCluster(String clusterName, String instanceId) {
-    InstanceConfig config = toInstanceConfig(instanceId);
+    InstanceConfig config = InstanceConfig.toInstanceConfig(instanceId);
     _admin.addInstance(clusterName, config);
   }
 
@@ -238,7 +200,7 @@ public class ClusterSetup {
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
-    InstanceConfig instanceConfig = toInstanceConfig(instanceId);
+    InstanceConfig instanceConfig = InstanceConfig.toInstanceConfig(instanceId);
     instanceId = instanceConfig.getInstanceName();
 
     // ensure node is stopped

http://git-wip-us.apache.org/repos/asf/helix/blob/de1a27f6/helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalanceNonRack.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalanceNonRack.java
b/helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalanceNonRack.java
new file mode 100644
index 0000000..9672483
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalanceNonRack.java
@@ -0,0 +1,216 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class TestCrushAutoRebalanceNonRack extends ZkIntegrationTestBase {
+  final int NUM_NODE = 6;
+  protected static final int START_PORT = 12918;
+  protected static final int _PARTITIONS = 20;
+
+  protected final String CLASS_NAME = getShortClassName();
+  protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+  protected ClusterControllerManager _controller;
+
+  protected ClusterSetup _setupTool = null;
+  List<MockParticipantManager> _participants = new ArrayList<MockParticipantManager>();
+  Map<String, String> _nodeToTagMap = new HashMap<String, String>();
+  List<String> _nodes = new ArrayList<String>();
+  List<String> _allDBs = new ArrayList<String>();
+  int _replica = 3;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
+    }
+    _setupTool = new ClusterSetup(_gZkClient);
+    _setupTool.addCluster(CLUSTER_NAME, true);
+
+    ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+      HelixConfigScope clusterScope =
+          new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER)
+              .forCluster(CLUSTER_NAME).build();
+
+    Map<String, String> configs = new HashMap<String, String>();
+    configs.put(ClusterConfig.ClusterConfigProperty.TOPOLOGY.name(), "/instance");
+    configs.put(ClusterConfig.ClusterConfigProperty.FAULT_ZONE_TYPE.name(), "instance");
+    configAccessor.set(clusterScope, configs);
+
+    for (int i = 0; i < NUM_NODE; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+      _nodes.add(storageNodeName);
+      String tag = "tag-" + i % 2;
+      _setupTool.getClusterManagementTool().addInstanceTag(CLUSTER_NAME, storageNodeName,
tag);
+      _nodeToTagMap.put(storageNodeName, tag);
+      HelixConfigScope instanceScope =
+          new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT)
+              .forCluster(CLUSTER_NAME).forParticipant(storageNodeName).build();
+      configAccessor
+          .set(instanceScope, InstanceConfig.InstanceConfigProperty.DOMAIN.name(), "instance="
+ storageNodeName);
+    }
+
+    // start dummy participants
+    for (String node : _nodes) {
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, node);
+      participant.syncStart();
+      _participants.add(participant);
+    }
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+  }
+
+  @DataProvider(name = "rebalanceStrategies")
+  public static String [][] rebalanceStrategies() {
+    return new String[][] { {"CrushRebalanceStrategy", CrushRebalanceStrategy.class.getName()}};
+  }
+
+  @Test(dataProvider = "rebalanceStrategies", enabled=true)
+  public void test(String rebalanceStrategyName, String rebalanceStrategyClass)
+      throws Exception {
+    System.out.println("Test " + rebalanceStrategyName);
+    List<String> testDBs = new ArrayList<String>();
+    String[] testModels = { BuiltInStateModelDefinitions.OnlineOffline.name(),
+        BuiltInStateModelDefinitions.MasterSlave.name(),
+        BuiltInStateModelDefinitions.LeaderStandby.name()
+    };
+    int i = 0;
+    for (String stateModel : testModels) {
+      String db = "Test-DB-" + rebalanceStrategyName + "-" + i++;
+      _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS, stateModel,
+          RebalanceMode.FULL_AUTO + "", rebalanceStrategyClass);
+      _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+      testDBs.add(db);
+      _allDBs.add(db);
+    }
+    Thread.sleep(300);
+
+    boolean result = ClusterStateVerifier.verifyByZkCallback(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+    Assert.assertTrue(result);
+
+    for (String db : testDBs) {
+      IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME,
db);
+      ExternalView ev =
+          _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      validateIsolation(is, ev);
+    }
+  }
+
+  @Test(dataProvider = "rebalanceStrategies", enabled=true)
+  public void testWithInstanceTag(
+      String rebalanceStrategyName, String rebalanceStrategyClass) throws Exception {
+    List<String> testDBs = new ArrayList<String>();
+    Set<String> tags = new HashSet<String>(_nodeToTagMap.values());
+    int i = 0;
+    for (String tag : tags) {
+      String db = "Test-DB-Tag-" + rebalanceStrategyName + "-" + i++;
+      _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS,
+          BuiltInStateModelDefinitions.MasterSlave.name(), RebalanceMode.FULL_AUTO + "",
+          rebalanceStrategyClass);
+      IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME,
db);
+      is.setInstanceGroupTag(tag);
+      _setupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, db, is);
+      _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+      testDBs.add(db);
+      _allDBs.add(db);
+    }
+    Thread.sleep(300);
+
+    boolean result = ClusterStateVerifier.verifyByZkCallback(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+    Assert.assertTrue(result);
+
+    for (String db : testDBs) {
+      IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME,
db);
+      ExternalView ev =
+          _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      validateIsolation(is, ev);
+    }
+  }
+
+  /**
+   * Validate each partition is different instances and with necessary tagged instances.
+   */
+  private void validateIsolation(IdealState is, ExternalView ev) {
+    int replica = Integer.valueOf(is.getReplicas());
+    String tag = is.getInstanceGroupTag();
+
+    for (String partition : is.getPartitionSet()) {
+      Map<String, String> assignmentMap = ev.getRecord().getMapField(partition);
+      Set<String> instancesInEV = assignmentMap.keySet();
+      Assert.assertEquals(instancesInEV.size(), replica);
+      for (String instance : instancesInEV) {
+        if (tag != null) {
+          InstanceConfig config =
+              _setupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instance);
+          Assert.assertTrue(config.containsTag(tag));
+        }
+      }
+    }
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    /**
+     * shutdown order: 1) disconnect the controller 2) disconnect participants
+     */
+    _controller.syncStop();
+    for (MockParticipantManager participant : _participants) {
+      participant.syncStop();
+    }
+    _setupTool.deleteCluster(CLUSTER_NAME);
+    System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+  }
+}


Mime
View raw message