helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kisho...@apache.org
Subject [2/2] git commit: [HELIX-18] Moving some rebalancing methods to HelixAdmin from clustersetup
Date Thu, 03 Jan 2013 06:43:02 GMT
Updated Branches:
  refs/heads/master f7cc98631 -> 80e9c4a98


[HELIX-18] Moving some rebalancing methods to HelixAdmin from clustersetup


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/80e9c4a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/80e9c4a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/80e9c4a9

Branch: refs/heads/master
Commit: 80e9c4a9866d5a72cdbcf8509d74a86b5091253a
Parents: f7cc986
Author: Kishore Gopalakrishna <g.kishore@gmail.com>
Authored: Wed Jan 2 22:42:49 2013 -0800
Committer: Kishore Gopalakrishna <g.kishore@gmail.com>
Committed: Wed Jan 2 22:42:49 2013 -0800

----------------------------------------------------------------------
 .../src/main/java/org/apache/helix/HelixAdmin.java |   29 +
 .../org/apache/helix/manager/zk/ZKHelixAdmin.java  |  107 ++-
 .../java/org/apache/helix/tools/ClusterSetup.java  |  292 +------
 .../helix/tools/DefaultIdealStateCalculator.java   |  792 +++++++++++++++
 .../tools/IdealStateCalculatorForStorageNode.java  |  792 ---------------
 .../java/org/apache/helix/util/RebalanceUtil.java  |  146 +++
 .../TestEspressoStorageClusterIdealState.java      |   21 +-
 .../stages/TestCompatibilityCheckStage.java        |    4 +-
 .../stages/TestResourceComputationStage.java       |    6 +-
 .../helix/integration/TestAutoIsWithEmptyMap.java  |    4 +-
 .../org/apache/helix/integration/TestDriver.java   |    4 +-
 .../helix/integration/TestExpandCluster.java       |    9 +-
 .../helix/integration/TestRenamePartition.java     |    4 +-
 .../josql/TestClusterJosqlQueryProcessor.java      |    4 +-
 .../messaging/TestDefaultMessagingService.java     |    4 +-
 .../mbeans/TestClusterStatusMonitor.java           |   10 +-
 .../monitoring/mbeans/TestResourceMonitor.java     |    4 +-
 17 files changed, 1110 insertions(+), 1122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80e9c4a9/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
index 746e9ad..7d3ecfa 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
@@ -397,4 +397,33 @@ public interface HelixAdmin
   void addMessageConstraint(String clusterName,
                             String constraintId,
                             Map<String, String> constraints);
+
+  /**
+   * 
+   * @param clusterName
+   * @param currentIdealState
+   * @param instanceNames
+   * @return
+   */
+  void rebalance(String clusterName, 
+                       IdealState currentIdealState,
+                       List<String> instanceNames);
+  /**
+   * 
+   * @param clusterName
+   * @param resourceName
+   * @param replica
+   * @param instances
+   */
+  void rebalance(String clusterName, String resourceName, int replica,
+      List<String> instances);
+  /**
+   * 
+   * @param clusterName
+   * @param resourceName
+   * @param replica
+   * @param keyPrefix
+   */
+  void rebalance(String clusterName, String resourceName, int replica,
+      String keyPrefix);
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80e9c4a9/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 4499d4b..ddb177c 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -69,8 +69,9 @@ import org.apache.helix.model.IdealState.IdealStateModeProperty;
 import org.apache.helix.model.InstanceConfig.InstanceConfigProperty;
 import org.apache.helix.model.Message.MessageState;
 import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.tools.IdealStateCalculatorForStorageNode;
+import org.apache.helix.tools.DefaultIdealStateCalculator;
 import org.apache.helix.util.HelixUtil;
+import org.apache.helix.util.RebalanceUtil;
 import org.apache.log4j.Logger;
 
 
@@ -1100,15 +1101,30 @@ public class ZKHelixAdmin implements HelixAdmin
   @Override
   public void rebalance(String clusterName, String resourceName, int replica)
   {
-    rebalance(clusterName, resourceName, replica, resourceName);
+    List<String> instanceNames = getInstancesInCluster(clusterName);
+    rebalance(clusterName, resourceName, replica, resourceName, instanceNames);
   }
-
-  void rebalance(String clusterName, String resourceName, int replica, String keyPrefix)
+  @Override
+  public void rebalance(String clusterName, String resourceName, int replica, String keyPrefix)
+  {
+    List<String> instanceNames = getInstancesInCluster(clusterName);
+    rebalance(clusterName, resourceName, replica, keyPrefix, instances);
+  }
+  @Override
+  public void rebalance(String clusterName, String resourceName, int replica, List<String> instances)
+  {
+    rebalance(clusterName, resourceName, replica, resourceName, instances);
+  }
+  
+  
+  void rebalance(String clusterName, 
+                 String resourceName, 
+                 int replica, 
+                 String keyPrefix, 
+                 List<String> instanceNames)
   {
-    List<String> InstanceNames = getInstancesInCluster(clusterName);
-
     // ensure we get the same idealState with the same set of instances
-    Collections.sort(InstanceNames);
+    Collections.sort(instanceNames);
 
     IdealState idealState = getResourceIdealState(clusterName, resourceName);
     if (idealState == null)
@@ -1158,7 +1174,7 @@ public class ZKHelixAdmin implements HelixAdmin
         {
           throw new HelixException("Invalid or unsupported state model definition");
         }
-        replica = InstanceNames.size() - 1;
+        replica = instanceNames.size() - 1;
         masterStateValue = slaveStateValue = state;
       }
     }
@@ -1174,7 +1190,7 @@ public class ZKHelixAdmin implements HelixAdmin
     if (idealState.getIdealStateMode() != IdealStateModeProperty.AUTO_REBALANCE)
     {
       ZNRecord newIdealState =
-          IdealStateCalculatorForStorageNode.calculateIdealState(InstanceNames,
+          DefaultIdealStateCalculator.calculateIdealState(instanceNames,
                                                                  partitions,
                                                                  replica,
                                                                  keyPrefix,
@@ -1291,6 +1307,77 @@ public class ZKHelixAdmin implements HelixAdmin
       }
     }, AccessOption.PERSISTENT);
   }
-
   
+  
+  /**
+   * Takes the existing idealstate as input and computes newIdealState such that 
+   * the partition movement is minimized. The partitions are redistributed among the instances provided.
+   * @param clusterName 
+   * @param currentIdealState
+   * @param instanceNames
+   * @return
+   */
+  @Override
+  public void rebalance(String clusterName,
+                              IdealState currentIdealState, 
+                              List<String> instanceNames)
+  {
+    Set<String> activeInstances = new HashSet<String>();
+    for (String partition : currentIdealState.getPartitionSet())
+    {
+      activeInstances.addAll(currentIdealState.getRecord().getListField(partition));
+    }
+    instanceNames.removeAll(activeInstances);
+    Map<String, Object> previousIdealState = RebalanceUtil.buildInternalIdealState(currentIdealState);
+
+    Map<String, Object> balancedRecord =
+        DefaultIdealStateCalculator.calculateNextIdealState(instanceNames,
+                                                                   previousIdealState);
+    StateModelDefinition stateModDef =
+        this.getStateModelDef(clusterName, currentIdealState.getStateModelDefRef());
+
+    if (stateModDef == null)
+    {
+      throw new HelixException("cannot find state model: " + currentIdealState.getStateModelDefRef());
+    }
+    String[] states = RebalanceUtil.parseStates(clusterName, stateModDef);
+
+    ZNRecord newIdealStateRecord =
+        DefaultIdealStateCalculator.convertToZNRecord(balancedRecord,
+                                                             currentIdealState.getResourceName(),
+                                                             states[0],
+                                                             states[1]);
+    Set<String> partitionSet = new HashSet<String>();
+    partitionSet.addAll(newIdealStateRecord.getMapFields().keySet());
+    partitionSet.addAll(newIdealStateRecord.getListFields().keySet());
+
+    Map<String, String> reversePartitionIndex =
+        (Map<String, String>) balancedRecord.get("reversePartitionIndex");
+    for (String partition : partitionSet)
+    {
+      if (reversePartitionIndex.containsKey(partition))
+      {
+        String originPartitionName = reversePartitionIndex.get(partition);
+        if (partition.equals(originPartitionName))
+        {
+          continue;
+        }
+        newIdealStateRecord.getMapFields()
+                           .put(originPartitionName,
+                                newIdealStateRecord.getMapField(partition));
+        newIdealStateRecord.getMapFields().remove(partition);
+
+        newIdealStateRecord.getListFields()
+                           .put(originPartitionName,
+                                newIdealStateRecord.getListField(partition));
+        newIdealStateRecord.getListFields().remove(partition);
+      }
+    }
+
+    newIdealStateRecord.getSimpleFields()
+                       .putAll(currentIdealState.getRecord().getSimpleFields());
+    IdealState newIdealState = new IdealState(newIdealStateRecord);
+    setResourceIdealState(clusterName, newIdealStateRecord.getId(), newIdealState);
+  }
+ 
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80e9c4a9/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index 5319519..18662a5 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -483,8 +483,7 @@ public class ClusterSetup
       _logger.warn("Resource " + resourceName + " not balanced, skip");
       return;
     }
-    IdealState newIdealState = balanceIdealState(clusterName, idealState);
-    _admin.setResourceIdealState(clusterName, resourceName, newIdealState);
+    balanceIdealState(clusterName, idealState);
   }
 
   public void expandCluster(String clusterName)
@@ -496,305 +495,30 @@ public class ClusterSetup
     }
   }
 
-  public String[] parseStates(String clusterName, String stateModelName)
-  {
-    String[] result = new String[2];
-    String masterStateValue = null, slaveStateValue = null;
-    StateModelDefinition stateModDef =
-        _admin.getStateModelDef(clusterName, stateModelName);
-
-    if (stateModDef == null)
-    {
-      throw new HelixException("cannot find state model: " + stateModelName);
-    }
-    // StateModelDefinition def = new StateModelDefinition(stateModDef);
-
-    List<String> statePriorityList = stateModDef.getStatesPriorityList();
-
-    for (String state : statePriorityList)
-    {
-      String count = stateModDef.getNumInstancesPerState(state);
-      if (count.equals("1"))
-      {
-        if (masterStateValue != null)
-        {
-          throw new HelixException("Invalid or unsupported state model definition");
-        }
-        masterStateValue = state;
-      }
-      else if (count.equalsIgnoreCase("R"))
-      {
-        if (slaveStateValue != null)
-        {
-          throw new HelixException("Invalid or unsupported state model definition");
-        }
-        slaveStateValue = state;
-      }
-      else if (count.equalsIgnoreCase("N"))
-      {
-        if (!(masterStateValue == null && slaveStateValue == null))
-        {
-          throw new HelixException("Invalid or unsupported state model definition");
-        }
-        masterStateValue = slaveStateValue = state;
-      }
-    }
-    if (masterStateValue == null && slaveStateValue == null)
-    {
-      throw new HelixException("Invalid or unsupported state model definition");
-    }
-
-    if (masterStateValue == null)
-    {
-      masterStateValue = slaveStateValue;
-    }
-    result[0] = masterStateValue;
-    result[1] = slaveStateValue;
-    return result;
-  }
+  
 
-  public IdealState balanceIdealState(String clusterName, IdealState idealState)
+  public void balanceIdealState(String clusterName, IdealState idealState)
   {
     // The new instances are added into the cluster already. So we need to find out the
     // instances that
     // already have partitions assigned to them.
     List<String> instanceNames = _admin.getInstancesInCluster(clusterName);
-    Set<String> activeInstances = new HashSet<String>();
-    for (String partition : idealState.getPartitionSet())
-    {
-      activeInstances.addAll(idealState.getRecord().getListField(partition));
-    }
-    instanceNames.removeAll(activeInstances);
-    Map<String, Object> previousIdealState = buildInternalIdealState(idealState);
-
-    Map<String, Object> balancedRecord =
-        IdealStateCalculatorForStorageNode.calculateNextIdealState(instanceNames,
-                                                                   previousIdealState);
-
-    String[] states = parseStates(clusterName, idealState.getStateModelDefRef());
-
-    ZNRecord newIdealStateRecord =
-        IdealStateCalculatorForStorageNode.convertToZNRecord(balancedRecord,
-                                                             idealState.getResourceName(),
-                                                             states[0],
-                                                             states[1]);
-    Set<String> partitionSet = new HashSet<String>();
-    partitionSet.addAll(newIdealStateRecord.getMapFields().keySet());
-    partitionSet.addAll(newIdealStateRecord.getListFields().keySet());
-
-    Map<String, String> reversePartitionIndex =
-        (Map<String, String>) balancedRecord.get("reversePartitionIndex");
-    for (String partition : partitionSet)
-    {
-      if (reversePartitionIndex.containsKey(partition))
-      {
-        String originPartitionName = reversePartitionIndex.get(partition);
-        if (partition.equals(originPartitionName))
-        {
-          continue;
-        }
-        newIdealStateRecord.getMapFields()
-                           .put(originPartitionName,
-                                newIdealStateRecord.getMapField(partition));
-        newIdealStateRecord.getMapFields().remove(partition);
-
-        newIdealStateRecord.getListFields()
-                           .put(originPartitionName,
-                                newIdealStateRecord.getListField(partition));
-        newIdealStateRecord.getListFields().remove(partition);
-      }
-    }
-
-    newIdealStateRecord.getSimpleFields()
-                       .putAll(idealState.getRecord().getSimpleFields());
-    return new IdealState(newIdealStateRecord);
+    rebalanceResource(clusterName, idealState, instanceNames);
 
   }
 
-  public static Map<String, Object> buildInternalIdealState(IdealState state)
+  private void rebalanceResource(String clusterName,
+      IdealState idealState, List<String> instanceNames)
   {
-    // Try parse the partition number from name DB_n. If not, sort the partitions and
-    // assign id
-    Map<String, Integer> partitionIndex = new HashMap<String, Integer>();
-    Map<String, String> reversePartitionIndex = new HashMap<String, String>();
-    boolean indexInPartitionName = true;
-    for (String partitionId : state.getPartitionSet())
-    {
-      int lastPos = partitionId.lastIndexOf("_");
-      if (lastPos < 0)
-      {
-        indexInPartitionName = false;
-        break;
-      }
-      try
-      {
-        String idStr = partitionId.substring(lastPos + 1);
-        int partition = Integer.parseInt(idStr);
-        partitionIndex.put(partitionId, partition);
-        reversePartitionIndex.put(state.getResourceName() + "_" + partition, partitionId);
-      }
-      catch (Exception e)
-      {
-        indexInPartitionName = false;
-        partitionIndex.clear();
-        reversePartitionIndex.clear();
-        break;
-      }
-    }
-
-    if (indexInPartitionName == false)
-    {
-      List<String> partitions = new ArrayList<String>();
-      partitions.addAll(state.getPartitionSet());
-      Collections.sort(partitions);
-      for (int i = 0; i < partitions.size(); i++)
-      {
-        partitionIndex.put(partitions.get(i), i);
-        reversePartitionIndex.put(state.getResourceName() + "_" + i, partitions.get(i));
-      }
-    }
-
-    Map<String, List<Integer>> nodeMasterAssignmentMap =
-        new TreeMap<String, List<Integer>>();
-    Map<String, Map<String, List<Integer>>> combinedNodeSlaveAssignmentMap =
-        new TreeMap<String, Map<String, List<Integer>>>();
-    for (String partition : state.getPartitionSet())
-    {
-      List<String> instances = state.getRecord().getListField(partition);
-      String master = instances.get(0);
-      if (!nodeMasterAssignmentMap.containsKey(master))
-      {
-        nodeMasterAssignmentMap.put(master, new ArrayList<Integer>());
-      }
-      if (!combinedNodeSlaveAssignmentMap.containsKey(master))
-      {
-        combinedNodeSlaveAssignmentMap.put(master, new TreeMap<String, List<Integer>>());
-      }
-      nodeMasterAssignmentMap.get(master).add(partitionIndex.get(partition));
-      for (int i = 1; i < instances.size(); i++)
-      {
-        String instance = instances.get(i);
-        Map<String, List<Integer>> slaveMap = combinedNodeSlaveAssignmentMap.get(master);
-        if (!slaveMap.containsKey(instance))
-        {
-          slaveMap.put(instance, new ArrayList<Integer>());
-        }
-        slaveMap.get(instance).add(partitionIndex.get(partition));
-      }
-    }
-
-    Map<String, Object> result = new TreeMap<String, Object>();
-    result.put("MasterAssignmentMap", nodeMasterAssignmentMap);
-    result.put("SlaveAssignmentMap", combinedNodeSlaveAssignmentMap);
-    result.put("replicas", Integer.parseInt(state.getReplicas()));
-    result.put("partitions", new Integer(state.getRecord().getListFields().size()));
-    result.put("reversePartitionIndex", reversePartitionIndex);
-    return result;
+     _admin.rebalance(clusterName, idealState, instanceNames);
   }
 
-  // TODO: remove this. has moved to ZkHelixAdmin
   public void rebalanceStorageCluster(String clusterName,
                                       String resourceName,
                                       int replica,
                                       String keyPrefix)
   {
-    List<String> InstanceNames = _admin.getInstancesInCluster(clusterName);
-    // ensure we get the same idealState with the same set of instances
-    Collections.sort(InstanceNames);
-
-    IdealState idealState = _admin.getResourceIdealState(clusterName, resourceName);
-    if (idealState == null)
-    {
-      throw new HelixException("Resource: " + resourceName + " has NOT been added yet");
-    }
-
-    idealState.setReplicas(Integer.toString(replica));
-    int partitions = idealState.getNumPartitions();
-    String stateModelName = idealState.getStateModelDefRef();
-    StateModelDefinition stateModDef =
-        _admin.getStateModelDef(clusterName, stateModelName);
-
-    if (stateModDef == null)
-    {
-      throw new HelixException("cannot find state model: " + stateModelName);
-    }
-    // StateModelDefinition def = new StateModelDefinition(stateModDef);
-
-    List<String> statePriorityList = stateModDef.getStatesPriorityList();
-
-    String masterStateValue = null;
-    String slaveStateValue = null;
-    replica--;
-
-    for (String state : statePriorityList)
-    {
-      String count = stateModDef.getNumInstancesPerState(state);
-      if (count.equals("1"))
-      {
-        if (masterStateValue != null)
-        {
-          throw new HelixException("Invalid or unsupported state model definition");
-        }
-        masterStateValue = state;
-      }
-      else if (count.equalsIgnoreCase("R"))
-      {
-        if (slaveStateValue != null)
-        {
-          throw new HelixException("Invalid or unsupported state model definition");
-        }
-        slaveStateValue = state;
-      }
-      else if (count.equalsIgnoreCase("N"))
-      {
-        if (!(masterStateValue == null && slaveStateValue == null))
-        {
-          throw new HelixException("Invalid or unsupported state model definition");
-        }
-        replica = InstanceNames.size() - 1;
-        masterStateValue = slaveStateValue = state;
-      }
-    }
-    if (masterStateValue == null && slaveStateValue == null)
-    {
-      throw new HelixException("Invalid or unsupported state model definition");
-    }
-
-    if (masterStateValue == null)
-    {
-      masterStateValue = slaveStateValue;
-    }
-    if (idealState.getIdealStateMode() != IdealStateModeProperty.AUTO_REBALANCE)
-    {
-      ZNRecord newIdealState =
-          IdealStateCalculatorForStorageNode.calculateIdealState(InstanceNames,
-                                                                 partitions,
-                                                                 replica,
-                                                                 keyPrefix,
-                                                                 masterStateValue,
-                                                                 slaveStateValue);
-
-      // for now keep mapField in AUTO mode and remove listField in CUSTOMIZED mode
-      if (idealState.getIdealStateMode() == IdealStateModeProperty.AUTO)
-      {
-        idealState.getRecord().setListFields(newIdealState.getListFields());
-        idealState.getRecord().setMapFields(newIdealState.getMapFields());
-      }
-      if (idealState.getIdealStateMode() == IdealStateModeProperty.CUSTOMIZED)
-      {
-        idealState.getRecord().setMapFields(newIdealState.getMapFields());
-      }
-    }
-    else
-    {
-      for (int i = 0; i < partitions; i++)
-      {
-        String partitionName = keyPrefix + "_" + i;
-        idealState.getRecord().setMapField(partitionName, new HashMap<String, String>());
-        idealState.getRecord().setListField(partitionName, new ArrayList<String>());
-      }
-    }
-    _admin.setResourceIdealState(clusterName, resourceName, idealState);
+    _admin.rebalance(clusterName, resourceName, replica);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/80e9c4a9/helix-core/src/main/java/org/apache/helix/tools/DefaultIdealStateCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/DefaultIdealStateCalculator.java b/helix-core/src/main/java/org/apache/helix/tools/DefaultIdealStateCalculator.java
new file mode 100644
index 0000000..3c63269
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/DefaultIdealStateCalculator.java
@@ -0,0 +1,792 @@
+package org.apache.helix.tools;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.IdealState.IdealStateProperty;
+
+
+/**
+ * DefaultIdealStateCalculator tries to optimally allocate master/slave partitions among
+ * espresso storage nodes.
+ *
+ * Given a batch of storage nodes, the partition and replication factor, the algorithm first given a initial state
+ * When new batches of storage nodes are added, the algorithm will calculate the new ideal state such that the total
+ * partition movements are minimized.
+ *
+ */
+public class DefaultIdealStateCalculator
+{
+  static final String _MasterAssignmentMap = "MasterAssignmentMap";
+  static final String _SlaveAssignmentMap = "SlaveAssignmentMap";
+  static final String _partitions = "partitions";
+  static final String _replicas = "replicas";
+
+  /**
+   * Calculate the initial ideal state given a batch of storage instances, the replication factor and
+   * number of partitions
+   *
+   * 1. Calculate the master assignment by random shuffling
+   * 2. for each storage instance, calculate the 1st slave assignment map, by another random shuffling
+   * 3. for each storage instance, calculate the i-th slave assignment map
+   * 4. Combine the i-th slave assignment maps together
+   *
+   * @param instanceNames
+   *          list of storage node instances
+   * @param partitions
+   *          number of partitions
+   * @param replicas
+   *          The number of replicas (slave partitions) per master partition
+   * @param masterStateValue
+   *          master state value: e.g. "MASTER" or "LEADER"
+   * @param slaveStateValue
+   *          slave state value: e.g. "SLAVE" or "STANDBY"
+   * @param resourceName
+   * @return a ZNRecord that contain the idealstate info
+   */
+  public static ZNRecord calculateIdealState(List<String> instanceNames, int partitions, int replicas, String resourceName,
+                                             String masterStateValue, String slaveStateValue)
+  {
+    Collections.sort(instanceNames);
+    if(instanceNames.size() < replicas + 1)
+    {
+      throw new HelixException("Number of instances must not be less than replicas + 1. "
+                                      + "instanceNr:" + instanceNames.size()
+                                      + ", replicas:" + replicas);
+    }
+    else if(partitions < instanceNames.size())
+    {
+      ZNRecord idealState = IdealStateCalculatorByShuffling.calculateIdealState(instanceNames, partitions, replicas, resourceName, 12345, masterStateValue, slaveStateValue);
+      int i = 0;
+      for(String partitionId : idealState.getMapFields().keySet())
+      {
+        Map<String, String> partitionAssignmentMap = idealState.getMapField(partitionId);
+        List<String> partitionAssignmentPriorityList = new ArrayList<String>();
+        String masterInstance = "";
+        for(String instanceName : partitionAssignmentMap.keySet())
+        {
+          if(partitionAssignmentMap.get(instanceName).equalsIgnoreCase(masterStateValue)
+              && masterInstance.equals(""))
+          {
+            masterInstance = instanceName;
+          }
+          else
+          {
+            partitionAssignmentPriorityList.add(instanceName);
+          }
+        }
+        Collections.shuffle(partitionAssignmentPriorityList, new Random(i++));
+        partitionAssignmentPriorityList.add(0, masterInstance);
+        idealState.setListField(partitionId, partitionAssignmentPriorityList);
+      }
+      return idealState;
+    }
+
+    Map<String, Object> result = calculateInitialIdealState(instanceNames, partitions, replicas);
+
+    return convertToZNRecord(result, resourceName, masterStateValue, slaveStateValue);
+  }
+
+  public static ZNRecord calculateIdealStateBatch(List<List<String>> instanceBatches, int partitions, int replicas, String resourceName,
+                                                  String masterStateValue, String slaveStateValue)
+  {
+    Map<String, Object> result = calculateInitialIdealState(instanceBatches.get(0), partitions, replicas);
+
+    for(int i = 1; i < instanceBatches.size(); i++)
+    {
+      result = calculateNextIdealState(instanceBatches.get(i), result);
+    }
+
+    return convertToZNRecord(result, resourceName, masterStateValue, slaveStateValue);
+  }
+
+  /**
+   * Convert the internal result (stored as a Map<String, Object>) into ZNRecord.
+   */
+  public static ZNRecord convertToZNRecord(Map<String, Object> result, String resourceName,
+                                    String masterStateValue, String slaveStateValue)
+  {
+    Map<String, List<Integer>> nodeMasterAssignmentMap
+    = (Map<String, List<Integer>>) (result.get(_MasterAssignmentMap));
+    Map<String, Map<String, List<Integer>>> nodeSlaveAssignmentMap
+        = (Map<String, Map<String, List<Integer>>>)(result.get(_SlaveAssignmentMap));
+
+    int partitions = (Integer)(result.get("partitions"));
+
+    ZNRecord idealState = new ZNRecord(resourceName);
+    idealState.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(), String.valueOf(partitions));
+
+
+    for(String instanceName : nodeMasterAssignmentMap.keySet())
+    {
+      for(Integer partitionId : nodeMasterAssignmentMap.get(instanceName))
+      {
+        String partitionName = resourceName+"_"+partitionId;
+        if(!idealState.getMapFields().containsKey(partitionName))
+        {
+          idealState.setMapField(partitionName, new TreeMap<String, String>());
+        }
+        idealState.getMapField(partitionName).put(instanceName, masterStateValue);
+      }
+    }
+
+    for(String instanceName : nodeSlaveAssignmentMap.keySet())
+    {
+      Map<String, List<Integer>> slaveAssignmentMap = nodeSlaveAssignmentMap.get(instanceName);
+
+      for(String slaveNode: slaveAssignmentMap.keySet())
+      {
+        List<Integer> slaveAssignment = slaveAssignmentMap.get(slaveNode);
+        for(Integer partitionId: slaveAssignment)
+        {
+          String partitionName = resourceName+"_"+partitionId;
+          idealState.getMapField(partitionName).put(slaveNode, slaveStateValue);
+        }
+      }
+    }
+    // generate the priority list of instances per partition. Master should be at front and slave follows.
+
+    for(String partitionId : idealState.getMapFields().keySet())
+    {
+      Map<String, String> partitionAssignmentMap = idealState.getMapField(partitionId);
+      List<String> partitionAssignmentPriorityList = new ArrayList<String>();
+      String masterInstance = "";
+      for(String instanceName : partitionAssignmentMap.keySet())
+      {
+        if(partitionAssignmentMap.get(instanceName).equalsIgnoreCase(masterStateValue)
+            && masterInstance.equals(""))
+        {
+          masterInstance = instanceName;
+        }
+        else
+        {
+          partitionAssignmentPriorityList.add(instanceName);
+        }
+      }
+      Collections.shuffle(partitionAssignmentPriorityList);
+      partitionAssignmentPriorityList.add(0, masterInstance);
+      idealState.setListField(partitionId, partitionAssignmentPriorityList);
+    }
+    assert(result.containsKey("replicas"));
+    idealState.setSimpleField(IdealStateProperty.REPLICAS.toString(), result.get("replicas").toString());
+    return idealState;
+  }
+  /**
+   * Calculate the initial ideal state given a batch of storage instances, the replication factor and
+   * number of partitions
+   *
+   * 1. Calculate the master assignment by random shuffling
+   * 2. for each storage instance, calculate the 1st slave assignment map, by another random shuffling
+   * 3. for each storage instance, calculate the i-th slave assignment map
+   * 4. Combine the i-th slave assignment maps together
+   *
+   * @param instanceNames
+   *          list of storage node instances
+   * @param weight
+   *          weight for the initial storage node (each node has the same weight)
+   * @param partitions
+   *          number of partitions
+   * @param replicas
+   *          The number of replicas (slave partitions) per master partition
+   * @return a map that contain the idealstate info
+   */
+  public static Map<String, Object> calculateInitialIdealState(List<String> instanceNames, int partitions, int replicas)
+  {
+    Random r = new Random(54321);
+    assert(replicas <= instanceNames.size() - 1);
+
+    ArrayList<Integer> masterPartitionAssignment = new ArrayList<Integer>();
+    for(int i = 0;i< partitions; i++)
+    {
+      masterPartitionAssignment.add(i);
+    }
+    // shuffle the partition id array
+    Collections.shuffle(masterPartitionAssignment, new Random(r.nextInt()));
+
+    // 1. Generate the random master partition assignment
+    //    instanceName -> List of master partitions on that instance
+    Map<String, List<Integer>> nodeMasterAssignmentMap = new TreeMap<String, List<Integer>>();
+    for(int i = 0; i < masterPartitionAssignment.size(); i++)
+    {
+      String instanceName = instanceNames.get(i % instanceNames.size());
+      if(!nodeMasterAssignmentMap.containsKey(instanceName))
+      {
+        nodeMasterAssignmentMap.put(instanceName, new ArrayList<Integer>());
+      }
+      nodeMasterAssignmentMap.get(instanceName).add(masterPartitionAssignment.get(i));
+    }
+
+    // instanceName -> slave assignment for its master partitions
+    // slave assignment: instanceName -> list of slave partitions on it
+    List<Map<String, Map<String, List<Integer>>>> nodeSlaveAssignmentMapsList = new ArrayList<Map<String, Map<String, List<Integer>>>>(replicas);
+
+    Map<String, Map<String, List<Integer>>> firstNodeSlaveAssignmentMap = new TreeMap<String, Map<String, List<Integer>>>();
+    Map<String, Map<String, List<Integer>>> combinedNodeSlaveAssignmentMap = new TreeMap<String, Map<String, List<Integer>>>();
+
+    if(replicas > 0)
+    {
+      // 2. For each node, calculate the evenly distributed slave as the first slave assignment
+      // We will figure out the 2nd ...replicas-th slave assignment based on the first level slave assignment
+      for(int i = 0; i < instanceNames.size(); i++)
+      {
+        List<String> slaveInstances = new ArrayList<String>();
+        ArrayList<Integer> slaveAssignment = new ArrayList<Integer>();
+        TreeMap<String, List<Integer>> slaveAssignmentMap = new TreeMap<String, List<Integer>>();
+
+        for(int j = 0;j < instanceNames.size(); j++)
+        {
+          if(j != i)
+          {
+            slaveInstances.add(instanceNames.get(j));
+            slaveAssignmentMap.put(instanceNames.get(j), new ArrayList<Integer>());
+          }
+        }
+        // Get the number of master partitions on instanceName
+        List<Integer> masterAssignment =  nodeMasterAssignmentMap.get(instanceNames.get(i));
+        // do a random shuffling as in step 1, so that the first-level slave are distributed among rest instances
+
+
+        for(int j = 0;j < masterAssignment.size(); j++)
+        {
+          slaveAssignment.add(j);
+        }
+        Collections.shuffle(slaveAssignment, new Random(r.nextInt()));
+
+        Collections.shuffle(slaveInstances, new Random(instanceNames.get(i).hashCode()));
+
+        // Get the slave assignment map of node instanceName
+        for(int j = 0;j < masterAssignment.size(); j++)
+        {
+          String slaveInstanceName = slaveInstances.get(slaveAssignment.get(j) % slaveInstances.size());
+          if(!slaveAssignmentMap.containsKey(slaveInstanceName))
+          {
+            slaveAssignmentMap.put(slaveInstanceName, new ArrayList<Integer>());
+          }
+          slaveAssignmentMap.get(slaveInstanceName).add(masterAssignment.get(j));
+        }
+        firstNodeSlaveAssignmentMap.put(instanceNames.get(i), slaveAssignmentMap);
+      }
+      nodeSlaveAssignmentMapsList.add(firstNodeSlaveAssignmentMap);
+      // From the first slave assignment map, calculate the rest slave assignment maps
+      for(int replicaOrder = 1; replicaOrder < replicas; replicaOrder++)
+      {
+        // calculate the next slave partition assignment map
+        Map<String, Map<String, List<Integer>>> nextNodeSlaveAssignmentMap
+          = calculateNextSlaveAssignemntMap(firstNodeSlaveAssignmentMap, replicaOrder);
+        nodeSlaveAssignmentMapsList.add(nextNodeSlaveAssignmentMap);
+      }
+
+      // Combine the calculated 1...replicas-th slave assignment map together
+
+      for(String instanceName : nodeMasterAssignmentMap.keySet())
+      {
+        Map<String, List<Integer>> combinedSlaveAssignmentMap =  new TreeMap<String, List<Integer>>();
+
+        for(Map<String, Map<String, List<Integer>>> slaveNodeAssignmentMap : nodeSlaveAssignmentMapsList)
+        {
+          Map<String, List<Integer>> slaveAssignmentMap = slaveNodeAssignmentMap.get(instanceName);
+
+          for(String slaveInstance : slaveAssignmentMap.keySet())
+          {
+            if(!combinedSlaveAssignmentMap.containsKey(slaveInstance))
+            {
+              combinedSlaveAssignmentMap.put(slaveInstance, new ArrayList<Integer>());
+            }
+            combinedSlaveAssignmentMap.get(slaveInstance).addAll(slaveAssignmentMap.get(slaveInstance));
+          }
+        }
+        migrateSlaveAssignMapToNewInstances(combinedSlaveAssignmentMap, new ArrayList<String>());
+        combinedNodeSlaveAssignmentMap.put(instanceName, combinedSlaveAssignmentMap);
+      }
+    }
+    /*
+    // Print the result master and slave assignment maps
+    System.out.println("Master assignment:");
+    for(String instanceName : nodeMasterAssignmentMap.keySet())
+    {
+      System.out.println(instanceName+":");
+      for(Integer x : nodeMasterAssignmentMap.get(instanceName))
+      {
+        System.out.print(x+" ");
+      }
+      System.out.println();
+      System.out.println("Slave assignment:");
+
+      int slaveOrder = 1;
+      for(Map<String, Map<String, List<Integer>>> slaveNodeAssignmentMap : nodeSlaveAssignmentMapsList)
+      {
+        System.out.println("Slave assignment order :" + (slaveOrder++));
+        Map<String, List<Integer>> slaveAssignmentMap = slaveNodeAssignmentMap.get(instanceName);
+        for(String slaveName : slaveAssignmentMap.keySet())
+        {
+          System.out.print("\t" + slaveName +":\n\t" );
+          for(Integer x : slaveAssignmentMap.get(slaveName))
+          {
+            System.out.print(x + " ");
+          }
+          System.out.println("\n");
+        }
+      }
+      System.out.println("\nCombined slave assignment map");
+      Map<String, List<Integer>> slaveAssignmentMap = combinedNodeSlaveAssignmentMap.get(instanceName);
+      for(String slaveName : slaveAssignmentMap.keySet())
+      {
+        System.out.print("\t" + slaveName +":\n\t" );
+        for(Integer x : slaveAssignmentMap.get(slaveName))
+        {
+          System.out.print(x + " ");
+        }
+        System.out.println("\n");
+      }
+    }*/
+    Map<String, Object> result = new TreeMap<String, Object>();
+    result.put("MasterAssignmentMap", nodeMasterAssignmentMap);
+    result.put("SlaveAssignmentMap", combinedNodeSlaveAssignmentMap);
+    result.put("replicas", new Integer(replicas));
+    result.put("partitions", new Integer(partitions));
+    return result;
+  }
+  /**
+   * In the case there are more than 1 slave, we use the following algorithm to calculate the n-th slave
+   * assignment map based on the first level slave assignment map.
+   *
+   * @param firstInstanceSlaveAssignmentMap  the first slave assignment map for all instances
+   * @param order of the slave
+   * @return the n-th slave assignment map for all the instances
+   * */
+  static Map<String, Map<String, List<Integer>>> calculateNextSlaveAssignemntMap(Map<String, Map<String, List<Integer>>> firstInstanceSlaveAssignmentMap, int replicaOrder)
+  {
+    Map<String, Map<String, List<Integer>>> result = new TreeMap<String, Map<String, List<Integer>>>();
+
+    for(String currentInstance : firstInstanceSlaveAssignmentMap.keySet())
+    {
+      Map<String, List<Integer>> resultAssignmentMap = new TreeMap<String, List<Integer>>();
+      result.put(currentInstance, resultAssignmentMap);
+    }
+
+    for(String currentInstance : firstInstanceSlaveAssignmentMap.keySet())
+    {
+      Map<String, List<Integer>> previousSlaveAssignmentMap = firstInstanceSlaveAssignmentMap.get(currentInstance);
+      Map<String, List<Integer>> resultAssignmentMap = result.get(currentInstance);
+      int offset = replicaOrder - 1;
+      for(String instance : previousSlaveAssignmentMap.keySet())
+      {
+        List<String> otherInstances = new ArrayList<String>(previousSlaveAssignmentMap.size() - 1);
+        // Obtain an array of other instances
+        for(String otherInstance : previousSlaveAssignmentMap.keySet())
+        {
+          otherInstances.add(otherInstance);
+        }
+        Collections.sort(otherInstances);
+        int instanceIndex = -1;
+        for(int index = 0;index < otherInstances.size(); index++)
+        {
+          if(otherInstances.get(index).equalsIgnoreCase(instance))
+          {
+            instanceIndex = index;
+          }
+        }
+        assert(instanceIndex >= 0);
+        if(instanceIndex == otherInstances.size() - 1)
+        {
+          instanceIndex --;
+        }
+        // Since we need to evenly distribute the slaves on "instance" to other partitions, we
+        // need to remove "instance" from the array.
+        otherInstances.remove(instance);
+
+        // distribute previous slave assignment to other instances.
+        List<Integer> previousAssignmentList = previousSlaveAssignmentMap.get(instance);
+        for(int i = 0; i < previousAssignmentList.size(); i++)
+        {
+
+          // Evenly distribute the previousAssignmentList to the remaining other instances
+          int newInstanceIndex = (i + offset + instanceIndex) % otherInstances.size();
+          String newInstance = otherInstances.get(newInstanceIndex);
+          if(!resultAssignmentMap.containsKey(newInstance))
+          {
+            resultAssignmentMap.put(newInstance, new ArrayList<Integer>());
+          }
+          resultAssignmentMap.get(newInstance).add(previousAssignmentList.get(i));
+        }
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Given the current idealState, and the list of new Instances needed to be added, calculate the
+   * new Ideal state.
+   *
+   * 1. Calculate how many master partitions should be moved to the new cluster of instances
+   * 2. assign the number of master partitions px to be moved to each previous node
+   * 3. for each previous node,
+   *    3.1 randomly choose px nodes, move them to temp list
+   *    3.2 for each px nodes, remove them from the slave assignment map; record the map position of
+   *        the partition;
+   *    3.3 calculate # of new nodes that should be put in the slave assignment map
+   *    3.4 even-fy the slave assignment map;
+   *    3.5 randomly place # of new nodes that should be placed in
+   *
+   * 4. from all the temp master node list get from 3.1,
+   *    4.1 randomly assign them to nodes in the new cluster
+   *
+   * 5. for each node in the new cluster,
+   *    5.1 assemble the slave assignment map
+   *    5.2 even-fy the slave assignment map
+   *
+   * @param newInstances
+   *          list of new added storage node instances
+   * @param weight
+   *          weight for the new storage nodes (each node has the same weight)
+   * @param previousIdealState
+   *          The previous ideal state
+   * @return a map that contain the updated idealstate info
+   * */
+  public static Map<String, Object> calculateNextIdealState(List<String> newInstances, Map<String, Object> previousIdealState)
+  {
+    // Obtain the master / slave assignment info maps
+    Collections.sort(newInstances);
+    Map<String, List<Integer>> previousMasterAssignmentMap
+        = (Map<String, List<Integer>>) (previousIdealState.get("MasterAssignmentMap"));
+    Map<String, Map<String, List<Integer>>> nodeSlaveAssignmentMap
+        = (Map<String, Map<String, List<Integer>>>)(previousIdealState.get("SlaveAssignmentMap"));
+
+    List<String> oldInstances = new ArrayList<String>();
+    for(String oldInstance : previousMasterAssignmentMap.keySet())
+    {
+      oldInstances.add(oldInstance);
+    }
+
+    int previousInstanceNum = previousMasterAssignmentMap.size();
+    int partitions = (Integer)(previousIdealState.get("partitions"));
+
+    // TODO: take weight into account when calculate this
+
+    int totalMasterParitionsToMove
+        = partitions * (newInstances.size()) / (previousInstanceNum + newInstances.size());
+    int numMastersFromEachNode = totalMasterParitionsToMove / previousInstanceNum;
+    int remain = totalMasterParitionsToMove % previousInstanceNum;
+
+    // Note that when remain > 0, we should make [remain] moves with (numMastersFromEachNode + 1) partitions.
+    // And we should first choose those (numMastersFromEachNode + 1) moves from the instances that has more
+    // master partitions
+    List<Integer> masterPartitionListToMove = new ArrayList<Integer>();
+
+    // For corresponding moved slave partitions, keep track of their original location; the new node does not
+    // need to migrate all of them.
+    Map<String, List<Integer>> slavePartitionsToMoveMap = new TreeMap<String, List<Integer>>();
+
+    // Make sure that the instances that holds more master partitions are put in front
+    List<String> bigList = new ArrayList<String>(), smallList = new ArrayList<String>();
+    for(String oldInstance : previousMasterAssignmentMap.keySet())
+    {
+      List<Integer> masterAssignmentList = previousMasterAssignmentMap.get(oldInstance);
+      if(masterAssignmentList.size() > numMastersFromEachNode)
+      {
+        bigList.add(oldInstance);
+      }
+      else
+      {
+        smallList.add(oldInstance);
+      }
+    }
+    // "sort" the list, such that the nodes that has more master partitions moves more partitions to the
+    // new added batch of instances.
+    bigList.addAll(smallList);
+    int totalSlaveMoves = 0;
+    for(String oldInstance : bigList)
+    {
+      List<Integer> masterAssignmentList = previousMasterAssignmentMap.get(oldInstance);
+      int numToChoose = numMastersFromEachNode;
+      if(remain > 0)
+      {
+        numToChoose = numMastersFromEachNode + 1;
+        remain --;
+      }
+      // randomly remove numToChoose of master partitions to the new added nodes
+      ArrayList<Integer> masterPartionsMoved = new ArrayList<Integer>();
+      randomSelect(masterAssignmentList, masterPartionsMoved, numToChoose);
+
+      masterPartitionListToMove.addAll(masterPartionsMoved);
+      Map<String, List<Integer>> slaveAssignmentMap = nodeSlaveAssignmentMap.get(oldInstance);
+      removeFromSlaveAssignmentMap(slaveAssignmentMap, masterPartionsMoved, slavePartitionsToMoveMap);
+
+      // Make sure that for old instances, the slave placement map is evenly distributed
+      // Trace the "local slave moves", which should together contribute to most of the slave migrations
+      int movesWithinInstance = migrateSlaveAssignMapToNewInstances(slaveAssignmentMap, newInstances);
+      // System.out.println("local moves: "+ movesWithinInstance);
+      totalSlaveMoves += movesWithinInstance;
+    }
+    // System.out.println("local slave moves total: "+ totalSlaveMoves);
+    // calculate the master /slave assignment for the new added nodes
+
+    // We already have the list of master partitions that will migrate to new batch of instances,
+    // shuffle the partitions and assign them to new instances
+    Collections.shuffle(masterPartitionListToMove, new Random(12345));
+    for(int i = 0;i < newInstances.size(); i++)
+    {
+      String newInstance = newInstances.get(i);
+      List<Integer> masterPartitionList = new ArrayList<Integer>();
+      for(int j = 0;j < masterPartitionListToMove.size(); j++)
+      {
+        if(j % newInstances.size() == i)
+        {
+          masterPartitionList.add(masterPartitionListToMove.get(j));
+        }
+      }
+
+      Map<String, List<Integer>> slavePartitionMap = new TreeMap<String, List<Integer>>();
+      for(String oldInstance : oldInstances)
+      {
+        slavePartitionMap.put(oldInstance, new ArrayList<Integer>());
+      }
+      // Build the slave assignment map for the new instance, based on the saved information
+      // about those slave partition locations in slavePartitionsToMoveMap
+      for(Integer x : masterPartitionList)
+      {
+        for(String oldInstance : slavePartitionsToMoveMap.keySet())
+        {
+          List<Integer> slaves = slavePartitionsToMoveMap.get(oldInstance);
+          if(slaves.contains(x))
+          {
+            slavePartitionMap.get(oldInstance).add(x);
+          }
+        }
+      }
+      // add entry for other new instances into the slavePartitionMap
+      List<String> otherNewInstances = new ArrayList<String>();
+      for(String instance : newInstances)
+      {
+        if(!instance.equalsIgnoreCase(newInstance))
+        {
+          otherNewInstances.add(instance);
+        }
+      }
+      // Make sure that slave partitions are evenly distributed
+      migrateSlaveAssignMapToNewInstances(slavePartitionMap, otherNewInstances);
+
+      // Update the result in the result map. We can reuse the input previousIdealState map as
+      // the result.
+      previousMasterAssignmentMap.put(newInstance, masterPartitionList);
+      nodeSlaveAssignmentMap.put(newInstance, slavePartitionMap);
+
+    }
+    /*
+    // Print content of the master/ slave assignment maps
+    for(String instanceName : previousMasterAssignmentMap.keySet())
+    {
+      System.out.println(instanceName+":");
+      for(Integer x : previousMasterAssignmentMap.get(instanceName))
+      {
+        System.out.print(x+" ");
+      }
+      System.out.println("\nmaster partition moved:");
+
+      System.out.println();
+      System.out.println("Slave assignment:");
+
+      Map<String, List<Integer>> slaveAssignmentMap = nodeSlaveAssignmentMap.get(instanceName);
+      for(String slaveName : slaveAssignmentMap.keySet())
+      {
+        System.out.print("\t" + slaveName +":\n\t" );
+        for(Integer x : slaveAssignmentMap.get(slaveName))
+        {
+          System.out.print(x + " ");
+        }
+        System.out.println("\n");
+      }
+    }
+
+    System.out.println("Master partitions migrated to new instances");
+    for(Integer x : masterPartitionListToMove)
+    {
+        System.out.print(x+" ");
+    }
+    System.out.println();
+
+    System.out.println("Slave partitions migrated to new instances");
+    for(String oldInstance : slavePartitionsToMoveMap.keySet())
+    {
+        System.out.print(oldInstance + ": ");
+        for(Integer x : slavePartitionsToMoveMap.get(oldInstance))
+        {
+          System.out.print(x+" ");
+        }
+        System.out.println();
+    }
+    */
+    return previousIdealState;
+  }
+  
+  public ZNRecord calculateNextIdealState(List<String> newInstances, Map<String, Object> previousIdealState,
+       String resourceName, String masterStateValue, String slaveStateValue)
+  {
+    return convertToZNRecord(calculateNextIdealState(newInstances, previousIdealState), resourceName, masterStateValue, slaveStateValue);
+  }
+  /**
+   * Given the list of master partition that will be migrated away from the storage instance,
+   * Remove their entries from the local instance slave assignment map.
+   *
+   * @param slaveAssignmentMap  the local instance slave assignment map
+   * @param masterPartionsMoved the list of master partition ids that will be migrated away
+   * @param removedAssignmentMap keep track of the removed slave assignment info. The info can be
+   *        used by new added storage nodes.
+   * */
+  static void removeFromSlaveAssignmentMap( Map<String, List<Integer>>slaveAssignmentMap, List<Integer> masterPartionsMoved, Map<String, List<Integer>> removedAssignmentMap)
+  {
+    for(String instanceName : slaveAssignmentMap.keySet())
+    {
+      List<Integer> slaveAssignment = slaveAssignmentMap.get(instanceName);
+      for(Integer partitionId: masterPartionsMoved)
+      {
+        if(slaveAssignment.contains(partitionId))
+        {
+          slaveAssignment.remove(partitionId);
+          if(!removedAssignmentMap.containsKey(instanceName))
+          {
+            removedAssignmentMap.put(instanceName, new ArrayList<Integer>());
+          }
+          removedAssignmentMap.get(instanceName).add(partitionId);
+        }
+      }
+    }
+  }
+
+  /**
+   * Since some new storage instances are added, each existing storage instance should migrate some
+   * slave partitions to the new added instances.
+   *
+   * The algorithm keeps moving one partition to from the instance that hosts most slave partitions
+   * to the instance that hosts least number of partitions, until max-min <= 1.
+   *
+   * In this way we can guarantee that all instances hosts almost same number of slave partitions, also
+   * slave partitions are evenly distributed.
+   *
+   * @param slaveAssignmentMap  the local instance slave assignment map
+   * @param masterPartionsMoved the list of master partition ids that will be migrated away
+   * @param removedAssignmentMap keep track of the removed slave assignment info. The info can be
+   *        used by new added storage nodes.
+   * */
+  static int migrateSlaveAssignMapToNewInstances(Map<String, List<Integer>> nodeSlaveAssignmentMap, List<String> newInstances)
+  {
+    int moves = 0;
+    boolean done = false;
+    for(String newInstance : newInstances)
+    {
+      nodeSlaveAssignmentMap.put(newInstance, new ArrayList<Integer>());
+    }
+    while(!done)
+    {
+      List<Integer> maxAssignment = null, minAssignment = null;
+      int minCount = Integer.MAX_VALUE, maxCount = Integer.MIN_VALUE;
+      String minInstance = "";
+      for(String instanceName : nodeSlaveAssignmentMap.keySet())
+      {
+        List<Integer> slaveAssignment = nodeSlaveAssignmentMap.get(instanceName);
+        if(minCount > slaveAssignment.size())
+        {
+          minCount = slaveAssignment.size();
+          minAssignment = slaveAssignment;
+          minInstance = instanceName;
+        }
+        if(maxCount < slaveAssignment.size())
+        {
+          maxCount = slaveAssignment.size();
+          maxAssignment = slaveAssignment;
+        }
+      }
+      if(maxCount - minCount <= 1 )
+      {
+        done = true;
+      }
+      else
+      {
+        int indexToMove = -1;
+        // find a partition that is not contained in the minAssignment list
+        for(int i = 0; i < maxAssignment.size(); i++ )
+        {
+          if(!minAssignment.contains(maxAssignment.get(i)))
+          {
+            indexToMove = i;
+            break;
+          }
+        }
+
+        minAssignment.add(maxAssignment.get(indexToMove));
+        maxAssignment.remove(indexToMove);
+
+        if(newInstances.contains(minInstance))
+        {
+          moves++;
+        }
+      }
+    }
+    return moves;
+  }
+
+  /**
+   * Randomly select a number of elements from original list and put them in the selectedList
+   * The algorithm is used to select master partitions to be migrated when new instances are added.
+   *
+   *
+   * @param originalList  the original list
+   * @param selectedList  the list that contain selected elements
+   * @param num number of elements to be selected
+   * */
+  static void randomSelect(List<Integer> originalList, List<Integer> selectedList, int num)
+  {
+    assert(originalList.size() >= num);
+    int[] indexArray = new int[originalList.size()];
+    for(int i = 0;i < indexArray.length; i++)
+    {
+      indexArray[i] = i;
+    }
+    int numRemains = originalList.size();
+    Random r = new Random(numRemains);
+    for(int j = 0;j < num; j++)
+    {
+      int randIndex = r.nextInt(numRemains);
+      selectedList.add(originalList.get(randIndex));
+      originalList.remove(randIndex);
+      numRemains --;
+    }
+  }
+
+  public static void main(String args[])
+  {
+    List<String> instanceNames = new ArrayList<String>();
+    for(int i = 0;i < 10; i++)
+    {
+      instanceNames.add("localhost:123" + i);
+    }
+    int partitions = 48*3, replicas = 3;
+    Map<String, Object> resultOriginal = DefaultIdealStateCalculator.calculateInitialIdealState(instanceNames, partitions, replicas);
+
+  }
+}


Mime
View raw message