Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A4E1E200B8C for ; Mon, 12 Sep 2016 19:09:42 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A3E62160AD8; Mon, 12 Sep 2016 17:09:42 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7FF54160AD6 for ; Mon, 12 Sep 2016 19:09:41 +0200 (CEST) Received: (qmail 19241 invoked by uid 500); 12 Sep 2016 17:09:40 -0000 Mailing-List: contact commits-help@helix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@helix.apache.org Delivered-To: mailing list commits@helix.apache.org Received: (qmail 19116 invoked by uid 99); 12 Sep 2016 17:09:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 12 Sep 2016 17:09:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6D5C8E04D9; Mon, 12 Sep 2016 17:09:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: lxia@apache.org To: commits@helix.apache.org Date: Mon, 12 Sep 2016 17:09:43 -0000 Message-Id: In-Reply-To: <1ff357b2a8674d6fa7e4b1fc0618c761@git.apache.org> References: <1ff357b2a8674d6fa7e4b1fc0618c761@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/5] helix git commit: [HELIX-634] Refactor AutoRebalancer to allow configuable placement strategy. archived-at: Mon, 12 Sep 2016 17:09:42 -0000 [HELIX-634] Refactor AutoRebalancer to allow configuable placement strategy. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/ea0fbbbc Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/ea0fbbbc Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/ea0fbbbc Branch: refs/heads/helix-0.6.x Commit: ea0fbbbce302974b88a2b8253bf06616fd91aa5b Parents: bc0aa76 Author: Lei Xia Authored: Tue Jun 7 14:42:43 2016 -0700 Committer: Lei Xia Committed: Mon Sep 12 10:06:33 2016 -0700 ---------------------------------------------------------------------- .../controller/rebalancer/AutoRebalancer.java | 40 +++++++--- .../strategy/AutoRebalanceStrategy.java | 40 +++++----- .../controller/strategy/RebalanceStrategy.java | 52 +++++++++++++ .../java/org/apache/helix/model/IdealState.java | 23 +++++- .../helix/model/builder/IdealStateBuilder.java | 80 ++++++++++++++++++++ .../task/GenericTaskAssignmentCalculator.java | 5 +- .../strategy/TestAutoRebalanceStrategy.java | 25 +++--- 7 files changed, 217 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/ea0fbbbc/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java index e47297f..6682426 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.helix.HelixException; import org.apache.helix.HelixManager; import org.apache.helix.ZNRecord; import org.apache.helix.controller.rebalancer.internal.MappingCalculator; @@ -35,8 +36,7 @@ import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment; import org.apache.helix.controller.stages.ClusterDataCache; import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.controller.strategy.AutoRebalanceStrategy; -import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme; -import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme; +import org.apache.helix.controller.strategy.RebalanceStrategy; import org.apache.helix.model.IdealState; import org.apache.helix.model.IdealState.RebalanceMode; import org.apache.helix.model.LiveInstance; @@ -44,6 +44,7 @@ import org.apache.helix.model.Partition; import org.apache.helix.model.Resource; import org.apache.helix.model.ResourceAssignment; import org.apache.helix.model.StateModelDefinition; +import org.apache.helix.util.HelixUtil; import org.apache.log4j.Logger; /** @@ -59,14 +60,14 @@ import org.apache.log4j.Logger; public class AutoRebalancer implements Rebalancer, MappingCalculator { // These should be final, but are initialized in init rather than a constructor private HelixManager _manager; - private AutoRebalanceStrategy _algorithm; + private RebalanceStrategy _rebalanceStrategy; private static final Logger LOG = Logger.getLogger(AutoRebalancer.class); @Override public void init(HelixManager manager) { this._manager = manager; - this._algorithm = null; + this._rebalanceStrategy = null; } @Override @@ -127,13 +128,32 @@ public class AutoRebalancer implements Rebalancer, MappingCalculator { int maxPartition = currentIdealState.getMaxPartitionsPerInstance(); - ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme(); - placementScheme.init(_manager); - _algorithm = - new AutoRebalanceStrategy(resourceName, partitions, stateCountMap, maxPartition, - placementScheme); + String rebalanceStrategyName = currentIdealState.getRebalanceStrategy(); + if (rebalanceStrategyName == null || rebalanceStrategyName.equalsIgnoreCase("default")) { + _rebalanceStrategy = + new AutoRebalanceStrategy(resourceName, partitions, stateCountMap, maxPartition); + } else { + try { + _rebalanceStrategy = RebalanceStrategy.class + .cast(HelixUtil.loadClass(getClass(), rebalanceStrategyName).newInstance()); + _rebalanceStrategy.init(resourceName, partitions, stateCountMap, maxPartition); + } catch (ClassNotFoundException ex) { + throw new HelixException( + "Exception while invoking custom rebalance strategy class: " + rebalanceStrategyName, + ex); + } catch (InstantiationException ex) { + throw new HelixException( + "Exception while invoking custom rebalance strategy class: " + rebalanceStrategyName, + ex); + } catch (IllegalAccessException ex) { + throw new HelixException( + "Exception while invoking custom rebalance strategy class: " + rebalanceStrategyName, + ex); + } + } + ZNRecord newMapping = - _algorithm.computePartitionAssignment(liveNodes, currentMapping, allNodes); + _rebalanceStrategy.computePartitionAssignment(liveNodes, currentMapping, allNodes); if (LOG.isDebugEnabled()) { LOG.debug("currentMapping: " + currentMapping); http://git-wip-us.apache.org/repos/asf/helix/blob/ea0fbbbc/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java index 11b5b0d..959609f 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java +++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java @@ -36,16 +36,15 @@ import org.apache.helix.HelixManager; import org.apache.helix.ZNRecord; import org.apache.log4j.Logger; -public class AutoRebalanceStrategy { - +public class AutoRebalanceStrategy implements RebalanceStrategy { private static Logger logger = Logger.getLogger(AutoRebalanceStrategy.class); - - private final String _resourceName; - private final List _partitions; - private final LinkedHashMap _states; - private final int _maximumPerNode; private final ReplicaPlacementScheme _placementScheme; + private String _resourceName; + private List _partitions; + private LinkedHashMap _states; + private int _maximumPerNode; + private Map _nodeMap; private List _liveNodesList; private Map _stateMap; @@ -56,24 +55,26 @@ public class AutoRebalanceStrategy { private Set _orphaned; public AutoRebalanceStrategy(String resourceName, final List partitions, - final LinkedHashMap states, int maximumPerNode, - ReplicaPlacementScheme placementScheme) { - _resourceName = resourceName; - _partitions = partitions; - _states = states; - _maximumPerNode = maximumPerNode; - if (placementScheme != null) { - _placementScheme = placementScheme; - } else { - _placementScheme = new DefaultPlacementScheme(); - } + final LinkedHashMap states, int maximumPerNode) { + init(resourceName, partitions, states, maximumPerNode); + _placementScheme = new DefaultPlacementScheme(); } public AutoRebalanceStrategy(String resourceName, final List partitions, final LinkedHashMap states) { - this(resourceName, partitions, states, Integer.MAX_VALUE, new DefaultPlacementScheme()); + this(resourceName, partitions, states, Integer.MAX_VALUE); + } + + @Override + public void init(String resourceName, final List partitions, + final LinkedHashMap states, int maximumPerNode) { + _resourceName = resourceName; + _partitions = partitions; + _states = states; + _maximumPerNode = maximumPerNode; } + @Override public ZNRecord computePartitionAssignment(final List liveNodes, final Map> currentMapping, final List allNodes) { int numReplicas = countStateReplicas(); @@ -546,7 +547,6 @@ public class AutoRebalanceStrategy { /** * Counts the total number of replicas given a state-count mapping - * @param states * @return */ private int countStateReplicas() { http://git-wip-us.apache.org/repos/asf/helix/blob/ea0fbbbc/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java new file mode 100644 index 0000000..4daae82 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java @@ -0,0 +1,52 @@ +package org.apache.helix.controller.strategy; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.ZNRecord; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * Assignment strategy interface that computes the assignment of partition->instance. + */ +public interface RebalanceStrategy { + /** + * Perform the necessary initialization for the rebalance strategy object. + * @param resourceName + * @param partitions + * @param states + * @param maximumPerNode + */ + void init(String resourceName, final List partitions, + final LinkedHashMap states, int maximumPerNode); + + /** + * Compute the preference lists and (optional partition-state mapping) for the given resource. + * + * @param liveNodes + * @param currentMapping + * @param allNodes + * @return + */ + ZNRecord computePartitionAssignment(final List liveNodes, + final Map> currentMapping, final List allNodes); +} http://git-wip-us.apache.org/repos/asf/helix/blob/ea0fbbbc/helix-core/src/main/java/org/apache/helix/model/IdealState.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java index 44f4219..7c4cf54 100644 --- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java +++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java @@ -53,10 +53,11 @@ public class IdealState extends HelixProperty { @Deprecated IDEAL_STATE_MODE, REBALANCE_MODE, + REBALANCER_CLASS_NAME, REBALANCE_TIMER_PERIOD, + REBALANCE_STRATEGY, MAX_PARTITIONS_PER_INSTANCE, INSTANCE_GROUP_TAG, - REBALANCER_CLASS_NAME, HELIX_ENABLED, RESOURCE_GROUP_NAME, GROUP_ROUTING_ENABLED, @@ -165,6 +166,26 @@ public class IdealState extends HelixProperty { } /** + * Specify the strategy for Helix to use to compute the partition-instance assignment, + * i,e, the custom rebalance strategy that implements {@link org.apache.helix.controller.strategy.RebalanceStrategy} + * + * @param rebalanceStrategy + * @return + */ + public void setRebalanceStrategy(String rebalanceStrategy) { + _record.setSimpleField(IdealStateProperty.REBALANCE_STRATEGY.name(), rebalanceStrategy); + } + + /** + * Get the rebalance strategy for this resource. + * + * @return rebalance strategy, or null if not specified. + */ + public String getRebalanceStrategy() { + return _record.getSimpleField(IdealStateProperty.REBALANCE_STRATEGY.name()); + } + + /** * Set the resource group name * @param resourceGroupName */ http://git-wip-us.apache.org/repos/asf/helix/blob/ea0fbbbc/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java b/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java index d3bc3f2..9ad3023 100644 --- a/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java +++ b/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java @@ -52,6 +52,17 @@ public abstract class IdealStateBuilder { * Helix rebalancer strategies. AUTO, SEMI_AUTO, CUSTOMIZED */ protected IdealState.RebalanceMode rebalancerMode; + + /** + * Customized rebalancer class. + */ + private String rebalancerClassName; + + /** + * Custom rebalance strategy + */ + private String rebalanceStrategy; + /** * A constraint that limits the maximum number of partitions per Node. */ @@ -68,6 +79,16 @@ public abstract class IdealStateBuilder { */ private Boolean disableExternalView = null; + /** + * Resource group name. + */ + private String resourceGroupName; + + /** + * Whether the resource group routing should be enabled in routingProvider. + */ + private Boolean enableGroupRouting; + protected ZNRecord _record; /** @@ -144,6 +165,44 @@ public abstract class IdealStateBuilder { } /** + * Set custom rebalancer class name. + * @return IdealStateBuilder + */ + public IdealStateBuilder setRebalancerClass(String rebalancerClassName) { + this.rebalancerClassName = rebalancerClassName; + return this; + } + + /** + * Set custom rebalance strategy name. + * @param rebalanceStrategy + * @return + */ + public IdealStateBuilder setRebalanceStrategy(String rebalanceStrategy) { + this.rebalanceStrategy = rebalanceStrategy; + return this; + } + + /** + * + * @param resourceGroupName + * @return + */ + public IdealStateBuilder setResourceGroupName(String resourceGroupName) { + this.resourceGroupName = resourceGroupName; + return this; + } + + /** + * Enable Group Routing for this resource. + * @return + */ + public IdealStateBuilder enableGroupRouting() { + this.enableGroupRouting = true; + return this; + } + + /** * @return */ public IdealState build() { @@ -154,10 +213,31 @@ public abstract class IdealStateBuilder { idealstate.setStateModelFactoryName(stateModelFactoryName); idealstate.setRebalanceMode(rebalancerMode); idealstate.setReplicas("" + numReplica); + + if (rebalancerClassName != null) { + idealstate.setRebalancerClassName(rebalancerClassName); + } + + if (rebalanceStrategy != null) { + idealstate.setRebalanceStrategy(rebalanceStrategy); + } + + if (maxPartitionsPerNode > 0) { + idealstate.setMaxPartitionsPerInstance(maxPartitionsPerNode); + } + if (disableExternalView != null) { idealstate.setDisableExternalView(disableExternalView); } + if (resourceGroupName != null) { + idealstate.setResourceGroupName(resourceGroupName); + } + + if (enableGroupRouting != null) { + idealstate.enableGroupRouting(enableGroupRouting); + } + if (!idealstate.isValid()) { throw new HelixException("invalid ideal-state: " + idealstate); } http://git-wip-us.apache.org/repos/asf/helix/blob/ea0fbbbc/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java index b0a1a33..623357f 100644 --- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java +++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java @@ -33,6 +33,7 @@ import org.apache.helix.ZNRecord; import org.apache.helix.controller.stages.ClusterDataCache; import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.controller.strategy.AutoRebalanceStrategy; +import org.apache.helix.controller.strategy.RebalanceStrategy; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.Partition; @@ -121,9 +122,7 @@ public class GenericTaskAssignmentCalculator extends TaskAssignmentCalculator { } // Get the assignment keyed on partition - AutoRebalanceStrategy strategy = - new AutoRebalanceStrategy(resourceId, partitions, states, Integer.MAX_VALUE, - new AutoRebalanceStrategy.DefaultPlacementScheme()); + RebalanceStrategy strategy = new AutoRebalanceStrategy(resourceId, partitions, states); List allNodes = Lists.newArrayList(getEligibleInstances(jobCfg, currStateOutput, instances, cache)); Collections.sort(allNodes); http://git-wip-us.apache.org/repos/asf/helix/blob/ea0fbbbc/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java index 985d0c8..adc92d6 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java +++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java @@ -116,8 +116,7 @@ public class TestAutoRebalanceStrategy { StateModelDefinition stateModelDef = getIncompleteStateModelDef(name, stateNames[0], states); new AutoRebalanceTester(partitions, states, liveNodes, currentMapping, allNodes, maxPerNode, - stateModelDef, new AutoRebalanceStrategy.DefaultPlacementScheme()) - .runRepeatedly(numIterations); + stateModelDef).runRepeatedly(numIterations); } /** @@ -157,13 +156,11 @@ public class TestAutoRebalanceStrategy { private List _allNodes; private int _maxPerNode; private StateModelDefinition _stateModelDef; - private ReplicaPlacementScheme _placementScheme; private Random _random; public AutoRebalanceTester(List partitions, LinkedHashMap states, List liveNodes, Map> currentMapping, - List allNodes, int maxPerNode, StateModelDefinition stateModelDef, - ReplicaPlacementScheme placementScheme) { + List allNodes, int maxPerNode, StateModelDefinition stateModelDef) { _partitions = partitions; _states = states; _liveNodes = liveNodes; @@ -182,7 +179,6 @@ public class TestAutoRebalanceStrategy { } _maxPerNode = maxPerNode; _stateModelDef = stateModelDef; - _placementScheme = placementScheme; _random = new Random(); } @@ -193,9 +189,10 @@ public class TestAutoRebalanceStrategy { */ public void runRepeatedly(int numIterations) { logger.info("~~~~ Initial State ~~~~~"); + RebalanceStrategy strategy = + new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode); ZNRecord initialResult = - new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode, - _placementScheme).computePartitionAssignment(_liveNodes, _currentMapping, _allNodes); + strategy.computePartitionAssignment(_liveNodes, _currentMapping, _allNodes); _currentMapping = getMapping(initialResult.getListFields()); logger.info(_currentMapping); getRunResult(_currentMapping, initialResult.getListFields()); @@ -500,8 +497,8 @@ public class TestAutoRebalanceStrategy { _liveSet.add(node); _nonLiveSet.remove(node); - return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode, - _placementScheme).computePartitionAssignment(_liveNodes, _currentMapping, _allNodes); + return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode). + computePartitionAssignment(_liveNodes, _currentMapping, _allNodes); } /** @@ -534,8 +531,8 @@ public class TestAutoRebalanceStrategy { } } - return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode, - _placementScheme).computePartitionAssignment(_liveNodes, _currentMapping, _allNodes); + return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode) + .computePartitionAssignment(_liveNodes, _currentMapping, _allNodes); } /** @@ -560,8 +557,8 @@ public class TestAutoRebalanceStrategy { _liveNodes.add(node); _liveSet.add(node); - return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode, - _placementScheme).computePartitionAssignment(_liveNodes, _currentMapping, _allNodes); + return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode) + .computePartitionAssignment(_liveNodes, _currentMapping, _allNodes); } private T getRandomSetElement(Set source) {