helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject [1/5] helix git commit: [HELIX-568] Add new topology aware (rack-aware) rebalance strategy based on CRUSH algorithm. Design doc is available at: https://cwiki.apache.org/confluence/display/HELIX/Helix+Topology-aware+Rebalance+Strategy
Date Mon, 12 Sep 2016 17:09:40 GMT
Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x f5ac8f8b9 -> 7147ec874


http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
deleted file mode 100644
index adc92d6..0000000
--- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
+++ /dev/null
@@ -1,765 +0,0 @@
-package org.apache.helix.controller.strategy;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-import org.apache.helix.HelixDefinedState;
-import org.apache.helix.Mocks.MockAccessor;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.rebalancer.AutoRebalancer;
-import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.tools.StateModelConfigGenerator;
-import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-public class TestAutoRebalanceStrategy {
-  private static Logger logger = Logger.getLogger(TestAutoRebalanceStrategy.class);
-
-  /**
-   * Sanity test for a basic Master-Slave model
-   */
-  @Test
-  public void simpleMasterSlaveTest() {
-    final int NUM_ITERATIONS = 10;
-    final int NUM_PARTITIONS = 10;
-    final int NUM_LIVE_NODES = 12;
-    final int NUM_TOTAL_NODES = 20;
-    final int MAX_PER_NODE = 5;
-
-    final String[] STATE_NAMES = {
-        "MASTER", "SLAVE"
-    };
-    final int[] STATE_COUNTS = {
-        1, 2
-    };
-
-    runTest("BasicMasterSlave", NUM_ITERATIONS, NUM_PARTITIONS, NUM_LIVE_NODES, NUM_TOTAL_NODES,
-        MAX_PER_NODE, STATE_NAMES, STATE_COUNTS);
-  }
-
-  /**
-   * Run a test for an arbitrary state model.
-   * @param name Name of the test state model
-   * @param numIterations Number of rebalance tasks to run
-   * @param numPartitions Number of partitions for the resource
-   * @param numLiveNodes Number of live nodes in the cluster
-   * @param numTotalNodes Number of nodes in the cluster, must be greater than or equal to
-   *          numLiveNodes
-   * @param maxPerNode Maximum number of replicas a node can serve
-   * @param stateNames States ordered by preference
-   * @param stateCounts Number of replicas that should be in each state
-   */
-  private void runTest(String name, int numIterations, int numPartitions, int numLiveNodes,
-      int numTotalNodes, int maxPerNode, String[] stateNames, int[] stateCounts) {
-    List<String> partitions = new ArrayList<String>();
-    for (int i = 0; i < numPartitions; i++) {
-      partitions.add("p_" + i);
-    }
-
-    List<String> liveNodes = new ArrayList<String>();
-    List<String> allNodes = new ArrayList<String>();
-    for (int i = 0; i < numTotalNodes; i++) {
-      allNodes.add("n_" + i);
-      if (i < numLiveNodes) {
-        liveNodes.add("n_" + i);
-      }
-    }
-
-    Map<String, Map<String, String>> currentMapping = new TreeMap<String, Map<String, String>>();
-
-    LinkedHashMap<String, Integer> states = new LinkedHashMap<String, Integer>();
-    for (int i = 0; i < Math.min(stateNames.length, stateCounts.length); i++) {
-      states.put(stateNames[i], stateCounts[i]);
-    }
-
-    StateModelDefinition stateModelDef = getIncompleteStateModelDef(name, stateNames[0], states);
-
-    new AutoRebalanceTester(partitions, states, liveNodes, currentMapping, allNodes, maxPerNode,
-        stateModelDef).runRepeatedly(numIterations);
-  }
-
-  /**
-   * Get a StateModelDefinition without transitions. The auto rebalancer doesn't take transitions
-   * into account when computing mappings, so this is acceptable.
-   * @param modelName name to give the model
-   * @param initialState initial state for all nodes
-   * @param states ordered map of state to count
-   * @return incomplete StateModelDefinition for rebalancing
-   */
-  private StateModelDefinition getIncompleteStateModelDef(String modelName, String initialState,
-      LinkedHashMap<String, Integer> states) {
-    StateModelDefinition.Builder builder = new StateModelDefinition.Builder(modelName);
-    builder.initialState(initialState);
-    int i = states.size();
-    for (String state : states.keySet()) {
-      builder.addState(state, i);
-      builder.upperBound(state, states.get(state));
-      i--;
-    }
-    return builder.build();
-  }
-
-  class AutoRebalanceTester {
-    private static final double P_KILL = 0.45;
-    private static final double P_ADD = 0.1;
-    private static final double P_RESURRECT = 0.45;
-    private static final String RESOURCE_NAME = "resource";
-
-    private List<String> _partitions;
-    private LinkedHashMap<String, Integer> _states;
-    private List<String> _liveNodes;
-    private Set<String> _liveSet;
-    private Set<String> _removedSet;
-    private Set<String> _nonLiveSet;
-    private Map<String, Map<String, String>> _currentMapping;
-    private List<String> _allNodes;
-    private int _maxPerNode;
-    private StateModelDefinition _stateModelDef;
-    private Random _random;
-
-    public AutoRebalanceTester(List<String> partitions, LinkedHashMap<String, Integer> states,
-        List<String> liveNodes, Map<String, Map<String, String>> currentMapping,
-        List<String> allNodes, int maxPerNode, StateModelDefinition stateModelDef) {
-      _partitions = partitions;
-      _states = states;
-      _liveNodes = liveNodes;
-      _liveSet = new TreeSet<String>();
-      for (String node : _liveNodes) {
-        _liveSet.add(node);
-      }
-      _removedSet = new TreeSet<String>();
-      _nonLiveSet = new TreeSet<String>();
-      _currentMapping = currentMapping;
-      _allNodes = allNodes;
-      for (String node : allNodes) {
-        if (!_liveSet.contains(node)) {
-          _nonLiveSet.add(node);
-        }
-      }
-      _maxPerNode = maxPerNode;
-      _stateModelDef = stateModelDef;
-      _random = new Random();
-    }
-
-    /**
-     * Repeatedly randomly select a task to run and report the result
-     * @param numIterations
-     *          Number of random tasks to run in sequence
-     */
-    public void runRepeatedly(int numIterations) {
-      logger.info("~~~~ Initial State ~~~~~");
-      RebalanceStrategy strategy =
-          new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode);
-      ZNRecord initialResult =
-          strategy.computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
-      _currentMapping = getMapping(initialResult.getListFields());
-      logger.info(_currentMapping);
-      getRunResult(_currentMapping, initialResult.getListFields());
-      for (int i = 0; i < numIterations; i++) {
-        logger.info("~~~~ Iteration " + i + " ~~~~~");
-        ZNRecord znRecord = runOnceRandomly();
-        if (znRecord != null) {
-          final Map<String, List<String>> listResult = znRecord.getListFields();
-          final Map<String, Map<String, String>> mapResult = getMapping(listResult);
-          logger.info(mapResult);
-          logger.info(listResult);
-          getRunResult(mapResult, listResult);
-          _currentMapping = mapResult;
-        }
-      }
-    }
-
-    private Map<String, Map<String, String>> getMapping(final Map<String, List<String>> listResult) {
-      final Map<String, Map<String, String>> mapResult = new HashMap<String, Map<String, String>>();
-      ClusterDataCache cache = new ClusterDataCache();
-      MockAccessor accessor = new MockAccessor();
-      Builder keyBuilder = accessor.keyBuilder();
-      for (String node : _liveNodes) {
-        LiveInstance liveInstance = new LiveInstance(node);
-        liveInstance.setSessionId("testSession");
-        accessor.setProperty(keyBuilder.liveInstance(node), liveInstance);
-      }
-      cache.refresh(accessor);
-      for (String partition : _partitions) {
-        List<String> preferenceList = listResult.get(partition);
-        Map<String, String> currentStateMap = _currentMapping.get(partition);
-        Set<String> disabled = Collections.emptySet();
-        Map<String, String> assignment =
-            ConstraintBasedAssignment.computeAutoBestStateForPartition(cache, _stateModelDef,
-                preferenceList, currentStateMap, disabled, true);
-        mapResult.put(partition, assignment);
-      }
-      return mapResult;
-    }
-
-    /**
-     * Output various statistics and correctness check results
-     * @param mapFields
-     *          The map-map assignment generated by the rebalancer
-     * @param listFields
-     *          The map-list assignment generated by the rebalancer
-     */
-    public void getRunResult(final Map<String, Map<String, String>> mapFields,
-        final Map<String, List<String>> listFields) {
-      logger.info("***** Statistics *****");
-      dumpStatistics(mapFields);
-      verifyCorrectness(mapFields, listFields);
-    }
-
-    /**
-     * Output statistics about the assignment
-     * @param mapFields
-     *          The map-map assignment generated by the rebalancer
-     */
-    public void dumpStatistics(final Map<String, Map<String, String>> mapFields) {
-      Map<String, Integer> partitionsPerNode = getPartitionBucketsForNode(mapFields);
-      int nodeCount = _liveNodes.size();
-      logger.info("Total number of nodes: " + nodeCount);
-      logger.info("Nodes: " + _liveNodes);
-      int sumPartitions = getSum(partitionsPerNode.values());
-      logger.info("Total number of partitions: " + sumPartitions);
-      double averagePartitions = getAverage(partitionsPerNode.values());
-      logger.info("Average number of partitions per node: " + averagePartitions);
-      double stdevPartitions = getStdev(partitionsPerNode.values(), averagePartitions);
-      logger.info("Standard deviation of partitions: " + stdevPartitions);
-
-      // Statistics about each state
-      Map<String, Map<String, Integer>> statesPerNode = getStateBucketsForNode(mapFields);
-      for (String state : _states.keySet()) {
-        Map<String, Integer> nodeStateCounts = new TreeMap<String, Integer>();
-        for (Entry<String, Map<String, Integer>> nodeStates : statesPerNode.entrySet()) {
-          Map<String, Integer> stateCounts = nodeStates.getValue();
-          if (stateCounts.containsKey(state)) {
-            nodeStateCounts.put(nodeStates.getKey(), stateCounts.get(state));
-          } else {
-            nodeStateCounts.put(nodeStates.getKey(), 0);
-          }
-        }
-        int sumStates = getSum(nodeStateCounts.values());
-        logger.info("Total number of state " + state + ": " + sumStates);
-        double averageStates = getAverage(nodeStateCounts.values());
-        logger.info("Average number of state " + state + " per node: " + averageStates);
-        double stdevStates = getStdev(nodeStateCounts.values(), averageStates);
-        logger.info("Standard deviation of state " + state + " per node: " + stdevStates);
-      }
-    }
-
-    /**
-     * Run a set of correctness tests, reporting success or failure
-     * @param mapFields
-     *          The map-map assignment generated by the rebalancer
-     * @param listFields
-     *          The map-list assignment generated by the rebalancer
-     */
-    public void verifyCorrectness(final Map<String, Map<String, String>> mapFields,
-        final Map<String, List<String>> listFields) {
-      final Map<String, Integer> partitionsPerNode = getPartitionBucketsForNode(mapFields);
-      boolean maxConstraintMet = maxNotExceeded(partitionsPerNode);
-      assert maxConstraintMet : "Max per node constraint: FAIL";
-      logger.info("Max per node constraint: PASS");
-
-      boolean liveConstraintMet = onlyLiveAssigned(partitionsPerNode);
-      assert liveConstraintMet : "Only live nodes have partitions constraint: FAIL";
-      logger.info("Only live nodes have partitions constraint: PASS");
-
-      boolean stateAssignmentPossible = correctStateAssignmentCount(mapFields);
-      assert stateAssignmentPossible : "State replica constraint: FAIL";
-      logger.info("State replica constraint: PASS");
-
-      boolean nodesUniqueForPartitions = atMostOnePartitionReplicaPerNode(listFields);
-      assert nodesUniqueForPartitions : "Node uniqueness per partition constraint: FAIL";
-      logger.info("Node uniqueness per partition constraint: PASS");
-    }
-
-    private boolean maxNotExceeded(final Map<String, Integer> partitionsPerNode) {
-      for (String node : partitionsPerNode.keySet()) {
-        Integer value = partitionsPerNode.get(node);
-        if (value > _maxPerNode) {
-          logger.error("ERROR: Node " + node + " has " + value
-              + " partitions despite a maximum of " + _maxPerNode);
-          return false;
-        }
-      }
-      return true;
-    }
-
-    private boolean onlyLiveAssigned(final Map<String, Integer> partitionsPerNode) {
-      for (final Entry<String, Integer> nodeState : partitionsPerNode.entrySet()) {
-        boolean isLive = _liveSet.contains(nodeState.getKey());
-        boolean isEmpty = nodeState.getValue() == 0;
-        if (!isLive && !isEmpty) {
-          logger.error("ERROR: Node " + nodeState.getKey() + " is not live, but has "
-              + nodeState.getValue() + " replicas!");
-          return false;
-        }
-      }
-      return true;
-    }
-
-    private boolean correctStateAssignmentCount(final Map<String, Map<String, String>> assignment) {
-      for (final Entry<String, Map<String, String>> partitionEntry : assignment.entrySet()) {
-        final Map<String, String> nodeMap = partitionEntry.getValue();
-        final Map<String, Integer> stateCounts = new TreeMap<String, Integer>();
-        for (String state : nodeMap.values()) {
-          if (!stateCounts.containsKey(state)) {
-            stateCounts.put(state, 1);
-          } else {
-            stateCounts.put(state, stateCounts.get(state) + 1);
-          }
-        }
-        for (String state : stateCounts.keySet()) {
-          if (state.equals(HelixDefinedState.DROPPED.toString())) {
-            continue;
-          }
-          int count = stateCounts.get(state);
-          int maximumCount = _states.get(state);
-          if (count > maximumCount) {
-            logger.error("ERROR: State " + state + " for partition " + partitionEntry.getKey()
-                + " has " + count + " replicas when " + maximumCount + " is allowed!");
-            return false;
-          }
-        }
-      }
-      return true;
-    }
-
-    private boolean atMostOnePartitionReplicaPerNode(final Map<String, List<String>> listFields) {
-      for (final Entry<String, List<String>> partitionEntry : listFields.entrySet()) {
-        Set<String> nodeSet = new HashSet<String>(partitionEntry.getValue());
-        int numUniques = nodeSet.size();
-        int total = partitionEntry.getValue().size();
-        if (numUniques < total) {
-          logger.error("ERROR: Partition " + partitionEntry.getKey() + " is assigned to " + total
-              + " nodes, but only " + numUniques + " are unique!");
-          return false;
-        }
-      }
-      return true;
-    }
-
-    private double getAverage(final Collection<Integer> values) {
-      double sum = 0.0;
-      for (Integer value : values) {
-        sum += value;
-      }
-      if (values.size() != 0) {
-        return sum / values.size();
-      } else {
-        return -1.0;
-      }
-    }
-
-    private int getSum(final Collection<Integer> values) {
-      int sum = 0;
-      for (Integer value : values) {
-        sum += value;
-      }
-      return sum;
-    }
-
-    private double getStdev(final Collection<Integer> values, double mean) {
-      double sum = 0.0;
-      for (Integer value : values) {
-        double deviation = mean - value;
-        sum += Math.pow(deviation, 2.0);
-      }
-      if (values.size() != 0) {
-        sum /= values.size();
-        return Math.pow(sum, 0.5);
-      } else {
-        return -1.0;
-      }
-    }
-
-    private Map<String, Integer> getPartitionBucketsForNode(
-        final Map<String, Map<String, String>> assignment) {
-      Map<String, Integer> partitionsPerNode = new TreeMap<String, Integer>();
-      for (String node : _liveNodes) {
-        partitionsPerNode.put(node, 0);
-      }
-      for (Entry<String, Map<String, String>> partitionEntry : assignment.entrySet()) {
-        final Map<String, String> nodeMap = partitionEntry.getValue();
-        for (String node : nodeMap.keySet()) {
-          String state = nodeMap.get(node);
-          if (state.equals(HelixDefinedState.DROPPED.toString())) {
-            continue;
-          }
-          // add 1 for every occurrence of a node
-          if (!partitionsPerNode.containsKey(node)) {
-            partitionsPerNode.put(node, 1);
-          } else {
-            partitionsPerNode.put(node, partitionsPerNode.get(node) + 1);
-          }
-        }
-      }
-      return partitionsPerNode;
-    }
-
-    private Map<String, Map<String, Integer>> getStateBucketsForNode(
-        final Map<String, Map<String, String>> assignment) {
-      Map<String, Map<String, Integer>> result = new TreeMap<String, Map<String, Integer>>();
-      for (String n : _liveNodes) {
-        result.put(n, new TreeMap<String, Integer>());
-      }
-      for (Map<String, String> nodeStateMap : assignment.values()) {
-        for (Entry<String, String> nodeState : nodeStateMap.entrySet()) {
-          if (!result.containsKey(nodeState.getKey())) {
-            result.put(nodeState.getKey(), new TreeMap<String, Integer>());
-          }
-          Map<String, Integer> stateMap = result.get(nodeState.getKey());
-          if (!stateMap.containsKey(nodeState.getValue())) {
-            stateMap.put(nodeState.getValue(), 1);
-          } else {
-            stateMap.put(nodeState.getValue(), stateMap.get(nodeState.getValue()) + 1);
-          }
-        }
-      }
-      return result;
-    }
-
-    /**
-     * Randomly choose between killing, adding, or resurrecting a single node
-     * @return (Partition -> (Node -> State)) ZNRecord
-     */
-    public ZNRecord runOnceRandomly() {
-      double choose = _random.nextDouble();
-      ZNRecord result = null;
-      if (choose < P_KILL) {
-        result = removeSingleNode(null);
-      } else if (choose < P_KILL + P_ADD) {
-        result = addSingleNode(null);
-      } else if (choose < P_KILL + P_ADD + P_RESURRECT) {
-        result = resurrectSingleNode(null);
-      }
-      return result;
-    }
-
-    /**
-     * Run rebalancer trying to add a never-live node
-     * @param node
-     *          Optional String to add
-     * @return ZNRecord result returned by the rebalancer
-     */
-    public ZNRecord addSingleNode(String node) {
-      logger.info("=================== add node =================");
-      if (_nonLiveSet.size() == 0) {
-        logger.warn("Cannot add node because there are no nodes left to add.");
-        return null;
-      }
-
-      // Get a random never-live node
-      if (node == null || !_nonLiveSet.contains(node)) {
-        node = getRandomSetElement(_nonLiveSet);
-      }
-      logger.info("Adding " + node);
-      _liveNodes.add(node);
-      _liveSet.add(node);
-      _nonLiveSet.remove(node);
-
-      return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode).
-          computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
-    }
-
-    /**
-     * Run rebalancer trying to remove a live node
-     * @param node
-     *          Optional String to remove
-     * @return ZNRecord result returned by the rebalancer
-     */
-    public ZNRecord removeSingleNode(String node) {
-      logger.info("=================== remove node =================");
-      if (_liveSet.size() == 0) {
-        logger.warn("Cannot remove node because there are no nodes left to remove.");
-        return null;
-      }
-
-      // Get a random never-live node
-      if (node == null || !_liveSet.contains(node)) {
-        node = getRandomSetElement(_liveSet);
-      }
-      logger.info("Removing " + node);
-      _removedSet.add(node);
-      _liveNodes.remove(node);
-      _liveSet.remove(node);
-
-      // the rebalancer expects that the current mapping doesn't contain deleted
-      // nodes
-      for (Map<String, String> nodeMap : _currentMapping.values()) {
-        if (nodeMap.containsKey(node)) {
-          nodeMap.remove(node);
-        }
-      }
-
-      return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode)
-          .computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
-    }
-
-    /**
-     * Run rebalancer trying to add back a removed node
-     * @param node
-     *          Optional String to resurrect
-     * @return ZNRecord result returned by the rebalancer
-     */
-    public ZNRecord resurrectSingleNode(String node) {
-      logger.info("=================== resurrect node =================");
-      if (_removedSet.size() == 0) {
-        logger.warn("Cannot remove node because there are no nodes left to resurrect.");
-        return null;
-      }
-
-      // Get a random never-live node
-      if (node == null || !_removedSet.contains(node)) {
-        node = getRandomSetElement(_removedSet);
-      }
-      logger.info("Resurrecting " + node);
-      _removedSet.remove(node);
-      _liveNodes.add(node);
-      _liveSet.add(node);
-
-      return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode)
-          .computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
-    }
-
-    private <T> T getRandomSetElement(Set<T> source) {
-      int element = _random.nextInt(source.size());
-      int i = 0;
-      for (T node : source) {
-        if (i == element) {
-          return node;
-        }
-        i++;
-      }
-      return null;
-    }
-  }
-
-  /**
-   * Tests the following scenario: nodes come up one by one, then one node is taken down. Preference
-   * lists should prefer nodes in the current mapping at all times, but when all nodes are in the
-   * current mapping, then it should distribute states as evenly as possible.
-   */
-  @Test
-  public void testOrphansNotPreferred() {
-    final String RESOURCE_NAME = "resource";
-    final String[] PARTITIONS = {
-        "resource_0", "resource_1", "resource_2"
-    };
-    final StateModelDefinition STATE_MODEL =
-        new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
-    final int REPLICA_COUNT = 2;
-    final String[] NODES = {
-        "n0", "n1", "n2"
-    };
-
-    // initial state, one node, no mapping
-    List<String> allNodes = Lists.newArrayList(NODES[0]);
-    List<String> liveNodes = Lists.newArrayList(NODES[0]);
-    Map<String, Map<String, String>> currentMapping = Maps.newHashMap();
-    for (String partition : PARTITIONS) {
-      currentMapping.put(partition, new HashMap<String, String>());
-    }
-
-    // make sure that when the first node joins, a single replica is assigned fairly
-    List<String> partitions = ImmutableList.copyOf(PARTITIONS);
-    LinkedHashMap<String, Integer> stateCount =
-        AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
-    ZNRecord znRecord =
-        new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
-            .computePartitionAssignment(liveNodes, currentMapping, allNodes);
-    Map<String, List<String>> preferenceLists = znRecord.getListFields();
-    for (String partition : currentMapping.keySet()) {
-      // make sure these are all MASTER
-      List<String> preferenceList = preferenceLists.get(partition);
-      Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
-      Assert.assertEquals(preferenceList.size(), 1, "invalid preference list for " + partition);
-    }
-
-    // now assign a replica to the first node in the current mapping, and add a second node
-    allNodes.add(NODES[1]);
-    liveNodes.add(NODES[1]);
-    stateCount = AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
-    for (String partition : PARTITIONS) {
-      currentMapping.get(partition).put(NODES[0], "MASTER");
-    }
-    znRecord =
-        new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
-            .computePartitionAssignment(liveNodes, currentMapping, allNodes);
-    preferenceLists = znRecord.getListFields();
-    for (String partition : currentMapping.keySet()) {
-      List<String> preferenceList = preferenceLists.get(partition);
-      Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
-      Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
-      Assert.assertEquals(preferenceList.get(0), NODES[0], "invalid preference list for "
-          + partition);
-      Assert.assertEquals(preferenceList.get(1), NODES[1], "invalid preference list for "
-          + partition);
-    }
-
-    // now set the current mapping to reflect this update and make sure that it distributes masters
-    for (String partition : PARTITIONS) {
-      currentMapping.get(partition).put(NODES[1], "SLAVE");
-    }
-    znRecord =
-        new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
-            .computePartitionAssignment(liveNodes, currentMapping, allNodes);
-    preferenceLists = znRecord.getListFields();
-    Set<String> firstNodes = Sets.newHashSet();
-    for (String partition : currentMapping.keySet()) {
-      List<String> preferenceList = preferenceLists.get(partition);
-      Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
-      Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
-      firstNodes.add(preferenceList.get(0));
-    }
-    Assert.assertEquals(firstNodes.size(), 2, "masters not evenly distributed");
-
-    // set a mapping corresponding to a valid mapping for 2 nodes, add a third node, check that the
-    // new node is never the most preferred
-    allNodes.add(NODES[2]);
-    liveNodes.add(NODES[2]);
-    stateCount = AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
-
-    // recall that the other two partitions are [MASTER, SLAVE], which is fine, just reorder one
-    currentMapping.get(PARTITIONS[1]).put(NODES[0], "SLAVE");
-    currentMapping.get(PARTITIONS[1]).put(NODES[1], "MASTER");
-    znRecord =
-        new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
-            .computePartitionAssignment(liveNodes, currentMapping, allNodes);
-    preferenceLists = znRecord.getListFields();
-    boolean newNodeUsed = false;
-    for (String partition : currentMapping.keySet()) {
-      List<String> preferenceList = preferenceLists.get(partition);
-      Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
-      Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
-      if (preferenceList.contains(NODES[2])) {
-        newNodeUsed = true;
-        Assert.assertEquals(preferenceList.get(1), NODES[2],
-            "newly added node not at preference list tail for " + partition);
-      }
-    }
-    Assert.assertTrue(newNodeUsed, "not using " + NODES[2]);
-
-    // now remap this to take the new node into account, should go back to balancing masters, slaves
-    // evenly across all nodes
-    for (String partition : PARTITIONS) {
-      currentMapping.get(partition).clear();
-    }
-    currentMapping.get(PARTITIONS[0]).put(NODES[0], "MASTER");
-    currentMapping.get(PARTITIONS[0]).put(NODES[1], "SLAVE");
-    currentMapping.get(PARTITIONS[1]).put(NODES[1], "MASTER");
-    currentMapping.get(PARTITIONS[1]).put(NODES[2], "SLAVE");
-    currentMapping.get(PARTITIONS[2]).put(NODES[0], "MASTER");
-    currentMapping.get(PARTITIONS[2]).put(NODES[2], "SLAVE");
-    znRecord =
-        new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
-            .computePartitionAssignment(liveNodes, currentMapping, allNodes);
-    preferenceLists = znRecord.getListFields();
-    firstNodes.clear();
-    Set<String> secondNodes = Sets.newHashSet();
-    for (String partition : currentMapping.keySet()) {
-      List<String> preferenceList = preferenceLists.get(partition);
-      Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
-      Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
-      firstNodes.add(preferenceList.get(0));
-      secondNodes.add(preferenceList.get(1));
-    }
-    Assert.assertEquals(firstNodes.size(), 3, "masters not distributed evenly");
-    Assert.assertEquals(secondNodes.size(), 3, "slaves not distributed evenly");
-
-    // remove a node now, but use the current mapping with everything balanced just prior
-    liveNodes.remove(0);
-    stateCount = AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
-
-    // remove all references of n0 from the mapping, keep everything else in a legal state
-    for (String partition : PARTITIONS) {
-      currentMapping.get(partition).clear();
-    }
-    currentMapping.get(PARTITIONS[0]).put(NODES[1], "MASTER");
-    currentMapping.get(PARTITIONS[1]).put(NODES[1], "MASTER");
-    currentMapping.get(PARTITIONS[1]).put(NODES[2], "SLAVE");
-    currentMapping.get(PARTITIONS[2]).put(NODES[2], "MASTER");
-    znRecord =
-        new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
-            .computePartitionAssignment(liveNodes, currentMapping, allNodes);
-    preferenceLists = znRecord.getListFields();
-    for (String partition : currentMapping.keySet()) {
-      List<String> preferenceList = preferenceLists.get(partition);
-      Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
-      Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
-      Map<String, String> stateMap = currentMapping.get(partition);
-      for (String participant : stateMap.keySet()) {
-        Assert.assertTrue(preferenceList.contains(participant), "minimal movement violated for "
-            + partition);
-      }
-      for (String participant : preferenceList) {
-        if (!stateMap.containsKey(participant)) {
-          Assert.assertNotSame(preferenceList.get(0), participant,
-              "newly moved replica should not be master for " + partition);
-        }
-      }
-    }
-
-    // finally, adjust the current mapping to reflect 2 nodes and make sure everything's even again
-    for (String partition : PARTITIONS) {
-      currentMapping.get(partition).clear();
-    }
-    currentMapping.get(PARTITIONS[0]).put(NODES[1], "MASTER");
-    currentMapping.get(PARTITIONS[0]).put(NODES[2], "SLAVE");
-    currentMapping.get(PARTITIONS[1]).put(NODES[1], "SLAVE");
-    currentMapping.get(PARTITIONS[1]).put(NODES[2], "MASTER");
-    currentMapping.get(PARTITIONS[2]).put(NODES[1], "SLAVE");
-    currentMapping.get(PARTITIONS[2]).put(NODES[2], "MASTER");
-    znRecord =
-        new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
-            .computePartitionAssignment(liveNodes, currentMapping, allNodes);
-    preferenceLists = znRecord.getListFields();
-    firstNodes.clear();
-    for (String partition : currentMapping.keySet()) {
-      List<String> preferenceList = preferenceLists.get(partition);
-      Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
-      Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
-      firstNodes.add(preferenceList.get(0));
-    }
-    Assert.assertEquals(firstNodes.size(), 2, "masters not evenly distributed");
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/test/java/org/apache/helix/controller/strategy/TestTopology.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestTopology.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestTopology.java
new file mode 100644
index 0000000..5169edd
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestTopology.java
@@ -0,0 +1,172 @@
+package org.apache.helix.controller.Strategy;
+
+import org.apache.helix.HelixProperty;
+import org.apache.helix.controller.rebalancer.topology.Node;
+import org.apache.helix.controller.rebalancer.topology.Topology;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public class TestTopology {
+  private static Logger logger = Logger.getLogger(TestTopology.class);
+
+  @Test
+  public void testCreateClusterTopology() {
+    ClusterConfig clusterConfig = new ClusterConfig("Test_Cluster");
+
+    String topology = "/Rack/Sub-Rack/Host/Instance";
+    clusterConfig.getRecord().getSimpleFields()
+        .put(ClusterConfig.ClusterConfigProperty.TOPOLOGY.name(), topology);
+    clusterConfig.getRecord().getSimpleFields()
+        .put(ClusterConfig.ClusterConfigProperty.FAULT_ZONE_TYPE.name(), "Sub-Rack");
+
+    List<String> allNodes = new ArrayList<String>();
+    List<String> liveNodes = new ArrayList<String>();
+    Map<String, InstanceConfig> instanceConfigMap = new HashMap<String, InstanceConfig>();
+
+    Map<String, Integer> nodeToWeightMap = new HashMap<String, Integer>();
+
+    for (int i = 0; i < 100; i++) {
+      String instance = "localhost_" + i;
+      InstanceConfig config = new InstanceConfig(instance);
+      String rack_id = "rack_" + i/25;
+      String sub_rack_id = "subrack-" + i/5;
+
+      String domain =
+          String.format("Rack=%s, Sub-Rack=%s, Host=%s", rack_id, sub_rack_id, instance);
+      config.setDomain(domain);
+      config.setHostName(instance);
+      config.setPort("9000");
+      allNodes.add(instance);
+
+      int weight = 0;
+      if (i % 10 != 0) {
+        liveNodes.add(instance);
+        weight = 1000;
+        if (i % 3 == 0) {
+          // set random instance weight.
+          weight = (i+1) * 100;
+          config.setWeight(weight);
+        }
+      }
+
+      instanceConfigMap.put(instance, config);
+
+      if (!nodeToWeightMap.containsKey(rack_id)) {
+        nodeToWeightMap.put(rack_id, 0);
+      }
+      nodeToWeightMap.put(rack_id, nodeToWeightMap.get(rack_id) + weight);
+      if (!nodeToWeightMap.containsKey(sub_rack_id)) {
+        nodeToWeightMap.put(sub_rack_id, 0);
+      }
+      nodeToWeightMap.put(sub_rack_id, nodeToWeightMap.get(sub_rack_id) + weight);
+    }
+
+    Topology topo = new Topology(allNodes, liveNodes, instanceConfigMap, clusterConfig);
+
+    Assert.assertTrue(topo.getEndNodeType().equals("Instance"));
+    Assert.assertTrue(topo.getFaultZoneType().equals("Sub-Rack"));
+
+    List<Node> faultZones = topo.getFaultZones();
+    Assert.assertEquals(faultZones.size(), 20);
+
+    Node root = topo.getRootNode();
+
+    Assert.assertEquals(root.getChildrenCount("Rack"), 4);
+    Assert.assertEquals(root.getChildrenCount("Sub-Rack"), 20);
+    Assert.assertEquals(root.getChildrenCount("Host"), 100);
+    Assert.assertEquals(root.getChildrenCount("Instance"), 100);
+
+
+    // validate weights.
+    for (Node rack : root.getChildren()) {
+      Assert.assertEquals(rack.getWeight(), (long)nodeToWeightMap.get(rack.getName()));
+      for (Node subRack : rack.getChildren()) {
+        Assert.assertEquals(subRack.getWeight(), (long)nodeToWeightMap.get(subRack.getName()));
+      }
+    }
+  }
+
+  @Test
+  public void testCreateClusterTopologyWithDefaultTopology() {
+    ClusterConfig clusterConfig = new ClusterConfig("Test_Cluster");
+
+    List<String> allNodes = new ArrayList<String>();
+    List<String> liveNodes = new ArrayList<String>();
+    Map<String, InstanceConfig> instanceConfigMap = new HashMap<String, InstanceConfig>();
+
+    Map<String, Integer> nodeToWeightMap = new HashMap<String, Integer>();
+
+    for (int i = 0; i < 100; i++) {
+      String instance = "localhost_" + i;
+      InstanceConfig config = new InstanceConfig(instance);
+      String zoneId = "rack_" + i / 10;
+      config.setZoneId(zoneId);
+      config.setHostName(instance);
+      config.setPort("9000");
+      allNodes.add(instance);
+
+      int weight = 0;
+      if (i % 10 != 0) {
+        liveNodes.add(instance);
+        weight = 1000;
+        if (i % 3 == 0) {
+          // set random instance weight.
+          weight = (i + 1) * 100;
+          config.setWeight(weight);
+        }
+      }
+
+      instanceConfigMap.put(instance, config);
+
+      if (!nodeToWeightMap.containsKey(zoneId)) {
+        nodeToWeightMap.put(zoneId, 0);
+      }
+      nodeToWeightMap.put(zoneId, nodeToWeightMap.get(zoneId) + weight);
+    }
+
+    Topology topo = new Topology(allNodes, liveNodes, instanceConfigMap, clusterConfig);
+
+    Assert.assertTrue(topo.getEndNodeType().equals(Topology.Types.INSTANCE.name()));
+    Assert.assertTrue(topo.getFaultZoneType().equals(Topology.Types.ZONE.name()));
+
+    List<Node> faultZones = topo.getFaultZones();
+    Assert.assertEquals(faultZones.size(), 10);
+
+    Node root = topo.getRootNode();
+
+    Assert.assertEquals(root.getChildrenCount(Topology.Types.ZONE.name()), 10);
+    Assert.assertEquals(root.getChildrenCount(topo.getEndNodeType()), 100);
+
+    // validate weights.
+    for (Node rack : root.getChildren()) {
+      Assert.assertEquals(rack.getWeight(), (long) nodeToWeightMap.get(rack.getName()));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalance.java
new file mode 100644
index 0000000..5c34792
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalance.java
@@ -0,0 +1,221 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class TestCrushAutoRebalance extends ZkIntegrationTestBase {
+  final int NUM_NODE = 6;
+  protected static final int START_PORT = 12918;
+  protected static final int _PARTITIONS = 20;
+
+  protected final String CLASS_NAME = getShortClassName();
+  protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+  protected ClusterControllerManager _controller;
+
+  protected ClusterSetup _setupTool = null;
+  List<MockParticipantManager> _participants = new ArrayList<MockParticipantManager>();
+  Map<String, String> _nodeToZoneMap = new HashMap<String, String>();
+  Map<String, String> _nodeToTagMap = new HashMap<String, String>();
+  List<String> _nodes = new ArrayList<String>();
+  List<String> _allDBs = new ArrayList<String>();
+  int _replica = 3;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
+    }
+    _setupTool = new ClusterSetup(_gZkClient);
+    _setupTool.addCluster(CLUSTER_NAME, true);
+
+    for (int i = 0; i < NUM_NODE; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+      String zone = "zone-" + i % 3;
+      String tag = "tag-" + i % 2;
+      _setupTool.getClusterManagementTool().setInstanceZoneId(CLUSTER_NAME, storageNodeName, zone);
+      _setupTool.getClusterManagementTool().addInstanceTag(CLUSTER_NAME, storageNodeName, tag);
+      _nodeToZoneMap.put(storageNodeName, zone);
+      _nodeToTagMap.put(storageNodeName, tag);
+      _nodes.add(storageNodeName);
+    }
+
+    // start dummy participants
+    for (String node : _nodes) {
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, node);
+      participant.syncStart();
+      _participants.add(participant);
+    }
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+  }
+
+  @DataProvider(name = "rebalanceStrategies")
+  public static String [][] rebalanceStrategies() {
+    return new String[][] { {"CrushRebalanceStrategy", CrushRebalanceStrategy.class.getName()}};
+  }
+
+  @Test(dataProvider = "rebalanceStrategies")
+  public void testZoneIsolation(String rebalanceStrategyName, String rebalanceStrategyClass)
+      throws Exception {
+    System.out.println("Test " + rebalanceStrategyName);
+    List<String> testDBs = new ArrayList<String>();
+    String[] testModels = { BuiltInStateModelDefinitions.OnlineOffline.name(),
+        BuiltInStateModelDefinitions.MasterSlave.name(),
+        BuiltInStateModelDefinitions.LeaderStandby.name()
+    };
+    int i = 0;
+    for (String stateModel : testModels) {
+      String db = "Test-DB-" + rebalanceStrategyName + "-" + i++;
+      _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS, stateModel,
+          RebalanceMode.FULL_AUTO + "", rebalanceStrategyClass);
+      _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+      testDBs.add(db);
+      _allDBs.add(db);
+    }
+    Thread.sleep(300);
+
+    boolean result = ClusterStateVerifier.verifyByZkCallback(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+    Assert.assertTrue(result);
+
+    for (String db : testDBs) {
+      IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      ExternalView ev =
+          _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      validateZoneAndTagIsolation(is, ev);
+    }
+  }
+
+  @Test(dataProvider = "rebalanceStrategies")
+  public void testZoneIsolationWithInstanceTag(
+      String rebalanceStrategyName, String rebalanceStrategyClass) throws Exception {
+    List<String> testDBs = new ArrayList<String>();
+    Set<String> tags = new HashSet<String>(_nodeToTagMap.values());
+    int i = 0;
+    for (String tag : tags) {
+      String db = "Test-DB-Tag-" + rebalanceStrategyName + "-" + i++;
+      _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS,
+          BuiltInStateModelDefinitions.MasterSlave.name(), RebalanceMode.FULL_AUTO + "",
+          rebalanceStrategyClass);
+      IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      is.setInstanceGroupTag(tag);
+      _setupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, db, is);
+      _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+      testDBs.add(db);
+      _allDBs.add(db);
+    }
+    Thread.sleep(300);
+
+    boolean result = ClusterStateVerifier.verifyByZkCallback(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+    Assert.assertTrue(result);
+
+    for (String db : testDBs) {
+      IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      ExternalView ev =
+          _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      validateZoneAndTagIsolation(is, ev);
+    }
+  }
+
+  /**
+   * Validate instances for each partition is on different zone and with necessary tagged instances.
+   */
+  private void validateZoneAndTagIsolation(IdealState is, ExternalView ev) {
+    int replica = Integer.valueOf(is.getReplicas());
+    String tag = is.getInstanceGroupTag();
+
+    for (String partition : is.getPartitionSet()) {
+      Set<String> assignedZones = new HashSet<String>();
+
+      Set<String> instancesInIs = new HashSet<String>(is.getRecord().getListField(partition));
+      Map<String, String> assignmentMap = ev.getRecord().getMapField(partition);
+      Set<String> instancesInEV = assignmentMap.keySet();
+      Assert.assertEquals(instancesInEV, instancesInIs);
+      for (String instance : instancesInEV) {
+        assignedZones.add(_nodeToZoneMap.get(instance));
+        if (tag != null) {
+          InstanceConfig config =
+              _setupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instance);
+          Assert.assertTrue(config.containsTag(tag));
+        }
+      }
+      Assert.assertEquals(assignedZones.size(), replica);
+    }
+  }
+
+  @Test()
+  public void testAddZone() throws Exception {
+    //TODO
+  }
+
+  @Test()
+  public void testAddNodes() throws Exception {
+    //TODO
+  }
+
+  @Test()
+  public void testNodeFailure() throws Exception {
+    //TODO
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    /**
+     * shutdown order: 1) disconnect the controller 2) disconnect participants
+     */
+    _controller.syncStop();
+    for (MockParticipantManager participant : _participants) {
+      participant.syncStop();
+    }
+    _setupTool.deleteCluster(CLUSTER_NAME);
+    System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
index 917be17..51dd19d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
@@ -31,6 +31,7 @@ import org.apache.helix.mock.participant.DummyProcess.DummyOnlineOfflineStateMod
 import org.apache.helix.mock.participant.MockMSModelFactory;
 import org.apache.helix.mock.participant.MockSchemataModelFactory;
 import org.apache.helix.mock.participant.MockTransition;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.log4j.Logger;
 
@@ -73,14 +74,17 @@ public class MockParticipantManager extends ZKHelixManager implements Runnable,
   public void run() {
     try {
       StateMachineEngine stateMach = getStateMachineEngine();
-      stateMach.registerStateModelFactory("MasterSlave", _msModelFactory);
+      stateMach.registerStateModelFactory(BuiltInStateModelDefinitions.MasterSlave.name(),
+          _msModelFactory);
 
       DummyLeaderStandbyStateModelFactory lsModelFactory =
           new DummyLeaderStandbyStateModelFactory(10);
       DummyOnlineOfflineStateModelFactory ofModelFactory =
           new DummyOnlineOfflineStateModelFactory(10);
-      stateMach.registerStateModelFactory("LeaderStandby", lsModelFactory);
-      stateMach.registerStateModelFactory("OnlineOffline", ofModelFactory);
+      stateMach.registerStateModelFactory(BuiltInStateModelDefinitions.LeaderStandby.name(),
+          lsModelFactory);
+      stateMach.registerStateModelFactory(BuiltInStateModelDefinitions.OnlineOffline.name(),
+          ofModelFactory);
 
       MockSchemataModelFactory schemataFactory = new MockSchemataModelFactory();
       stateMach.registerStateModelFactory("STORAGE_DEFAULT_SM_SCHEMATA", schemataFactory);


Mime
View raw message