helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject git commit: HELIX-141: Autorebalance does not work reliably and fails when replica>1
Date Fri, 26 Jul 2013 21:15:11 GMT
Updated Branches:
  refs/heads/master ba1628e76 -> 6c4ba17b5


HELIX-141: Autorebalance does not work reliably and fails when replica>1


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/6c4ba17b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/6c4ba17b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/6c4ba17b

Branch: refs/heads/master
Commit: 6c4ba17b5b222ba31f3ec263f5f6c96184aa4ad8
Parents: ba1628e
Author: zzhang <zzhang5@uci.edu>
Authored: Fri Jul 26 14:15:04 2013 -0700
Committer: zzhang <zzhang5@uci.edu>
Committed: Fri Jul 26 14:15:04 2013 -0700

----------------------------------------------------------------------
 .../strategy/AutoRebalanceStrategy.java         | 608 +++++++++++++++++++
 .../strategy/TestAutoRebalanceStrategy.java     | 490 +++++++++++++++
 2 files changed, 1098 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6c4ba17b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
new file mode 100644
index 0000000..740d17d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
@@ -0,0 +1,608 @@
+package org.apache.helix.controller.strategy;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.Map.Entry;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.rebalancer.Rebalancer;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.log4j.Logger;
+
+public class AutoRebalanceStrategy implements Rebalancer {
+  @SuppressWarnings("unused")
+  // These should be final, but are initialized in init rather than a constructor
+  private HelixManager _manager;
+  private AutoRebalanceModeAlgorithm _algorithm;
+
+  private static Logger LOG = Logger.getLogger(AutoRebalanceStrategy.class);
+
+  @Override
+  public void init(HelixManager manager) {
+    this._manager = manager;
+    this._algorithm = null;
+  }
+
+  @Override
+  public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState,
+      CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
+    List<String> partitions = new ArrayList<String>(currentIdealState.getPartitionSet());
+    String stateModelName = currentIdealState.getStateModelDefRef();
+    StateModelDefinition stateModelDef = clusterData.getStateModelDef(stateModelName);
+    Map<String, LiveInstance> liveInstance = clusterData.getLiveInstances();
+    String replicas = currentIdealState.getReplicas();
+
+    LinkedHashMap<String, Integer> stateCountMap = new LinkedHashMap<String, Integer>();
+    stateCountMap = stateCount(stateModelDef, liveInstance.size(), Integer.parseInt(replicas));
+    List<String> liveNodes = new ArrayList<String>(liveInstance.keySet());
+    Map<String, Map<String, String>> currentMapping = currentMapping(currentStateOutput,
+        resourceName, partitions, stateCountMap);
+
+    List<String> allNodes = new ArrayList<String>(clusterData.getInstanceConfigMap().keySet());
+    int maxPartition = currentIdealState.getMaxPartitionsPerInstance();
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("currentMapping: " + currentMapping);
+      LOG.info("stateCountMap: " + stateCountMap);
+      LOG.info("liveNodes: " + liveNodes);
+      LOG.info("allNodes: " + allNodes);
+      LOG.info("maxPartition: " + maxPartition);
+    }
+
+    _algorithm = new AutoRebalanceModeAlgorithm(resourceName, partitions, stateCountMap,
+        maxPartition);
+    ZNRecord newMapping = _algorithm.computePartitionAssignment(liveNodes,currentMapping,
+        allNodes);
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("newMapping: " + newMapping);
+    }
+
+    IdealState newIdealState = new IdealState(resourceName);
+    newIdealState.getRecord().setSimpleFields(currentIdealState.getRecord().getSimpleFields());
+    newIdealState.setIdealStateMode(IdealStateModeProperty.AUTO.toString());
+    newIdealState.getRecord().setListFields(newMapping.getListFields());
+    return newIdealState;
+  }
+
+  public static class AutoRebalanceModeAlgorithm {
+
+    private static Logger logger = Logger.getLogger(AutoRebalanceModeAlgorithm.class);
+
+    private final String _resourceName;
+    private final List<String> _partitions;
+    private final LinkedHashMap<String, Integer> _states;
+    private final int _maximumPerNode;
+
+    private Map<String, Node> _nodeMap;
+    private List<Node> _liveNodesList;
+    private Map<Replica, Node> _preferredAssignment;
+    private Map<Replica, Node> _existingNonPreferredAssignment;
+    private Set<Replica> _orphaned;
+
+    public AutoRebalanceModeAlgorithm(String resourceName, final List<String> partitions,
+        final LinkedHashMap<String, Integer> states, int maximumPerNode) {
+      _resourceName = resourceName;
+      _partitions = partitions;
+      _states = states;
+      _maximumPerNode = maximumPerNode;
+    }
+
+    public AutoRebalanceModeAlgorithm(String resourceName, final List<String> partitions,
+        final LinkedHashMap<String, Integer> states) {
+      this(resourceName, partitions, states, Integer.MAX_VALUE);
+    }
+
+    public ZNRecord computePartitionAssignment(final List<String> liveNodes,
+        final Map<String, Map<String, String>> currentMapping, final List<String>
allNodes) {
+      int numReplicas = countStateReplicas();
+      ZNRecord znRecord = new ZNRecord(_resourceName);
+      if (liveNodes.size() == 0) {
+        return znRecord;
+      }
+      int distRemainder = (numReplicas * _partitions.size()) % liveNodes.size();
+      int distFloor = (numReplicas * _partitions.size()) / liveNodes.size();
+      _nodeMap = new HashMap<String, Node>();
+      _liveNodesList = new ArrayList<Node>();
+
+      for (String id : allNodes) {
+        Node node = new Node(id);
+        node.capacity = 0;
+        _nodeMap.put(id, node);
+      }
+      for (int i = 0; i < liveNodes.size(); i++) {
+        int targetSize = (_maximumPerNode > 0) ? Math.min(distFloor, _maximumPerNode)
: distFloor;
+        if (distRemainder > 0 && targetSize < _maximumPerNode) {
+          targetSize += 1;
+          distRemainder = distRemainder - 1;
+        }
+        Node node = _nodeMap.get(liveNodes.get(i));
+        node.isAlive = true;
+        node.capacity = targetSize;
+        _liveNodesList.add(node);
+      }
+      // compute the preferred mapping if all nodes were up
+      _preferredAssignment = computePreferredPlacement(allNodes);
+      // logger.info("preferred mapping:"+ preferredAssignment);
+      // from current mapping derive the ones in preferred location
+      // this will update the nodes with their current fill status
+      computeExistingPreferredPlacement(currentMapping);
+
+      // compute orphaned replica that are not assigned to any node
+      _orphaned = computeOrphaned(currentMapping);
+      if (logger.isInfoEnabled()) {
+        logger.info("orphan = \n" + _orphaned);
+      }
+
+      // from current mapping derive the ones not in preferred location
+      _existingNonPreferredAssignment = computeExistingNonPreferredPlacement(currentMapping);
+
+      moveNonPreferredReplicasToPreferred();
+
+      assignOrphans();
+
+      moveExcessReplicas();
+
+      prepareResult(znRecord);
+      return znRecord;
+    }
+
+    /**
+     * Move replicas assigned to non-preferred nodes if their current node is at capacity
+     * and its preferred node is under capacity.
+     */
+    private void moveNonPreferredReplicasToPreferred() {
+      // iterate through non preferred and see if we can move them to
+      // preferredlocation if the donor has more than it should and stealer has
+      // enough capacity
+      Iterator<Entry<Replica, Node>> iterator = _existingNonPreferredAssignment.entrySet()
+          .iterator();
+      while (iterator.hasNext()) {
+        Entry<Replica, Node> entry = iterator.next();
+        Replica replica = entry.getKey();
+        Node donor = entry.getValue();
+        Node receiver = _preferredAssignment.get(replica);
+        if (donor.capacity < donor.currentlyAssigned
+            && receiver.capacity > receiver.currentlyAssigned && receiver.canAdd(replica))
{
+          donor.currentlyAssigned = donor.currentlyAssigned - 1;
+          receiver.currentlyAssigned = receiver.currentlyAssigned + 1;
+          donor.nonPreferred.remove(replica);
+          receiver.preferred.add(replica);
+          iterator.remove();
+        }
+      }
+    }
+
+    /**
+     * Slot in orphaned partitions randomly so as to maintain even load on live nodes.
+     */
+    private void assignOrphans() {
+      // now iterate over nodes and remaining orphaned partitions and assign
+      // partitions randomly
+      // Better to iterate over orphaned partitions first
+      Iterator<Replica> it = _orphaned.iterator();
+      while (it.hasNext()) {
+        Replica replica = it.next();
+        int startIndex = (replica.hashCode() & 0x7FFFFFFF) % _liveNodesList.size();
+        for (int index = startIndex; index < startIndex + _liveNodesList.size(); index++)
{
+          Node receiver = _liveNodesList.get(index % _liveNodesList.size());
+          if (receiver.capacity > receiver.currentlyAssigned && receiver.canAdd(replica))
{
+            receiver.currentlyAssigned = receiver.currentlyAssigned + 1;
+            receiver.nonPreferred.add(replica);
+            it.remove();
+            break;
+          }
+        }
+      }
+      if (_orphaned.size() > 0 && logger.isInfoEnabled()) {
+        logger.info("could not assign nodes to partitions: " + _orphaned);
+      }
+    }
+
+    /**
+     * Move replicas from too-full nodes to nodes that can accept the replicas
+     */
+    private void moveExcessReplicas() {
+      // iterate over nodes and move extra load
+      Iterator<Replica> it;
+      for (Node donor : _liveNodesList) {
+        if (donor.capacity < donor.currentlyAssigned) {
+          Collections.sort(donor.nonPreferred);
+          it = donor.nonPreferred.iterator();
+          while (it.hasNext()) {
+            Replica replica = it.next();
+            int startIndex = (replica.hashCode() & 0x7FFFFFFF) % _liveNodesList.size();
+
+            for (int index = startIndex; index < startIndex + _liveNodesList.size(); index++)
{
+              Node receiver = _liveNodesList.get(index % _liveNodesList.size());
+              if (receiver.canAdd(replica)) {
+                receiver.currentlyAssigned = receiver.currentlyAssigned + 1;
+                receiver.nonPreferred.add(replica);
+
+                donor.currentlyAssigned = donor.currentlyAssigned - 1;
+                it.remove();
+                break;
+              }
+            }
+            if (donor.capacity >= donor.currentlyAssigned) {
+              break;
+            }
+          }
+          if (donor.capacity < donor.currentlyAssigned) {
+            logger.warn("Could not take partitions out of node:" + donor.id);
+          }
+        }
+      }
+    }
+
+    /**
+     * Update a ZNRecord with the results of the rebalancing.
+     * @param znRecord
+     */
+    private void prepareResult(ZNRecord znRecord)
+    {
+      // The map fields are keyed on partition name to a pair of node and state, i.e. it
+      // indicates that the partition with given state is served by that node
+      //
+      // The list fields are also keyed on partition and list all the nodes serving that
partition.
+      // This is useful to verify that there is no node serving multiple replicas of the
same
+      // partition.
+      for (String partition : _partitions) {
+        znRecord.setMapField(partition, new TreeMap<String, String>());
+        znRecord.setListField(partition, new ArrayList<String>());
+      }
+      for (Node node : _liveNodesList) {
+        for (Replica replica : node.preferred) {
+          znRecord.getMapField(replica.partition).put(node.id, replica.state);
+        }
+        for (Replica replica : node.nonPreferred) {
+          znRecord.getMapField(replica.partition).put(node.id, replica.state);
+        }
+      }
+
+      for (String state : _states.keySet()) {
+        int count = _states.get(state);
+        for (int replicaId = 0; replicaId < count; replicaId++) {
+          for (Node node : _liveNodesList) {
+            for (Replica replica : node.preferred) {
+              if (replica.state.equals(state) && replicaId == replica.replicaId)
{
+                znRecord.getListField(replica.partition).add(node.id);
+              }
+            }
+            for (Replica replica : node.nonPreferred) {
+              if (replica.state.equals(state) && replicaId == replica.replicaId)
{
+                znRecord.getListField(replica.partition).add(node.id);
+              }
+            }
+          }
+        }
+      }
+    }
+
+    /**
+     * Compute the subset of the current mapping where replicas are not mapped according
to their
+     * preferred assignment.
+     * @param currentMapping Current mapping of replicas to nodes
+     * @return The current assignments that do not conform to the preferred assignment
+     */
+    private Map<Replica, Node> computeExistingNonPreferredPlacement(
+        Map<String, Map<String, String>> currentMapping) {
+      Map<Replica, Node> existingNonPreferredAssignment = new TreeMap<Replica, Node>();
+      for (String partition : currentMapping.keySet()) {
+        Map<String, String> nodeStateMap = currentMapping.get(partition);
+        for (String nodeId : nodeStateMap.keySet()) {
+          Node node = _nodeMap.get(nodeId);
+          String state = nodeStateMap.get(nodeId);
+          Integer count = _states.get(state);
+          boolean skip = false;
+          for(Replica replica: node.preferred){
+            if(replica.partition.equals(partition)){
+              skip = true;
+              break;
+            }
+          }
+          if (skip) {
+            continue;
+          }
+          // check if its in one of the preferred position
+          for (int i = 0; i < count; i++) {
+            Replica replica = new Replica(partition, state, i);
+            if (_preferredAssignment.get(replica).id != node.id) {
+              existingNonPreferredAssignment.put(replica, node);
+              node.nonPreferred.add(replica);
+              break;
+            }
+          }
+        }
+      }
+      return existingNonPreferredAssignment;
+    }
+
+    /**
+     * Get a set of replicas not currently assigned to any node
+     * @param currentMapping Current assignment of replicas to nodes
+     * @return Unassigned replicas
+     */
+    private Set<Replica> computeOrphaned(final Map<String, Map<String, String>>
currentMapping) {
+      Set<Replica> orphanedPartitions = new TreeSet<Replica>(_preferredAssignment.keySet());
+      for (String partition : currentMapping.keySet()) {
+        Map<String, String> nodeStateMap = currentMapping.get(partition);
+        for (String node : nodeStateMap.keySet()) {
+          String state = nodeStateMap.get(node);
+          Integer count = _states.get(state);
+          // remove from orphaned if possible
+          for (int i = 0; i < count; i++) {
+            Replica replica = new Replica(partition, state, i);
+            if (orphanedPartitions.contains(replica)) {
+              orphanedPartitions.remove(replica);
+              break;
+            }
+          }
+        }
+      }
+
+      return orphanedPartitions;
+    }
+
+    /**
+     * Determine the replicas already assigned to their preferred nodes
+     * @param currentMapping Current assignment of replicas to nodes
+     * @return Assignments that conform to the preferred placement
+     */
+    private Map<Replica, Node> computeExistingPreferredPlacement(
+        final Map<String, Map<String, String>> currentMapping) {
+      Map<Replica, Node> existingPreferredAssignment = new TreeMap<Replica, Node>();
+      for (String partition : currentMapping.keySet()) {
+        Map<String, String> nodeStateMap = currentMapping.get(partition);
+        for (String nodeId : nodeStateMap.keySet()) {
+          Node node = _nodeMap.get(nodeId);
+          node.currentlyAssigned = node.currentlyAssigned + 1;
+          String state = nodeStateMap.get(nodeId);
+          Integer count = _states.get(state);
+          // check if its in one of the preferred position
+          for (int i = 0; i < count; i++) {
+            Replica replica = new Replica(partition, state, i);
+            if (_preferredAssignment.containsKey(replica)
+                && !existingPreferredAssignment.containsKey(replica)
+                && _preferredAssignment.get(replica).id == node.id) {
+              existingPreferredAssignment.put(replica, node);
+              node.preferred.add(replica);
+              break;
+            }
+          }
+        }
+      }
+
+      return existingPreferredAssignment;
+    }
+
+    /**
+     * Given a predefined set of all possible nodes, compute an assignment of replicas to
+     * nodes that evenly assigns all replicas to nodes.
+     * @param allNodes Identifiers to all nodes, live and non-live
+     * @return Preferred assignment of replicas
+     */
+    private Map<Replica, Node> computePreferredPlacement(final List<String> allNodes)
{
+      Map<Replica, Node> preferredMapping;
+      preferredMapping = new HashMap<Replica, Node>();
+      int partitionId = 0;
+
+      for (String partition : _partitions) {
+        int replicaId = 0;
+        for (String state : _states.keySet()) {
+          for (int i = 0; i < _states.get(state); i++) {
+            Replica replica = new Replica(partition, state, i);
+            int index = (partitionId + replicaId) % allNodes.size();
+            preferredMapping.put(replica, _nodeMap.get(allNodes.get(index)));
+            replicaId = replicaId + 1;
+          }
+        }
+        partitionId = partitionId + 1;
+      }
+      return preferredMapping;
+    }
+
+    /**
+     * Counts the total number of replicas given a state-count mapping
+     *
+     * @param states
+     * @return
+     */
+    private int countStateReplicas() {
+      int total = 0;
+      for (Integer count : _states.values()) {
+        total += count;
+      }
+      return total;
+    }
+
+    /**
+     * A Node is an entity that can serve replicas. It has a capacity and knowledge
+     * of replicas assigned to it, so it can decide if it can receive additional replicas.
+     */
+    class Node {
+
+      public int currentlyAssigned;
+      public int capacity;
+      private String id;
+      boolean isAlive;
+      private List<Replica> preferred;
+      private List<Replica> nonPreferred;
+
+      public Node(String id) {
+        preferred = new ArrayList<Replica>();
+        nonPreferred = new ArrayList<Replica>();
+        currentlyAssigned = 0;
+        isAlive = false;
+        this.id = id;
+      }
+
+      /**
+       * Check if this replica can be legally added to this node
+       * @param replica The replica to test
+       * @return true if the assignment can be made, false otherwise
+       */
+      public boolean canAdd(Replica replica) {
+        if (!isAlive) {
+          return false;
+        }
+        if (currentlyAssigned >= capacity) {
+          return false;
+        }
+        for (Replica r : preferred) {
+          if (r.partition.equals(replica.partition)) {
+            return false;
+          }
+        }
+        for (Replica r : nonPreferred) {
+          if (r.partition.equals(replica.partition)) {
+            return false;
+          }
+        }
+        return true;
+      }
+
+      @Override
+      public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("##########\nname=").append(id).append("\npreferred:").append(preferred.size())
+            .append("\nnonpreferred:").append(nonPreferred.size());
+        return sb.toString();
+      }
+    }
+
+    /**
+     * A Replica is a combination of a partition of the resource, the state the replica is
in
+     * and an identifier signifying a specific replica of a given partition and state.
+     */
+    class Replica implements Comparable<Replica> {
+
+      private String partition;
+      private String state;
+      private int replicaId;
+      private String format;
+
+      public Replica(String partition, String state, int replicaId) {
+        this.partition = partition;
+        this.state = state;
+        this.replicaId = replicaId;
+        this.format = partition + "|" + state + "|" + replicaId;
+      }
+
+      @Override
+      public String toString() {
+        return format;
+      }
+
+      @Override
+      public boolean equals(Object that) {
+        if (that instanceof Replica) {
+          return this.format.equals(((Replica) that).format);
+        }
+        return false;
+      }
+
+      @Override
+      public int hashCode() {
+        return this.format.hashCode();
+      }
+
+      @Override
+      public int compareTo(Replica that) {
+        if (that instanceof Replica) {
+          return this.format.compareTo(((Replica) that).format);
+        }
+        return -1;
+      }
+    }
+  }
+
+  /**
+   *
+   * @return state count map: state->count
+   */
+  LinkedHashMap<String, Integer> stateCount(StateModelDefinition stateModelDef, int
liveNodesNb,
+      int totalReplicas) {
+    LinkedHashMap<String, Integer> stateCountMap = new LinkedHashMap<String, Integer>();
+    List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
+
+    int replicas = totalReplicas;
+    for (String state : statesPriorityList) {
+      String num = stateModelDef.getNumInstancesPerState(state);
+      if ("N".equals(num)) {
+        stateCountMap.put(state, liveNodesNb);
+      } else if ("R".equals(num)) {
+        // wait until we get the counts for all other states
+        continue;
+      } else {
+        int stateCount = -1;
+        try {
+          stateCount = Integer.parseInt(num);
+        } catch (Exception e) {
+          // LOG.error("Invalid count for state: " + state + ", count: " + num +
+          // ", use -1 instead");
+        }
+
+        if (stateCount > 0) {
+          stateCountMap.put(state, stateCount);
+          replicas -= stateCount;
+        }
+      }
+    }
+
+    // get state count for R
+    for (String state : statesPriorityList) {
+      String num = stateModelDef.getNumInstancesPerState(state);
+      if ("R".equals(num)) {
+        stateCountMap.put(state, replicas);
+        // should have at most one state using R
+        break;
+      }
+    }
+    return stateCountMap;
+  }
+
+  Map<String, Map<String, String>> currentMapping(CurrentStateOutput currentStateOutput,
+      String resourceName, List<String> partitions, Map<String, Integer> stateCountMap)
{
+
+    Map<String, Map<String, String>> map = new HashMap<String, Map<String,
String>>();
+
+    for (String partition : partitions) {
+      Map<String, String> curStateMap = currentStateOutput.getCurrentStateMap(resourceName,
+          new Partition(partition));
+      map.put(partition, new HashMap<String, String>());
+      for (String node : curStateMap.keySet()) {
+        String state = curStateMap.get(node);
+        if (stateCountMap.containsKey(state)) {
+          map.get(partition).put(node, state);
+        }
+      }
+
+      Map<String, String> pendingStateMap = currentStateOutput.getPendingStateMap(resourceName,
+          new Partition(partition));
+      for (String node : pendingStateMap.keySet()) {
+        String state = pendingStateMap.get(node);
+        if (stateCountMap.containsKey(state)) {
+          map.get(partition).put(node, state);
+        }
+      }
+    }
+    return map;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6c4ba17b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
new file mode 100644
index 0000000..d2b44dc
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
@@ -0,0 +1,490 @@
+package org.apache.helix.controller.strategy;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.AutoRebalanceModeAlgorithm;
+import org.apache.log4j.Logger;
+import org.testng.annotations.Test;
+
+public class TestAutoRebalanceStrategy {
+  private static Logger logger = Logger.getLogger(AutoRebalanceModeAlgorithm.class);
+
+  @Test(groups = { "unitTest" })
+  public void simpleTest() {
+    final int NUM_ITERATIONS = 10;
+
+    final int NUM_PARTITIONS = 10;
+    final int NUM_LIVE_NODES = 12;
+    final int NUM_TOTAL_NODES = 20;
+    final int MAX_PER_NODE = 5;
+
+    final String[] STATE_NAMES = new String[] { "MASTER", "SLAVE" };
+    final int[] STATE_COUNTS = new int[] { 1, 2 };
+
+    List<String> partitions = new ArrayList<String>();
+    for (int i = 0; i < NUM_PARTITIONS; i++) {
+      partitions.add("p_" + i);
+    }
+
+    LinkedHashMap<String, Integer> states = new LinkedHashMap<String, Integer>();
+    int numStates = Math.min(STATE_NAMES.length, STATE_COUNTS.length);
+    for (int i = 0; i < numStates; i++) {
+      states.put(STATE_NAMES[i], STATE_COUNTS[i]);
+    }
+
+    List<String> liveNodes = new ArrayList<String>();
+    List<String> allNodes = new ArrayList<String>();
+    for (int i = 0; i < NUM_TOTAL_NODES; i++) {
+      allNodes.add("n_" + i);
+      if (i < NUM_LIVE_NODES) {
+        liveNodes.add("n_" + i);
+      }
+    }
+
+    Map<String, Map<String, String>> currentMapping = new TreeMap<String,
Map<String, String>>();
+
+    new AutoRebalanceTester(partitions, states, liveNodes, currentMapping,
+        allNodes, MAX_PER_NODE).runRepeatedly(NUM_ITERATIONS);
+  }
+
+  class AutoRebalanceTester {
+    private static final double P_KILL = 0.45;
+    private static final double P_ADD = 0.1;
+    private static final double P_RESURRECT = 0.45;
+    private static final String RESOURCE_NAME = "resource";
+
+    private List<String> _partitions;
+    private LinkedHashMap<String, Integer> _states;
+    private List<String> _liveNodes;
+    private Set<String> _liveSet;
+    private Set<String> _removedSet;
+    private Set<String> _nonLiveSet;
+    private Map<String, Map<String, String>> _currentMapping;
+    private List<String> _allNodes;
+    private int _maxPerNode;
+    private Random _random;
+
+    public AutoRebalanceTester(List<String> partitions,
+        LinkedHashMap<String, Integer> states, List<String> liveNodes,
+        Map<String, Map<String, String>> currentMapping, List<String> allNodes,
+        int maxPerNode) {
+      _partitions = partitions;
+      _states = states;
+      _liveNodes = liveNodes;
+      _liveSet = new TreeSet<String>();
+      for (String node : _liveNodes) {
+        _liveSet.add(node);
+      }
+      _removedSet = new TreeSet<String>();
+      _nonLiveSet = new TreeSet<String>();
+      _currentMapping = currentMapping;
+      _allNodes = allNodes;
+      for (String node : allNodes) {
+        if (!_liveSet.contains(node)) {
+          _nonLiveSet.add(node);
+        }
+      }
+      _maxPerNode = maxPerNode;
+      _random = new Random();
+    }
+
+    /**
+     * Repeatedly randomly select a task to run and report the result
+     *
+     * @param numIterations
+     *          Number of random tasks to run in sequence
+     */
+    public void runRepeatedly(int numIterations) {
+      logger.info("~~~~ Initial State ~~~~~");
+      ZNRecord initialResult = new AutoRebalanceStrategy.AutoRebalanceModeAlgorithm(
+          RESOURCE_NAME, _partitions, _states, _maxPerNode)
+          .computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
+      _currentMapping = initialResult.getMapFields();
+      logger.info(_currentMapping);
+      getRunResult(_currentMapping, initialResult.getListFields());
+      for (int i = 0; i < numIterations; i++) {
+        logger.info("~~~~ Iteration " + i + " ~~~~~");
+        ZNRecord znRecord = runOnceRandomly();
+        if (znRecord != null) {
+          final Map<String, Map<String, String>> mapResult = znRecord
+              .getMapFields();
+          final Map<String, List<String>> listResult = znRecord.getListFields();
+          logger.info(mapResult);
+          getRunResult(mapResult, listResult);
+          _currentMapping = mapResult;
+        }
+      }
+    }
+
+    /**
+     * Output various statistics and correctness check results
+     *
+     * @param mapFields
+     *          The map-map assignment generated by the rebalancer
+     * @param listFields
+     *          The map-list assignment generated by the rebalancer
+     */
+    public void getRunResult(final Map<String, Map<String, String>> mapFields,
+        final Map<String, List<String>> listFields) {
+      logger.info("***** Statistics *****");
+      dumpStatistics(mapFields);
+      verifyCorrectness(mapFields, listFields);
+    }
+
+    /**
+     * Output statistics about the assignment
+     * 
+     * @param mapFields
+     *          The map-map assignment generated by the rebalancer
+     */
+    public void dumpStatistics(final Map<String, Map<String, String>> mapFields)
{
+      Map<String, Integer> partitionsPerNode = getPartitionBucketsForNode(mapFields);
+      int nodeCount = _liveNodes.size();
+      logger.info("Total number of nodes: " + nodeCount);
+      logger.info("Nodes: " + _liveNodes);
+      int sumPartitions = getSum(partitionsPerNode.values());
+      logger.info("Total number of partitions: " + sumPartitions);
+      double averagePartitions = getAverage(partitionsPerNode.values());
+      logger.info("Average number of partitions per node: "
+          + averagePartitions);
+      double stdevPartitions = getStdev(partitionsPerNode.values(),
+          averagePartitions);
+      logger.info("Standard deviation of partitions: " + stdevPartitions);
+
+      // Statistics about each state
+      Map<String, Map<String, Integer>> statesPerNode = getStateBucketsForNode(mapFields);
+      for (String state : _states.keySet()) {
+        Map<String, Integer> nodeStateCounts = new TreeMap<String, Integer>();
+        for (Entry<String, Map<String, Integer>> nodeStates : statesPerNode
+            .entrySet()) {
+          Map<String, Integer> stateCounts = nodeStates.getValue();
+          if (stateCounts.containsKey(state)) {
+            nodeStateCounts.put(nodeStates.getKey(), stateCounts.get(state));
+          } else {
+            nodeStateCounts.put(nodeStates.getKey(), 0);
+          }
+        }
+        int sumStates = getSum(nodeStateCounts.values());
+        logger.info("Total number of state " + state + ": " + sumStates);
+        double averageStates = getAverage(nodeStateCounts.values());
+        logger.info("Average number of state " + state + " per node: "
+            + averageStates);
+        double stdevStates = getStdev(nodeStateCounts.values(), averageStates);
+        logger.info("Standard deviation of state " + state
+            + " per node: " + stdevStates);
+      }
+    }
+
+    /**
+     * Run a set of correctness tests, reporting success or failure
+     *
+     * @param mapFields
+     *          The map-map assignment generated by the rebalancer
+     * @param listFields
+     *          The map-list assignment generated by the rebalancer
+     */
+    public void verifyCorrectness(
+        final Map<String, Map<String, String>> mapFields,
+        final Map<String, List<String>> listFields) {
+      final Map<String, Integer> partitionsPerNode = getPartitionBucketsForNode(mapFields);
+      boolean maxConstraintMet = maxNotExceeded(partitionsPerNode);
+      assert maxConstraintMet : "Max per node constraint: FAIL";
+      logger.info("Max per node constraint: PASS");
+
+      boolean liveConstraintMet = onlyLiveAssigned(partitionsPerNode);
+      assert liveConstraintMet : "Only live nodes have partitions constraint: FAIL";
+      logger.info("Only live nodes have partitions constraint: PASS");
+
+      boolean stateAssignmentPossible = correctStateAssignmentCount(mapFields);
+      assert stateAssignmentPossible : "State replica constraint: FAIL";
+      logger.info("State replica constraint: PASS");
+
+      boolean nodesUniqueForPartitions = atMostOnePartitionReplicaPerNode(listFields);
+      assert nodesUniqueForPartitions : "Node uniqueness per partition constraint: FAIL";
+      logger.info("Node uniqueness per partition constraint: PASS");
+    }
+
+    private boolean maxNotExceeded(final Map<String, Integer> partitionsPerNode) {
+      for (String node : partitionsPerNode.keySet()) {
+        Integer value = partitionsPerNode.get(node);
+        if (value > _maxPerNode) {
+          logger.error("ERROR: Node " + node + " has " + value
+              + " partitions despite a maximum of " + _maxPerNode);
+          return false;
+        }
+      }
+      return true;
+    }
+
+    private boolean onlyLiveAssigned(
+        final Map<String, Integer> partitionsPerNode) {
+      for (final Entry<String, Integer> nodeState : partitionsPerNode
+          .entrySet()) {
+        boolean isLive = _liveSet.contains(nodeState.getKey());
+        boolean isEmpty = nodeState.getValue() == 0;
+        if (!isLive && !isEmpty) {
+          logger.error("ERROR: Node " + nodeState.getKey()
+              + " is not live, but has " + nodeState.getValue() + " replicas!");
+          return false;
+        }
+      }
+      return true;
+    }
+
+    private boolean correctStateAssignmentCount(
+        final Map<String, Map<String, String>> assignment) {
+      for (final Entry<String, Map<String, String>> partitionEntry : assignment
+          .entrySet()) {
+        final Map<String, String> nodeMap = partitionEntry.getValue();
+        final Map<String, Integer> stateCounts = new TreeMap<String, Integer>();
+        for (String state : nodeMap.values()) {
+          if (!stateCounts.containsKey(state)) {
+            stateCounts.put(state, 1);
+          } else {
+            stateCounts.put(state, stateCounts.get(state) + 1);
+          }
+        }
+        for (String state : stateCounts.keySet()) {
+          int count = stateCounts.get(state);
+          int maximumCount = _states.get(state);
+          if (count > maximumCount) {
+            logger.error("ERROR: State " + state + " for partition "
+                + partitionEntry.getKey() + " has " + count + " replicas when "
+                + maximumCount + " is allowed!");
+            return false;
+          }
+        }
+      }
+      return true;
+    }
+
+    private boolean atMostOnePartitionReplicaPerNode(
+        final Map<String, List<String>> listFields) {
+      for (final Entry<String, List<String>> partitionEntry : listFields
+          .entrySet()) {
+        Set<String> nodeSet = new HashSet<String>(partitionEntry.getValue());
+        int numUniques = nodeSet.size();
+        int total = partitionEntry.getValue().size();
+        if (numUniques < total) {
+          logger.error("ERROR: Partition " + partitionEntry.getKey()
+              + " is assigned to " + total + " nodes, but only " + numUniques
+              + " are unique!");
+          return false;
+        }
+      }
+      return true;
+    }
+
+    private double getAverage(final Collection<Integer> values) {
+      double sum = 0.0;
+      for (Integer value : values) {
+        sum += value;
+      }
+      if (values.size() != 0) {
+        return sum / values.size();
+      } else {
+        return -1.0;
+      }
+    }
+
+    private int getSum(final Collection<Integer> values) {
+      int sum = 0;
+      for (Integer value : values) {
+        sum += value;
+      }
+      return sum;
+    }
+
+    private double getStdev(final Collection<Integer> values, double mean) {
+      double sum = 0.0;
+      for (Integer value : values) {
+        double deviation = mean - value;
+        sum += Math.pow(deviation, 2.0);
+      }
+      if (values.size() != 0) {
+        sum /= values.size();
+        return Math.pow(sum, 0.5);
+      } else {
+        return -1.0;
+      }
+    }
+
+    private Map<String, Integer> getPartitionBucketsForNode(
+        final Map<String, Map<String, String>> assignment) {
+      Map<String, Integer> partitionsPerNode = new TreeMap<String, Integer>();
+      for (String node : _liveNodes) {
+        partitionsPerNode.put(node, 0);
+      }
+      for (Entry<String, Map<String, String>> partitionEntry : assignment
+          .entrySet()) {
+        final Map<String, String> nodeMap = partitionEntry.getValue();
+        for (String node : nodeMap.keySet()) {
+          // add 1 for every occurrence of a node
+          if (!partitionsPerNode.containsKey(node)) {
+            partitionsPerNode.put(node, 1);
+          } else {
+            partitionsPerNode.put(node, partitionsPerNode.get(node) + 1);
+          }
+        }
+      }
+      return partitionsPerNode;
+    }
+
+    private Map<String, Map<String, Integer>> getStateBucketsForNode(
+        final Map<String, Map<String, String>> assignment) {
+      Map<String, Map<String, Integer>> result = new TreeMap<String, Map<String,
Integer>>();
+      for (String n : _liveNodes) {
+        result.put(n, new TreeMap<String, Integer>());
+      }
+      for (Map<String, String> nodeStateMap : assignment.values()) {
+        for (Entry<String, String> nodeState : nodeStateMap.entrySet()) {
+          if (!result.containsKey(nodeState.getKey())) {
+            result.put(nodeState.getKey(), new TreeMap<String, Integer>());
+          }
+          Map<String, Integer> stateMap = result.get(nodeState.getKey());
+          if (!stateMap.containsKey(nodeState.getValue())) {
+            stateMap.put(nodeState.getValue(), 1);
+          } else {
+            stateMap.put(nodeState.getValue(),
+                stateMap.get(nodeState.getValue()) + 1);
+          }
+        }
+      }
+      return result;
+    }
+
+    /**
+     * Randomly choose between killing, adding, or resurrecting a single node
+     *
+     * @return (Partition -> (Node -> State)) ZNRecord
+     */
+    public ZNRecord runOnceRandomly() {
+      double choose = _random.nextDouble();
+      ZNRecord result = null;
+      if (choose < P_KILL) {
+        result = removeSingleNode(null);
+      } else if (choose < P_KILL + P_ADD) {
+        result = addSingleNode(null);
+      } else if (choose < P_KILL + P_ADD + P_RESURRECT) {
+        result = resurrectSingleNode(null);
+      }
+      return result;
+    }
+
+    /**
+     * Run rebalancer trying to add a never-live node
+     *
+     * @param node
+     *          Optional String to add
+     * @return ZNRecord result returned by the rebalancer
+     */
+    public ZNRecord addSingleNode(String node) {
+      logger.info("=================== add node =================");
+      if (_nonLiveSet.size() == 0) {
+        logger.warn("Cannot add node because there are no nodes left to add.");
+        return null;
+      }
+
+      // Get a random never-live node
+      if (node == null || !_nonLiveSet.contains(node)) {
+        node = getRandomSetElement(_nonLiveSet);
+      }
+      logger.info("Adding " + node);
+      _liveNodes.add(node);
+      _liveSet.add(node);
+      _nonLiveSet.remove(node);
+
+      return new AutoRebalanceStrategy.AutoRebalanceModeAlgorithm(
+          RESOURCE_NAME, _partitions, _states, _maxPerNode)
+          .computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
+    }
+
+    /**
+     * Run rebalancer trying to remove a live node
+     *
+     * @param node
+     *          Optional String to remove
+     * @return ZNRecord result returned by the rebalancer
+     */
+    public ZNRecord removeSingleNode(String node) {
+      logger.info("=================== remove node =================");
+      if (_liveSet.size() == 0) {
+        logger.warn("Cannot remove node because there are no nodes left to remove.");
+        return null;
+      }
+
+      // Get a random never-live node
+      if (node == null || !_liveSet.contains(node)) {
+        node = getRandomSetElement(_liveSet);
+      }
+      logger.info("Removing " + node);
+      _removedSet.add(node);
+      _liveNodes.remove(node);
+      _liveSet.remove(node);
+
+      // the rebalancer expects that the current mapping doesn't contain deleted
+      // nodes
+      for (Map<String, String> nodeMap : _currentMapping.values()) {
+        if (nodeMap.containsKey(node)) {
+          nodeMap.remove(node);
+        }
+      }
+
+      return new AutoRebalanceStrategy.AutoRebalanceModeAlgorithm(
+          RESOURCE_NAME, _partitions, _states, _maxPerNode)
+          .computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
+    }
+
+    /**
+     * Run rebalancer trying to add back a removed node
+     *
+     * @param node
+     *          Optional String to resurrect
+     * @return ZNRecord result returned by the rebalancer
+     */
+    public ZNRecord resurrectSingleNode(String node) {
+      logger.info("=================== resurrect node =================");
+      if (_removedSet.size() == 0) {
+        logger.warn("Cannot remove node because there are no nodes left to resurrect.");
+        return null;
+      }
+
+      // Get a random never-live node
+      if (node == null || !_removedSet.contains(node)) {
+        node = getRandomSetElement(_removedSet);
+      }
+      logger.info("Resurrecting " + node);
+      _removedSet.remove(node);
+      _liveNodes.add(node);
+      _liveSet.add(node);
+
+      return new AutoRebalanceStrategy.AutoRebalanceModeAlgorithm(
+          RESOURCE_NAME, _partitions, _states, _maxPerNode)
+          .computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
+    }
+
+    private <T> T getRandomSetElement(Set<T> source) {
+      int element = _random.nextInt(source.size());
+      int i = 0;
+      for (T node : source) {
+        if (i == element) {
+          return node;
+        }
+        i++;
+      }
+      return null;
+    }
+  }
+}


Mime
View raw message