helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject [3/3] git commit: [HELIX-174] Clean up ideal state calculators, move them to the controller rebalancer package, rb=13696
Date Mon, 26 Aug 2013 21:56:34 GMT
[HELIX-174] Clean up ideal state calculators, move them to the controller rebalancer package, rb=13696


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/21c4fcb5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/21c4fcb5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/21c4fcb5

Branch: refs/heads/master
Commit: 21c4fcb5157366acbf626c7c46e996e01c70ff89
Parents: f73b3a7
Author: zzhang <zzhang5@uci.edu>
Authored: Mon Aug 26 14:56:24 2013 -0700
Committer: zzhang <zzhang5@uci.edu>
Committed: Mon Aug 26 14:56:24 2013 -0700

----------------------------------------------------------------------
 .../ConsistentHashingMasterSlaveStrategy.java   | 535 ++++++++++++++
 .../strategy/DefaultTwoStateStrategy.java       | 727 +++++++++++++++++++
 .../strategy/EspressoRelayStrategy.java         | 108 +++
 .../strategy/RUSHMasterSlaveStrategy.java       | 284 ++++++++
 .../helix/controller/strategy/RUSHrHash.java    | 313 ++++++++
 .../strategy/ShufflingTwoStateStrategy.java     | 120 +++
 .../apache/helix/manager/zk/ZKHelixAdmin.java   |  10 +-
 .../tools/DefaultIdealStateCalculator.java      | 722 ------------------
 .../IdealCalculatorByConsistentHashing.java     | 535 --------------
 .../helix/tools/IdealStateCalculatorByRush.java | 284 --------
 .../tools/IdealStateCalculatorByShuffling.java  | 108 ---
 .../IdealStateCalculatorForEspressoRelay.java   | 108 ---
 .../java/org/apache/helix/tools/RUSHrHash.java  | 313 --------
 .../org/apache/helix/tools/YAISCalculator.java  | 174 -----
 .../org/apache/helix/util/RebalanceUtil.java    |   4 +-
 .../TestEspressoStorageClusterIdealState.java   | 301 --------
 .../helix/TestRelayIdealStateCalculator.java    |  83 ---
 .../apache/helix/TestShuffledIdealState.java    | 253 -------
 .../stages/TestCompatibilityCheckStage.java     |  10 +-
 .../stages/TestResourceComputationStage.java    |  16 +-
 .../strategy/TestEspressoRelayStrategy.java     |  81 +++
 .../TestEspressoStorageClusterIdealState.java   | 298 ++++++++
 .../strategy/TestShufflingTwoStateStrategy.java | 242 ++++++
 .../integration/TestAutoIsWithEmptyMap.java     |   4 +-
 .../apache/helix/integration/TestDriver.java    |  14 +-
 .../helix/integration/TestExpandCluster.java    |   2 +-
 .../helix/integration/TestRenamePartition.java  |   8 +-
 .../josql/TestClusterJosqlQueryProcessor.java   |   8 +-
 .../messaging/TestDefaultMessagingService.java  |   4 +-
 .../mbeans/TestClusterStatusMonitor.java        |  13 +-
 .../monitoring/mbeans/TestResourceMonitor.java  |   6 +-
 31 files changed, 2750 insertions(+), 2938 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/21c4fcb5/helix-core/src/main/java/org/apache/helix/controller/strategy/ConsistentHashingMasterSlaveStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/ConsistentHashingMasterSlaveStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/ConsistentHashingMasterSlaveStrategy.java
new file mode 100644
index 0000000..017d3d8
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/ConsistentHashingMasterSlaveStrategy.java
@@ -0,0 +1,535 @@
+package org.apache.helix.controller.strategy;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.IdealState.IdealStateProperty;
+
+public class ConsistentHashingMasterSlaveStrategy {
+  /**
+   * Interface to calculate the hash function value of a string
+   */
+  public interface HashFunction {
+    public int getHashValue(String key);
+  }
+
+  /**
+   * The default string hash function. Same as the default function used by
+   * Voldmort
+   */
+  public static class FnvHash implements HashFunction {
+    private static final long FNV_BASIS = 0x811c9dc5;
+    private static final long FNV_PRIME = (1 << 24) + 0x193;
+    public static final long FNV_BASIS_64 = 0xCBF29CE484222325L;
+    public static final long FNV_PRIME_64 = 1099511628211L;
+
+    public int hash(byte[] key) {
+      long hash = FNV_BASIS;
+      for (int i = 0; i < key.length; i++) {
+        hash ^= 0xFF & key[i];
+        hash *= FNV_PRIME;
+      }
+      return (int) hash;
+    }
+
+    public long hash64(long val) {
+      long hashval = FNV_BASIS_64;
+      for (int i = 0; i < 8; i++) {
+        long octet = val & 0x00ff;
+        val = val >> 8;
+        hashval = hashval ^ octet;
+        hashval = hashval * FNV_PRIME_64;
+      }
+      return Math.abs(hashval);
+    }
+
+    @Override
+    public int getHashValue(String key) {
+      return hash(key.getBytes());
+    }
+
+  }
+
+  /**
+   * Calculate the ideal state for list of instances clusters using consistent
+   * hashing.
+   * @param instanceNames
+   *          List of instance names.
+   * @param partitions
+   *          the partition number of the database
+   * @param replicas
+   *          the replication degree
+   * @param resourceName
+   *          the name of the database
+   * @return The ZNRecord that contains the ideal state
+   */
+  public static ZNRecord calculateIdealState(List<String> instanceNames, int partitions,
+      int replicas, String resourceName, HashFunction hashFunc) {
+    return calculateIdealState(instanceNames, partitions, replicas, resourceName, hashFunc, 65536);
+  }
+
+  /**
+   * Calculate the ideal state for list of instances clusters using consistent
+   * hashing.
+   * @param instanceNames
+   *          List of instance names.
+   * @param partitions
+   *          the partition number of the database
+   * @param replicas
+   *          the replication degree
+   * @param resourceName
+   *          the name of the database
+   * @param hashRingSize
+   *          the size of the hash ring used by consistent hashing
+   * @return The ZNRecord that contains the ideal state
+   */
+  public static ZNRecord calculateIdealState(List<String> instanceNames, int partitions,
+      int replicas, String resourceName, HashFunction hashFunc, int hashRingSize) {
+    ZNRecord result = new ZNRecord(resourceName);
+
+    int[] hashRing = generateEvenHashRing(instanceNames, hashRingSize);
+    result.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(), String.valueOf(partitions));
+    Random rand = new Random(0xc0ffee);
+    for (int i = 0; i < partitions; i++) {
+      String partitionName = resourceName + ".partition-" + i;
+      int hashPos = rand.nextInt() % hashRingSize;
+      // (int)(hashFunc.getHashValue(partitionName) % hashRingSize);
+      hashPos = hashPos < 0 ? (hashPos + hashRingSize) : hashPos;
+      // System.out.print(hashPos+ " ");
+      // if(i % 120 == 0) System.out.println();
+      Map<String, String> partitionAssignment = new TreeMap<String, String>();
+      // the first in the list is the node that contains the master
+      int masterPos = hashRing[hashPos];
+      partitionAssignment.put(instanceNames.get(masterPos), "MASTER");
+
+      // partitionAssignment.put("hash", "" + hashPos + " " + masterPos);
+
+      // Put slaves in next has ring positions. We need to make sure that no
+      // more than 2 slaves
+      // are mapped to one node.
+      for (int j = 1; j <= replicas; j++) {
+        String next = instanceNames.get(hashRing[(hashPos + j) % hashRingSize]);
+        while (partitionAssignment.containsKey(next)) {
+          hashPos++;
+          next = instanceNames.get(hashRing[(hashPos + j) % hashRingSize]);
+        }
+        partitionAssignment.put(next, "SLAVE");
+      }
+      result.setMapField(partitionName, partitionAssignment);
+    }
+    return result;
+  }
+
+  /**
+   * Generate the has ring for consistent hashing.
+   * @param instanceNames
+   *          List of instance names.
+   * @param hashRingSize
+   *          the size of the hash ring used by consistent hashing
+   * @return The int array as the hashing. it contains random values ranges from
+   *         0..size of instanceNames-1
+   */
+  public static int[] generateHashRing(List<String> instanceNames, int hashRingSize) {
+    int[] result = new int[hashRingSize];
+    for (int i = 0; i < result.length; i++) {
+      result[i] = 0;
+    }
+    int instances = instanceNames.size();
+    // The following code generates the random distribution
+    for (int i = 1; i < instances; i++) {
+      putNodeOnHashring(result, i, hashRingSize / (i + 1), i);
+    }
+    return result;
+  }
+
+  public static int[] generateEvenHashRing(List<String> instanceNames, int hashRingSize) {
+    int[] result = new int[hashRingSize];
+    for (int i = 0; i < result.length; i++) {
+      result[i] = 0;
+    }
+    int instances = instanceNames.size();
+    // The following code generates the random distribution
+    for (int i = 1; i < instances; i++) {
+      putNodeEvenOnHashRing(result, i, i + 1);
+    }
+    return result;
+  }
+
+  private static void putNodeEvenOnHashRing(int[] hashRing, int nodeVal, int totalValues) {
+    int newValNum = hashRing.length / totalValues;
+    assert (newValNum > 0);
+    Map<Integer, List<Integer>> valueIndex = buildValueIndex(hashRing);
+    int nSources = valueIndex.size();
+    int remainder = newValNum % nSources;
+
+    List<List<Integer>> positionLists = new ArrayList<List<Integer>>();
+    for (List<Integer> list : valueIndex.values()) {
+      positionLists.add(list);
+    }
+    class ListComparator implements Comparator<List<Integer>> {
+      @Override
+      public int compare(List<Integer> o1, List<Integer> o2) {
+        return (o1.size() > o2.size() ? -1 : (o1.size() == o2.size() ? 0 : 1));
+      }
+    }
+    Collections.sort(positionLists, new ListComparator());
+
+    for (List<Integer> oldValPositions : positionLists) {
+      // List<Integer> oldValPositions = valueIndex.get(oldVal);
+      int nValsToReplace = newValNum / nSources;
+      assert (nValsToReplace > 0);
+      if (remainder > 0) {
+        nValsToReplace++;
+        remainder--;
+      }
+      // System.out.print(oldValPositions.size()+" "+nValsToReplace+"  ");
+      putNodeValueOnHashRing(hashRing, nodeVal, nValsToReplace, oldValPositions);
+      // randomly take nValsToReplace positions in oldValPositions and make them
+    }
+    // System.out.println();
+  }
+
+  private static void putNodeValueOnHashRing(int[] hashRing, int nodeVal, int numberOfValues,
+      List<Integer> positions) {
+    Random rand = new Random(nodeVal);
+    // initialize the index array
+    int[] index = new int[positions.size()];
+    for (int i = 0; i < index.length; i++) {
+      index[i] = i;
+    }
+
+    int nodesLeft = index.length;
+
+    for (int i = 0; i < numberOfValues; i++) {
+      // Calculate a random index
+      int randIndex = rand.nextInt() % nodesLeft;
+      if (randIndex < 0) {
+        randIndex += nodesLeft;
+      }
+      hashRing[positions.get(index[randIndex])] = nodeVal;
+
+      // swap the random index and the last available index, and decrease the
+      // nodes left
+      int temp = index[randIndex];
+      index[randIndex] = index[nodesLeft - 1];
+      index[nodesLeft - 1] = temp;
+      nodesLeft--;
+    }
+  }
+
+  private static Map<Integer, List<Integer>> buildValueIndex(int[] hashRing) {
+    Map<Integer, List<Integer>> result = new TreeMap<Integer, List<Integer>>();
+    for (int i = 0; i < hashRing.length; i++) {
+      if (!result.containsKey(hashRing[i])) {
+        List<Integer> list = new ArrayList<Integer>();
+        result.put(hashRing[i], list);
+      }
+      result.get(hashRing[i]).add(i);
+    }
+    return result;
+  }
+
+  /**
+   * Uniformly put node values on the hash ring. Derived from the shuffling
+   * algorithm
+   * @param result
+   *          the hash ring array.
+   * @param nodeValue
+   *          the int value to be added to the hash ring this time
+   * @param numberOfNodes
+   *          number of node values to put on the hash ring array
+   * @param randomSeed
+   *          the random seed
+   */
+  public static void putNodeOnHashring(int[] result, int nodeValue, int numberOfNodes,
+      int randomSeed) {
+    Random rand = new Random(randomSeed);
+    // initialize the index array
+    int[] index = new int[result.length];
+    for (int i = 0; i < index.length; i++) {
+      index[i] = i;
+    }
+
+    int nodesLeft = index.length;
+
+    for (int i = 0; i < numberOfNodes; i++) {
+      // Calculate a random index
+      int randIndex = rand.nextInt() % nodesLeft;
+      if (randIndex < 0) {
+        randIndex += nodesLeft;
+      }
+      if (result[index[randIndex]] == nodeValue) {
+        assert (false);
+      }
+      result[index[randIndex]] = nodeValue;
+
+      // swap the random index and the last available index, and decrease the
+      // nodes left
+      int temp = index[randIndex];
+      index[randIndex] = index[nodesLeft - 1];
+      index[nodesLeft - 1] = temp;
+
+      nodesLeft--;
+    }
+  }
+
+  /**
+   * Helper function to see how many partitions are mapped to different
+   * instances in two ideal states
+   */
+  public static void printDiff(ZNRecord record1, ZNRecord record2) {
+    int diffCount = 0;
+    for (String key : record1.getMapFields().keySet()) {
+      Map<String, String> map1 = record1.getMapField(key);
+      Map<String, String> map2 = record2.getMapField(key);
+
+      for (String k : map1.keySet()) {
+        if (!map2.containsKey(k)) {
+          diffCount++;
+        } else if (!map1.get(k).equalsIgnoreCase(map2.get(k))) {
+          diffCount++;
+        }
+      }
+    }
+    System.out.println("diff count = " + diffCount);
+  }
+
+  /**
+   * Helper function to compare the difference between two hashing buffers
+   */
+  public static void compareHashrings(int[] ring1, int[] ring2) {
+    int diff = 0;
+    for (int i = 0; i < ring1.length; i++) {
+      if (ring1[i] != ring2[i]) {
+        diff++;
+      }
+    }
+    System.out.println("ring diff: " + diff);
+  }
+
+  public static void printNodeOfflineOverhead(ZNRecord record) {
+    // build node -> partition map
+    Map<String, Set<String>> nodeNextMap = new TreeMap<String, Set<String>>();
+    for (String partitionName : record.getMapFields().keySet()) {
+      Map<String, String> map1 = record.getMapField(partitionName);
+      String master = "", slave = "";
+      for (String nodeName : map1.keySet()) {
+        if (!nodeNextMap.containsKey(nodeName)) {
+          nodeNextMap.put(nodeName, new TreeSet<String>());
+        }
+
+        // String master = "", slave = "";
+        if (map1.get(nodeName).equalsIgnoreCase("MASTER")) {
+          master = nodeName;
+        } else {
+          if (slave.equalsIgnoreCase("")) {
+            slave = nodeName;
+          }
+        }
+
+      }
+      nodeNextMap.get(master).add(slave);
+    }
+    System.out.println("next count: ");
+    for (String key : nodeNextMap.keySet()) {
+      System.out.println(nodeNextMap.get(key).size() + " ");
+    }
+    System.out.println();
+  }
+
+  /**
+   * Helper function to calculate and print the standard deviation of the
+   * partition assignment ideal state, also the min/max of master partitions
+   * that is hosted on each node
+   */
+  public static void printIdealStateStats(ZNRecord record, String value) {
+    Map<String, Integer> countsMap = new TreeMap<String, Integer>();
+    for (String key : record.getMapFields().keySet()) {
+      Map<String, String> map1 = record.getMapField(key);
+      for (String k : map1.keySet()) {
+        if (!countsMap.containsKey(k)) {
+          countsMap.put(k, new Integer(0));//
+        }
+        if (value.equals("") || map1.get(k).equalsIgnoreCase(value)) {
+          countsMap.put(k, countsMap.get(k).intValue() + 1);
+        }
+      }
+    }
+    double sum = 0;
+    int maxCount = 0;
+    int minCount = Integer.MAX_VALUE;
+
+    System.out.println("Partition distributions: ");
+    for (String k : countsMap.keySet()) {
+      int count = countsMap.get(k);
+      sum += count;
+      if (maxCount < count) {
+        maxCount = count;
+      }
+      if (minCount > count) {
+        minCount = count;
+      }
+      System.out.print(count + " ");
+    }
+    System.out.println();
+    double mean = sum / (countsMap.size());
+    // calculate the deviation of the node distribution
+    double deviation = 0;
+    for (String k : countsMap.keySet()) {
+      double count = countsMap.get(k);
+      deviation += (count - mean) * (count - mean);
+    }
+    System.out.println("Mean: " + mean + " normal deviation:"
+        + Math.sqrt(deviation / countsMap.size()));
+
+    System.out.println("Max count: " + maxCount + " min count:" + minCount);
+    /*
+     * int steps = 10; int stepLen = (maxCount - minCount)/steps; List<Integer>
+     * histogram = new ArrayList<Integer>((maxCount - minCount)/stepLen + 1);
+     * for(int i = 0; i< (maxCount - minCount)/stepLen + 1; i++) {
+     * histogram.add(0); } for(String k :countsMap.keySet()) { int count =
+     * countsMap.get(k); int stepNo = (count - minCount)/stepLen;
+     * histogram.set(stepNo, histogram.get(stepNo) +1); }
+     * System.out.println("histogram:"); for(Integer x : histogram) {
+     * System.out.print(x+" "); }
+     */
+  }
+
+  public static void printHashRingStat(int[] hashRing) {
+    double sum = 0, mean = 0, deviation = 0;
+    Map<Integer, Integer> countsMap = new TreeMap<Integer, Integer>();
+    for (int i = 0; i < hashRing.length; i++) {
+      if (!countsMap.containsKey(hashRing[i])) {
+        countsMap.put(hashRing[i], new Integer(0));//
+      }
+      countsMap.put(hashRing[i], countsMap.get(hashRing[i]).intValue() + 1);
+    }
+    int maxCount = Integer.MIN_VALUE;
+    int minCount = Integer.MAX_VALUE;
+    for (Integer k : countsMap.keySet()) {
+      int count = countsMap.get(k);
+      sum += count;
+      if (maxCount < count) {
+        maxCount = count;
+      }
+      if (minCount > count) {
+        minCount = count;
+      }
+    }
+    mean = sum / countsMap.size();
+    for (Integer k : countsMap.keySet()) {
+      int count = countsMap.get(k);
+      deviation += (count - mean) * (count - mean);
+    }
+    System.out.println("hashring Mean: " + mean + " normal deviation:"
+        + Math.sqrt(deviation / countsMap.size()));
+
+  }
+
+  static int[] getFnvHashArray(List<String> strings) {
+    int[] result = new int[strings.size()];
+    int i = 0;
+    ConsistentHashingMasterSlaveStrategy.FnvHash hashfunc =
+        new ConsistentHashingMasterSlaveStrategy.FnvHash();
+    for (String s : strings) {
+      int val = hashfunc.getHashValue(s) % 65536;
+      if (val < 0)
+        val += 65536;
+      result[i++] = val;
+    }
+    return result;
+  }
+
+  static void printArrayStat(int[] vals) {
+    double sum = 0, mean = 0, deviation = 0;
+
+    for (int i = 0; i < vals.length; i++) {
+      sum += vals[i];
+    }
+    mean = sum / vals.length;
+    for (int i = 0; i < vals.length; i++) {
+      deviation += (mean - vals[i]) * (mean - vals[i]);
+    }
+    System.out.println("normalized deviation: " + Math.sqrt(deviation / vals.length) / mean);
+  }
+
+  public static void main(String args[]) throws Exception {
+    // Test the hash ring generation
+    List<String> instanceNames = new ArrayList<String>();
+    for (int i = 0; i < 10; i++) {
+      instanceNames.add("localhost_123" + i);
+    }
+
+    // int[] ring1 =
+    // IdealCalculatorByConsistentHashing.generateEvenHashRing(instanceNames,
+    // 65535);
+    // printHashRingStat(ring1);
+    // int[] ring1 = getFnvHashArray(instanceNames);
+    // printArrayStat(ring1);
+
+    int partitions = 200, replicas = 2;
+    String dbName = "espressoDB1";
+
+    ZNRecord result =
+        ConsistentHashingMasterSlaveStrategy.calculateIdealState(instanceNames, partitions,
+            replicas, dbName, new ConsistentHashingMasterSlaveStrategy.FnvHash());
+    System.out.println("\nMaster :");
+    printIdealStateStats(result, "MASTER");
+
+    System.out.println("\nSlave :");
+    printIdealStateStats(result, "SLAVE");
+
+    System.out.println("\nTotal :");
+    printIdealStateStats(result, "");
+
+    printNodeOfflineOverhead(result);
+    /*
+     * ZNRecordSerializer serializer = new ZNRecordSerializer(); byte[] bytes;
+     * bytes = serializer.serialize(result); // System.out.println(new
+     * String(bytes));
+     * List<String> instanceNames2 = new ArrayList<String>(); for(int i = 0;i <
+     * 40; i++) { instanceNames2.add("localhost_123"+i); }
+     * ZNRecord result2 =
+     * IdealCalculatorByConsistentHashing.calculateIdealState( instanceNames2,
+     * partitions, replicas, dbName, new
+     * IdealCalculatorByConsistentHashing.FnvHash());
+     * printDiff(result, result2);
+     * //IdealCalculatorByConsistentHashing.printIdealStateStats(result2);
+     * int[] ring2 =
+     * IdealCalculatorByConsistentHashing.generateHashRing(instanceNames2,
+     * 30000);
+     * IdealCalculatorByConsistentHashing.compareHashrings(ring1, ring2);
+     * //printNodeStats(result); //printNodeStats(result2); bytes =
+     * serializer.serialize(result2); printHashRingStat(ring2); //
+     * System.out.println(new String(bytes));
+     */
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/21c4fcb5/helix-core/src/main/java/org/apache/helix/controller/strategy/DefaultTwoStateStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/DefaultTwoStateStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/DefaultTwoStateStrategy.java
new file mode 100644
index 0000000..c965748
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/DefaultTwoStateStrategy.java
@@ -0,0 +1,727 @@
+package org.apache.helix.controller.strategy;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.IdealState.IdealStateProperty;
+
+/**
+ * DefaultIdealStateCalculator tries to optimally allocate two state partitions among
+ * storage nodes.
+ * Given a batch of storage nodes, the partition and replication factor, the algorithm first given a
+ * initial state
+ * When new batches of storage nodes are added, the algorithm will calculate the new ideal state
+ * such that the total
+ * partition movements are minimized.
+ */
+public class DefaultTwoStateStrategy {
+  static final String _PrimaryAssignmentMap = "PrimaryAssignmentMap";
+  static final String _SecondaryAssignmentMap = "SecondaryAssignmentMap";
+  static final String _partitions = "partitions";
+  static final String _replicas = "replicas";
+
+  /**
+   * Calculate the initial ideal state given a batch of storage instances, the replication factor
+   * and
+   * number of partitions
+   * 1. Calculate the primary state assignment by random shuffling
+   * 2. for each storage instance, calculate the 1st secondary state assignment map, by another
+   * random shuffling
+   * 3. for each storage instance, calculate the i-th secondary state assignment map
+   * 4. Combine the i-th secondary state assignment maps together
+   * @param instanceNames
+   *          list of storage node instances
+   * @param partitions
+   *          number of partitions
+   * @param replicas
+   *          The number of replicas (secondary partitions) per primary partition
+   * @param primaryStateValue
+   *          primary state value: e.g. "MASTER" or "LEADER"
+   * @param secondaryStateValue
+   *          secondary state value: e.g. "SLAVE" or "STANDBY"
+   * @param resourceName
+   * @return a ZNRecord that contain the idealstate info
+   */
+  public static ZNRecord calculateIdealState(List<String> instanceNames, int partitions,
+      int replicas, String resourceName, String primaryStateValue, String secondaryStateValue) {
+    Collections.sort(instanceNames);
+    if (instanceNames.size() < replicas + 1) {
+      throw new HelixException("Number of instances must not be less than replicas + 1. "
+          + "instanceNr:" + instanceNames.size() + ", replicas:" + replicas);
+    } else if (partitions < instanceNames.size()) {
+      ZNRecord idealState =
+          ShufflingTwoStateStrategy.calculateIdealState(instanceNames, partitions, replicas,
+              resourceName, 12345, primaryStateValue, secondaryStateValue);
+      int i = 0;
+      for (String partitionId : idealState.getMapFields().keySet()) {
+        Map<String, String> partitionAssignmentMap = idealState.getMapField(partitionId);
+        List<String> partitionAssignmentPriorityList = new ArrayList<String>();
+        String primaryInstance = "";
+        for (String instanceName : partitionAssignmentMap.keySet()) {
+          if (partitionAssignmentMap.get(instanceName).equalsIgnoreCase(primaryStateValue)
+              && primaryInstance.equals("")) {
+            primaryInstance = instanceName;
+          } else {
+            partitionAssignmentPriorityList.add(instanceName);
+          }
+        }
+        Collections.shuffle(partitionAssignmentPriorityList, new Random(i++));
+        partitionAssignmentPriorityList.add(0, primaryInstance);
+        idealState.setListField(partitionId, partitionAssignmentPriorityList);
+      }
+      return idealState;
+    }
+
+    Map<String, Object> result = calculateInitialIdealState(instanceNames, partitions, replicas);
+
+    return convertToZNRecord(result, resourceName, primaryStateValue, secondaryStateValue);
+  }
+
+  public static ZNRecord calculateIdealStateBatch(List<List<String>> instanceBatches,
+      int partitions, int replicas, String resourceName, String primaryStateValue,
+      String secondaryStateValue) {
+    Map<String, Object> result =
+        calculateInitialIdealState(instanceBatches.get(0), partitions, replicas);
+
+    for (int i = 1; i < instanceBatches.size(); i++) {
+      result = calculateNextIdealState(instanceBatches.get(i), result);
+    }
+
+    return convertToZNRecord(result, resourceName, primaryStateValue, secondaryStateValue);
+  }
+
+  /**
+   * Convert the internal result (stored as a Map<String, Object>) into ZNRecord.
+   */
+  public static ZNRecord convertToZNRecord(Map<String, Object> result, String resourceName,
+      String primaryStateValue, String secondaryStateValue) {
+    Map<String, List<Integer>> nodePrimaryAssignmentMap =
+        (Map<String, List<Integer>>) (result.get(_PrimaryAssignmentMap));
+    Map<String, Map<String, List<Integer>>> nodeSecondaryAssignmentMap =
+        (Map<String, Map<String, List<Integer>>>) (result.get(_SecondaryAssignmentMap));
+
+    int partitions = (Integer) (result.get("partitions"));
+
+    ZNRecord idealState = new ZNRecord(resourceName);
+    idealState.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(),
+        String.valueOf(partitions));
+
+    for (String instanceName : nodePrimaryAssignmentMap.keySet()) {
+      for (Integer partitionId : nodePrimaryAssignmentMap.get(instanceName)) {
+        String partitionName = resourceName + "_" + partitionId;
+        if (!idealState.getMapFields().containsKey(partitionName)) {
+          idealState.setMapField(partitionName, new TreeMap<String, String>());
+        }
+        idealState.getMapField(partitionName).put(instanceName, primaryStateValue);
+      }
+    }
+
+    for (String instanceName : nodeSecondaryAssignmentMap.keySet()) {
+      Map<String, List<Integer>> secondaryAssignmentMap =
+          nodeSecondaryAssignmentMap.get(instanceName);
+
+      for (String secondaryNode : secondaryAssignmentMap.keySet()) {
+        List<Integer> secondaryAssignment = secondaryAssignmentMap.get(secondaryNode);
+        for (Integer partitionId : secondaryAssignment) {
+          String partitionName = resourceName + "_" + partitionId;
+          idealState.getMapField(partitionName).put(secondaryNode, secondaryStateValue);
+        }
+      }
+    }
+    // generate the priority list of instances per partition. the primary should be at front
+    // and the secondaries follow.
+
+    for (String partitionId : idealState.getMapFields().keySet()) {
+      Map<String, String> partitionAssignmentMap = idealState.getMapField(partitionId);
+      List<String> partitionAssignmentPriorityList = new ArrayList<String>();
+      String primaryInstance = "";
+      for (String instanceName : partitionAssignmentMap.keySet()) {
+        if (partitionAssignmentMap.get(instanceName).equalsIgnoreCase(primaryStateValue)
+            && primaryInstance.equals("")) {
+          primaryInstance = instanceName;
+        } else {
+          partitionAssignmentPriorityList.add(instanceName);
+        }
+      }
+      Collections.shuffle(partitionAssignmentPriorityList);
+      partitionAssignmentPriorityList.add(0, primaryInstance);
+      idealState.setListField(partitionId, partitionAssignmentPriorityList);
+    }
+    assert (result.containsKey("replicas"));
+    idealState.setSimpleField(IdealStateProperty.REPLICAS.toString(), result.get("replicas")
+        .toString());
+    return idealState;
+  }
+
+  /**
+   * Calculate the initial ideal state given a batch of storage instances, the replication factor
+   * and
+   * number of partitions
+   * 1. Calculate the primary assignment by random shuffling
+   * 2. for each storage instance, calculate the 1st secondary state assignment map, by another
+   * random shuffling
+   * 3. for each storage instance, calculate the i-th secondary state assignment map
+   * 4. Combine the i-th secondary state assignment maps together
+   * @param instanceNames
+   *          list of storage node instances
+   * @param weight
+   *          weight for the initial storage node (each node has the same weight)
+   * @param partitions
+   *          number of partitions
+   * @param replicas
+   *          The number of replicas (secondary partitions) per primary partition
+   * @return a map that contain the idealstate info
+   */
+  public static Map<String, Object> calculateInitialIdealState(List<String> instanceNames,
+      int partitions, int replicas) {
+    Random r = new Random(54321);
+    assert (replicas <= instanceNames.size() - 1);
+    ArrayList<Integer> primaryPartitionAssignment = new ArrayList<Integer>();
+    for (int i = 0; i < partitions; i++) {
+      primaryPartitionAssignment.add(i);
+    }
+    // shuffle the partition id array
+    Collections.shuffle(primaryPartitionAssignment, new Random(r.nextInt()));
+
+    // 1. Generate the random primary partition assignment
+    // instanceName -> List of primary partitions on that instance
+    Map<String, List<Integer>> nodePrimaryAssignmentMap = new TreeMap<String, List<Integer>>();
+    for (int i = 0; i < primaryPartitionAssignment.size(); i++) {
+      String instanceName = instanceNames.get(i % instanceNames.size());
+      if (!nodePrimaryAssignmentMap.containsKey(instanceName)) {
+        nodePrimaryAssignmentMap.put(instanceName, new ArrayList<Integer>());
+      }
+      nodePrimaryAssignmentMap.get(instanceName).add(primaryPartitionAssignment.get(i));
+    }
+
+    // instanceName -> secondary assignment for its primary partitions
+    // secondary assignment: instanceName -> list of secondary partitions on it
+    List<Map<String, Map<String, List<Integer>>>> nodeSecondaryAssignmentMapsList =
+        new ArrayList<Map<String, Map<String, List<Integer>>>>(replicas);
+
+    Map<String, Map<String, List<Integer>>> firstNodeSecondaryAssignmentMap =
+        new TreeMap<String, Map<String, List<Integer>>>();
+    Map<String, Map<String, List<Integer>>> combinedNodeSecondaryAssignmentMap =
+        new TreeMap<String, Map<String, List<Integer>>>();
+
+    if (replicas > 0) {
+      // 2. For each node, calculate the evenly distributed secondary state as the first secondary
+      // state assignment
+      // We will figure out the 2nd ...replicas-th secondary state assignment based on the first
+      // level secondary state assignment
+      for (int i = 0; i < instanceNames.size(); i++) {
+        List<String> secondaryInstances = new ArrayList<String>();
+        ArrayList<Integer> secondaryAssignment = new ArrayList<Integer>();
+        TreeMap<String, List<Integer>> secondaryAssignmentMap =
+            new TreeMap<String, List<Integer>>();
+
+        for (int j = 0; j < instanceNames.size(); j++) {
+          if (j != i) {
+            secondaryInstances.add(instanceNames.get(j));
+            secondaryAssignmentMap.put(instanceNames.get(j), new ArrayList<Integer>());
+          }
+        }
+        // Get the number of primary partitions on instanceName
+        List<Integer> primaryAssignment = nodePrimaryAssignmentMap.get(instanceNames.get(i));
+        // do a random shuffling as in step 1, so that the first-level secondary states are
+        // distributed among rest instances
+
+        for (int j = 0; j < primaryAssignment.size(); j++) {
+          secondaryAssignment.add(j);
+        }
+        Collections.shuffle(secondaryAssignment, new Random(r.nextInt()));
+
+        Collections.shuffle(secondaryInstances, new Random(instanceNames.get(i).hashCode()));
+        // Get the secondary assignment map of node instanceName
+        for (int j = 0; j < primaryAssignment.size(); j++) {
+          String secondaryInstanceName =
+              secondaryInstances.get(secondaryAssignment.get(j) % secondaryInstances.size());
+          if (!secondaryAssignmentMap.containsKey(secondaryInstanceName)) {
+            secondaryAssignmentMap.put(secondaryInstanceName, new ArrayList<Integer>());
+          }
+          secondaryAssignmentMap.get(secondaryInstanceName).add(primaryAssignment.get(j));
+        }
+        firstNodeSecondaryAssignmentMap.put(instanceNames.get(i), secondaryAssignmentMap);
+      }
+      nodeSecondaryAssignmentMapsList.add(firstNodeSecondaryAssignmentMap);
+      // From the first secondary assignment map, calculate the rest secondary assignment maps
+      for (int replicaOrder = 1; replicaOrder < replicas; replicaOrder++) {
+        // calculate the next secondary partition assignment map
+        Map<String, Map<String, List<Integer>>> nextNodeSecondaryAssignmentMap =
+            calculateNextSecondaryAssignemntMap(firstNodeSecondaryAssignmentMap, replicaOrder);
+        nodeSecondaryAssignmentMapsList.add(nextNodeSecondaryAssignmentMap);
+      }
+
+      // Combine the calculated 1...replicas-th secondary assignment map together
+      for (String instanceName : nodePrimaryAssignmentMap.keySet()) {
+        Map<String, List<Integer>> combinedSecondaryAssignmentMap =
+            new TreeMap<String, List<Integer>>();
+
+        for (Map<String, Map<String, List<Integer>>> secondaryNodeAssignmentMap : nodeSecondaryAssignmentMapsList) {
+          Map<String, List<Integer>> secondaryAssignmentMap =
+              secondaryNodeAssignmentMap.get(instanceName);
+
+          for (String secondaryInstance : secondaryAssignmentMap.keySet()) {
+            if (!combinedSecondaryAssignmentMap.containsKey(secondaryInstance)) {
+              combinedSecondaryAssignmentMap.put(secondaryInstance, new ArrayList<Integer>());
+            }
+            combinedSecondaryAssignmentMap.get(secondaryInstance).addAll(
+                secondaryAssignmentMap.get(secondaryInstance));
+          }
+        }
+        migrateSecondaryAssignMapToNewInstances(combinedSecondaryAssignmentMap,
+            new ArrayList<String>());
+        combinedNodeSecondaryAssignmentMap.put(instanceName, combinedSecondaryAssignmentMap);
+      }
+    }
+    /*
+     * // Print the result master and slave assignment maps
+     * System.out.println("Master assignment:");
+     * for(String instanceName : nodeMasterAssignmentMap.keySet())
+     * {
+     * System.out.println(instanceName+":");
+     * for(Integer x : nodeMasterAssignmentMap.get(instanceName))
+     * {
+     * System.out.print(x+" ");
+     * }
+     * System.out.println();
+     * System.out.println("Slave assignment:");
+     * int slaveOrder = 1;
+     * for(Map<String, Map<String, List<Integer>>> slaveNodeAssignmentMap :
+     * nodeSlaveAssignmentMapsList)
+     * {
+     * System.out.println("Slave assignment order :" + (slaveOrder++));
+     * Map<String, List<Integer>> slaveAssignmentMap = slaveNodeAssignmentMap.get(instanceName);
+     * for(String slaveName : slaveAssignmentMap.keySet())
+     * {
+     * System.out.print("\t" + slaveName +":\n\t" );
+     * for(Integer x : slaveAssignmentMap.get(slaveName))
+     * {
+     * System.out.print(x + " ");
+     * }
+     * System.out.println("\n");
+     * }
+     * }
+     * System.out.println("\nCombined slave assignment map");
+     * Map<String, List<Integer>> slaveAssignmentMap =
+     * combinedNodeSlaveAssignmentMap.get(instanceName);
+     * for(String slaveName : slaveAssignmentMap.keySet())
+     * {
+     * System.out.print("\t" + slaveName +":\n\t" );
+     * for(Integer x : slaveAssignmentMap.get(slaveName))
+     * {
+     * System.out.print(x + " ");
+     * }
+     * System.out.println("\n");
+     * }
+     * }
+     */
+    Map<String, Object> result = new TreeMap<String, Object>();
+    result.put("PrimaryAssignmentMap", nodePrimaryAssignmentMap);
+    result.put("SecondaryAssignmentMap", combinedNodeSecondaryAssignmentMap);
+    result.put("replicas", new Integer(replicas));
+    result.put("partitions", new Integer(partitions));
+    return result;
+  }
+
+  /**
+   * In the case there are more than 1 secondary, we use the following algorithm to calculate the
+   * n-th secondary
+   * assignment map based on the first level secondary assignment map.
+   * @param firstInstanceSecondaryAssignmentMap the first secondary assignment map for all instances
+   * @param order of the secondary state
+   * @return the n-th secondary assignment map for all the instances
+   */
+  static Map<String, Map<String, List<Integer>>> calculateNextSecondaryAssignemntMap(
+      Map<String, Map<String, List<Integer>>> firstInstanceSecondaryAssignmentMap, int replicaOrder) {
+    Map<String, Map<String, List<Integer>>> result =
+        new TreeMap<String, Map<String, List<Integer>>>();
+
+    for (String currentInstance : firstInstanceSecondaryAssignmentMap.keySet()) {
+      Map<String, List<Integer>> resultAssignmentMap = new TreeMap<String, List<Integer>>();
+      result.put(currentInstance, resultAssignmentMap);
+    }
+
+    for (String currentInstance : firstInstanceSecondaryAssignmentMap.keySet()) {
+      Map<String, List<Integer>> previousSecondaryAssignmentMap =
+          firstInstanceSecondaryAssignmentMap.get(currentInstance);
+      Map<String, List<Integer>> resultAssignmentMap = result.get(currentInstance);
+      int offset = replicaOrder - 1;
+      for (String instance : previousSecondaryAssignmentMap.keySet()) {
+        List<String> otherInstances =
+            new ArrayList<String>(previousSecondaryAssignmentMap.size() - 1);
+        // Obtain an array of other instances
+        for (String otherInstance : previousSecondaryAssignmentMap.keySet()) {
+          otherInstances.add(otherInstance);
+        }
+        Collections.sort(otherInstances);
+        int instanceIndex = -1;
+        for (int index = 0; index < otherInstances.size(); index++) {
+          if (otherInstances.get(index).equalsIgnoreCase(instance)) {
+            instanceIndex = index;
+          }
+        }
+        assert (instanceIndex >= 0);
+        if (instanceIndex == otherInstances.size() - 1) {
+          instanceIndex--;
+        }
+        // Since we need to evenly distribute the secondaries on "instance" to other partitions, we
+        // need to remove "instance" from the array.
+        otherInstances.remove(instance);
+
+        // distribute previous secondary assignment to other instances.
+        List<Integer> previousAssignmentList = previousSecondaryAssignmentMap.get(instance);
+        for (int i = 0; i < previousAssignmentList.size(); i++) {
+
+          // Evenly distribute the previousAssignmentList to the remaining other instances
+          int newInstanceIndex = (i + offset + instanceIndex) % otherInstances.size();
+          String newInstance = otherInstances.get(newInstanceIndex);
+          if (!resultAssignmentMap.containsKey(newInstance)) {
+            resultAssignmentMap.put(newInstance, new ArrayList<Integer>());
+          }
+          resultAssignmentMap.get(newInstance).add(previousAssignmentList.get(i));
+        }
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Given the current idealState, and the list of new Instances needed to be added, calculate the
+   * new Ideal state.
+   * 1. Calculate how many primary partitions should be moved to the new cluster of instances
+   * 2. assign the number of primary partitions px to be moved to each previous node
+   * 3. for each previous node,
+   * 3.1 randomly choose px nodes, move them to temp list
+   * 3.2 for each px nodes, remove them from the secondary assignment map; record the map position
+   * of
+   * the partition;
+   * 3.3 calculate # of new nodes that should be put in the secondary assignment map
+   * 3.4 even-fy the secondary assignment map;
+   * 3.5 randomly place # of new nodes that should be placed in
+   * 4. from all the temp primary node list get from 3.1,
+   * 4.1 randomly assign them to nodes in the new cluster
+   * 5. for each node in the new cluster,
+   * 5.1 assemble the secondary assignment map
+   * 5.2 even-fy the secondary assignment map
+   * @param newInstances
+   *          list of new added storage node instances
+   * @param weight
+   *          weight for the new storage nodes (each node has the same weight)
+   * @param previousIdealState
+   *          The previous ideal state
+   * @return a map that contain the updated idealstate info
+   */
+  public static Map<String, Object> calculateNextIdealState(List<String> newInstances,
+      Map<String, Object> previousIdealState) {
+    // Obtain the primary / secondary assignment info maps
+    Collections.sort(newInstances);
+    Map<String, List<Integer>> previousPrimaryAssignmentMap =
+        (Map<String, List<Integer>>) (previousIdealState.get("PrimaryAssignmentMap"));
+    Map<String, Map<String, List<Integer>>> nodeSecondaryAssignmentMap =
+        (Map<String, Map<String, List<Integer>>>) (previousIdealState.get("SecondaryAssignmentMap"));
+
+    List<String> oldInstances = new ArrayList<String>();
+    for (String oldInstance : previousPrimaryAssignmentMap.keySet()) {
+      oldInstances.add(oldInstance);
+    }
+
+    int previousInstanceNum = previousPrimaryAssignmentMap.size();
+    int partitions = (Integer) (previousIdealState.get("partitions"));
+
+    // TODO: take weight into account when calculate this
+
+    int totalPrimaryParitionsToMove =
+        partitions * (newInstances.size()) / (previousInstanceNum + newInstances.size());
+    int numPrimariesFromEachNode = totalPrimaryParitionsToMove / previousInstanceNum;
+    int remain = totalPrimaryParitionsToMove % previousInstanceNum;
+
+    // Note that when remain > 0, we should make [remain] moves with (numPrimariesFromEachNode + 1)
+    // partitions.
+    // And we should first choose those (numPrimariesFromEachNode + 1) moves from the instances that
+    // has more
+    // primary partitions
+    List<Integer> primaryPartitionListToMove = new ArrayList<Integer>();
+
+    // For corresponding moved secondary partitions, keep track of their original location; the new
+    // node does not
+    // need to migrate all of them.
+    Map<String, List<Integer>> secondaryPartitionsToMoveMap = new TreeMap<String, List<Integer>>();
+
+    // Make sure that the instances that holds more primary partitions are put in front
+    List<String> bigList = new ArrayList<String>(), smallList = new ArrayList<String>();
+    for (String oldInstance : previousPrimaryAssignmentMap.keySet()) {
+      List<Integer> primaryAssignmentList = previousPrimaryAssignmentMap.get(oldInstance);
+      if (primaryAssignmentList.size() > numPrimariesFromEachNode) {
+        bigList.add(oldInstance);
+      } else {
+        smallList.add(oldInstance);
+      }
+    }
+    // "sort" the list, such that the nodes that has more primary partitions moves more partitions
+    // to the
+    // new added batch of instances.
+    bigList.addAll(smallList);
+    for (String oldInstance : bigList) {
+      List<Integer> primaryAssignmentList = previousPrimaryAssignmentMap.get(oldInstance);
+      int numToChoose = numPrimariesFromEachNode;
+      if (remain > 0) {
+        numToChoose = numPrimariesFromEachNode + 1;
+        remain--;
+      }
+      // randomly remove numToChoose of primary partitions to the new added nodes
+      ArrayList<Integer> primaryPartionsMoved = new ArrayList<Integer>();
+      randomSelect(primaryAssignmentList, primaryPartionsMoved, numToChoose);
+
+      primaryPartitionListToMove.addAll(primaryPartionsMoved);
+      Map<String, List<Integer>> secondaryAssignmentMap =
+          nodeSecondaryAssignmentMap.get(oldInstance);
+      removeFromSecondaryAssignmentMap(secondaryAssignmentMap, primaryPartionsMoved,
+          secondaryPartitionsToMoveMap);
+
+      // Make sure that for old instances, the secondary placement map is evenly distributed
+      // Trace the "local secondary moves", which should together contribute to most of the
+      // secondary migrations
+      migrateSecondaryAssignMapToNewInstances(secondaryAssignmentMap, newInstances);
+      // System.out.println("local moves: "+ movesWithinInstance);
+    }
+    // System.out.println("local slave moves total: "+ totalSlaveMoves);
+    // calculate the primary /secondary assignment for the new added nodes
+
+    // We already have the list of primary partitions that will migrate to new batch of instances,
+    // shuffle the partitions and assign them to new instances
+    Collections.shuffle(primaryPartitionListToMove, new Random(12345));
+    for (int i = 0; i < newInstances.size(); i++) {
+      String newInstance = newInstances.get(i);
+      List<Integer> primaryPartitionList = new ArrayList<Integer>();
+      for (int j = 0; j < primaryPartitionListToMove.size(); j++) {
+        if (j % newInstances.size() == i) {
+          primaryPartitionList.add(primaryPartitionListToMove.get(j));
+        }
+      }
+
+      Map<String, List<Integer>> secondaryPartitionMap = new TreeMap<String, List<Integer>>();
+      for (String oldInstance : oldInstances) {
+        secondaryPartitionMap.put(oldInstance, new ArrayList<Integer>());
+      }
+      // Build the secondary assignment map for the new instance, based on the saved information
+      // about those secondary partition locations in secondaryPartitionsToMoveMap
+      for (Integer x : primaryPartitionList) {
+        for (String oldInstance : secondaryPartitionsToMoveMap.keySet()) {
+          List<Integer> secondaries = secondaryPartitionsToMoveMap.get(oldInstance);
+          if (secondaries.contains(x)) {
+            secondaryPartitionMap.get(oldInstance).add(x);
+          }
+        }
+      }
+      // add entry for other new instances into the secondaryPartitionMap
+      List<String> otherNewInstances = new ArrayList<String>();
+      for (String instance : newInstances) {
+        if (!instance.equalsIgnoreCase(newInstance)) {
+          otherNewInstances.add(instance);
+        }
+      }
+      // Make sure that secondary partitions are evenly distributed
+      migrateSecondaryAssignMapToNewInstances(secondaryPartitionMap, otherNewInstances);
+
+      // Update the result in the result map. We can reuse the input previousIdealState map as
+      // the result.
+      previousPrimaryAssignmentMap.put(newInstance, primaryPartitionList);
+      nodeSecondaryAssignmentMap.put(newInstance, secondaryPartitionMap);
+
+    }
+    /*
+     * // Print content of the master/ slave assignment maps
+     * for(String instanceName : previousMasterAssignmentMap.keySet())
+     * {
+     * System.out.println(instanceName+":");
+     * for(Integer x : previousMasterAssignmentMap.get(instanceName))
+     * {
+     * System.out.print(x+" ");
+     * }
+     * System.out.println("\nmaster partition moved:");
+     * System.out.println();
+     * System.out.println("Slave assignment:");
+     * Map<String, List<Integer>> slaveAssignmentMap = nodeSlaveAssignmentMap.get(instanceName);
+     * for(String slaveName : slaveAssignmentMap.keySet())
+     * {
+     * System.out.print("\t" + slaveName +":\n\t" );
+     * for(Integer x : slaveAssignmentMap.get(slaveName))
+     * {
+     * System.out.print(x + " ");
+     * }
+     * System.out.println("\n");
+     * }
+     * }
+     * System.out.println("Master partitions migrated to new instances");
+     * for(Integer x : masterPartitionListToMove)
+     * {
+     * System.out.print(x+" ");
+     * }
+     * System.out.println();
+     * System.out.println("Slave partitions migrated to new instances");
+     * for(String oldInstance : slavePartitionsToMoveMap.keySet())
+     * {
+     * System.out.print(oldInstance + ": ");
+     * for(Integer x : slavePartitionsToMoveMap.get(oldInstance))
+     * {
+     * System.out.print(x+" ");
+     * }
+     * System.out.println();
+     * }
+     */
+    return previousIdealState;
+  }
+
+  public ZNRecord calculateNextIdealState(List<String> newInstances,
+      Map<String, Object> previousIdealState, String resourceName, String primaryStateValue,
+      String secondaryStateValue) {
+    return convertToZNRecord(calculateNextIdealState(newInstances, previousIdealState),
+        resourceName, primaryStateValue, secondaryStateValue);
+  }
+
+  /**
+   * Given the list of primary partitions that will be migrated away from the storage instance,
+   * Remove their entries from the local instance secondary assignment map.
+   * @param secondaryAssignmentMap the local instance secondary assignment map
+   * @param primaryPartionsMoved the list of primary partition ids that will be migrated away
+   * @param removedAssignmentMap keep track of the removed secondary assignment info. The info can
+   *          be
+   *          used by new added storage nodes.
+   */
+  static void removeFromSecondaryAssignmentMap(Map<String, List<Integer>> secondaryAssignmentMap,
+      List<Integer> primaryPartionsMoved, Map<String, List<Integer>> removedAssignmentMap) {
+    for (String instanceName : secondaryAssignmentMap.keySet()) {
+      List<Integer> secondaryAssignment = secondaryAssignmentMap.get(instanceName);
+      for (Integer partitionId : primaryPartionsMoved) {
+        if (secondaryAssignment.contains(partitionId)) {
+          secondaryAssignment.remove(partitionId);
+          if (!removedAssignmentMap.containsKey(instanceName)) {
+            removedAssignmentMap.put(instanceName, new ArrayList<Integer>());
+          }
+          removedAssignmentMap.get(instanceName).add(partitionId);
+        }
+      }
+    }
+  }
+
+  /**
+   * Since some new storage instances are added, each existing storage instance should migrate some
+   * secondary partitions to the new added instances.
+   * The algorithm keeps moving one partition to from the instance that hosts most secondary
+   * partitions
+   * to the instance that hosts least number of partitions, until max-min <= 1.
+   * In this way we can guarantee that all instances hosts almost same number of secondary
+   * partitions, also
+   * secondary partitions are evenly distributed.
+   * @param secondaryAssignmentMap the local instance secondary assignment map
+   * @param primaryPartionsMoved the list of primary partition ids that will be migrated away
+   * @param removedAssignmentMap keep track of the removed secondary assignment info. The info can
+   *          be
+   *          used by new added storage nodes.
+   */
+  static int migrateSecondaryAssignMapToNewInstances(
+      Map<String, List<Integer>> secondaryAssignmentMap, List<String> newInstances) {
+    int moves = 0;
+    boolean done = false;
+    for (String newInstance : newInstances) {
+      secondaryAssignmentMap.put(newInstance, new ArrayList<Integer>());
+    }
+    while (!done) {
+      List<Integer> maxAssignment = null, minAssignment = null;
+      int minCount = Integer.MAX_VALUE, maxCount = Integer.MIN_VALUE;
+      String minInstance = "";
+      for (String instanceName : secondaryAssignmentMap.keySet()) {
+        List<Integer> secondaryAssignment = secondaryAssignmentMap.get(instanceName);
+        if (minCount > secondaryAssignment.size()) {
+          minCount = secondaryAssignment.size();
+          minAssignment = secondaryAssignment;
+          minInstance = instanceName;
+        }
+        if (maxCount < secondaryAssignment.size()) {
+          maxCount = secondaryAssignment.size();
+          maxAssignment = secondaryAssignment;
+        }
+      }
+      if (maxCount - minCount <= 1) {
+        done = true;
+      } else {
+        int indexToMove = -1;
+        // find a partition that is not contained in the minAssignment list
+        for (int i = 0; i < maxAssignment.size(); i++) {
+          if (!minAssignment.contains(maxAssignment.get(i))) {
+            indexToMove = i;
+            break;
+          }
+        }
+
+        minAssignment.add(maxAssignment.get(indexToMove));
+        maxAssignment.remove(indexToMove);
+
+        if (newInstances.contains(minInstance)) {
+          moves++;
+        }
+      }
+    }
+    return moves;
+  }
+
+  /**
+   * Randomly select a number of elements from original list and put them in the selectedList
+   * The algorithm is used to select primary partitions to be migrated when new instances are added.
+   * @param originalList the original list
+   * @param selectedList the list that contain selected elements
+   * @param num number of elements to be selected
+   */
+  static void randomSelect(List<Integer> originalList, List<Integer> selectedList, int num) {
+    assert (originalList.size() >= num);
+    int[] indexArray = new int[originalList.size()];
+    for (int i = 0; i < indexArray.length; i++) {
+      indexArray[i] = i;
+    }
+    int numRemains = originalList.size();
+    Random r = new Random(numRemains);
+    for (int j = 0; j < num; j++) {
+      int randIndex = r.nextInt(numRemains);
+      selectedList.add(originalList.get(randIndex));
+      originalList.remove(randIndex);
+      numRemains--;
+    }
+  }
+
+  public static void main(String args[]) {
+    List<String> instanceNames = new ArrayList<String>();
+    for (int i = 0; i < 10; i++) {
+      instanceNames.add("localhost:123" + i);
+    }
+    int partitions = 48 * 3, replicas = 3;
+    Map<String, Object> resultOriginal =
+        DefaultTwoStateStrategy.calculateInitialIdealState(instanceNames, partitions, replicas);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/21c4fcb5/helix-core/src/main/java/org/apache/helix/controller/strategy/EspressoRelayStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/EspressoRelayStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/EspressoRelayStrategy.java
new file mode 100644
index 0000000..4e88499
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/EspressoRelayStrategy.java
@@ -0,0 +1,108 @@
+package org.apache.helix.controller.strategy;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.model.IdealState;
+
+public class EspressoRelayStrategy {
+  public static IdealState calculateRelayIdealState(List<String> partitions,
+      List<String> instances, String resultRecordName, int replica, String firstValue,
+      String restValue, String stateModelName) {
+    Collections.sort(partitions);
+    Collections.sort(instances);
+    if (instances.size() % replica != 0) {
+      throw new HelixException("Instances must be divided by replica");
+    }
+
+    IdealState result = new IdealState(resultRecordName);
+    result.setNumPartitions(partitions.size());
+    result.setReplicas("" + replica);
+    result.setStateModelDefRef(stateModelName);
+
+    int groups = instances.size() / replica;
+    int remainder = instances.size() % replica;
+
+    int remainder2 = partitions.size() % groups;
+    int storageNodeGroupSize = partitions.size() / groups;
+
+    for (int i = 0; i < groups; i++) {
+      int relayStart = 0, relayEnd = 0, storageNodeStart = 0, storageNodeEnd = 0;
+      if (i < remainder) {
+        relayStart = (replica + 1) * i;
+        relayEnd = (replica + 1) * (i + 1);
+      } else {
+        relayStart = (replica + 1) * remainder + replica * (i - remainder);
+        relayEnd = relayStart + replica;
+      }
+      // System.out.println("relay start :" + relayStart + " relayEnd:" + relayEnd);
+      if (i < remainder2) {
+        storageNodeStart = (storageNodeGroupSize + 1) * i;
+        storageNodeEnd = (storageNodeGroupSize + 1) * (i + 1);
+      } else {
+        storageNodeStart =
+            (storageNodeGroupSize + 1) * remainder2 + storageNodeGroupSize * (i - remainder2);
+        storageNodeEnd = storageNodeStart + storageNodeGroupSize;
+      }
+
+      // System.out.println("storageNodeStart :" + storageNodeStart + " storageNodeEnd:" +
+      // storageNodeEnd);
+      List<String> snBatch = partitions.subList(storageNodeStart, storageNodeEnd);
+      List<String> relayBatch = instances.subList(relayStart, relayEnd);
+
+      Map<String, List<String>> sublistFields =
+          calculateSubIdealState(snBatch, relayBatch, replica);
+
+      result.getRecord().getListFields().putAll(sublistFields);
+    }
+
+    for (String snName : result.getRecord().getListFields().keySet()) {
+      Map<String, String> mapField = new TreeMap<String, String>();
+      List<String> relayCandidates = result.getRecord().getListField(snName);
+      mapField.put(relayCandidates.get(0), firstValue);
+      for (int i = 1; i < relayCandidates.size(); i++) {
+        mapField.put(relayCandidates.get(i), restValue);
+      }
+      result.getRecord().getMapFields().put(snName, mapField);
+    }
+    System.out.println();
+    return result;
+  }
+
+  private static Map<String, List<String>> calculateSubIdealState(List<String> snBatch,
+      List<String> relayBatch, int replica) {
+    Map<String, List<String>> result = new HashMap<String, List<String>>();
+    for (int i = 0; i < snBatch.size(); i++) {
+      String snName = snBatch.get(i);
+      result.put(snName, new ArrayList<String>());
+      for (int j = 0; j < replica; j++) {
+        result.get(snName).add(relayBatch.get((j + i) % (relayBatch.size())));
+      }
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/21c4fcb5/helix-core/src/main/java/org/apache/helix/controller/strategy/RUSHMasterSlaveStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/RUSHMasterSlaveStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/RUSHMasterSlaveStrategy.java
new file mode 100644
index 0000000..39561ae
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/RUSHMasterSlaveStrategy.java
@@ -0,0 +1,284 @@
+package org.apache.helix.controller.strategy;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.IdealState.IdealStateProperty;
+
+public class RUSHMasterSlaveStrategy {
+  /**
+   * Build the config map for RUSH algorithm. The input of RUSH algorithm groups
+   * nodes into "cluster"s, and different clusters can be assigned with
+   * different weights.
+   * @param numClusters
+   *          number of node clusters
+   * @param instancesPerCluster
+   *          List of clusters, each contain a list of node name strings.
+   * @param replicationDegree
+   *          the replication degree
+   * @param clusterWeights
+   *          the weight for each node cluster
+   * @return this config map structure for RUSH algorithm.
+   */
+  static HashMap<String, Object> buildRushConfig(int numClusters,
+      List<List<String>> instancesPerCluster, int replicationDegree, List<Integer> clusterWeights) {
+    HashMap<String, Object> config = new HashMap<String, Object>();
+    config.put("replicationDegree", replicationDegree);
+
+    HashMap[] clusterList = new HashMap[numClusters];
+    config.put("subClusters", clusterList);
+
+    HashMap[] nodes;
+    HashMap<String, String> node;
+    HashMap<String, Object> clusterData;
+    for (int n = 0; n < numClusters; n++) {
+      int numNodes = instancesPerCluster.get(n).size();
+      nodes = new HashMap[numNodes];
+      for (int i = 0; i < numNodes; i++) {
+        node = new HashMap<String, String>();
+        node.put("partition", instancesPerCluster.get(n).get(i));
+        nodes[i] = node;
+      }
+      clusterData = new HashMap<String, Object>();
+      clusterData.put("weight", clusterWeights.get(n));
+      clusterData.put("nodes", nodes);
+      clusterList[n] = clusterData;
+    }
+    return config;
+  }
+
+  /**
+   * Calculate the ideal state for list of instances clusters.
+   * @param numClusters
+   *          number of node clusters
+   * @param instanceClusters
+   *          List of clusters, each contain a list of node name strings.
+   * @param instanceClusterWeights
+   *          the weight for each instance cluster
+   * @param partitions
+   *          the partition number of the database
+   * @param replicas
+   *          the replication degree
+   * @param resourceName
+   *          the name of the database
+   * @return The ZNRecord that contains the ideal state
+   */
+  public static ZNRecord calculateIdealState(List<List<String>> instanceClusters,
+      List<Integer> instanceClusterWeights, int partitions, int replicas, String resourceName)
+      throws Exception {
+    ZNRecord result = new ZNRecord(resourceName);
+
+    int numberOfClusters = instanceClusters.size();
+    List<List<String>> nodesInClusters = instanceClusters;
+    List<Integer> clusterWeights = instanceClusterWeights;
+
+    HashMap<String, Object> rushConfig =
+        buildRushConfig(numberOfClusters, nodesInClusters, replicas + 1, clusterWeights);
+    RUSHrHash rushHash = new RUSHrHash(rushConfig);
+
+    Random r = new Random(0);
+    for (int i = 0; i < partitions; i++) {
+      int partitionId = i;
+      String partitionName = resourceName + ".partition-" + partitionId;
+
+      ArrayList<HashMap> partitionAssignmentResult = rushHash.findNode(i);
+      List<String> nodeNames = new ArrayList<String>();
+      for (HashMap<?, ?> p : partitionAssignmentResult) {
+        for (Object key : p.keySet()) {
+          if (p.get(key) instanceof String) {
+            nodeNames.add(p.get(key).toString());
+          }
+        }
+      }
+      Map<String, String> partitionAssignment = new TreeMap<String, String>();
+
+      for (int j = 0; j < nodeNames.size(); j++) {
+        partitionAssignment.put(nodeNames.get(j), "SLAVE");
+      }
+      int master = r.nextInt(nodeNames.size());
+      // master = nodeNames.size()/2;
+      partitionAssignment.put(nodeNames.get(master), "MASTER");
+
+      result.setMapField(partitionName, partitionAssignment);
+    }
+    result.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(), String.valueOf(partitions));
+    return result;
+  }
+
+  public static ZNRecord calculateIdealState(List<String> instanceClusters,
+      int instanceClusterWeight, int partitions, int replicas, String resourceName)
+      throws Exception {
+    List<List<String>> instanceClustersList = new ArrayList<List<String>>();
+    instanceClustersList.add(instanceClusters);
+
+    List<Integer> instanceClusterWeightList = new ArrayList<Integer>();
+    instanceClusterWeightList.add(instanceClusterWeight);
+
+    return calculateIdealState(instanceClustersList, instanceClusterWeightList, partitions,
+        replicas, resourceName);
+  }
+
+  /**
+   * Helper function to see how many partitions are mapped to different
+   * instances in two ideal states
+   */
+  public static void printDiff(ZNRecord record1, ZNRecord record2) {
+    int diffCount = 0;
+    int diffCountMaster = 0;
+    for (String key : record1.getMapFields().keySet()) {
+      Map<String, String> map1 = record1.getMapField(key);
+      Map<String, String> map2 = record2.getMapField(key);
+
+      for (String k : map1.keySet()) {
+        if (!map2.containsKey(k)) {
+          diffCount++;
+        } else if (!map1.get(k).equalsIgnoreCase(map2.get(k))) {
+          diffCountMaster++;
+        }
+      }
+    }
+    System.out.println("\ndiff count = " + diffCount);
+    System.out.println("\nmaster diff count:" + diffCountMaster);
+  }
+
+  /**
+   * Helper function to calculate and print the standard deviation of the
+   * partition assignment ideal state.
+   */
+  public static void printIdealStateStats(ZNRecord record) {
+    Map<String, Integer> countsMap = new TreeMap<String, Integer>();
+    Map<String, Integer> masterCountsMap = new TreeMap<String, Integer>();
+    for (String key : record.getMapFields().keySet()) {
+      Map<String, String> map1 = record.getMapField(key);
+      for (String k : map1.keySet()) {
+        if (!countsMap.containsKey(k)) {
+          countsMap.put(k, new Integer(0));
+        } else {
+          countsMap.put(k, countsMap.get(k).intValue() + 1);
+        }
+        if (!masterCountsMap.containsKey(k)) {
+          masterCountsMap.put(k, new Integer(0));
+
+        } else if (map1.get(k).equalsIgnoreCase("MASTER")) {
+          masterCountsMap.put(k, masterCountsMap.get(k).intValue() + 1);
+        }
+      }
+    }
+    double sum = 0;
+    int maxCount = 0;
+    int minCount = Integer.MAX_VALUE;
+    for (String k : countsMap.keySet()) {
+      int count = countsMap.get(k);
+      sum += count;
+      if (maxCount < count) {
+        maxCount = count;
+      }
+      if (minCount > count) {
+        minCount = count;
+      }
+      System.out.print(count + " ");
+    }
+    System.out.println("\nMax count: " + maxCount + " min count:" + minCount);
+    System.out.println("\n master:");
+    double sumMaster = 0;
+    int maxCountMaster = 0;
+    int minCountMaster = Integer.MAX_VALUE;
+    for (String k : masterCountsMap.keySet()) {
+      int count = masterCountsMap.get(k);
+      sumMaster += count;
+      if (maxCountMaster < count) {
+        maxCountMaster = count;
+      }
+      if (minCountMaster > count) {
+        minCountMaster = count;
+      }
+      System.out.print(count + " ");
+    }
+    System.out.println("\nMean master: " + 1.0 * sumMaster / countsMap.size());
+    System.out.println("Max master count: " + maxCountMaster + " min count:" + minCountMaster);
+    double mean = sum / (countsMap.size());
+    // calculate the deviation of the node distribution
+    double deviation = 0;
+    for (String k : countsMap.keySet()) {
+      double count = countsMap.get(k);
+      deviation += (count - mean) * (count - mean);
+    }
+    System.out.println("Mean: " + mean + " normal deviation:"
+        + Math.sqrt(deviation / countsMap.size()) / mean);
+
+    // System.out.println("Max count: " + maxCount + " min count:" + minCount);
+    int steps = 10;
+    int stepLen = (maxCount - minCount) / steps;
+    if (stepLen == 0)
+      return;
+    List<Integer> histogram = new ArrayList<Integer>((maxCount - minCount) / stepLen + 1);
+
+    for (int i = 0; i < (maxCount - minCount) / stepLen + 1; i++) {
+      histogram.add(0);
+    }
+    for (String k : countsMap.keySet()) {
+      int count = countsMap.get(k);
+      int stepNo = (count - minCount) / stepLen;
+      histogram.set(stepNo, histogram.get(stepNo) + 1);
+    }
+    System.out.println("histogram:");
+    for (Integer x : histogram) {
+      System.out.print(x + " ");
+    }
+  }
+
+  public static void main(String args[]) throws Exception {
+    int partitions = 4096, replicas = 2;
+    String resourceName = "espressoDB1";
+    List<String> instanceNames = new ArrayList<String>();
+    List<List<String>> instanceCluster1 = new ArrayList<List<String>>();
+    for (int i = 0; i < 20; i++) {
+      instanceNames.add("local" + i + "host_123" + i);
+    }
+    instanceCluster1.add(instanceNames);
+    List<Integer> weights1 = new ArrayList<Integer>();
+    weights1.add(1);
+    ZNRecord result =
+        RUSHMasterSlaveStrategy.calculateIdealState(instanceCluster1, weights1, partitions,
+            replicas, resourceName);
+
+    printIdealStateStats(result);
+
+    List<String> instanceNames2 = new ArrayList<String>();
+    for (int i = 400; i < 405; i++) {
+      instanceNames2.add("localhost_123" + i);
+    }
+    instanceCluster1.add(instanceNames2);
+    weights1.add(1);
+    ZNRecord result2 =
+        RUSHMasterSlaveStrategy.calculateIdealState(instanceCluster1, weights1, partitions,
+            replicas, resourceName);
+
+    printDiff(result, result2);
+    printIdealStateStats(result2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/21c4fcb5/helix-core/src/main/java/org/apache/helix/controller/strategy/RUSHrHash.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/RUSHrHash.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/RUSHrHash.java
new file mode 100644
index 0000000..e3972ff
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/RUSHrHash.java
@@ -0,0 +1,313 @@
+package org.apache.helix.controller.strategy;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.*;
+import java.util.zip.CRC32;
+
+public class RUSHrHash {
+  /**
+   * @var int holds the value for how many replicas to create for an object
+   */
+  protected int replicationDegree = 1;
+
+  /**
+   * an array of hash maps where each hash map holds info on the sub cluster
+   * that corresponds to the array indices meaning that array element 0 holds
+   * data for server 0
+   * that is the total number of nodes in the cluster this property is populated
+   * at construction time only
+   * @var
+   */
+
+  protected HashMap[] clusters;
+
+  /**
+   * an array of hash maps where each element holds data for a sub cluster
+   */
+  protected HashMap[] clusterConfig;
+
+  /**
+   * total number of sub-clusters in our data configuration this property is
+   * populated at construction time only
+   * @var integer
+   */
+  protected int totalClusters = 0;
+
+  /**
+   * the total number of nodes in all of the subClusters this property is
+   * populated at construction time only
+   * @var integer
+   */
+  protected int totalNodes = 0;
+
+  /**
+   * the total number of nodes in all of the clusters this property is populated
+   * at construction time only
+   * @var integer
+   */
+  protected int totalNodesW = 0;
+
+  /**
+   * an array of HashMaps where each HashMap holds the data for a single node
+   */
+  protected HashMap[] nodes = null;
+
+  /**
+   * @var integer value used to help seed the random number generator
+   */
+  protected final int SEED_PARAM = 1560;
+
+  /**
+   * random number generator
+   */
+
+  Random ran = new Random();
+
+  /**
+   * maximum value we can have from the ran generator
+   */
+  float ranMax = (float) Math.pow(2.0, 16.0);
+
+  /**
+   * The constructor analyzes the passed config to obtain the fundamental values
+   * and data structures for locating a node. Each of those values is described
+   * in detail above with each property. briefly:
+   * this.clusters this.totalClusters this.totalNodes
+   * The values above are derived from the HashMap[] oonfig passed to the
+   * locator.
+   * @param conf
+   *          dataConfig
+   * @throws Exception
+   */
+
+  public RUSHrHash(HashMap<String, Object> conf) throws Exception {
+
+    clusterConfig = (HashMap[]) conf.get("subClusters");
+    replicationDegree = (Integer) conf.get("replicationDegree");
+
+    HashMap[] subClusters = (HashMap[]) conf.get("subClusters");
+    totalClusters = subClusters.length;
+    clusters = new HashMap[totalClusters];
+    // check the confg for all of the params
+    // throw a exception if they are not there
+    if (totalClusters <= 0) {
+      throw new Exception(
+          "data config to the RUSHr locator does not contain a valid clusters property");
+    }
+
+    int nodeCt = 0;
+    HashMap[] nodeData = null;
+    ArrayList<HashMap> tempNodes = new ArrayList<HashMap>();
+    HashMap<String, Object> subCluster = null, clusterData = null;
+    Integer clusterDataList[] = null;
+    for (int i = 0; i < totalClusters; i++) {
+      subCluster = subClusters[i];
+      nodeData = (HashMap[]) subCluster.get("nodes");
+
+      nodeCt = nodeData.length;
+      clusterDataList = new Integer[nodeCt];
+      for (int n = 0; n < nodeCt; n++) {
+        tempNodes.add(nodeData[n]);
+        clusterDataList[n] = n;
+      }
+      totalNodes += nodeCt;
+      totalNodesW += nodeCt * (Integer) subCluster.get("weight");
+
+      clusterData = new HashMap<String, Object>();
+      clusterData.put("count", nodeCt);
+      clusterData.put("list", clusterDataList);
+      clusters[i] = clusterData;
+    }
+    nodes = new HashMap[totalNodes];
+    tempNodes.toArray(nodes);
+  }
+
+  /**
+   * This function is an implementation of a RUSHr algorithm as described by R J
+   * Honicky and Ethan Miller
+   * @param objKey
+   * @throws Exception
+   * @return
+   */
+  public ArrayList<HashMap> findNode(long objKey) throws Exception {
+
+    HashMap[] c = this.clusters;
+    int sumRemainingNodes = this.totalNodes;
+    int sumRemainingNodesW = this.totalNodesW;
+    int repDeg = this.replicationDegree;
+    int totClu = this.totalClusters;
+    int totNod = this.totalNodes;
+    HashMap[] clusConfig = this.clusterConfig;
+
+    // throw an exception if the data is no good
+    if ((totNod <= 0) || (totClu <= 0)) {
+      throw new Exception("the total nodes or total clusters is negative or 0.  bad joo joos!");
+    }
+
+    // get the starting cluster
+    int currentCluster = totClu - 1;
+
+    /**
+     * this loop is an implementation of the RUSHr algorithm for fast placement
+     * and location of objects in a distributed storage system
+     * j = current cluster m = disks in current cluster n = remaining nodes
+     */
+    ArrayList<HashMap> nodeData = new ArrayList<HashMap>();
+    while (true) {
+
+      // prevent an infinite loop, in case there is a bug
+      if (currentCluster < 0) {
+        throw new Exception(
+            "the cluster index became negative while we were looking for the following id: objKey.  This should never happen with any key.  There is a bug or maybe your joo joos are BAD!");
+      }
+
+      HashMap clusterData = clusConfig[currentCluster];
+      Integer weight = (Integer) clusterData.get("weight");
+
+      Integer disksInCurrentCluster = (Integer) c[currentCluster].get("count");
+      sumRemainingNodes -= disksInCurrentCluster;
+
+      Integer disksInCurrentClusterW = disksInCurrentCluster * weight;
+      sumRemainingNodesW -= disksInCurrentClusterW;
+
+      // set the seed to our set id
+      long seed = objKey + currentCluster;
+      ran.setSeed(seed);
+      int t = (repDeg - sumRemainingNodes) > 0 ? (repDeg - sumRemainingNodes) : 0;
+
+      int u =
+          t
+              + drawWHG(repDeg - t, disksInCurrentClusterW - t, disksInCurrentClusterW
+                  + sumRemainingNodesW - t, weight);
+      if (u > 0) {
+        if (u > disksInCurrentCluster) {
+          u = disksInCurrentCluster;
+        }
+        ran.setSeed(objKey + currentCluster + SEED_PARAM);
+        choose(u, currentCluster, sumRemainingNodes, nodeData);
+        reset(u, currentCluster);
+        repDeg -= u;
+      }
+      if (repDeg == 0) {
+        break;
+      }
+      currentCluster--;
+    }
+    return nodeData;
+  }
+
+  /**
+   * This function is an implementation of a RUSH algorithm as described by R J
+   * Honicky and Ethan Miller
+   * @param objKey
+   *          - an int used as the prng seed. this int is usually derived from a
+   *          string hash
+   * @return node - holds three values: abs_node - an int which is the absolute
+   *         position of the located node in relation to all nodes on all
+   *         subClusters rel_node - an int which is the relative postion located
+   *         node within the located cluster cluster - an int which is the
+   *         located cluster
+   * @throws Exception
+   */
+  public ArrayList<HashMap> findNode(String objKey) throws Exception {
+    // turn a string identifier into an integer for the random seed
+    CRC32 crc32 = new CRC32();
+    byte[] bytes = objKey.getBytes();
+    crc32.update(bytes);
+    long crc32Value = crc32.getValue();
+    long objKeyLong = (crc32Value >> 16) & 0x7fff;
+    return findNode(objKeyLong);
+  }
+
+  public void reset(int nodesToRetrieve, int currentCluster) {
+    Integer[] list = (Integer[]) clusters[currentCluster].get("list");
+    Integer count = (Integer) clusters[currentCluster].get("count");
+
+    int listIdx;
+    int val;
+    for (int nodeIdx = 0; nodeIdx < nodesToRetrieve; nodeIdx++) {
+      listIdx = count - nodesToRetrieve + nodeIdx;
+      val = list[listIdx];
+      if (val < (count - nodesToRetrieve)) {
+        list[val] = val;
+      }
+      list[listIdx] = listIdx;
+    }
+  }
+
+  public void choose(int nodesToRetrieve, int currentCluster, int remainingNodes,
+      ArrayList<HashMap> nodeData) {
+    Integer[] list = (Integer[]) clusters[currentCluster].get("list");
+    Integer count = (Integer) clusters[currentCluster].get("count");
+
+    int maxIdx;
+    int randNode;
+    int chosen;
+    for (int nodeIdx = 0; nodeIdx < nodesToRetrieve; nodeIdx++) {
+      maxIdx = count - nodeIdx - 1;
+      randNode = ran.nextInt(maxIdx + 1);
+      // swap
+      chosen = list[randNode];
+      list[randNode] = list[maxIdx];
+      list[maxIdx] = chosen;
+      // add the remaining nodes so we can find the node data when we are done
+      nodeData.add(nodes[remainingNodes + chosen]);
+    }
+  }
+
+  /**
+   * @param objKey
+   * @return
+   * @throws com.targetnode.data.locator.Exception
+   */
+  public ArrayList<HashMap> findNodes(String objKey) throws Exception {
+    return findNode(objKey);
+  }
+
+  public int getReplicationDegree() {
+    return replicationDegree;
+  }
+
+  public int getTotalNodes() {
+    return totalNodes;
+  }
+
+  public int drawWHG(int replicas, int disksInCurrentCluster, int totalDisks, int weight) {
+    int found = 0;
+    float z;
+    float prob;
+    int ranInt;
+
+    for (int i = 0; i < replicas; i++) {
+      if (totalDisks != 0) {
+        ranInt = ran.nextInt((int) (ranMax + 1));
+        z = ((float) ranInt / ranMax);
+        prob = ((float) disksInCurrentCluster / (float) totalDisks);
+        if (z <= prob) {
+          found++;
+          disksInCurrentCluster -= weight;
+        }
+        totalDisks -= weight;
+      }
+    }
+    return found;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/21c4fcb5/helix-core/src/main/java/org/apache/helix/controller/strategy/ShufflingTwoStateStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/ShufflingTwoStateStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/ShufflingTwoStateStrategy.java
new file mode 100644
index 0000000..7b4ce73
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/ShufflingTwoStateStrategy.java
@@ -0,0 +1,120 @@
+package org.apache.helix.controller.strategy;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.IdealState.IdealStateProperty;
+
+/*
+ * Ideal state calculator for the cluster manager V1. The ideal state is
+ * calculated by randomly assign primary partitions to storage nodes. This is intended for a
+ * two-state scheme where one is primary and the other is secondary.
+ *
+ * Note that the following code is a native strategy and is for cluster manager V1 only. We will
+ * use the other algorithm to calculate the ideal state in future milestones.
+ *
+ *
+ * */
+
+public class ShufflingTwoStateStrategy {
+  /*
+   * Given the number of nodes, partitions and replica number, calculate the
+   * ideal state in the following manner: For the primary partition assignment,
+   * 1. construct Arraylist partitionList, with partitionList[i] = i; 2. Shuffle
+   * the partitions array 3. Scan the shuffled array, then assign
+   * partitionList[i] to node (i % nodes)
+   * for the slave partitions, simply put them in the node after the node that
+   * <<<<<<<
+   * HEAD:helix-core/src/main/java/org/apache/helix/controller/strategy/ShufflingTwoStateStrategy
+   * .java
+   * contains the primary partition.
+   * =======
+   * contains the master partition.
+   * >>>>>>>
+   * master:helix-core/src/main/java/org/apache/helix/tools/IdealStateCalculatorByShuffling.java
+   * The result of the method is a ZNRecord, which contains a list of maps; each
+   * map is from the name of nodes to either state name ("MASTER" or "SLAVE" for
+   * MasterSlave).
+   */
+
+  /**
+   * Calculate an ideal state for a MasterSlave configuration
+   */
+  public static ZNRecord calculateIdealState(List<String> instanceNames, int partitions,
+      int replicas, String resourceName, long randomSeed) {
+    return calculateIdealState(instanceNames, partitions, replicas, resourceName, randomSeed,
+        "MASTER", "SLAVE");
+  }
+
+  public static ZNRecord calculateIdealState(List<String> instanceNames, int partitions,
+      int replicas, String resourceName, long randomSeed, String primaryValue, String secondaryValue) {
+    if (instanceNames.size() <= replicas) {
+      throw new IllegalArgumentException("Replicas must be less than number of nodes");
+    }
+
+    Collections.sort(instanceNames);
+
+    ZNRecord result = new ZNRecord(resourceName);
+
+    List<Integer> partitionList = new ArrayList<Integer>(partitions);
+    for (int i = 0; i < partitions; i++) {
+      partitionList.add(new Integer(i));
+    }
+    Random rand = new Random(randomSeed);
+    // Shuffle the partition list
+    Collections.shuffle(partitionList, rand);
+
+    for (int i = 0; i < partitionList.size(); i++) {
+      int partitionId = partitionList.get(i);
+      Map<String, String> partitionAssignment = new TreeMap<String, String>();
+      int primaryNode = i % instanceNames.size();
+      // the first in the list is the node that contains the primary
+      partitionAssignment.put(instanceNames.get(primaryNode), primaryValue);
+
+      // for the jth replica, we put it on (primaryNode + j) % nodes-th
+      // node
+      for (int j = 1; j <= replicas; j++) {
+        int index = (primaryNode + j * partitionList.size()) % instanceNames.size();
+        while (partitionAssignment.keySet().contains(instanceNames.get(index))) {
+          index = (index + 1) % instanceNames.size();
+        }
+        partitionAssignment.put(instanceNames.get(index), secondaryValue);
+      }
+      String partitionName = resourceName + "_" + partitionId;
+      result.setMapField(partitionName, partitionAssignment);
+    }
+    result.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(), String.valueOf(partitions));
+    return result;
+  }
+
+  public static ZNRecord calculateIdealState(List<String> instanceNames, int partitions,
+      int replicas, String resourceName) {
+    long randomSeed = 888997632;
+    // seed is a constant, so that the shuffle always give same result
+    return calculateIdealState(instanceNames, partitions, replicas, resourceName, randomSeed);
+  }
+}


Mime
View raw message