Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3B986200B8C for ; Mon, 12 Sep 2016 19:09:43 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3A6E2160AD6; Mon, 12 Sep 2016 17:09:43 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 640EF160AB2 for ; Mon, 12 Sep 2016 19:09:41 +0200 (CEST) Received: (qmail 19112 invoked by uid 500); 12 Sep 2016 17:09:40 -0000 Mailing-List: contact commits-help@helix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@helix.apache.org Delivered-To: mailing list commits@helix.apache.org Received: (qmail 19099 invoked by uid 99); 12 Sep 2016 17:09:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 12 Sep 2016 17:09:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6A523E09C6; Mon, 12 Sep 2016 17:09:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: lxia@apache.org To: commits@helix.apache.org Date: Mon, 12 Sep 2016 17:09:40 -0000 Message-Id: <1ff357b2a8674d6fa7e4b1fc0618c761@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/5] helix git commit: [HELIX-568] Add new topology aware (rack-aware) rebalance strategy based on CRUSH algorithm. Design doc is available at: https://cwiki.apache.org/confluence/display/HELIX/Helix+Topology-aware+Rebalance+Strategy archived-at: Mon, 12 Sep 2016 17:09:43 -0000 Repository: helix Updated Branches: refs/heads/helix-0.6.x f5ac8f8b9 -> 7147ec874 http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java deleted file mode 100644 index adc92d6..0000000 --- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java +++ /dev/null @@ -1,765 +0,0 @@ -package org.apache.helix.controller.strategy; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Random; -import java.util.Set; -import java.util.TreeMap; -import java.util.TreeSet; - -import org.apache.helix.HelixDefinedState; -import org.apache.helix.Mocks.MockAccessor; -import org.apache.helix.PropertyKey.Builder; -import org.apache.helix.ZNRecord; -import org.apache.helix.controller.rebalancer.AutoRebalancer; -import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment; -import org.apache.helix.controller.stages.ClusterDataCache; -import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme; -import org.apache.helix.model.LiveInstance; -import org.apache.helix.model.StateModelDefinition; -import org.apache.helix.tools.StateModelConfigGenerator; -import org.apache.log4j.Logger; -import org.testng.Assert; -import org.testng.annotations.Test; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -public class TestAutoRebalanceStrategy { - private static Logger logger = Logger.getLogger(TestAutoRebalanceStrategy.class); - - /** - * Sanity test for a basic Master-Slave model - */ - @Test - public void simpleMasterSlaveTest() { - final int NUM_ITERATIONS = 10; - final int NUM_PARTITIONS = 10; - final int NUM_LIVE_NODES = 12; - final int NUM_TOTAL_NODES = 20; - final int MAX_PER_NODE = 5; - - final String[] STATE_NAMES = { - "MASTER", "SLAVE" - }; - final int[] STATE_COUNTS = { - 1, 2 - }; - - runTest("BasicMasterSlave", NUM_ITERATIONS, NUM_PARTITIONS, NUM_LIVE_NODES, NUM_TOTAL_NODES, - MAX_PER_NODE, STATE_NAMES, STATE_COUNTS); - } - - /** - * Run a test for an arbitrary state model. - * @param name Name of the test state model - * @param numIterations Number of rebalance tasks to run - * @param numPartitions Number of partitions for the resource - * @param numLiveNodes Number of live nodes in the cluster - * @param numTotalNodes Number of nodes in the cluster, must be greater than or equal to - * numLiveNodes - * @param maxPerNode Maximum number of replicas a node can serve - * @param stateNames States ordered by preference - * @param stateCounts Number of replicas that should be in each state - */ - private void runTest(String name, int numIterations, int numPartitions, int numLiveNodes, - int numTotalNodes, int maxPerNode, String[] stateNames, int[] stateCounts) { - List partitions = new ArrayList(); - for (int i = 0; i < numPartitions; i++) { - partitions.add("p_" + i); - } - - List liveNodes = new ArrayList(); - List allNodes = new ArrayList(); - for (int i = 0; i < numTotalNodes; i++) { - allNodes.add("n_" + i); - if (i < numLiveNodes) { - liveNodes.add("n_" + i); - } - } - - Map> currentMapping = new TreeMap>(); - - LinkedHashMap states = new LinkedHashMap(); - for (int i = 0; i < Math.min(stateNames.length, stateCounts.length); i++) { - states.put(stateNames[i], stateCounts[i]); - } - - StateModelDefinition stateModelDef = getIncompleteStateModelDef(name, stateNames[0], states); - - new AutoRebalanceTester(partitions, states, liveNodes, currentMapping, allNodes, maxPerNode, - stateModelDef).runRepeatedly(numIterations); - } - - /** - * Get a StateModelDefinition without transitions. The auto rebalancer doesn't take transitions - * into account when computing mappings, so this is acceptable. - * @param modelName name to give the model - * @param initialState initial state for all nodes - * @param states ordered map of state to count - * @return incomplete StateModelDefinition for rebalancing - */ - private StateModelDefinition getIncompleteStateModelDef(String modelName, String initialState, - LinkedHashMap states) { - StateModelDefinition.Builder builder = new StateModelDefinition.Builder(modelName); - builder.initialState(initialState); - int i = states.size(); - for (String state : states.keySet()) { - builder.addState(state, i); - builder.upperBound(state, states.get(state)); - i--; - } - return builder.build(); - } - - class AutoRebalanceTester { - private static final double P_KILL = 0.45; - private static final double P_ADD = 0.1; - private static final double P_RESURRECT = 0.45; - private static final String RESOURCE_NAME = "resource"; - - private List _partitions; - private LinkedHashMap _states; - private List _liveNodes; - private Set _liveSet; - private Set _removedSet; - private Set _nonLiveSet; - private Map> _currentMapping; - private List _allNodes; - private int _maxPerNode; - private StateModelDefinition _stateModelDef; - private Random _random; - - public AutoRebalanceTester(List partitions, LinkedHashMap states, - List liveNodes, Map> currentMapping, - List allNodes, int maxPerNode, StateModelDefinition stateModelDef) { - _partitions = partitions; - _states = states; - _liveNodes = liveNodes; - _liveSet = new TreeSet(); - for (String node : _liveNodes) { - _liveSet.add(node); - } - _removedSet = new TreeSet(); - _nonLiveSet = new TreeSet(); - _currentMapping = currentMapping; - _allNodes = allNodes; - for (String node : allNodes) { - if (!_liveSet.contains(node)) { - _nonLiveSet.add(node); - } - } - _maxPerNode = maxPerNode; - _stateModelDef = stateModelDef; - _random = new Random(); - } - - /** - * Repeatedly randomly select a task to run and report the result - * @param numIterations - * Number of random tasks to run in sequence - */ - public void runRepeatedly(int numIterations) { - logger.info("~~~~ Initial State ~~~~~"); - RebalanceStrategy strategy = - new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode); - ZNRecord initialResult = - strategy.computePartitionAssignment(_liveNodes, _currentMapping, _allNodes); - _currentMapping = getMapping(initialResult.getListFields()); - logger.info(_currentMapping); - getRunResult(_currentMapping, initialResult.getListFields()); - for (int i = 0; i < numIterations; i++) { - logger.info("~~~~ Iteration " + i + " ~~~~~"); - ZNRecord znRecord = runOnceRandomly(); - if (znRecord != null) { - final Map> listResult = znRecord.getListFields(); - final Map> mapResult = getMapping(listResult); - logger.info(mapResult); - logger.info(listResult); - getRunResult(mapResult, listResult); - _currentMapping = mapResult; - } - } - } - - private Map> getMapping(final Map> listResult) { - final Map> mapResult = new HashMap>(); - ClusterDataCache cache = new ClusterDataCache(); - MockAccessor accessor = new MockAccessor(); - Builder keyBuilder = accessor.keyBuilder(); - for (String node : _liveNodes) { - LiveInstance liveInstance = new LiveInstance(node); - liveInstance.setSessionId("testSession"); - accessor.setProperty(keyBuilder.liveInstance(node), liveInstance); - } - cache.refresh(accessor); - for (String partition : _partitions) { - List preferenceList = listResult.get(partition); - Map currentStateMap = _currentMapping.get(partition); - Set disabled = Collections.emptySet(); - Map assignment = - ConstraintBasedAssignment.computeAutoBestStateForPartition(cache, _stateModelDef, - preferenceList, currentStateMap, disabled, true); - mapResult.put(partition, assignment); - } - return mapResult; - } - - /** - * Output various statistics and correctness check results - * @param mapFields - * The map-map assignment generated by the rebalancer - * @param listFields - * The map-list assignment generated by the rebalancer - */ - public void getRunResult(final Map> mapFields, - final Map> listFields) { - logger.info("***** Statistics *****"); - dumpStatistics(mapFields); - verifyCorrectness(mapFields, listFields); - } - - /** - * Output statistics about the assignment - * @param mapFields - * The map-map assignment generated by the rebalancer - */ - public void dumpStatistics(final Map> mapFields) { - Map partitionsPerNode = getPartitionBucketsForNode(mapFields); - int nodeCount = _liveNodes.size(); - logger.info("Total number of nodes: " + nodeCount); - logger.info("Nodes: " + _liveNodes); - int sumPartitions = getSum(partitionsPerNode.values()); - logger.info("Total number of partitions: " + sumPartitions); - double averagePartitions = getAverage(partitionsPerNode.values()); - logger.info("Average number of partitions per node: " + averagePartitions); - double stdevPartitions = getStdev(partitionsPerNode.values(), averagePartitions); - logger.info("Standard deviation of partitions: " + stdevPartitions); - - // Statistics about each state - Map> statesPerNode = getStateBucketsForNode(mapFields); - for (String state : _states.keySet()) { - Map nodeStateCounts = new TreeMap(); - for (Entry> nodeStates : statesPerNode.entrySet()) { - Map stateCounts = nodeStates.getValue(); - if (stateCounts.containsKey(state)) { - nodeStateCounts.put(nodeStates.getKey(), stateCounts.get(state)); - } else { - nodeStateCounts.put(nodeStates.getKey(), 0); - } - } - int sumStates = getSum(nodeStateCounts.values()); - logger.info("Total number of state " + state + ": " + sumStates); - double averageStates = getAverage(nodeStateCounts.values()); - logger.info("Average number of state " + state + " per node: " + averageStates); - double stdevStates = getStdev(nodeStateCounts.values(), averageStates); - logger.info("Standard deviation of state " + state + " per node: " + stdevStates); - } - } - - /** - * Run a set of correctness tests, reporting success or failure - * @param mapFields - * The map-map assignment generated by the rebalancer - * @param listFields - * The map-list assignment generated by the rebalancer - */ - public void verifyCorrectness(final Map> mapFields, - final Map> listFields) { - final Map partitionsPerNode = getPartitionBucketsForNode(mapFields); - boolean maxConstraintMet = maxNotExceeded(partitionsPerNode); - assert maxConstraintMet : "Max per node constraint: FAIL"; - logger.info("Max per node constraint: PASS"); - - boolean liveConstraintMet = onlyLiveAssigned(partitionsPerNode); - assert liveConstraintMet : "Only live nodes have partitions constraint: FAIL"; - logger.info("Only live nodes have partitions constraint: PASS"); - - boolean stateAssignmentPossible = correctStateAssignmentCount(mapFields); - assert stateAssignmentPossible : "State replica constraint: FAIL"; - logger.info("State replica constraint: PASS"); - - boolean nodesUniqueForPartitions = atMostOnePartitionReplicaPerNode(listFields); - assert nodesUniqueForPartitions : "Node uniqueness per partition constraint: FAIL"; - logger.info("Node uniqueness per partition constraint: PASS"); - } - - private boolean maxNotExceeded(final Map partitionsPerNode) { - for (String node : partitionsPerNode.keySet()) { - Integer value = partitionsPerNode.get(node); - if (value > _maxPerNode) { - logger.error("ERROR: Node " + node + " has " + value - + " partitions despite a maximum of " + _maxPerNode); - return false; - } - } - return true; - } - - private boolean onlyLiveAssigned(final Map partitionsPerNode) { - for (final Entry nodeState : partitionsPerNode.entrySet()) { - boolean isLive = _liveSet.contains(nodeState.getKey()); - boolean isEmpty = nodeState.getValue() == 0; - if (!isLive && !isEmpty) { - logger.error("ERROR: Node " + nodeState.getKey() + " is not live, but has " - + nodeState.getValue() + " replicas!"); - return false; - } - } - return true; - } - - private boolean correctStateAssignmentCount(final Map> assignment) { - for (final Entry> partitionEntry : assignment.entrySet()) { - final Map nodeMap = partitionEntry.getValue(); - final Map stateCounts = new TreeMap(); - for (String state : nodeMap.values()) { - if (!stateCounts.containsKey(state)) { - stateCounts.put(state, 1); - } else { - stateCounts.put(state, stateCounts.get(state) + 1); - } - } - for (String state : stateCounts.keySet()) { - if (state.equals(HelixDefinedState.DROPPED.toString())) { - continue; - } - int count = stateCounts.get(state); - int maximumCount = _states.get(state); - if (count > maximumCount) { - logger.error("ERROR: State " + state + " for partition " + partitionEntry.getKey() - + " has " + count + " replicas when " + maximumCount + " is allowed!"); - return false; - } - } - } - return true; - } - - private boolean atMostOnePartitionReplicaPerNode(final Map> listFields) { - for (final Entry> partitionEntry : listFields.entrySet()) { - Set nodeSet = new HashSet(partitionEntry.getValue()); - int numUniques = nodeSet.size(); - int total = partitionEntry.getValue().size(); - if (numUniques < total) { - logger.error("ERROR: Partition " + partitionEntry.getKey() + " is assigned to " + total - + " nodes, but only " + numUniques + " are unique!"); - return false; - } - } - return true; - } - - private double getAverage(final Collection values) { - double sum = 0.0; - for (Integer value : values) { - sum += value; - } - if (values.size() != 0) { - return sum / values.size(); - } else { - return -1.0; - } - } - - private int getSum(final Collection values) { - int sum = 0; - for (Integer value : values) { - sum += value; - } - return sum; - } - - private double getStdev(final Collection values, double mean) { - double sum = 0.0; - for (Integer value : values) { - double deviation = mean - value; - sum += Math.pow(deviation, 2.0); - } - if (values.size() != 0) { - sum /= values.size(); - return Math.pow(sum, 0.5); - } else { - return -1.0; - } - } - - private Map getPartitionBucketsForNode( - final Map> assignment) { - Map partitionsPerNode = new TreeMap(); - for (String node : _liveNodes) { - partitionsPerNode.put(node, 0); - } - for (Entry> partitionEntry : assignment.entrySet()) { - final Map nodeMap = partitionEntry.getValue(); - for (String node : nodeMap.keySet()) { - String state = nodeMap.get(node); - if (state.equals(HelixDefinedState.DROPPED.toString())) { - continue; - } - // add 1 for every occurrence of a node - if (!partitionsPerNode.containsKey(node)) { - partitionsPerNode.put(node, 1); - } else { - partitionsPerNode.put(node, partitionsPerNode.get(node) + 1); - } - } - } - return partitionsPerNode; - } - - private Map> getStateBucketsForNode( - final Map> assignment) { - Map> result = new TreeMap>(); - for (String n : _liveNodes) { - result.put(n, new TreeMap()); - } - for (Map nodeStateMap : assignment.values()) { - for (Entry nodeState : nodeStateMap.entrySet()) { - if (!result.containsKey(nodeState.getKey())) { - result.put(nodeState.getKey(), new TreeMap()); - } - Map stateMap = result.get(nodeState.getKey()); - if (!stateMap.containsKey(nodeState.getValue())) { - stateMap.put(nodeState.getValue(), 1); - } else { - stateMap.put(nodeState.getValue(), stateMap.get(nodeState.getValue()) + 1); - } - } - } - return result; - } - - /** - * Randomly choose between killing, adding, or resurrecting a single node - * @return (Partition -> (Node -> State)) ZNRecord - */ - public ZNRecord runOnceRandomly() { - double choose = _random.nextDouble(); - ZNRecord result = null; - if (choose < P_KILL) { - result = removeSingleNode(null); - } else if (choose < P_KILL + P_ADD) { - result = addSingleNode(null); - } else if (choose < P_KILL + P_ADD + P_RESURRECT) { - result = resurrectSingleNode(null); - } - return result; - } - - /** - * Run rebalancer trying to add a never-live node - * @param node - * Optional String to add - * @return ZNRecord result returned by the rebalancer - */ - public ZNRecord addSingleNode(String node) { - logger.info("=================== add node ================="); - if (_nonLiveSet.size() == 0) { - logger.warn("Cannot add node because there are no nodes left to add."); - return null; - } - - // Get a random never-live node - if (node == null || !_nonLiveSet.contains(node)) { - node = getRandomSetElement(_nonLiveSet); - } - logger.info("Adding " + node); - _liveNodes.add(node); - _liveSet.add(node); - _nonLiveSet.remove(node); - - return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode). - computePartitionAssignment(_liveNodes, _currentMapping, _allNodes); - } - - /** - * Run rebalancer trying to remove a live node - * @param node - * Optional String to remove - * @return ZNRecord result returned by the rebalancer - */ - public ZNRecord removeSingleNode(String node) { - logger.info("=================== remove node ================="); - if (_liveSet.size() == 0) { - logger.warn("Cannot remove node because there are no nodes left to remove."); - return null; - } - - // Get a random never-live node - if (node == null || !_liveSet.contains(node)) { - node = getRandomSetElement(_liveSet); - } - logger.info("Removing " + node); - _removedSet.add(node); - _liveNodes.remove(node); - _liveSet.remove(node); - - // the rebalancer expects that the current mapping doesn't contain deleted - // nodes - for (Map nodeMap : _currentMapping.values()) { - if (nodeMap.containsKey(node)) { - nodeMap.remove(node); - } - } - - return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode) - .computePartitionAssignment(_liveNodes, _currentMapping, _allNodes); - } - - /** - * Run rebalancer trying to add back a removed node - * @param node - * Optional String to resurrect - * @return ZNRecord result returned by the rebalancer - */ - public ZNRecord resurrectSingleNode(String node) { - logger.info("=================== resurrect node ================="); - if (_removedSet.size() == 0) { - logger.warn("Cannot remove node because there are no nodes left to resurrect."); - return null; - } - - // Get a random never-live node - if (node == null || !_removedSet.contains(node)) { - node = getRandomSetElement(_removedSet); - } - logger.info("Resurrecting " + node); - _removedSet.remove(node); - _liveNodes.add(node); - _liveSet.add(node); - - return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode) - .computePartitionAssignment(_liveNodes, _currentMapping, _allNodes); - } - - private T getRandomSetElement(Set source) { - int element = _random.nextInt(source.size()); - int i = 0; - for (T node : source) { - if (i == element) { - return node; - } - i++; - } - return null; - } - } - - /** - * Tests the following scenario: nodes come up one by one, then one node is taken down. Preference - * lists should prefer nodes in the current mapping at all times, but when all nodes are in the - * current mapping, then it should distribute states as evenly as possible. - */ - @Test - public void testOrphansNotPreferred() { - final String RESOURCE_NAME = "resource"; - final String[] PARTITIONS = { - "resource_0", "resource_1", "resource_2" - }; - final StateModelDefinition STATE_MODEL = - new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave()); - final int REPLICA_COUNT = 2; - final String[] NODES = { - "n0", "n1", "n2" - }; - - // initial state, one node, no mapping - List allNodes = Lists.newArrayList(NODES[0]); - List liveNodes = Lists.newArrayList(NODES[0]); - Map> currentMapping = Maps.newHashMap(); - for (String partition : PARTITIONS) { - currentMapping.put(partition, new HashMap()); - } - - // make sure that when the first node joins, a single replica is assigned fairly - List partitions = ImmutableList.copyOf(PARTITIONS); - LinkedHashMap stateCount = - AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT); - ZNRecord znRecord = - new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) - .computePartitionAssignment(liveNodes, currentMapping, allNodes); - Map> preferenceLists = znRecord.getListFields(); - for (String partition : currentMapping.keySet()) { - // make sure these are all MASTER - List preferenceList = preferenceLists.get(partition); - Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); - Assert.assertEquals(preferenceList.size(), 1, "invalid preference list for " + partition); - } - - // now assign a replica to the first node in the current mapping, and add a second node - allNodes.add(NODES[1]); - liveNodes.add(NODES[1]); - stateCount = AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT); - for (String partition : PARTITIONS) { - currentMapping.get(partition).put(NODES[0], "MASTER"); - } - znRecord = - new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) - .computePartitionAssignment(liveNodes, currentMapping, allNodes); - preferenceLists = znRecord.getListFields(); - for (String partition : currentMapping.keySet()) { - List preferenceList = preferenceLists.get(partition); - Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); - Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition); - Assert.assertEquals(preferenceList.get(0), NODES[0], "invalid preference list for " - + partition); - Assert.assertEquals(preferenceList.get(1), NODES[1], "invalid preference list for " - + partition); - } - - // now set the current mapping to reflect this update and make sure that it distributes masters - for (String partition : PARTITIONS) { - currentMapping.get(partition).put(NODES[1], "SLAVE"); - } - znRecord = - new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) - .computePartitionAssignment(liveNodes, currentMapping, allNodes); - preferenceLists = znRecord.getListFields(); - Set firstNodes = Sets.newHashSet(); - for (String partition : currentMapping.keySet()) { - List preferenceList = preferenceLists.get(partition); - Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); - Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition); - firstNodes.add(preferenceList.get(0)); - } - Assert.assertEquals(firstNodes.size(), 2, "masters not evenly distributed"); - - // set a mapping corresponding to a valid mapping for 2 nodes, add a third node, check that the - // new node is never the most preferred - allNodes.add(NODES[2]); - liveNodes.add(NODES[2]); - stateCount = AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT); - - // recall that the other two partitions are [MASTER, SLAVE], which is fine, just reorder one - currentMapping.get(PARTITIONS[1]).put(NODES[0], "SLAVE"); - currentMapping.get(PARTITIONS[1]).put(NODES[1], "MASTER"); - znRecord = - new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) - .computePartitionAssignment(liveNodes, currentMapping, allNodes); - preferenceLists = znRecord.getListFields(); - boolean newNodeUsed = false; - for (String partition : currentMapping.keySet()) { - List preferenceList = preferenceLists.get(partition); - Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); - Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition); - if (preferenceList.contains(NODES[2])) { - newNodeUsed = true; - Assert.assertEquals(preferenceList.get(1), NODES[2], - "newly added node not at preference list tail for " + partition); - } - } - Assert.assertTrue(newNodeUsed, "not using " + NODES[2]); - - // now remap this to take the new node into account, should go back to balancing masters, slaves - // evenly across all nodes - for (String partition : PARTITIONS) { - currentMapping.get(partition).clear(); - } - currentMapping.get(PARTITIONS[0]).put(NODES[0], "MASTER"); - currentMapping.get(PARTITIONS[0]).put(NODES[1], "SLAVE"); - currentMapping.get(PARTITIONS[1]).put(NODES[1], "MASTER"); - currentMapping.get(PARTITIONS[1]).put(NODES[2], "SLAVE"); - currentMapping.get(PARTITIONS[2]).put(NODES[0], "MASTER"); - currentMapping.get(PARTITIONS[2]).put(NODES[2], "SLAVE"); - znRecord = - new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) - .computePartitionAssignment(liveNodes, currentMapping, allNodes); - preferenceLists = znRecord.getListFields(); - firstNodes.clear(); - Set secondNodes = Sets.newHashSet(); - for (String partition : currentMapping.keySet()) { - List preferenceList = preferenceLists.get(partition); - Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); - Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition); - firstNodes.add(preferenceList.get(0)); - secondNodes.add(preferenceList.get(1)); - } - Assert.assertEquals(firstNodes.size(), 3, "masters not distributed evenly"); - Assert.assertEquals(secondNodes.size(), 3, "slaves not distributed evenly"); - - // remove a node now, but use the current mapping with everything balanced just prior - liveNodes.remove(0); - stateCount = AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT); - - // remove all references of n0 from the mapping, keep everything else in a legal state - for (String partition : PARTITIONS) { - currentMapping.get(partition).clear(); - } - currentMapping.get(PARTITIONS[0]).put(NODES[1], "MASTER"); - currentMapping.get(PARTITIONS[1]).put(NODES[1], "MASTER"); - currentMapping.get(PARTITIONS[1]).put(NODES[2], "SLAVE"); - currentMapping.get(PARTITIONS[2]).put(NODES[2], "MASTER"); - znRecord = - new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) - .computePartitionAssignment(liveNodes, currentMapping, allNodes); - preferenceLists = znRecord.getListFields(); - for (String partition : currentMapping.keySet()) { - List preferenceList = preferenceLists.get(partition); - Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); - Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition); - Map stateMap = currentMapping.get(partition); - for (String participant : stateMap.keySet()) { - Assert.assertTrue(preferenceList.contains(participant), "minimal movement violated for " - + partition); - } - for (String participant : preferenceList) { - if (!stateMap.containsKey(participant)) { - Assert.assertNotSame(preferenceList.get(0), participant, - "newly moved replica should not be master for " + partition); - } - } - } - - // finally, adjust the current mapping to reflect 2 nodes and make sure everything's even again - for (String partition : PARTITIONS) { - currentMapping.get(partition).clear(); - } - currentMapping.get(PARTITIONS[0]).put(NODES[1], "MASTER"); - currentMapping.get(PARTITIONS[0]).put(NODES[2], "SLAVE"); - currentMapping.get(PARTITIONS[1]).put(NODES[1], "SLAVE"); - currentMapping.get(PARTITIONS[1]).put(NODES[2], "MASTER"); - currentMapping.get(PARTITIONS[2]).put(NODES[1], "SLAVE"); - currentMapping.get(PARTITIONS[2]).put(NODES[2], "MASTER"); - znRecord = - new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) - .computePartitionAssignment(liveNodes, currentMapping, allNodes); - preferenceLists = znRecord.getListFields(); - firstNodes.clear(); - for (String partition : currentMapping.keySet()) { - List preferenceList = preferenceLists.get(partition); - Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); - Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition); - firstNodes.add(preferenceList.get(0)); - } - Assert.assertEquals(firstNodes.size(), 2, "masters not evenly distributed"); - } -} http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/test/java/org/apache/helix/controller/strategy/TestTopology.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestTopology.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestTopology.java new file mode 100644 index 0000000..5169edd --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestTopology.java @@ -0,0 +1,172 @@ +package org.apache.helix.controller.Strategy; + +import org.apache.helix.HelixProperty; +import org.apache.helix.controller.rebalancer.topology.Node; +import org.apache.helix.controller.rebalancer.topology.Topology; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.InstanceConfig; +import org.apache.log4j.Logger; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +public class TestTopology { + private static Logger logger = Logger.getLogger(TestTopology.class); + + @Test + public void testCreateClusterTopology() { + ClusterConfig clusterConfig = new ClusterConfig("Test_Cluster"); + + String topology = "/Rack/Sub-Rack/Host/Instance"; + clusterConfig.getRecord().getSimpleFields() + .put(ClusterConfig.ClusterConfigProperty.TOPOLOGY.name(), topology); + clusterConfig.getRecord().getSimpleFields() + .put(ClusterConfig.ClusterConfigProperty.FAULT_ZONE_TYPE.name(), "Sub-Rack"); + + List allNodes = new ArrayList(); + List liveNodes = new ArrayList(); + Map instanceConfigMap = new HashMap(); + + Map nodeToWeightMap = new HashMap(); + + for (int i = 0; i < 100; i++) { + String instance = "localhost_" + i; + InstanceConfig config = new InstanceConfig(instance); + String rack_id = "rack_" + i/25; + String sub_rack_id = "subrack-" + i/5; + + String domain = + String.format("Rack=%s, Sub-Rack=%s, Host=%s", rack_id, sub_rack_id, instance); + config.setDomain(domain); + config.setHostName(instance); + config.setPort("9000"); + allNodes.add(instance); + + int weight = 0; + if (i % 10 != 0) { + liveNodes.add(instance); + weight = 1000; + if (i % 3 == 0) { + // set random instance weight. + weight = (i+1) * 100; + config.setWeight(weight); + } + } + + instanceConfigMap.put(instance, config); + + if (!nodeToWeightMap.containsKey(rack_id)) { + nodeToWeightMap.put(rack_id, 0); + } + nodeToWeightMap.put(rack_id, nodeToWeightMap.get(rack_id) + weight); + if (!nodeToWeightMap.containsKey(sub_rack_id)) { + nodeToWeightMap.put(sub_rack_id, 0); + } + nodeToWeightMap.put(sub_rack_id, nodeToWeightMap.get(sub_rack_id) + weight); + } + + Topology topo = new Topology(allNodes, liveNodes, instanceConfigMap, clusterConfig); + + Assert.assertTrue(topo.getEndNodeType().equals("Instance")); + Assert.assertTrue(topo.getFaultZoneType().equals("Sub-Rack")); + + List faultZones = topo.getFaultZones(); + Assert.assertEquals(faultZones.size(), 20); + + Node root = topo.getRootNode(); + + Assert.assertEquals(root.getChildrenCount("Rack"), 4); + Assert.assertEquals(root.getChildrenCount("Sub-Rack"), 20); + Assert.assertEquals(root.getChildrenCount("Host"), 100); + Assert.assertEquals(root.getChildrenCount("Instance"), 100); + + + // validate weights. + for (Node rack : root.getChildren()) { + Assert.assertEquals(rack.getWeight(), (long)nodeToWeightMap.get(rack.getName())); + for (Node subRack : rack.getChildren()) { + Assert.assertEquals(subRack.getWeight(), (long)nodeToWeightMap.get(subRack.getName())); + } + } + } + + @Test + public void testCreateClusterTopologyWithDefaultTopology() { + ClusterConfig clusterConfig = new ClusterConfig("Test_Cluster"); + + List allNodes = new ArrayList(); + List liveNodes = new ArrayList(); + Map instanceConfigMap = new HashMap(); + + Map nodeToWeightMap = new HashMap(); + + for (int i = 0; i < 100; i++) { + String instance = "localhost_" + i; + InstanceConfig config = new InstanceConfig(instance); + String zoneId = "rack_" + i / 10; + config.setZoneId(zoneId); + config.setHostName(instance); + config.setPort("9000"); + allNodes.add(instance); + + int weight = 0; + if (i % 10 != 0) { + liveNodes.add(instance); + weight = 1000; + if (i % 3 == 0) { + // set random instance weight. + weight = (i + 1) * 100; + config.setWeight(weight); + } + } + + instanceConfigMap.put(instance, config); + + if (!nodeToWeightMap.containsKey(zoneId)) { + nodeToWeightMap.put(zoneId, 0); + } + nodeToWeightMap.put(zoneId, nodeToWeightMap.get(zoneId) + weight); + } + + Topology topo = new Topology(allNodes, liveNodes, instanceConfigMap, clusterConfig); + + Assert.assertTrue(topo.getEndNodeType().equals(Topology.Types.INSTANCE.name())); + Assert.assertTrue(topo.getFaultZoneType().equals(Topology.Types.ZONE.name())); + + List faultZones = topo.getFaultZones(); + Assert.assertEquals(faultZones.size(), 10); + + Node root = topo.getRootNode(); + + Assert.assertEquals(root.getChildrenCount(Topology.Types.ZONE.name()), 10); + Assert.assertEquals(root.getChildrenCount(topo.getEndNodeType()), 100); + + // validate weights. + for (Node rack : root.getChildren()) { + Assert.assertEquals(rack.getWeight(), (long) nodeToWeightMap.get(rack.getName())); + } + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalance.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalance.java new file mode 100644 index 0000000..5c34792 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalance.java @@ -0,0 +1,221 @@ +package org.apache.helix.integration; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.IdealState.RebalanceMode; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.tools.ClusterSetup; +import org.apache.helix.tools.ClusterStateVerifier; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class TestCrushAutoRebalance extends ZkIntegrationTestBase { + final int NUM_NODE = 6; + protected static final int START_PORT = 12918; + protected static final int _PARTITIONS = 20; + + protected final String CLASS_NAME = getShortClassName(); + protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; + protected ClusterControllerManager _controller; + + protected ClusterSetup _setupTool = null; + List _participants = new ArrayList(); + Map _nodeToZoneMap = new HashMap(); + Map _nodeToTagMap = new HashMap(); + List _nodes = new ArrayList(); + List _allDBs = new ArrayList(); + int _replica = 3; + + @BeforeClass + public void beforeClass() throws Exception { + System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + + String namespace = "/" + CLUSTER_NAME; + if (_gZkClient.exists(namespace)) { + _gZkClient.deleteRecursive(namespace); + } + _setupTool = new ClusterSetup(_gZkClient); + _setupTool.addCluster(CLUSTER_NAME, true); + + for (int i = 0; i < NUM_NODE; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + String zone = "zone-" + i % 3; + String tag = "tag-" + i % 2; + _setupTool.getClusterManagementTool().setInstanceZoneId(CLUSTER_NAME, storageNodeName, zone); + _setupTool.getClusterManagementTool().addInstanceTag(CLUSTER_NAME, storageNodeName, tag); + _nodeToZoneMap.put(storageNodeName, zone); + _nodeToTagMap.put(storageNodeName, tag); + _nodes.add(storageNodeName); + } + + // start dummy participants + for (String node : _nodes) { + MockParticipantManager participant = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, node); + participant.syncStart(); + _participants.add(participant); + } + + // start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + } + + @DataProvider(name = "rebalanceStrategies") + public static String [][] rebalanceStrategies() { + return new String[][] { {"CrushRebalanceStrategy", CrushRebalanceStrategy.class.getName()}}; + } + + @Test(dataProvider = "rebalanceStrategies") + public void testZoneIsolation(String rebalanceStrategyName, String rebalanceStrategyClass) + throws Exception { + System.out.println("Test " + rebalanceStrategyName); + List testDBs = new ArrayList(); + String[] testModels = { BuiltInStateModelDefinitions.OnlineOffline.name(), + BuiltInStateModelDefinitions.MasterSlave.name(), + BuiltInStateModelDefinitions.LeaderStandby.name() + }; + int i = 0; + for (String stateModel : testModels) { + String db = "Test-DB-" + rebalanceStrategyName + "-" + i++; + _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS, stateModel, + RebalanceMode.FULL_AUTO + "", rebalanceStrategyClass); + _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); + testDBs.add(db); + _allDBs.add(db); + } + Thread.sleep(300); + + boolean result = ClusterStateVerifier.verifyByZkCallback( + new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME)); + Assert.assertTrue(result); + + for (String db : testDBs) { + IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + ExternalView ev = + _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + validateZoneAndTagIsolation(is, ev); + } + } + + @Test(dataProvider = "rebalanceStrategies") + public void testZoneIsolationWithInstanceTag( + String rebalanceStrategyName, String rebalanceStrategyClass) throws Exception { + List testDBs = new ArrayList(); + Set tags = new HashSet(_nodeToTagMap.values()); + int i = 0; + for (String tag : tags) { + String db = "Test-DB-Tag-" + rebalanceStrategyName + "-" + i++; + _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS, + BuiltInStateModelDefinitions.MasterSlave.name(), RebalanceMode.FULL_AUTO + "", + rebalanceStrategyClass); + IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + is.setInstanceGroupTag(tag); + _setupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, db, is); + _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); + testDBs.add(db); + _allDBs.add(db); + } + Thread.sleep(300); + + boolean result = ClusterStateVerifier.verifyByZkCallback( + new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME)); + Assert.assertTrue(result); + + for (String db : testDBs) { + IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + ExternalView ev = + _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + validateZoneAndTagIsolation(is, ev); + } + } + + /** + * Validate instances for each partition is on different zone and with necessary tagged instances. + */ + private void validateZoneAndTagIsolation(IdealState is, ExternalView ev) { + int replica = Integer.valueOf(is.getReplicas()); + String tag = is.getInstanceGroupTag(); + + for (String partition : is.getPartitionSet()) { + Set assignedZones = new HashSet(); + + Set instancesInIs = new HashSet(is.getRecord().getListField(partition)); + Map assignmentMap = ev.getRecord().getMapField(partition); + Set instancesInEV = assignmentMap.keySet(); + Assert.assertEquals(instancesInEV, instancesInIs); + for (String instance : instancesInEV) { + assignedZones.add(_nodeToZoneMap.get(instance)); + if (tag != null) { + InstanceConfig config = + _setupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instance); + Assert.assertTrue(config.containsTag(tag)); + } + } + Assert.assertEquals(assignedZones.size(), replica); + } + } + + @Test() + public void testAddZone() throws Exception { + //TODO + } + + @Test() + public void testAddNodes() throws Exception { + //TODO + } + + @Test() + public void testNodeFailure() throws Exception { + //TODO + } + + @AfterClass + public void afterClass() throws Exception { + /** + * shutdown order: 1) disconnect the controller 2) disconnect participants + */ + _controller.syncStop(); + for (MockParticipantManager participant : _participants) { + participant.syncStop(); + } + _setupTool.deleteCluster(CLUSTER_NAME); + System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java index 917be17..51dd19d 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java +++ b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java @@ -31,6 +31,7 @@ import org.apache.helix.mock.participant.DummyProcess.DummyOnlineOfflineStateMod import org.apache.helix.mock.participant.MockMSModelFactory; import org.apache.helix.mock.participant.MockSchemataModelFactory; import org.apache.helix.mock.participant.MockTransition; +import org.apache.helix.model.BuiltInStateModelDefinitions; import org.apache.helix.participant.StateMachineEngine; import org.apache.log4j.Logger; @@ -73,14 +74,17 @@ public class MockParticipantManager extends ZKHelixManager implements Runnable, public void run() { try { StateMachineEngine stateMach = getStateMachineEngine(); - stateMach.registerStateModelFactory("MasterSlave", _msModelFactory); + stateMach.registerStateModelFactory(BuiltInStateModelDefinitions.MasterSlave.name(), + _msModelFactory); DummyLeaderStandbyStateModelFactory lsModelFactory = new DummyLeaderStandbyStateModelFactory(10); DummyOnlineOfflineStateModelFactory ofModelFactory = new DummyOnlineOfflineStateModelFactory(10); - stateMach.registerStateModelFactory("LeaderStandby", lsModelFactory); - stateMach.registerStateModelFactory("OnlineOffline", ofModelFactory); + stateMach.registerStateModelFactory(BuiltInStateModelDefinitions.LeaderStandby.name(), + lsModelFactory); + stateMach.registerStateModelFactory(BuiltInStateModelDefinitions.OnlineOffline.name(), + ofModelFactory); MockSchemataModelFactory schemataFactory = new MockSchemataModelFactory(); stateMach.registerStateModelFactory("STORAGE_DEFAULT_SM_SCHEMATA", schemataFactory);