helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject [1/2] git commit: [HELIX-400] Remove all references to the old full auto rebalancing code
Date Wed, 12 Mar 2014 17:35:50 GMT
Repository: helix
Updated Branches:
  refs/heads/helix-0.6.2-release b6b7807dd -> c6cb2c2c8


[HELIX-400] Remove all references to the old full auto rebalancing code


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/8d99778a
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/8d99778a
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/8d99778a

Branch: refs/heads/helix-0.6.2-release
Commit: 8d99778a30d10f529ee0757286efa84ea581b5bf
Parents: b6b7807
Author: Kanak Biscuitwala <kanak@apache.org>
Authored: Tue Mar 11 18:27:30 2014 -0700
Committer: Kanak Biscuitwala <kanak@apache.org>
Committed: Tue Mar 11 18:27:30 2014 -0700

----------------------------------------------------------------------
 .../controller/rebalancer/AutoRebalancer.java   | 194 +------------------
 .../helix/integration/TestAutoRebalance.java    |  27 ++-
 .../helix/integration/TestDisablePartition.java | 120 ++++++++++++
 3 files changed, 139 insertions(+), 202 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/8d99778a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
index 5a832ce..745a9c9 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
@@ -20,7 +20,6 @@ package org.apache.helix.controller.rebalancer;
  */
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -28,7 +27,6 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.TreeMap;
 
 import org.apache.helix.HelixManager;
 import org.apache.helix.ZNRecord;
@@ -40,7 +38,6 @@ import org.apache.helix.controller.stages.ResourceAssignment;
 import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
 import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
 import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
-import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.LiveInstance;
@@ -208,18 +205,14 @@ public class AutoRebalancer implements Rebalancer, MappingCalculator
{
       map.put(partition, new HashMap<String, String>());
       for (String node : curStateMap.keySet()) {
         String state = curStateMap.get(node);
-        if (stateCountMap.containsKey(state)) {
-          map.get(partition).put(node, state);
-        }
+        map.get(partition).put(node, state);
       }
 
       Map<String, String> pendingStateMap =
           currentStateOutput.getPendingStateMap(resourceName, new Partition(partition));
       for (String node : pendingStateMap.keySet()) {
         String state = pendingStateMap.get(node);
-        if (stateCountMap.containsKey(state)) {
-          map.get(partition).put(node, state);
-        }
+        map.get(partition).put(node, state);
       }
     }
     return map;
@@ -233,7 +226,6 @@ public class AutoRebalancer implements Rebalancer, MappingCalculator {
     }
     String stateModelDefName = idealState.getStateModelDefRef();
     StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);
-    calculateAutoBalancedIdealState(cache, idealState, stateModelDef);
     ResourceAssignment partitionMapping = new ResourceAssignment();
     for (Partition partition : resource.getPartitions()) {
       Map<String, String> currentStateMap =
@@ -249,186 +241,4 @@ public class AutoRebalancer implements Rebalancer, MappingCalculator
{
     }
     return partitionMapping;
   }
-
-  /**
-   * Compute best state for resource in AUTO_REBALANCE ideal state mode. the algorithm
-   * will make sure that the master partition are evenly distributed; Also when instances
-   * are added / removed, the amount of diff in master partitions are minimized
-   * @param cache
-   * @param idealState
-   * @param instancePreferenceList
-   * @param stateModelDef
-   * @param currentStateOutput
-   * @return
-   */
-  private void calculateAutoBalancedIdealState(ClusterDataCache cache, IdealState idealState,
-      StateModelDefinition stateModelDef) {
-    String topStateValue = stateModelDef.getStatesPriorityList().get(0);
-    Set<String> liveInstances = cache.getLiveInstances().keySet();
-    Set<String> taggedInstances = new HashSet<String>();
-
-    // If there are instances tagged with resource name, use only those instances
-    if (idealState.getInstanceGroupTag() != null) {
-      for (String instanceName : liveInstances) {
-        if (cache.getInstanceConfigMap().get(instanceName)
-            .containsTag(idealState.getInstanceGroupTag())) {
-          taggedInstances.add(instanceName);
-        }
-      }
-    }
-    if (taggedInstances.size() > 0) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("found the following instances with tag " + idealState.getResourceName()
+ " "
-            + taggedInstances);
-      }
-      liveInstances = taggedInstances;
-    }
-    // Obtain replica number
-    int replicas = 1;
-    try {
-      replicas = Integer.parseInt(idealState.getReplicas());
-    } catch (Exception e) {
-      LOG.error("", e);
-    }
-    // Init for all partitions with empty list
-    Map<String, List<String>> defaultListFields = new TreeMap<String, List<String>>();
-    List<String> emptyList = new ArrayList<String>(0);
-    for (String partition : idealState.getPartitionSet()) {
-      defaultListFields.put(partition, emptyList);
-    }
-    idealState.getRecord().setListFields(defaultListFields);
-    // Return if no live instance
-    if (liveInstances.size() == 0) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("No live instances, return. Idealstate : " + idealState.getResourceName());
-      }
-      return;
-    }
-    Map<String, List<String>> masterAssignmentMap = new HashMap<String, List<String>>();
-    for (String instanceName : liveInstances) {
-      masterAssignmentMap.put(instanceName, new ArrayList<String>());
-    }
-    Set<String> orphanedPartitions = new HashSet<String>();
-    orphanedPartitions.addAll(idealState.getPartitionSet());
-    // Go through all current states and fill the assignments
-    for (String liveInstanceName : liveInstances) {
-      CurrentState currentState =
-          cache.getCurrentState(liveInstanceName,
-              cache.getLiveInstances().get(liveInstanceName).getSessionId())
-              .get(idealState.getId());
-      if (currentState != null) {
-        Map<String, String> partitionStates = currentState.getPartitionStateMap();
-        for (String partitionName : partitionStates.keySet()) {
-          String state = partitionStates.get(partitionName);
-          if (state.equals(topStateValue)) {
-            masterAssignmentMap.get(liveInstanceName).add(partitionName);
-            orphanedPartitions.remove(partitionName);
-          }
-        }
-      }
-    }
-    List<String> orphanedPartitionsList = new ArrayList<String>();
-    orphanedPartitionsList.addAll(orphanedPartitions);
-    int maxPartitionsPerInstance = idealState.getMaxPartitionsPerInstance();
-    normalizeAssignmentMap(masterAssignmentMap, orphanedPartitionsList, maxPartitionsPerInstance);
-    idealState.getRecord().setListFields(
-        generateListFieldFromMasterAssignment(masterAssignmentMap, replicas));
-  }
-
-  /**
-   * Given the current master assignment map and the partitions not hosted, generate an
-   * evenly distributed partition assignment map
-   * @param masterAssignmentMap
-   *          current master assignment map
-   * @param orphanPartitions
-   *          partitions not hosted by any instance
-   * @return
-   */
-  private void normalizeAssignmentMap(Map<String, List<String>> masterAssignmentMap,
-      List<String> orphanPartitions, int maxPartitionsPerInstance) {
-    int totalPartitions = 0;
-    String[] instanceNames = new String[masterAssignmentMap.size()];
-    masterAssignmentMap.keySet().toArray(instanceNames);
-    Arrays.sort(instanceNames);
-    // Find out total partition number
-    for (String key : masterAssignmentMap.keySet()) {
-      totalPartitions += masterAssignmentMap.get(key).size();
-      Collections.sort(masterAssignmentMap.get(key));
-    }
-    totalPartitions += orphanPartitions.size();
-
-    // Find out how many partitions an instance should host
-    int partitionNumber = totalPartitions / masterAssignmentMap.size();
-    int leave = totalPartitions % masterAssignmentMap.size();
-
-    for (int i = 0; i < instanceNames.length; i++) {
-      int targetPartitionNo = leave > 0 ? (partitionNumber + 1) : partitionNumber;
-      leave--;
-      // For hosts that has more partitions, move those partitions to "orphaned"
-      while (masterAssignmentMap.get(instanceNames[i]).size() > targetPartitionNo) {
-        int lastElementIndex = masterAssignmentMap.get(instanceNames[i]).size() - 1;
-        orphanPartitions.add(masterAssignmentMap.get(instanceNames[i]).get(lastElementIndex));
-        masterAssignmentMap.get(instanceNames[i]).remove(lastElementIndex);
-      }
-    }
-    leave = totalPartitions % masterAssignmentMap.size();
-    Collections.sort(orphanPartitions);
-    // Assign "orphaned" partitions to hosts that do not have enough partitions
-    for (int i = 0; i < instanceNames.length; i++) {
-      int targetPartitionNo = leave > 0 ? (partitionNumber + 1) : partitionNumber;
-      leave--;
-      if (targetPartitionNo > maxPartitionsPerInstance) {
-        targetPartitionNo = maxPartitionsPerInstance;
-      }
-      while (masterAssignmentMap.get(instanceNames[i]).size() < targetPartitionNo) {
-        int lastElementIndex = orphanPartitions.size() - 1;
-        masterAssignmentMap.get(instanceNames[i]).add(orphanPartitions.get(lastElementIndex));
-        orphanPartitions.remove(lastElementIndex);
-      }
-    }
-    if (orphanPartitions.size() > 0) {
-      LOG.warn("orphanPartitions still contains elements");
-    }
-  }
-
-  /**
-   * Generate full preference list from the master assignment map evenly distribute the
-   * slave partitions mastered on a host to other hosts
-   * @param masterAssignmentMap
-   *          current master assignment map
-   * @param orphanPartitions
-   *          partitions not hosted by any instance
-   * @return
-   */
-  private Map<String, List<String>> generateListFieldFromMasterAssignment(
-      Map<String, List<String>> masterAssignmentMap, int replicas) {
-    Map<String, List<String>> listFields = new HashMap<String, List<String>>();
-    int slaves = replicas - 1;
-    String[] instanceNames = new String[masterAssignmentMap.size()];
-    masterAssignmentMap.keySet().toArray(instanceNames);
-    Arrays.sort(instanceNames);
-
-    for (int i = 0; i < instanceNames.length; i++) {
-      String instanceName = instanceNames[i];
-      List<String> otherInstances = new ArrayList<String>(masterAssignmentMap.size()
- 1);
-      for (int x = 0; x < instanceNames.length - 1; x++) {
-        int index = (x + i + 1) % instanceNames.length;
-        otherInstances.add(instanceNames[index]);
-      }
-
-      List<String> partitionList = masterAssignmentMap.get(instanceName);
-      for (int j = 0; j < partitionList.size(); j++) {
-        String partitionName = partitionList.get(j);
-        listFields.put(partitionName, new ArrayList<String>());
-        listFields.get(partitionName).add(instanceName);
-
-        int slavesCanAssign = Math.min(slaves, otherInstances.size());
-        for (int k = 0; k < slavesCanAssign; k++) {
-          int index = (j + k + 1) % otherInstances.size();
-          listFields.get(partitionName).add(otherInstances.get(index));
-        }
-      }
-    }
-    return listFields;
-  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/8d99778a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
index b4f9223..746b463 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
@@ -26,9 +26,9 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
@@ -36,17 +36,16 @@ import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.ZkVerifier;
-import org.apache.log4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
-  private static final Logger LOG = Logger.getLogger(TestAutoRebalance.class.getName());
   String db2 = TEST_DB + "2";
   String _tag = "SSDSSD";
 
@@ -96,8 +95,7 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
 
     // start controller
     String controllerName = CONTROLLER_PREFIX + "_0";
-    _controller =
-        new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
     _controller.syncStart();
 
     boolean result =
@@ -245,14 +243,23 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
       HelixDataAccessor accessor =
           new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
       Builder keyBuilder = accessor.keyBuilder();
-      int numberOfPartitions =
-          accessor.getProperty(keyBuilder.idealStates(_resourceName)).getRecord().getListFields()
-              .size();
+      int numberOfPartitions;
+      try {
+        numberOfPartitions =
+            accessor.getProperty(keyBuilder.idealStates(_resourceName)).getRecord().getListFields()
+                .size();
+      } catch (Exception e) {
+        return false;
+      }
       ClusterDataCache cache = new ClusterDataCache();
       cache.refresh(accessor);
+
+      IdealState idealState = cache.getIdealState(_resourceName);
+      if (idealState == null) {
+        return false;
+      }
       String masterValue =
-          cache.getStateModelDef(cache.getIdealState(_resourceName).getStateModelDefRef())
-              .getStatesPriorityList().get(0);
+          cache.getStateModelDef(idealState.getStateModelDefRef()).getStatesPriorityList().get(0);
       int replicas = Integer.parseInt(cache.getIdealState(_resourceName).getReplicas());
       String instanceGroupTag = cache.getIdealState(_resourceName).getInstanceGroupTag();
       int instances = 0;

http://git-wip-us.apache.org/repos/asf/helix/blob/8d99778a/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java
b/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java
index ba7e8e4..fcfc744 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java
@@ -20,19 +20,32 @@ package org.apache.helix.integration;
  */
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.log4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
 public class TestDisablePartition extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
   private static Logger LOG = Logger.getLogger(TestDisablePartition.class);
 
@@ -76,4 +89,111 @@ public class TestDisablePartition extends ZkStandAloneCMTestBaseWithPropertyServ
 
   }
 
+  @Test
+  public void testDisableFullAuto() throws Exception {
+    final int NUM_PARTITIONS = 8;
+    final int NUM_PARTICIPANTS = 2;
+    final int NUM_REPLICAS = 1;
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    ClusterSetup clusterSetup = new ClusterSetup(ZK_ADDR);
+    clusterSetup.addCluster(clusterName, true);
+
+    for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+      String instanceName = "localhost_" + (11420 + i);
+      clusterSetup.addInstanceToCluster(clusterName, instanceName);
+    }
+
+    // Create a known problematic scenario
+    HelixAdmin admin = clusterSetup.getClusterManagementTool();
+    String resourceName = "MailboxDB";
+    IdealState idealState = new IdealState(resourceName + "DR");
+    idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
+    idealState.setStateModelDefRef("LeaderStandby");
+    idealState.setReplicas(String.valueOf(NUM_REPLICAS));
+    idealState.setNumPartitions(NUM_PARTITIONS);
+    for (int i = 0; i < NUM_PARTITIONS; i++) {
+      String partitionName = resourceName + '_' + i;
+      List<String> assignmentList = Lists.newArrayList();
+      if (i < NUM_PARTITIONS / 2) {
+        assignmentList.add("localhost_11420");
+      } else {
+        assignmentList.add("localhost_11421");
+      }
+      Map<String, String> emptyMap = Maps.newHashMap();
+      idealState.getRecord().setListField(partitionName, assignmentList);
+      idealState.getRecord().setMapField(partitionName, emptyMap);
+    }
+    admin.addResource(clusterName, idealState.getResourceName(), idealState);
+
+    // Start everything
+    MockParticipantManager[] participants = new MockParticipantManager[NUM_PARTICIPANTS];
+    for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+      String instanceName = "localhost_" + (11420 + i);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].syncStart();
+    }
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_1");
+    controller.syncStart();
+
+    Thread.sleep(1000);
+
+    // Switch to full auto
+    idealState.setRebalanceMode(RebalanceMode.FULL_AUTO);
+    for (int i = 0; i < NUM_PARTITIONS; i++) {
+      List<String> emptyList = Collections.emptyList();
+      idealState.getRecord().setListField(resourceName + '_' + i, emptyList);
+    }
+    admin.setResourceIdealState(clusterName, idealState.getResourceName(), idealState);
+
+    Thread.sleep(1000);
+
+    // Get the external view
+    HelixDataAccessor accessor = controller.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    ExternalView externalView =
+        accessor.getProperty(keyBuilder.externalView(idealState.getResourceName()));
+
+    // Disable the partitions in an order known to cause problems
+    int[] pid = {
+        0, 7
+    };
+    for (int i = 0; i < pid.length; i++) {
+      String partitionName = resourceName + '_' + pid[i];
+      Map<String, String> stateMap = externalView.getStateMap(partitionName);
+      String leader = null;
+      for (String participantName : stateMap.keySet()) {
+        String state = stateMap.get(participantName);
+        if (state.equals("LEADER")) {
+          leader = participantName;
+        }
+      }
+      List<String> partitionNames = Lists.newArrayList(partitionName);
+      admin.enablePartition(false, clusterName, leader, idealState.getResourceName(),
+          partitionNames);
+
+      Thread.sleep(1000);
+    }
+
+    // Ensure that nothing was reassigned and the disabled are offline
+    externalView = accessor.getProperty(keyBuilder.externalView(idealState.getResourceName()));
+    Map<String, String> p0StateMap = externalView.getStateMap(resourceName + "_0");
+    Assert.assertEquals(p0StateMap.size(), 1);
+    String p0Participant = p0StateMap.keySet().iterator().next();
+    Assert.assertEquals(p0StateMap.get(p0Participant), "OFFLINE");
+    Map<String, String> p7StateMap = externalView.getStateMap(resourceName + "_7");
+    Assert.assertEquals(p7StateMap.size(), 1);
+    String p7Participant = p7StateMap.keySet().iterator().next();
+    Assert.assertEquals(p7StateMap.get(p7Participant), "OFFLINE");
+
+    // Cleanup
+    controller.syncStop();
+    for (MockParticipantManager participant : participants) {
+      participant.syncStop();
+    }
+  }
+
 }


Mime
View raw message