helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject git commit: HELIX-154: Auto rebalance algorithm should not depend on state
Date Thu, 01 Aug 2013 20:32:32 GMT
Updated Branches:
  refs/heads/master edd4db8b7 -> 955b99fa4


HELIX-154: Auto rebalance algorithm should not depend on state


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/955b99fa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/955b99fa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/955b99fa

Branch: refs/heads/master
Commit: 955b99fa4533e861227cf0dbc3aaa7bdd8b3ce9f
Parents: edd4db8
Author: zzhang <zzhang5@uci.edu>
Authored: Thu Aug 1 13:32:29 2013 -0700
Committer: zzhang <zzhang5@uci.edu>
Committed: Thu Aug 1 13:32:29 2013 -0700

----------------------------------------------------------------------
 .../strategy/AutoRebalanceStrategy.java         | 129 ++++++++-----------
 1 file changed, 55 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/955b99fa/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
index 23b451f..9d0e2de 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
@@ -111,8 +111,10 @@ public class AutoRebalanceStrategy implements Rebalancer {
 
     private Map<String, Node> _nodeMap;
     private List<Node> _liveNodesList;
-    private Map<String, Map<Integer, Integer>> _replicaIdMap;
+    private Map<Integer, String> _stateMap;
+
     private Map<Replica, Node> _preferredAssignment;
+    private Map<Replica, Node> _existingPreferredAssignment;
     private Map<Replica, Node> _existingNonPreferredAssignment;
     private Set<Replica> _orphaned;
 
@@ -164,25 +166,26 @@ public class AutoRebalanceStrategy implements Rebalancer {
         _liveNodesList.add(node);
       }
 
-      // compute global ids for all replicas
-      _replicaIdMap = generateReplicaMap();
+      // compute states for all replica ids
+      _stateMap = generateStateMap();
 
       // compute the preferred mapping if all nodes were up
       _preferredAssignment = computePreferredPlacement(allNodes);
+
       // logger.info("preferred mapping:"+ preferredAssignment);
       // from current mapping derive the ones in preferred location
       // this will update the nodes with their current fill status
-      computeExistingPreferredPlacement(currentMapping);
-
-      // compute orphaned replica that are not assigned to any node
-      _orphaned = computeOrphaned(currentMapping);
-      if (logger.isInfoEnabled()) {
-        logger.info("orphan = \n" + _orphaned);
-      }
+      _existingPreferredAssignment = computeExistingPreferredPlacement(currentMapping);
 
       // from current mapping derive the ones not in preferred location
       _existingNonPreferredAssignment = computeExistingNonPreferredPlacement(currentMapping);
 
+      // compute orphaned replicas that are not assigned to any node
+      _orphaned = computeOrphaned();
+      if (logger.isInfoEnabled()) {
+        logger.info("orphan = " + _orphaned);
+      }
+
       moveNonPreferredReplicasToPreferred();
 
       assignOrphans();
@@ -297,27 +300,24 @@ public class AutoRebalanceStrategy implements Rebalancer {
       }
       for (Node node : _liveNodesList) {
         for (Replica replica : node.preferred) {
-          znRecord.getMapField(replica.partition).put(node.id, replica.state);
+          znRecord.getMapField(replica.partition).put(node.id, _stateMap.get(replica.replicaId));
         }
         for (Replica replica : node.nonPreferred) {
-          znRecord.getMapField(replica.partition).put(node.id, replica.state);
+          znRecord.getMapField(replica.partition).put(node.id, _stateMap.get(replica.replicaId));
         }
       }
 
-      for (String state : _states.keySet()) {
-        int count = _states.get(state);
-        for (int replicaId = 0; replicaId < count; replicaId++) {
-          int globalReplicaId = _replicaIdMap.get(state).get(replicaId);
-          for (Node node : _liveNodesList) {
-            for (Replica replica : node.preferred) {
-              if (replica.state.equals(state) && globalReplicaId == replica.replicaId)
{
-                znRecord.getListField(replica.partition).add(node.id);
-              }
+      int count = countStateReplicas();
+      for (int replicaId = 0; replicaId < count; replicaId++) {
+        for (Node node : _liveNodesList) {
+          for (Replica replica : node.preferred) {
+            if (replicaId == replica.replicaId) {
+              znRecord.getListField(replica.partition).add(node.id);
             }
-            for (Replica replica : node.nonPreferred) {
-              if (replica.state.equals(state) && globalReplicaId == replica.replicaId)
{
-                znRecord.getListField(replica.partition).add(node.id);
-              }
+          }
+          for (Replica replica : node.nonPreferred) {
+            if (replicaId == replica.replicaId) {
+              znRecord.getListField(replica.partition).add(node.id);
             }
           }
         }
@@ -333,12 +333,11 @@ public class AutoRebalanceStrategy implements Rebalancer {
     private Map<Replica, Node> computeExistingNonPreferredPlacement(
         Map<String, Map<String, String>> currentMapping) {
       Map<Replica, Node> existingNonPreferredAssignment = new TreeMap<Replica, Node>();
+      int count = countStateReplicas();
       for (String partition : currentMapping.keySet()) {
         Map<String, String> nodeStateMap = currentMapping.get(partition);
         for (String nodeId : nodeStateMap.keySet()) {
           Node node = _nodeMap.get(nodeId);
-          String state = nodeStateMap.get(nodeId);
-          Integer count = _states.get(state);
           boolean skip = false;
           for(Replica replica: node.preferred){
             if(replica.partition.equals(partition)){
@@ -350,10 +349,10 @@ public class AutoRebalanceStrategy implements Rebalancer {
             continue;
           }
           // check if its in one of the preferred position
-          for (int i = 0; i < count; i++) {
-            int replicaId = _replicaIdMap.get(state).get(i);
-            Replica replica = new Replica(partition, state, replicaId);
+          for (int replicaId = 0; replicaId < count; replicaId++) {
+            Replica replica = new Replica(partition, replicaId);
             if (_preferredAssignment.get(replica).id != node.id
+                && !_existingPreferredAssignment.containsKey(replica)
                 && !existingNonPreferredAssignment.containsKey(replica)) {
               existingNonPreferredAssignment.put(replica, node);
               node.nonPreferred.add(replica);
@@ -367,25 +366,18 @@ public class AutoRebalanceStrategy implements Rebalancer {
 
     /**
      * Get a set of replicas not currently assigned to any node
-     * @param currentMapping Current assignment of replicas to nodes
      * @return Unassigned replicas
      */
-    private Set<Replica> computeOrphaned(final Map<String, Map<String, String>>
currentMapping) {
+    private Set<Replica> computeOrphaned() {
       Set<Replica> orphanedPartitions = new TreeSet<Replica>(_preferredAssignment.keySet());
-      for (String partition : currentMapping.keySet()) {
-        Map<String, String> nodeStateMap = currentMapping.get(partition);
-        for (String node : nodeStateMap.keySet()) {
-          String state = nodeStateMap.get(node);
-          Integer count = _states.get(state);
-          // remove from orphaned if possible
-          for (int i = 0; i < count; i++) {
-            int replicaId = _replicaIdMap.get(state).get(i);
-            Replica replica = new Replica(partition, state, replicaId);
-            if (orphanedPartitions.contains(replica)) {
-              orphanedPartitions.remove(replica);
-              break;
-            }
-          }
+      for (Replica r : _existingPreferredAssignment.keySet()) {
+        if (orphanedPartitions.contains(r)) {
+          orphanedPartitions.remove(r);
+        }
+      }
+      for (Replica r : _existingNonPreferredAssignment.keySet()) {
+        if (orphanedPartitions.contains(r)) {
+          orphanedPartitions.remove(r);
         }
       }
 
@@ -400,17 +392,15 @@ public class AutoRebalanceStrategy implements Rebalancer {
     private Map<Replica, Node> computeExistingPreferredPlacement(
         final Map<String, Map<String, String>> currentMapping) {
       Map<Replica, Node> existingPreferredAssignment = new TreeMap<Replica, Node>();
+      int count = countStateReplicas();
       for (String partition : currentMapping.keySet()) {
         Map<String, String> nodeStateMap = currentMapping.get(partition);
         for (String nodeId : nodeStateMap.keySet()) {
           Node node = _nodeMap.get(nodeId);
           node.currentlyAssigned = node.currentlyAssigned + 1;
-          String state = nodeStateMap.get(nodeId);
-          Integer count = _states.get(state);
           // check if its in one of the preferred position
-          for (int i = 0; i < count; i++) {
-            int replicaId = _replicaIdMap.get(state).get(i);
-            Replica replica = new Replica(partition, state, replicaId);
+          for (int replicaId = 0; replicaId < count; replicaId++) {
+            Replica replica = new Replica(partition, replicaId);
             if (_preferredAssignment.containsKey(replica)
                 && !existingPreferredAssignment.containsKey(replica)
                 && _preferredAssignment.get(replica).id == node.id) {
@@ -436,16 +426,13 @@ public class AutoRebalanceStrategy implements Rebalancer {
       preferredMapping = new HashMap<Replica, Node>();
       int partitionId = 0;
       int numReplicas = countStateReplicas();
+      int count = countStateReplicas();
       for (String partition : _partitions) {
-        int replicaId = 0;
-        for (String state : _states.keySet()) {
-          for (int i = 0; i < _states.get(state); i++) {
-            Replica replica = new Replica(partition, state, replicaId);
-            String nodeName = _placementScheme.getLocation(partitionId, replicaId,
-                _partitions.size(), numReplicas, allNodes);
-            preferredMapping.put(replica, _nodeMap.get(nodeName));
-            replicaId = replicaId + 1;
-          }
+        for (int replicaId = 0; replicaId < count; replicaId++) {
+          Replica replica = new Replica(partition, replicaId);
+          String nodeName = _placementScheme.getLocation(partitionId, replicaId,
+              _partitions.size(), numReplicas, allNodes);
+          preferredMapping.put(replica, _nodeMap.get(nodeName));
         }
         partitionId = partitionId + 1;
       }
@@ -467,24 +454,20 @@ public class AutoRebalanceStrategy implements Rebalancer {
     }
 
     /**
-     * Compute a map of local replica ids to global replica ids. Local replica ids are relative
to
-     * a state of a partition, while global ids are relative to the partition as a whole
-     * @return A map of (state, local id) to global id
+     * Compute a map of replica ids to state names
+     * @return Map: replica id -> state name
      */
-    private Map<String, Map<Integer, Integer>> generateReplicaMap() {
+    private Map<Integer, String> generateStateMap() {
       int replicaId = 0;
-      Map<String, Map<Integer, Integer>> stateToIdMap =
-          new HashMap<String, Map<Integer, Integer>>();
+      Map<Integer, String> stateMap = new HashMap<Integer, String>();
       for (String state : _states.keySet()) {
         Integer count = _states.get(state);
-        Map<Integer, Integer> localToGlobalMap = new HashMap<Integer, Integer>();
         for (int i = 0; i < count; i++) {
-          localToGlobalMap.put(i, replicaId);
+          stateMap.put(replicaId, state);
           replicaId++;
         }
-        stateToIdMap.put(state, localToGlobalMap);
       }
-      return stateToIdMap;
+      return stateMap;
     }
 
     /**
@@ -549,20 +532,18 @@ public class AutoRebalanceStrategy implements Rebalancer {
     class Replica implements Comparable<Replica> {
 
       private String partition;
-      private String state;
       private int replicaId; // this is a partition-relative id
       private String format;
 
-      public Replica(String partition, String state, int replicaId) {
+      public Replica(String partition, int replicaId) {
         this.partition = partition;
-        this.state = state;
         this.replicaId = replicaId;
         this.format = partition + "|" + replicaId;
       }
 
       @Override
       public String toString() {
-        return format + "|" + state;
+        return format;
       }
 
       @Override


Mime
View raw message