helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject git commit: HELIX-173: Move rebalancing strategies to separate classes that implement the Rebalancer interface, adding missing files, rb=13390
Date Tue, 13 Aug 2013 17:59:12 GMT
Updated Branches:
  refs/heads/master c5a29ca41 -> 986107055


HELIX-173: Move rebalancing strategies to separate classes that implement the Rebalancer interface,
adding missing files, rb=13390


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/98610705
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/98610705
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/98610705

Branch: refs/heads/master
Commit: 986107055a3d4c33c44f939f4eb03a657c7e12f0
Parents: c5a29ca
Author: zzhang <zzhang5@uci.edu>
Authored: Tue Aug 13 10:59:11 2013 -0700
Committer: zzhang <zzhang5@uci.edu>
Committed: Tue Aug 13 10:59:11 2013 -0700

----------------------------------------------------------------------
 .../controller/rebalancer/AutoRebalancer.java   | 410 +++++++++++++++++++
 .../controller/rebalancer/CustomRebalancer.java | 140 +++++++
 .../rebalancer/SemiAutoRebalancer.java          |  85 ++++
 .../util/ConstraintBasedAssignment.java         | 148 +++++++
 .../controller/stages/ResourceMapping.java      |  58 +++
 5 files changed, 841 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/98610705/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
new file mode 100644
index 0000000..c0fb5bf
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
@@ -0,0 +1,410 @@
+package org.apache.helix.controller.rebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.controller.stages.ResourceMapping;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.log4j.Logger;
+
+/**
+ * This is a Rebalancer specific to full automatic mode. It is tasked with computing the
ideal
+ * state of a resource, fully adapting to the addition or removal of instances. This includes
+ * computation of a new preference list and a partition to instance and state mapping based
on the
+ * computed instance preferences.
+ *
+ * The input is the current assignment of partitions to instances, as well as existing instance
+ * preferences, if any.
+ *
+ * The output is a preference list and a mapping based on that preference list, i.e. partition
p
+ * has a replica on node k with state s.
+ */
+public class AutoRebalancer implements Rebalancer {
+  // These should be final, but are initialized in init rather than a constructor
+  private HelixManager _manager;
+  private AutoRebalanceStrategy _algorithm;
+
+  private static final Logger LOG = Logger.getLogger(AutoRebalancer.class);
+
+  @Override
+  public void init(HelixManager manager) {
+    this._manager = manager;
+    this._algorithm = null;
+  }
+
+  @Override
+  public IdealState computeNewIdealState(String resourceName,
+      IdealState currentIdealState, CurrentStateOutput currentStateOutput,
+      ClusterDataCache clusterData) {
+    List<String> partitions = new ArrayList<String>(currentIdealState.getPartitionSet());
+    String stateModelName = currentIdealState.getStateModelDefRef();
+    StateModelDefinition stateModelDef = clusterData.getStateModelDef(stateModelName);
+    Map<String, LiveInstance> liveInstance = clusterData.getLiveInstances();
+    String replicas = currentIdealState.getReplicas();
+
+    LinkedHashMap<String, Integer> stateCountMap = new LinkedHashMap<String, Integer>();
+    stateCountMap = stateCount(stateModelDef, liveInstance.size(), Integer.parseInt(replicas));
+    List<String> liveNodes = new ArrayList<String>(liveInstance.keySet());
+    Map<String, Map<String, String>> currentMapping = currentMapping(currentStateOutput,
+        resourceName, partitions, stateCountMap);
+
+    List<String> allNodes = new ArrayList<String>(clusterData.getInstanceConfigMap().keySet());
+    int maxPartition = currentIdealState.getMaxPartitionsPerInstance();
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("currentMapping: " + currentMapping);
+      LOG.info("stateCountMap: " + stateCountMap);
+      LOG.info("liveNodes: " + liveNodes);
+      LOG.info("allNodes: " + allNodes);
+      LOG.info("maxPartition: " + maxPartition);
+    }
+    ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
+    placementScheme.init(_manager);
+    _algorithm = new AutoRebalanceStrategy(resourceName, partitions, stateCountMap,
+        maxPartition, placementScheme);
+    ZNRecord newMapping = _algorithm.computePartitionAssignment(liveNodes,currentMapping,
+        allNodes);
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("newMapping: " + newMapping);
+    }
+
+    IdealState newIdealState = new IdealState(resourceName);
+    newIdealState.getRecord().setSimpleFields(currentIdealState.getRecord().getSimpleFields());
+    newIdealState.setIdealStateMode(IdealStateModeProperty.AUTO_REBALANCE.toString());
+    newIdealState.getRecord().setListFields(newMapping.getListFields());
+    return newIdealState;
+  }
+
+  /**
+  *
+  * @return state count map: state->count
+  */
+  private LinkedHashMap<String, Integer> stateCount(StateModelDefinition stateModelDef,
int liveNodesNb,
+      int totalReplicas) {
+    LinkedHashMap<String, Integer> stateCountMap = new LinkedHashMap<String, Integer>();
+    List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
+
+    int replicas = totalReplicas;
+    for (String state : statesPriorityList) {
+      String num = stateModelDef.getNumInstancesPerState(state);
+      if ("N".equals(num)) {
+        stateCountMap.put(state, liveNodesNb);
+      } else if ("R".equals(num)) {
+        // wait until we get the counts for all other states
+        continue;
+      } else {
+        int stateCount = -1;
+        try {
+          stateCount = Integer.parseInt(num);
+        } catch (Exception e) {
+          // LOG.error("Invalid count for state: " + state + ", count: " + num +
+          // ", use -1 instead");
+        }
+
+        if (stateCount > 0) {
+          stateCountMap.put(state, stateCount);
+          replicas -= stateCount;
+        }
+      }
+    }
+
+    // get state count for R
+    for (String state : statesPriorityList) {
+      String num = stateModelDef.getNumInstancesPerState(state);
+      if ("R".equals(num)) {
+        stateCountMap.put(state, replicas);
+        // should have at most one state using R
+        break;
+      }
+    }
+    return stateCountMap;
+  }
+
+  private Map<String, Map<String, String>> currentMapping(CurrentStateOutput
currentStateOutput,
+      String resourceName, List<String> partitions, Map<String, Integer> stateCountMap)
{
+
+    Map<String, Map<String, String>> map = new HashMap<String, Map<String,
String>>();
+
+    for (String partition : partitions) {
+      Map<String, String> curStateMap = currentStateOutput.getCurrentStateMap(resourceName,
+          new Partition(partition));
+      map.put(partition, new HashMap<String, String>());
+      for (String node : curStateMap.keySet()) {
+        String state = curStateMap.get(node);
+        if (stateCountMap.containsKey(state)) {
+          map.get(partition).put(node, state);
+        }
+      }
+
+      Map<String, String> pendingStateMap = currentStateOutput.getPendingStateMap(resourceName,
+          new Partition(partition));
+      for (String node : pendingStateMap.keySet()) {
+        String state = pendingStateMap.get(node);
+        if (stateCountMap.containsKey(state)) {
+          map.get(partition).put(node, state);
+        }
+      }
+    }
+    return map;
+  }
+
+  @Override
+  public ResourceMapping computeBestPossiblePartitionState(ClusterDataCache cache,
+      IdealState idealState, Resource resource, CurrentStateOutput currentStateOutput) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing resource:" + resource.getResourceName());
+    }
+    String stateModelDefName = idealState.getStateModelDefRef();
+    StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);
+    calculateAutoBalancedIdealState(cache, idealState, stateModelDef);
+    ResourceMapping partitionMapping = new ResourceMapping();
+    for (Partition partition : resource.getPartitions()) {
+      Map<String, String> currentStateMap =
+          currentStateOutput.getCurrentStateMap(resource.getResourceName(), partition);
+      Set<String> disabledInstancesForPartition =
+          cache.getDisabledInstancesForPartition(partition.toString());
+      List<String> preferenceList = ConstraintBasedAssignment.getPreferenceList(cache,
partition,
+          idealState, stateModelDef);
+      Map<String, String> bestStateForPartition =
+          ConstraintBasedAssignment.computeAutoBestStateForPartition(cache, stateModelDef,
+              preferenceList, currentStateMap, disabledInstancesForPartition);
+      partitionMapping.addReplicaMap(partition, bestStateForPartition);
+    }
+    return partitionMapping;
+  }
+
+  /**
+   * Compute best state for resource in AUTO_REBALANCE ideal state mode. the algorithm
+   * will make sure that the master partition are evenly distributed; Also when instances
+   * are added / removed, the amount of diff in master partitions are minimized
+   *
+   * @param cache
+   * @param idealState
+   * @param instancePreferenceList
+   * @param stateModelDef
+   * @param currentStateOutput
+   * @return
+   */
+  private void calculateAutoBalancedIdealState(ClusterDataCache cache,
+                                               IdealState idealState,
+                                               StateModelDefinition stateModelDef) {
+    String topStateValue = stateModelDef.getStatesPriorityList().get(0);
+    Set<String> liveInstances = cache.getLiveInstances().keySet();
+    Set<String> taggedInstances = new HashSet<String>();
+
+    // If there are instances tagged with resource name, use only those instances
+    if(idealState.getInstanceGroupTag() != null) {
+      for(String instanceName : liveInstances) {
+        if(cache.getInstanceConfigMap().get(instanceName).containsTag(
+            idealState.getInstanceGroupTag())) {
+          taggedInstances.add(instanceName);
+        }
+      }
+    }
+    if(taggedInstances.size() > 0) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("found the following instances with tag " + idealState.getResourceName()
+ " "
+            + taggedInstances);
+      }
+      liveInstances = taggedInstances;
+    }
+    // Obtain replica number
+    int replicas = 1;
+    try {
+      replicas = Integer.parseInt(idealState.getReplicas());
+    }
+    catch (Exception e) {
+      LOG.error("", e);
+    }
+    // Init for all partitions with empty list
+    Map<String, List<String>> defaultListFields = new TreeMap<String, List<String>>();
+    List<String> emptyList = new ArrayList<String>(0);
+    for (String partition : idealState.getPartitionSet()) {
+      defaultListFields.put(partition, emptyList);
+    }
+    idealState.getRecord().setListFields(defaultListFields);
+    // Return if no live instance
+    if (liveInstances.size() == 0) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("No live instances, return. Idealstate : "
+            + idealState.getResourceName());
+      }
+      return;
+    }
+    Map<String, List<String>> masterAssignmentMap = new HashMap<String, List<String>>();
+    for (String instanceName : liveInstances) {
+      masterAssignmentMap.put(instanceName, new ArrayList<String>());
+    }
+    Set<String> orphanedPartitions = new HashSet<String>();
+    orphanedPartitions.addAll(idealState.getPartitionSet());
+    // Go through all current states and fill the assignments
+    for (String liveInstanceName : liveInstances) {
+      CurrentState currentState =
+          cache.getCurrentState(liveInstanceName,
+                                cache.getLiveInstances()
+                                     .get(liveInstanceName)
+                                     .getSessionId()).get(idealState.getId());
+      if (currentState != null) {
+        Map<String, String> partitionStates = currentState.getPartitionStateMap();
+        for (String partitionName : partitionStates.keySet()) {
+          String state = partitionStates.get(partitionName);
+          if (state.equals(topStateValue)) {
+            masterAssignmentMap.get(liveInstanceName).add(partitionName);
+            orphanedPartitions.remove(partitionName);
+          }
+        }
+      }
+    }
+    List<String> orphanedPartitionsList = new ArrayList<String>();
+    orphanedPartitionsList.addAll(orphanedPartitions);
+    int maxPartitionsPerInstance = idealState.getMaxPartitionsPerInstance();
+    normalizeAssignmentMap(masterAssignmentMap, orphanedPartitionsList, maxPartitionsPerInstance);
+    idealState.getRecord()
+              .setListFields(generateListFieldFromMasterAssignment(masterAssignmentMap,
+                                                                   replicas));
+  }
+
+  /**
+   * Given the current master assignment map and the partitions not hosted, generate an
+   * evenly distributed partition assignment map
+   *
+   * @param masterAssignmentMap
+   *          current master assignment map
+   * @param orphanPartitions
+   *          partitions not hosted by any instance
+   * @return
+   */
+  private void normalizeAssignmentMap(Map<String, List<String>> masterAssignmentMap,
+                                      List<String> orphanPartitions,
+                                      int maxPartitionsPerInstance) {
+    int totalPartitions = 0;
+    String[] instanceNames = new String[masterAssignmentMap.size()];
+    masterAssignmentMap.keySet().toArray(instanceNames);
+    Arrays.sort(instanceNames);
+    // Find out total partition number
+    for (String key : masterAssignmentMap.keySet()) {
+      totalPartitions += masterAssignmentMap.get(key).size();
+      Collections.sort(masterAssignmentMap.get(key));
+    }
+    totalPartitions += orphanPartitions.size();
+
+    // Find out how many partitions an instance should host
+    int partitionNumber = totalPartitions / masterAssignmentMap.size();
+    int leave = totalPartitions % masterAssignmentMap.size();
+
+    for (int i = 0; i < instanceNames.length; i++) {
+      int targetPartitionNo = leave > 0 ? (partitionNumber + 1) : partitionNumber;
+      leave--;
+      // For hosts that has more partitions, move those partitions to "orphaned"
+      while (masterAssignmentMap.get(instanceNames[i]).size() > targetPartitionNo) {
+        int lastElementIndex = masterAssignmentMap.get(instanceNames[i]).size() - 1;
+        orphanPartitions.add(masterAssignmentMap.get(instanceNames[i])
+                                                .get(lastElementIndex));
+        masterAssignmentMap.get(instanceNames[i]).remove(lastElementIndex);
+      }
+    }
+    leave = totalPartitions % masterAssignmentMap.size();
+    Collections.sort(orphanPartitions);
+    // Assign "orphaned" partitions to hosts that do not have enough partitions
+    for (int i = 0; i < instanceNames.length; i++) {
+      int targetPartitionNo = leave > 0 ? (partitionNumber + 1) : partitionNumber;
+      leave--;
+      if(targetPartitionNo > maxPartitionsPerInstance) {
+        targetPartitionNo = maxPartitionsPerInstance;
+      }
+      while (masterAssignmentMap.get(instanceNames[i]).size() < targetPartitionNo) {
+        int lastElementIndex = orphanPartitions.size() - 1;
+        masterAssignmentMap.get(instanceNames[i])
+                           .add(orphanPartitions.get(lastElementIndex));
+        orphanPartitions.remove(lastElementIndex);
+      }
+    }
+    if (orphanPartitions.size() > 0) {
+      LOG.warn("orphanPartitions still contains elements");
+    }
+  }
+
+  /**
+   * Generate full preference list from the master assignment map evenly distribute the
+   * slave partitions mastered on a host to other hosts
+   *
+   * @param masterAssignmentMap
+   *          current master assignment map
+   * @param orphanPartitions
+   *          partitions not hosted by any instance
+   * @return
+   */
+  private Map<String, List<String>> generateListFieldFromMasterAssignment(
+      Map<String, List<String>> masterAssignmentMap, int replicas) {
+    Map<String, List<String>> listFields = new HashMap<String, List<String>>();
+    int slaves = replicas - 1;
+    String[] instanceNames = new String[masterAssignmentMap.size()];
+    masterAssignmentMap.keySet().toArray(instanceNames);
+    Arrays.sort(instanceNames);
+
+    for (int i = 0; i < instanceNames.length; i++) {
+      String instanceName = instanceNames[i];
+      List<String> otherInstances = new ArrayList<String>(masterAssignmentMap.size()
- 1);
+      for (int x = 0; x < instanceNames.length - 1; x++) {
+        int index = (x + i + 1) % instanceNames.length;
+        otherInstances.add(instanceNames[index]);
+      }
+
+      List<String> partitionList = masterAssignmentMap.get(instanceName);
+      for (int j = 0; j < partitionList.size(); j++) {
+        String partitionName = partitionList.get(j);
+        listFields.put(partitionName, new ArrayList<String>());
+        listFields.get(partitionName).add(instanceName);
+
+        int slavesCanAssign = Math.min(slaves, otherInstances.size());
+        for (int k = 0; k < slavesCanAssign; k++) {
+          int index = (j + k + 1) % otherInstances.size();
+          listFields.get(partitionName).add(otherInstances.get(index));
+        }
+      }
+    }
+    return listFields;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/98610705/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
new file mode 100644
index 0000000..1cbfde4
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
@@ -0,0 +1,140 @@
+package org.apache.helix.controller.rebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.controller.stages.ResourceMapping;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+/**
+ * This is a Rebalancer specific to custom mode. It is tasked with checking an existing mapping
of
+ * partitions against the set of live instances to mark assignment states as dropped or erroneous
+ * as necessary.
+ *
+ * The input is the required current assignment of partitions to instances, as well as the
required
+ * existing instance preferences.
+ *
+ * The output is a verified mapping based on that preference list, i.e. partition p has a
replica
+ * on node k with state s, where s may be a dropped or error state if necessary.
+ */
+public class CustomRebalancer implements Rebalancer {
+
+  private static final Logger LOG = Logger.getLogger(CustomRebalancer.class);
+
+  @Override
+  public void init(HelixManager manager) {
+  }
+
+  @Override
+  public IdealState computeNewIdealState(String resourceName,
+      IdealState currentIdealState, CurrentStateOutput currentStateOutput,
+      ClusterDataCache clusterData) {
+    return currentIdealState;
+  }
+
+  @Override
+  public ResourceMapping computeBestPossiblePartitionState(ClusterDataCache cache,
+      IdealState idealState, Resource resource, CurrentStateOutput currentStateOutput) {
+    String stateModelDefName = idealState.getStateModelDefRef();
+    StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing resource:" + resource.getResourceName());
+    }
+    ResourceMapping partitionMapping = new ResourceMapping();
+    for (Partition partition : resource.getPartitions()) {
+      Map<String, String> currentStateMap =
+          currentStateOutput.getCurrentStateMap(resource.getResourceName(), partition);
+      Set<String> disabledInstancesForPartition =
+          cache.getDisabledInstancesForPartition(partition.toString());
+      Map<String, String> idealStateMap =
+          idealState.getInstanceStateMap(partition.getPartitionName());
+      Map<String, String> bestStateForPartition =
+          computeCustomizedBestStateForPartition(cache, stateModelDef, idealStateMap,
+              currentStateMap, disabledInstancesForPartition);
+      partitionMapping.addReplicaMap(partition, bestStateForPartition);
+    }
+    return partitionMapping;
+  }
+
+  /**
+   * compute best state for resource in CUSTOMIZED ideal state mode
+   *
+   * @param cache
+   * @param stateModelDef
+   * @param idealStateMap
+   * @param currentStateMap
+   * @param disabledInstancesForPartition
+   * @return
+   */
+  private Map<String, String> computeCustomizedBestStateForPartition(ClusterDataCache
cache,
+      StateModelDefinition stateModelDef, Map<String, String> idealStateMap,
+      Map<String, String> currentStateMap, Set<String> disabledInstancesForPartition)
{
+    Map<String, String> instanceStateMap = new HashMap<String, String>();
+
+    // if the ideal state is deleted, idealStateMap will be null/empty and
+    // we should drop all resources.
+    if (currentStateMap != null) {
+      for (String instance : currentStateMap.keySet()) {
+        if ((idealStateMap == null || !idealStateMap.containsKey(instance))
+            && !disabledInstancesForPartition.contains(instance)) {
+          // if dropped and not disabled, transit to DROPPED
+          instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString());
+        }
+        else if ( (currentStateMap.get(instance) == null
+            || !currentStateMap.get(instance).equals(HelixDefinedState.ERROR.toString()))
+            && disabledInstancesForPartition.contains(instance)) {
+          // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
+          instanceStateMap.put(instance, stateModelDef.getInitialState());
+        }
+      }
+    }
+
+    // ideal state is deleted
+    if (idealStateMap == null) {
+      return instanceStateMap;
+    }
+
+    Map<String, LiveInstance> liveInstancesMap = cache.getLiveInstances();
+    for (String instance : idealStateMap.keySet()) {
+      boolean notInErrorState = currentStateMap == null
+          || currentStateMap.get(instance) == null
+          || !currentStateMap.get(instance).equals(HelixDefinedState.ERROR.toString());
+
+      if (liveInstancesMap.containsKey(instance) && notInErrorState
+          && !disabledInstancesForPartition.contains(instance)) {
+        instanceStateMap.put(instance, idealStateMap.get(instance));
+      }
+    }
+
+    return instanceStateMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/98610705/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
new file mode 100644
index 0000000..cb8a948
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
@@ -0,0 +1,85 @@
+package org.apache.helix.controller.rebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.controller.stages.ResourceMapping;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+/**
+ * This is a Rebalancer specific to semi-automatic mode. It is tasked with computing the
ideal
+ * state of a resource based on a predefined preference list of instances willing to accept
+ * replicas.
+ *
+ * The input is the optional current assignment of partitions to instances, as well as the
required
+ * existing instance preferences.
+ *
+ * The output is a mapping based on that preference list, i.e. partition p has a replica
on node k
+ * with state s.
+ */
+public class SemiAutoRebalancer implements Rebalancer {
+
+  private static final Logger LOG = Logger.getLogger(SemiAutoRebalancer.class);
+  @Override
+  public void init(HelixManager manager) {
+  }
+
+  @Override
+  public IdealState computeNewIdealState(String resourceName,
+      IdealState currentIdealState, CurrentStateOutput currentStateOutput,
+      ClusterDataCache clusterData) {
+    return currentIdealState;
+  }
+
+  @Override
+  public ResourceMapping computeBestPossiblePartitionState(ClusterDataCache cache,
+      IdealState idealState, Resource resource, CurrentStateOutput currentStateOutput) {
+    String stateModelDefName = idealState.getStateModelDefRef();
+    StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing resource:" + resource.getResourceName());
+    }
+    ResourceMapping partitionMapping = new ResourceMapping();
+    for (Partition partition : resource.getPartitions()) {
+      Map<String, String> currentStateMap =
+          currentStateOutput.getCurrentStateMap(resource.getResourceName(), partition);
+      Set<String> disabledInstancesForPartition =
+          cache.getDisabledInstancesForPartition(partition.toString());
+      List<String> preferenceList = ConstraintBasedAssignment.getPreferenceList(cache,
partition,
+          idealState, stateModelDef);
+      Map<String, String> bestStateForPartition =
+          ConstraintBasedAssignment.computeAutoBestStateForPartition(cache, stateModelDef,
+              preferenceList, currentStateMap, disabledInstancesForPartition);
+      partitionMapping.addReplicaMap(partition, bestStateForPartition);
+    }
+    return partitionMapping;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/98610705/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
new file mode 100644
index 0000000..dcf1f04
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
@@ -0,0 +1,148 @@
+package org.apache.helix.controller.rebalancer.util;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.HelixConstants.StateModelToken;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+/**
+ * Collection of functions that will compute the best possible states given the live instances
and
+ * an ideal state.
+ */
+public class ConstraintBasedAssignment {
+  private static Logger logger = Logger.getLogger(ConstraintBasedAssignment.class);
+
+  public static List<String> getPreferenceList(ClusterDataCache cache,
+      Partition resource,
+      IdealState idealState,
+      StateModelDefinition stateModelDef) {
+    List<String> listField = idealState.getPreferenceList(resource.getPartitionName());
+
+    if (listField != null && listField.size() == 1
+        && StateModelToken.ANY_LIVEINSTANCE.toString().equals(listField.get(0)))
{
+      Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
+      List<String> prefList = new ArrayList<String>(liveInstances.keySet());
+      Collections.sort(prefList);
+      return prefList;
+    }
+    else
+    {
+      return listField;
+    }
+  }
+
+  /**
+   * compute best state for resource in AUTO ideal state mode
+   *
+   * @param cache
+   * @param stateModelDef
+   * @param instancePreferenceList
+   * @param currentStateMap
+   *          : instance->state for each partition
+   * @param disabledInstancesForPartition
+   * @return
+   */
+  public static Map<String, String> computeAutoBestStateForPartition(ClusterDataCache
cache,
+      StateModelDefinition stateModelDef, List<String> instancePreferenceList,
+      Map<String, String> currentStateMap, Set<String> disabledInstancesForPartition)
+  {
+    Map<String, String> instanceStateMap = new HashMap<String, String>();
+
+    // if the ideal state is deleted, instancePreferenceList will be empty and
+    // we should drop all resources.
+    if (currentStateMap != null)
+    {
+      for (String instance : currentStateMap.keySet())
+      {
+        if ((instancePreferenceList == null || !instancePreferenceList.contains(instance))
+            && !disabledInstancesForPartition.contains(instance))
+        {
+          // if dropped and not disabled, transit to DROPPED
+          instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString());
+        }
+        else if ( (currentStateMap.get(instance) == null
+            || !currentStateMap.get(instance).equals(HelixDefinedState.ERROR.toString()))
+            && disabledInstancesForPartition.contains(instance))
+        {
+          // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
+          instanceStateMap.put(instance, stateModelDef.getInitialState());
+        }
+      }
+    }
+
+    // ideal state is deleted
+    if (instancePreferenceList == null)
+    {
+      return instanceStateMap;
+    }
+
+    List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
+    boolean assigned[] = new boolean[instancePreferenceList.size()];
+
+    Map<String, LiveInstance> liveInstancesMap = cache.getLiveInstances();
+
+    for (String state : statesPriorityList)
+    {
+      String num = stateModelDef.getNumInstancesPerState(state);
+      int stateCount = -1;
+      if ("N".equals(num))
+      {
+        Set<String> liveAndEnabled = new HashSet<String>(liveInstancesMap.keySet());
+        liveAndEnabled.removeAll(disabledInstancesForPartition);
+        stateCount = liveAndEnabled.size();
+      }
+      else if ("R".equals(num))
+      {
+        stateCount = instancePreferenceList.size();
+      }
+      else
+      {
+        try
+        {
+          stateCount = Integer.parseInt(num);
+        }
+        catch (Exception e)
+        {
+          logger.error("Invalid count for state:" + state + " ,count=" + num);
+        }
+      }
+      if (stateCount > -1)
+      {
+        int count = 0;
+        for (int i = 0; i < instancePreferenceList.size(); i++)
+        {
+          String instanceName = instancePreferenceList.get(i);
+
+          boolean notInErrorState = currentStateMap == null
+              || currentStateMap.get(instanceName) == null
+              || !currentStateMap.get(instanceName).equals(HelixDefinedState.ERROR.toString());
+
+          if (liveInstancesMap.containsKey(instanceName) && !assigned[i]
+              && notInErrorState && !disabledInstancesForPartition.contains(instanceName))
+          {
+            instanceStateMap.put(instanceName, state);
+            count = count + 1;
+            assigned[i] = true;
+            if (count == stateCount)
+            {
+              break;
+            }
+          }
+        }
+      }
+    }
+    return instanceStateMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/98610705/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceMapping.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceMapping.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceMapping.java
new file mode 100644
index 0000000..dc96507
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceMapping.java
@@ -0,0 +1,58 @@
+package org.apache.helix.controller.stages;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.model.Partition;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Represents the assignments of replicas for an entire resource, keyed on partitions of
the
+ * resource. Each partition has its replicas assigned to a node, and each replica is in a
state.
+ */
+public class ResourceMapping {
+
+  private final Map<Partition, Map<String, String>> _resourceMap;
+
+  public ResourceMapping() {
+    this(new HashMap<Partition, Map<String, String>>());
+  }
+
+  public ResourceMapping(Map<Partition, Map<String, String>> resourceMap) {
+    _resourceMap = resourceMap;
+  }
+
+  public Map<Partition, Map<String, String>> getResourceMap() {
+    return _resourceMap;
+  }
+
+  public Map<String, String> getInstanceStateMap(Partition partition) {
+    if (_resourceMap.containsKey(partition)) {
+      return _resourceMap.get(partition);
+    }
+    return Collections.emptyMap();
+  }
+
+  public void addReplicaMap(Partition partition, Map<String, String> replicaMap) {
+    _resourceMap.put(partition, replicaMap);
+  }
+}


Mime
View raw message