Return-Path: X-Original-To: apmail-helix-commits-archive@minotaur.apache.org Delivered-To: apmail-helix-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C6321109EB for ; Wed, 16 Oct 2013 00:10:22 +0000 (UTC) Received: (qmail 16614 invoked by uid 500); 16 Oct 2013 00:10:22 -0000 Delivered-To: apmail-helix-commits-archive@helix.apache.org Received: (qmail 16569 invoked by uid 500); 16 Oct 2013 00:10:22 -0000 Mailing-List: contact commits-help@helix.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@helix.incubator.apache.org Delivered-To: mailing list commits@helix.incubator.apache.org Received: (qmail 16523 invoked by uid 99); 16 Oct 2013 00:10:22 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Oct 2013 00:10:22 +0000 X-ASF-Spam-Status: No, hits=-2000.5 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 16 Oct 2013 00:10:08 +0000 Received: (qmail 16349 invoked by uid 99); 16 Oct 2013 00:09:44 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Oct 2013 00:09:44 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 99BF18B57B8; Wed, 16 Oct 2013 00:09:44 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kanak@apache.org To: commits@helix.incubator.apache.org Date: Wed, 16 Oct 2013 00:09:46 -0000 Message-Id: <86ac3b60f0ff42058b922e1cc59fd6cc@git.apache.org> In-Reply-To: <78154a7551fc49b0ae0b73ae35c69bc8@git.apache.org> References: <78154a7551fc49b0ae0b73ae35c69bc8@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/4] [HELIX-209] Moving rebalancer code around, take 2 X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/rebalancer/SemiAutoRebalancerContext.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/SemiAutoRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/SemiAutoRebalancerContext.java deleted file mode 100644 index e1f1ac2..0000000 --- a/helix-core/src/main/java/org/apache/helix/api/rebalancer/SemiAutoRebalancerContext.java +++ /dev/null @@ -1,176 +0,0 @@ -package org.apache.helix.api.rebalancer; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.helix.api.State; -import org.apache.helix.api.id.ParticipantId; -import org.apache.helix.api.id.PartitionId; -import org.apache.helix.api.id.ResourceId; -import org.apache.helix.api.rebalancer.util.ConstraintBasedAssignment; -import org.apache.helix.controller.strategy.AutoRebalanceStrategy; -import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme; -import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme; -import org.apache.helix.model.IdealState; -import org.apache.helix.model.IdealState.RebalanceMode; -import org.apache.helix.model.StateModelDefinition; -import org.codehaus.jackson.annotate.JsonIgnore; -import org.codehaus.jackson.annotate.JsonProperty; - -import com.google.common.collect.Maps; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * RebalancerContext for SEMI_AUTO rebalancer mode. It indicates the preferred locations of each - * partition replica. By default, it corresponds to {@link SemiAutoRebalancer} - */ -public final class SemiAutoRebalancerContext extends PartitionedRebalancerContext { - @JsonProperty("preferenceLists") - private Map> _preferenceLists; - - /** - * Instantiate a SemiAutoRebalancerContext - */ - public SemiAutoRebalancerContext() { - super(RebalanceMode.SEMI_AUTO); - setRebalancerRef(RebalancerRef.from(SemiAutoRebalancer.class)); - _preferenceLists = Maps.newHashMap(); - } - - /** - * Get the preference lists of all partitions of the resource - * @return map of partition id to list of participant ids - */ - public Map> getPreferenceLists() { - return _preferenceLists; - } - - /** - * Set the preference lists of all partitions of the resource - * @param preferenceLists - */ - public void setPreferenceLists(Map> preferenceLists) { - _preferenceLists = preferenceLists; - } - - /** - * Get the preference list of a partition - * @param partitionId the partition to look up - * @return list of participant ids - */ - @JsonIgnore - public List getPreferenceList(PartitionId partitionId) { - return _preferenceLists.get(partitionId); - } - - /** - * Generate preference lists based on a default cluster setup - * @param stateModelDef the state model definition to follow - * @param participantSet the set of participant ids to configure for - */ - @Override - @JsonIgnore - public void generateDefaultConfiguration(StateModelDefinition stateModelDef, - Set participantSet) { - // compute default upper bounds - Map upperBounds = Maps.newHashMap(); - for (State state : stateModelDef.getTypedStatesPriorityList()) { - upperBounds.put(state, stateModelDef.getNumParticipantsPerState(state)); - } - - // determine the current mapping - Map> currentMapping = Maps.newHashMap(); - for (PartitionId partitionId : getPartitionSet()) { - List preferenceList = getPreferenceList(partitionId); - if (preferenceList != null && !preferenceList.isEmpty()) { - Set disabledParticipants = Collections.emptySet(); - Map emptyCurrentState = Collections.emptyMap(); - Map initialMap = - ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds, - participantSet, stateModelDef, preferenceList, emptyCurrentState, - disabledParticipants); - currentMapping.put(partitionId, initialMap); - } - } - - // determine the preference - LinkedHashMap stateCounts = - ConstraintBasedAssignment.stateCount(upperBounds, stateModelDef, participantSet.size(), - getReplicaCount()); - ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme(); - List participantList = new ArrayList(participantSet); - List partitionList = new ArrayList(getPartitionSet()); - AutoRebalanceStrategy strategy = - new AutoRebalanceStrategy(ResourceId.from(""), partitionList, stateCounts, - getMaxPartitionsPerParticipant(), placementScheme); - Map> rawPreferenceLists = - strategy.typedComputePartitionAssignment(participantList, currentMapping, participantList) - .getListFields(); - Map> preferenceLists = - Maps.newHashMap(IdealState.preferenceListsFromStringLists(rawPreferenceLists)); - setPreferenceLists(preferenceLists); - } - - /** - * Build a SemiAutoRebalancerContext. By default, it corresponds to {@link SemiAutoRebalancer} - */ - public static final class Builder extends PartitionedRebalancerContext.AbstractBuilder { - private final Map> _preferenceLists; - - /** - * Instantiate for a resource - * @param resourceId resource id - */ - public Builder(ResourceId resourceId) { - super(resourceId); - super.rebalancerRef(RebalancerRef.from(SemiAutoRebalancer.class)); - _preferenceLists = Maps.newHashMap(); - } - - /** - * Add a preference list for a partition - * @param partitionId partition to set - * @param preferenceList ordered list of participants who can serve the partition - * @return Builder - */ - public Builder preferenceList(PartitionId partitionId, List preferenceList) { - _preferenceLists.put(partitionId, preferenceList); - return self(); - } - - @Override - protected Builder self() { - return this; - } - - @Override - public SemiAutoRebalancerContext build() { - SemiAutoRebalancerContext context = new SemiAutoRebalancerContext(); - super.update(context); - context.setPreferenceLists(_preferenceLists); - return context; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/rebalancer/util/ConstraintBasedAssignment.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/util/ConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/util/ConstraintBasedAssignment.java deleted file mode 100644 index 0199796..0000000 --- a/helix-core/src/main/java/org/apache/helix/api/rebalancer/util/ConstraintBasedAssignment.java +++ /dev/null @@ -1,244 +0,0 @@ -package org.apache.helix.api.rebalancer.util; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.helix.HelixConstants.StateModelToken; -import org.apache.helix.HelixDefinedState; -import org.apache.helix.api.Cluster; -import org.apache.helix.api.Participant; -import org.apache.helix.api.Scope; -import org.apache.helix.api.State; -import org.apache.helix.api.config.ClusterConfig; -import org.apache.helix.api.id.ParticipantId; -import org.apache.helix.api.id.PartitionId; -import org.apache.helix.api.id.ResourceId; -import org.apache.helix.model.StateModelDefinition; -import org.apache.log4j.Logger; - -import com.google.common.base.Predicate; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -/** - * Collection of functions that will compute the best possible state based on the participants and - * the rebalancer configuration of a resource. - */ -public class ConstraintBasedAssignment { - private static Logger logger = Logger.getLogger(ConstraintBasedAssignment.class); - - /** - * Get a set of disabled participants for a partition - * @param participantMap map of all participants - * @param partitionId the partition to check - * @return a set of all participants that are disabled for the partition - */ - public static Set getDisabledParticipants( - final Map participantMap, final PartitionId partitionId) { - Set participantSet = new HashSet(participantMap.keySet()); - Set disabledParticipantsForPartition = - Sets.filter(participantSet, new Predicate() { - @Override - public boolean apply(ParticipantId participantId) { - Participant participant = participantMap.get(participantId); - return !participant.isEnabled() - || participant.getDisabledPartitionIds().contains(partitionId); - } - }); - return disabledParticipantsForPartition; - } - - /** - * Get an ordered list of participants that can serve a partition - * @param cluster cluster snapshot - * @param partitionId the partition to look up - * @param config rebalancing constraints - * @return list with most preferred participants first - */ - public static List getPreferenceList(Cluster cluster, PartitionId partitionId, - List prefList) { - if (prefList != null && prefList.size() == 1 - && StateModelToken.ANY_LIVEINSTANCE.toString().equals(prefList.get(0).stringify())) { - prefList = new ArrayList(cluster.getLiveParticipantMap().keySet()); - Collections.sort(prefList); - } - return prefList; - } - - /** - * Get a map of state to upper bound constraint given a cluster - * @param stateModelDef the state model definition to check - * @param resourceId the resource that is constraint - * @param cluster the cluster the resource belongs to - * @return map of state to upper bound - */ - public static Map stateConstraints(StateModelDefinition stateModelDef, - ResourceId resourceId, ClusterConfig cluster) { - Map stateMap = Maps.newHashMap(); - for (State state : stateModelDef.getTypedStatesPriorityList()) { - String num = - cluster.getStateUpperBoundConstraint(Scope.resource(resourceId), - stateModelDef.getStateModelDefId(), state); - stateMap.put(state, num); - } - return stateMap; - } - - /** - * compute best state for resource in SEMI_AUTO and FULL_AUTO modes - * @param upperBounds map of state to upper bound - * @param liveParticipantSet set of live participant ids - * @param stateModelDef - * @param participantPreferenceList - * @param currentStateMap - * : participant->state for each partition - * @param disabledParticipantsForPartition - * @return - */ - public static Map computeAutoBestStateForPartition( - Map upperBounds, Set liveParticipantSet, - StateModelDefinition stateModelDef, List participantPreferenceList, - Map currentStateMap, Set disabledParticipantsForPartition) { - Map participantStateMap = new HashMap(); - - // if the resource is deleted, instancePreferenceList will be empty and - // we should drop all resources. - if (currentStateMap != null) { - for (ParticipantId participantId : currentStateMap.keySet()) { - if ((participantPreferenceList == null || !participantPreferenceList - .contains(participantId)) && !disabledParticipantsForPartition.contains(participantId)) { - // if dropped and not disabled, transit to DROPPED - participantStateMap.put(participantId, State.from(HelixDefinedState.DROPPED)); - } else if ((currentStateMap.get(participantId) == null || !currentStateMap.get( - participantId).equals(State.from(HelixDefinedState.ERROR))) - && disabledParticipantsForPartition.contains(participantId)) { - // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE) - participantStateMap.put(participantId, stateModelDef.getTypedInitialState()); - } - } - } - - // resource is deleted - if (participantPreferenceList == null) { - return participantStateMap; - } - - List statesPriorityList = stateModelDef.getTypedStatesPriorityList(); - boolean assigned[] = new boolean[participantPreferenceList.size()]; - - for (State state : statesPriorityList) { - String num = upperBounds.get(state); - int stateCount = -1; - if ("N".equals(num)) { - Set liveAndEnabled = new HashSet(liveParticipantSet); - liveAndEnabled.removeAll(disabledParticipantsForPartition); - stateCount = liveAndEnabled.size(); - } else if ("R".equals(num)) { - stateCount = participantPreferenceList.size(); - } else { - try { - stateCount = Integer.parseInt(num); - } catch (Exception e) { - logger.error("Invalid count for state:" + state + " ,count=" + num); - } - } - if (stateCount > -1) { - int count = 0; - for (int i = 0; i < participantPreferenceList.size(); i++) { - ParticipantId participantId = participantPreferenceList.get(i); - - boolean notInErrorState = - currentStateMap == null - || currentStateMap.get(participantId) == null - || !currentStateMap.get(participantId) - .equals(State.from(HelixDefinedState.ERROR)); - - if (liveParticipantSet.contains(participantId) && !assigned[i] && notInErrorState - && !disabledParticipantsForPartition.contains(participantId)) { - participantStateMap.put(participantId, state); - count = count + 1; - assigned[i] = true; - if (count == stateCount) { - break; - } - } - } - } - } - return participantStateMap; - } - - /** - * Get the number of replicas that should be in each state for a partition - * @param upperBounds map of state to upper bound - * @param stateModelDef StateModelDefinition object - * @param liveNodesNb number of live nodes - * @param total number of replicas - * @return state count map: state->count - */ - public static LinkedHashMap stateCount(Map upperBounds, - StateModelDefinition stateModelDef, int liveNodesNb, int totalReplicas) { - LinkedHashMap stateCountMap = new LinkedHashMap(); - List statesPriorityList = stateModelDef.getTypedStatesPriorityList(); - - int replicas = totalReplicas; - for (State state : statesPriorityList) { - String num = upperBounds.get(state); - if ("N".equals(num)) { - stateCountMap.put(state, liveNodesNb); - } else if ("R".equals(num)) { - // wait until we get the counts for all other states - continue; - } else { - int stateCount = -1; - try { - stateCount = Integer.parseInt(num); - } catch (Exception e) { - // LOG.error("Invalid count for state: " + state + ", count: " + num + - // ", use -1 instead"); - } - - if (stateCount > 0) { - stateCountMap.put(state, stateCount); - replicas -= stateCount; - } - } - } - - // get state count for R - for (State state : statesPriorityList) { - String num = upperBounds.get(state); - if ("R".equals(num)) { - stateCountMap.put(state, replicas); - // should have at most one state using R - break; - } - } - return stateCountMap; - } -} http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java deleted file mode 100644 index 880f2c0..0000000 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java +++ /dev/null @@ -1,187 +0,0 @@ -package org.apache.helix.controller.rebalancer; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.helix.HelixManager; -import org.apache.helix.ZNRecord; -import org.apache.helix.api.id.PartitionId; -import org.apache.helix.api.id.ResourceId; -import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment; -import org.apache.helix.controller.stages.ClusterDataCache; -import org.apache.helix.controller.stages.CurrentStateOutput; -import org.apache.helix.controller.strategy.AutoRebalanceStrategy; -import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme; -import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme; -import org.apache.helix.model.IdealState; -import org.apache.helix.model.IdealState.RebalanceMode; -import org.apache.helix.model.LiveInstance; -import org.apache.helix.model.Partition; -import org.apache.helix.model.Resource; -import org.apache.helix.model.ResourceAssignment; -import org.apache.helix.model.StateModelDefinition; -import org.apache.log4j.Logger; - -/** - * This is a Rebalancer specific to full automatic mode. It is tasked with computing the ideal - * state of a resource, fully adapting to the addition or removal of instances. This includes - * computation of a new preference list and a partition to instance and state mapping based on the - * computed instance preferences. - * The input is the current assignment of partitions to instances, as well as existing instance - * preferences, if any. - * The output is a preference list and a mapping based on that preference list, i.e. partition p - * has a replica on node k with state s. - */ -@Deprecated -public class AutoRebalancer implements Rebalancer { - // These should be final, but are initialized in init rather than a constructor - private HelixManager _manager; - private AutoRebalanceStrategy _algorithm; - - private static final Logger LOG = Logger.getLogger(AutoRebalancer.class); - - @Override - public void init(HelixManager manager) { - this._manager = manager; - this._algorithm = null; - } - - @Override - public ResourceAssignment computeResourceMapping(Resource resource, IdealState currentIdealState, - CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) { - // Compute a preference list based on the current ideal state - List partitions = new ArrayList(currentIdealState.getPartitionSet()); - String stateModelName = currentIdealState.getStateModelDefRef(); - StateModelDefinition stateModelDef = clusterData.getStateModelDef(stateModelName); - Map liveInstance = clusterData.getLiveInstances(); - String replicas = currentIdealState.getReplicas(); - - LinkedHashMap stateCountMap = new LinkedHashMap(); - stateCountMap = - ConstraintBasedAssignment.stateCount(stateModelDef, liveInstance.size(), - Integer.parseInt(replicas)); - List liveNodes = new ArrayList(liveInstance.keySet()); - Map> currentMapping = - currentMapping(currentStateOutput, resource.getResourceName(), partitions, stateCountMap); - - // If there are nodes tagged with resource name, use only those nodes - Set taggedNodes = new HashSet(); - if (currentIdealState.getInstanceGroupTag() != null) { - for (String instanceName : liveNodes) { - if (clusterData.getInstanceConfigMap().get(instanceName) - .containsTag(currentIdealState.getInstanceGroupTag())) { - taggedNodes.add(instanceName); - } - } - } - if (taggedNodes.size() > 0) { - if (LOG.isInfoEnabled()) { - LOG.info("found the following instances with tag " + currentIdealState.getResourceName() - + " " + taggedNodes); - } - liveNodes = new ArrayList(taggedNodes); - } - - List allNodes = new ArrayList(clusterData.getInstanceConfigMap().keySet()); - int maxPartition = currentIdealState.getMaxPartitionsPerInstance(); - - if (LOG.isInfoEnabled()) { - LOG.info("currentMapping: " + currentMapping); - LOG.info("stateCountMap: " + stateCountMap); - LOG.info("liveNodes: " + liveNodes); - LOG.info("allNodes: " + allNodes); - LOG.info("maxPartition: " + maxPartition); - } - ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme(); - placementScheme.init(_manager); - _algorithm = - new AutoRebalanceStrategy(resource.getResourceName(), partitions, stateCountMap, - maxPartition, placementScheme); - ZNRecord newMapping = - _algorithm.computePartitionAssignment(liveNodes, currentMapping, allNodes); - - if (LOG.isInfoEnabled()) { - LOG.info("newMapping: " + newMapping); - } - - IdealState newIdealState = new IdealState(resource.getResourceName()); - newIdealState.getRecord().setSimpleFields(currentIdealState.getRecord().getSimpleFields()); - newIdealState.setRebalanceMode(RebalanceMode.FULL_AUTO); - newIdealState.getRecord().setListFields(newMapping.getListFields()); - - // compute a full partition mapping for the resource - if (LOG.isDebugEnabled()) { - LOG.debug("Processing resource:" + resource.getResourceName()); - } - ResourceAssignment partitionMapping = - new ResourceAssignment(ResourceId.from(resource.getResourceName())); - for (String partitionName : partitions) { - Partition partition = new Partition(partitionName); - Map currentStateMap = - currentStateOutput.getCurrentStateMap(resource.getResourceName(), partition); - Set disabledInstancesForPartition = - clusterData.getDisabledInstancesForPartition(partition.toString()); - List preferenceList = - ConstraintBasedAssignment.getPreferenceList(clusterData, partition, newIdealState, - stateModelDef); - Map bestStateForPartition = - ConstraintBasedAssignment.computeAutoBestStateForPartition(clusterData, stateModelDef, - preferenceList, currentStateMap, disabledInstancesForPartition); - partitionMapping.addReplicaMap(PartitionId.from(partitionName), - ResourceAssignment.replicaMapFromStringMap(bestStateForPartition)); - } - return partitionMapping; - } - - private Map> currentMapping(CurrentStateOutput currentStateOutput, - String resourceName, List partitions, Map stateCountMap) { - - Map> map = new HashMap>(); - - for (String partition : partitions) { - Map curStateMap = - currentStateOutput.getCurrentStateMap(resourceName, new Partition(partition)); - map.put(partition, new HashMap()); - for (String node : curStateMap.keySet()) { - String state = curStateMap.get(node); - if (stateCountMap.containsKey(state)) { - map.get(partition).put(node, state); - } - } - - Map pendingStateMap = - currentStateOutput.getPendingStateMap(resourceName, new Partition(partition)); - for (String node : pendingStateMap.keySet()) { - String state = pendingStateMap.get(node); - if (stateCountMap.containsKey(state)) { - map.get(partition).put(node, state); - } - } - } - return map; - } -} http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java index f6ea60f..ac4d328 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java @@ -1,5 +1,23 @@ package org.apache.helix.controller.rebalancer; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.helix.HelixDefinedState; +import org.apache.helix.HelixManager; +import org.apache.helix.api.Cluster; +import org.apache.helix.api.State; +import org.apache.helix.api.id.ParticipantId; +import org.apache.helix.api.id.PartitionId; +import org.apache.helix.controller.rebalancer.context.CustomRebalancerContext; +import org.apache.helix.controller.rebalancer.context.RebalancerConfig; +import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment; +import org.apache.helix.controller.stages.ResourceCurrentState; +import org.apache.helix.model.ResourceAssignment; +import org.apache.helix.model.StateModelDefinition; +import org.apache.log4j.Logger; + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -19,117 +37,89 @@ package org.apache.helix.controller.rebalancer; * under the License. */ -import java.util.HashMap; -import java.util.Map; -import java.util.Set; - -import org.apache.helix.HelixDefinedState; -import org.apache.helix.HelixManager; -import org.apache.helix.api.id.PartitionId; -import org.apache.helix.api.id.ResourceId; -import org.apache.helix.controller.stages.ClusterDataCache; -import org.apache.helix.controller.stages.CurrentStateOutput; -import org.apache.helix.model.IdealState; -import org.apache.helix.model.LiveInstance; -import org.apache.helix.model.Partition; -import org.apache.helix.model.Resource; -import org.apache.helix.model.ResourceAssignment; -import org.apache.helix.model.StateModelDefinition; -import org.apache.log4j.Logger; - -/** - * This is a Rebalancer specific to custom mode. It is tasked with checking an existing mapping of - * partitions against the set of live instances to mark assignment states as dropped or erroneous - * as necessary. - * The input is the required current assignment of partitions to instances, as well as the required - * existing instance preferences. - * The output is a verified mapping based on that preference list, i.e. partition p has a replica - * on node k with state s, where s may be a dropped or error state if necessary. - */ -@Deprecated public class CustomRebalancer implements Rebalancer { private static final Logger LOG = Logger.getLogger(CustomRebalancer.class); @Override - public void init(HelixManager manager) { + public void init(HelixManager helixManager) { + // do nothing } @Override - public ResourceAssignment computeResourceMapping(Resource resource, IdealState currentIdealState, - CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) { - String stateModelDefName = currentIdealState.getStateModelDefRef(); - StateModelDefinition stateModelDef = clusterData.getStateModelDef(stateModelDefName); + public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig, + Cluster cluster, ResourceCurrentState currentState) { + CustomRebalancerContext config = + rebalancerConfig.getRebalancerContext(CustomRebalancerContext.class); + StateModelDefinition stateModelDef = + cluster.getStateModelMap().get(config.getStateModelDefId()); if (LOG.isDebugEnabled()) { - LOG.debug("Processing resource:" + resource.getResourceName()); + LOG.debug("Processing resource:" + config.getResourceId()); } - ResourceAssignment partitionMapping = - new ResourceAssignment(ResourceId.from(resource.getResourceName())); - for (Partition partition : resource.getPartitions()) { - Map currentStateMap = - currentStateOutput.getCurrentStateMap(resource.getResourceName(), partition); - Set disabledInstancesForPartition = - clusterData.getDisabledInstancesForPartition(partition.toString()); - Map idealStateMap = - IdealState.stringMapFromParticipantStateMap(currentIdealState - .getParticipantStateMap(PartitionId.from(partition.getPartitionName()))); - Map bestStateForPartition = - computeCustomizedBestStateForPartition(clusterData, stateModelDef, idealStateMap, - currentStateMap, disabledInstancesForPartition); - partitionMapping.addReplicaMap(PartitionId.from(partition.getPartitionName()), - ResourceAssignment.replicaMapFromStringMap(bestStateForPartition)); + ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId()); + for (PartitionId partition : config.getPartitionSet()) { + Map currentStateMap = + currentState.getCurrentStateMap(config.getResourceId(), partition); + Set disabledInstancesForPartition = + ConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(), + partition); + Map bestStateForPartition = + computeCustomizedBestStateForPartition(cluster.getLiveParticipantMap().keySet(), + stateModelDef, config.getPreferenceMap(partition), currentStateMap, + disabledInstancesForPartition); + partitionMapping.addReplicaMap(partition, bestStateForPartition); } return partitionMapping; } /** - * compute best state for resource in CUSTOMIZED ideal state mode - * @param cache + * compute best state for resource in CUSTOMIZED rebalancer mode + * @param liveParticipantMap * @param stateModelDef - * @param idealStateMap + * @param preferenceMap * @param currentStateMap - * @param disabledInstancesForPartition + * @param disabledParticipantsForPartition * @return */ - private Map computeCustomizedBestStateForPartition(ClusterDataCache cache, - StateModelDefinition stateModelDef, Map idealStateMap, - Map currentStateMap, Set disabledInstancesForPartition) { - Map instanceStateMap = new HashMap(); + private Map computeCustomizedBestStateForPartition( + Set liveParticipantSet, StateModelDefinition stateModelDef, + Map preferenceMap, Map currentStateMap, + Set disabledParticipantsForPartition) { + Map participantStateMap = new HashMap(); - // if the ideal state is deleted, idealStateMap will be null/empty and + // if the resource is deleted, idealStateMap will be null/empty and // we should drop all resources. if (currentStateMap != null) { - for (String instance : currentStateMap.keySet()) { - if ((idealStateMap == null || !idealStateMap.containsKey(instance)) - && !disabledInstancesForPartition.contains(instance)) { + for (ParticipantId participantId : currentStateMap.keySet()) { + if ((preferenceMap == null || !preferenceMap.containsKey(participantId)) + && !disabledParticipantsForPartition.contains(participantId)) { // if dropped and not disabled, transit to DROPPED - instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString()); - } else if ((currentStateMap.get(instance) == null || !currentStateMap.get(instance).equals( - HelixDefinedState.ERROR.toString())) - && disabledInstancesForPartition.contains(instance)) { + participantStateMap.put(participantId, State.from(HelixDefinedState.DROPPED)); + } else if ((currentStateMap.get(participantId) == null || !currentStateMap.get( + participantId).equals(State.from(HelixDefinedState.ERROR))) + && disabledParticipantsForPartition.contains(participantId)) { // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE) - instanceStateMap.put(instance, stateModelDef.getInitialState()); + participantStateMap.put(participantId, stateModelDef.getTypedInitialState()); } } } // ideal state is deleted - if (idealStateMap == null) { - return instanceStateMap; + if (preferenceMap == null) { + return participantStateMap; } - Map liveInstancesMap = cache.getLiveInstances(); - for (String instance : idealStateMap.keySet()) { + for (ParticipantId participantId : preferenceMap.keySet()) { boolean notInErrorState = - currentStateMap == null || currentStateMap.get(instance) == null - || !currentStateMap.get(instance).equals(HelixDefinedState.ERROR.toString()); + currentStateMap == null || currentStateMap.get(participantId) == null + || !currentStateMap.get(participantId).equals(State.from(HelixDefinedState.ERROR)); - if (liveInstancesMap.containsKey(instance) && notInErrorState - && !disabledInstancesForPartition.contains(instance)) { - instanceStateMap.put(instance, idealStateMap.get(instance)); + if (liveParticipantSet.contains(participantId) && notInErrorState + && !disabledParticipantsForPartition.contains(participantId)) { + participantStateMap.put(participantId, preferenceMap.get(participantId)); } } - return instanceStateMap; + return participantStateMap; } } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java new file mode 100644 index 0000000..b0c11d4 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java @@ -0,0 +1,196 @@ +package org.apache.helix.controller.rebalancer; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.helix.HelixManager; +import org.apache.helix.ZNRecord; +import org.apache.helix.api.Cluster; +import org.apache.helix.api.Participant; +import org.apache.helix.api.State; +import org.apache.helix.api.id.ParticipantId; +import org.apache.helix.api.id.PartitionId; +import org.apache.helix.controller.rebalancer.context.FullAutoRebalancerContext; +import org.apache.helix.controller.rebalancer.context.RebalancerConfig; +import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment; +import org.apache.helix.controller.stages.ResourceCurrentState; +import org.apache.helix.controller.strategy.AutoRebalanceStrategy; +import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme; +import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme; +import org.apache.helix.model.ResourceAssignment; +import org.apache.helix.model.StateModelDefinition; +import org.apache.log4j.Logger; + +import com.google.common.base.Function; +import com.google.common.collect.Lists; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +public class FullAutoRebalancer implements Rebalancer { + // These should be final, but are initialized in init rather than a constructor + private AutoRebalanceStrategy _algorithm; + + private static Logger LOG = Logger.getLogger(FullAutoRebalancer.class); + + @Override + public void init(HelixManager helixManager) { + // do nothing + } + + @Override + public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig, + Cluster cluster, ResourceCurrentState currentState) { + FullAutoRebalancerContext config = + rebalancerConfig.getRebalancerContext(FullAutoRebalancerContext.class); + StateModelDefinition stateModelDef = + cluster.getStateModelMap().get(config.getStateModelDefId()); + // Compute a preference list based on the current ideal state + List partitions = new ArrayList(config.getPartitionSet()); + Map liveParticipants = cluster.getLiveParticipantMap(); + Map allParticipants = cluster.getParticipantMap(); + int replicas = -1; + if (config.anyLiveParticipant()) { + replicas = liveParticipants.size(); + } else { + replicas = config.getReplicaCount(); + } + + // count how many replicas should be in each state + Map upperBounds = + ConstraintBasedAssignment.stateConstraints(stateModelDef, config.getResourceId(), + cluster.getConfig()); + LinkedHashMap stateCountMap = + ConstraintBasedAssignment.stateCount(upperBounds, stateModelDef, + liveParticipants.size(), replicas); + + // get the participant lists + List liveParticipantList = + new ArrayList(liveParticipants.keySet()); + List allParticipantList = + new ArrayList(cluster.getParticipantMap().keySet()); + + // compute the current mapping from the current state + Map> currentMapping = + currentMapping(config, currentState, stateCountMap); + + // If there are nodes tagged with resource, use only those nodes + Set taggedNodes = new HashSet(); + if (config.getParticipantGroupTag() != null) { + for (ParticipantId participantId : liveParticipantList) { + if (liveParticipants.get(participantId).hasTag(config.getParticipantGroupTag())) { + taggedNodes.add(participantId); + } + } + } + if (taggedNodes.size() > 0) { + if (LOG.isInfoEnabled()) { + LOG.info("found the following instances with tag " + config.getResourceId() + " " + + taggedNodes); + } + liveParticipantList = new ArrayList(taggedNodes); + } + + // determine which nodes the replicas should live on + int maxPartition = config.getMaxPartitionsPerParticipant(); + if (LOG.isInfoEnabled()) { + LOG.info("currentMapping: " + currentMapping); + LOG.info("stateCountMap: " + stateCountMap); + LOG.info("liveNodes: " + liveParticipantList); + LOG.info("allNodes: " + allParticipantList); + LOG.info("maxPartition: " + maxPartition); + } + ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme(); + _algorithm = + new AutoRebalanceStrategy(config.getResourceId(), partitions, stateCountMap, maxPartition, + placementScheme); + ZNRecord newMapping = + _algorithm.typedComputePartitionAssignment(liveParticipantList, currentMapping, + allParticipantList); + + if (LOG.isInfoEnabled()) { + LOG.info("newMapping: " + newMapping); + } + + // compute a full partition mapping for the resource + if (LOG.isDebugEnabled()) { + LOG.debug("Processing resource:" + config.getResourceId()); + } + ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId()); + for (PartitionId partition : partitions) { + Set disabledParticipantsForPartition = + ConstraintBasedAssignment.getDisabledParticipants(allParticipants, partition); + List rawPreferenceList = newMapping.getListField(partition.stringify()); + if (rawPreferenceList == null) { + rawPreferenceList = Collections.emptyList(); + } + List preferenceList = + Lists.transform(rawPreferenceList, new Function() { + @Override + public ParticipantId apply(String participantName) { + return ParticipantId.from(participantName); + } + }); + preferenceList = + ConstraintBasedAssignment.getPreferenceList(cluster, partition, preferenceList); + Map bestStateForPartition = + ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds, + liveParticipants.keySet(), stateModelDef, preferenceList, + currentState.getCurrentStateMap(config.getResourceId(), partition), + disabledParticipantsForPartition); + partitionMapping.addReplicaMap(partition, bestStateForPartition); + } + return partitionMapping; + } + + private Map> currentMapping( + FullAutoRebalancerContext config, ResourceCurrentState currentStateOutput, + Map stateCountMap) { + Map> map = + new HashMap>(); + + for (PartitionId partition : config.getPartitionSet()) { + Map curStateMap = + currentStateOutput.getCurrentStateMap(config.getResourceId(), partition); + map.put(partition, new HashMap()); + for (ParticipantId node : curStateMap.keySet()) { + State state = curStateMap.get(node); + if (stateCountMap.containsKey(state)) { + map.get(partition).put(node, state); + } + } + + Map pendingStateMap = + currentStateOutput.getPendingStateMap(config.getResourceId(), partition); + for (ParticipantId node : pendingStateMap.keySet()) { + State state = pendingStateMap.get(node); + if (stateCountMap.containsKey(state)) { + map.get(partition).put(node, state); + } + } + } + return map; + } +} http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java index 26fc2ef..5a6a24e 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java @@ -1,5 +1,11 @@ package org.apache.helix.controller.rebalancer; +import org.apache.helix.HelixManager; +import org.apache.helix.api.Cluster; +import org.apache.helix.controller.rebalancer.context.RebalancerConfig; +import org.apache.helix.controller.stages.ResourceCurrentState; +import org.apache.helix.model.ResourceAssignment; + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -19,40 +25,15 @@ package org.apache.helix.controller.rebalancer; * under the License. */ -import org.apache.helix.HelixManager; -import org.apache.helix.controller.stages.ClusterDataCache; -import org.apache.helix.controller.stages.CurrentStateOutput; -import org.apache.helix.model.IdealState; -import org.apache.helix.model.Resource; -import org.apache.helix.model.ResourceAssignment; - /** * Allows one to come up with custom implementation of a rebalancer.
* This will be invoked on all changes that happen in the cluster.
- * Simply return the newIdealState for a resource in this method.
- *
- * Deprecated. Use {@link org.apache.helix.api.rebalancer.Rebalancer} instead. + * Simply return the resource assignment for a resource in this method.
*/ -@Deprecated public interface Rebalancer { - /** - * Initialize the rebalancer with a HelixManager if necessary - * @param manager - */ - void init(HelixManager manager); - /** - * Given an ideal state for a resource and liveness of instances, compute a assignment of - * instances and states to each partition of a resource. This method provides all the relevant - * information needed to rebalance a resource. If you need additional information use - * manager.getAccessor to read the cluster data. This allows one to compute the newIdealState - * according to app specific requirements. - * @param resourceName the resource for which a mapping will be computed - * @param currentIdealState the IdealState that corresponds to this resource - * @param currentStateOutput the current states of all partitions - * @param clusterData cache of the cluster state - */ - ResourceAssignment computeResourceMapping(final Resource resource, - final IdealState currentIdealState, final CurrentStateOutput currentStateOutput, - final ClusterDataCache clusterData); + public void init(HelixManager helixManager); + + public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig, Cluster cluster, + ResourceCurrentState currentState); } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/RebalancerRef.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/RebalancerRef.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/RebalancerRef.java new file mode 100644 index 0000000..79e4ba0 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/RebalancerRef.java @@ -0,0 +1,94 @@ +package org.apache.helix.controller.rebalancer; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.util.HelixUtil; +import org.apache.log4j.Logger; +import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonIgnore; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * Reference to a class that extends {@link Rebalancer}. It loads the class automatically. + */ +public class RebalancerRef { + private static final Logger LOG = Logger.getLogger(RebalancerRef.class); + + @JsonProperty("rebalancerClassName") + private final String _rebalancerClassName; + + @JsonCreator + private RebalancerRef(@JsonProperty("rebalancerClassName") String rebalancerClassName) { + _rebalancerClassName = rebalancerClassName; + } + + /** + * Get an instantiated Rebalancer + * @return Rebalancer or null if instantiation failed + */ + @JsonIgnore + public Rebalancer getRebalancer() { + try { + return (Rebalancer) (HelixUtil.loadClass(getClass(), _rebalancerClassName).newInstance()); + } catch (Exception e) { + LOG.warn("Exception while invoking custom rebalancer class:" + _rebalancerClassName, e); + } + return null; + } + + @Override + public String toString() { + return _rebalancerClassName; + } + + @Override + public boolean equals(Object that) { + if (that instanceof RebalancerRef) { + return this.toString().equals(((RebalancerRef) that).toString()); + } else if (that instanceof String) { + return this.toString().equals(that); + } + return false; + } + + /** + * Get a rebalancer class reference + * @param rebalancerClassName name of the class + * @return RebalancerRef or null if name is null + */ + public static RebalancerRef from(String rebalancerClassName) { + if (rebalancerClassName == null) { + return null; + } + return new RebalancerRef(rebalancerClassName); + } + + /** + * Get a RebalancerRef from a class object + * @param rebalancerClass class that implements Rebalancer + * @return RebalancerRef + */ + public static RebalancerRef from(Class rebalancerClass) { + if (rebalancerClass == null) { + return null; + } + return RebalancerRef.from(rebalancerClass.getName()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java index dd9fcf1..96e3d4b 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java @@ -1,5 +1,22 @@ package org.apache.helix.controller.rebalancer; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.helix.HelixManager; +import org.apache.helix.api.Cluster; +import org.apache.helix.api.State; +import org.apache.helix.api.id.ParticipantId; +import org.apache.helix.api.id.PartitionId; +import org.apache.helix.controller.rebalancer.context.RebalancerConfig; +import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext; +import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment; +import org.apache.helix.controller.stages.ResourceCurrentState; +import org.apache.helix.model.ResourceAssignment; +import org.apache.helix.model.StateModelDefinition; +import org.apache.log4j.Logger; + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -19,65 +36,48 @@ package org.apache.helix.controller.rebalancer; * under the License. */ -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.helix.HelixManager; -import org.apache.helix.api.id.PartitionId; -import org.apache.helix.api.id.ResourceId; -import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment; -import org.apache.helix.controller.stages.ClusterDataCache; -import org.apache.helix.controller.stages.CurrentStateOutput; -import org.apache.helix.model.IdealState; -import org.apache.helix.model.Partition; -import org.apache.helix.model.Resource; -import org.apache.helix.model.ResourceAssignment; -import org.apache.helix.model.StateModelDefinition; -import org.apache.log4j.Logger; - /** - * This is a Rebalancer specific to semi-automatic mode. It is tasked with computing the ideal - * state of a resource based on a predefined preference list of instances willing to accept - * replicas. - * The input is the optional current assignment of partitions to instances, as well as the required - * existing instance preferences. - * The output is a mapping based on that preference list, i.e. partition p has a replica on node k - * with state s. + * Rebalancer for the SEMI_AUTO mode. It expects a RebalancerConfig that understands the preferred + * locations of each partition replica */ -@Deprecated public class SemiAutoRebalancer implements Rebalancer { - private static final Logger LOG = Logger.getLogger(SemiAutoRebalancer.class); @Override - public void init(HelixManager manager) { + public void init(HelixManager helixManager) { + // do nothing } @Override - public ResourceAssignment computeResourceMapping(Resource resource, IdealState currentIdealState, - CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) { - String stateModelDefName = currentIdealState.getStateModelDefRef(); - StateModelDefinition stateModelDef = clusterData.getStateModelDef(stateModelDefName); + public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig, + Cluster cluster, ResourceCurrentState currentState) { + SemiAutoRebalancerContext config = + rebalancerConfig.getRebalancerContext(SemiAutoRebalancerContext.class); + StateModelDefinition stateModelDef = + cluster.getStateModelMap().get(config.getStateModelDefId()); if (LOG.isDebugEnabled()) { - LOG.debug("Processing resource:" + resource.getResourceName()); + LOG.debug("Processing resource:" + config.getResourceId()); } - ResourceAssignment partitionMapping = - new ResourceAssignment(ResourceId.from(resource.getResourceName())); - for (Partition partition : resource.getPartitions()) { - Map currentStateMap = - currentStateOutput.getCurrentStateMap(resource.getResourceName(), partition); - Set disabledInstancesForPartition = - clusterData.getDisabledInstancesForPartition(partition.toString()); - List preferenceList = - ConstraintBasedAssignment.getPreferenceList(clusterData, partition, currentIdealState, - stateModelDef); - Map bestStateForPartition = - ConstraintBasedAssignment.computeAutoBestStateForPartition(clusterData, stateModelDef, - preferenceList, currentStateMap, disabledInstancesForPartition); - partitionMapping.addReplicaMap(PartitionId.from(partition.getPartitionName()), - ResourceAssignment.replicaMapFromStringMap(bestStateForPartition)); + ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId()); + for (PartitionId partition : config.getPartitionSet()) { + Map currentStateMap = + currentState.getCurrentStateMap(config.getResourceId(), partition); + Set disabledInstancesForPartition = + ConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(), + partition); + List preferenceList = + ConstraintBasedAssignment.getPreferenceList(cluster, partition, + config.getPreferenceList(partition)); + Map upperBounds = + ConstraintBasedAssignment.stateConstraints(stateModelDef, config.getResourceId(), + cluster.getConfig()); + Map bestStateForPartition = + ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds, cluster + .getLiveParticipantMap().keySet(), stateModelDef, preferenceList, currentStateMap, + disabledInstancesForPartition); + partitionMapping.addReplicaMap(partition, bestStateForPartition); } return partitionMapping; } + } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/BasicRebalancerContext.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/BasicRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/BasicRebalancerContext.java new file mode 100644 index 0000000..ec765d7 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/BasicRebalancerContext.java @@ -0,0 +1,240 @@ +package org.apache.helix.controller.rebalancer.context; + +import java.util.Set; + +import org.apache.helix.api.Partition; +import org.apache.helix.api.id.PartitionId; +import org.apache.helix.api.id.ResourceId; +import org.apache.helix.api.id.StateModelDefId; +import org.apache.helix.api.id.StateModelFactoryId; +import org.apache.helix.controller.rebalancer.RebalancerRef; +import org.codehaus.jackson.annotate.JsonIgnore; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Abstract RebalancerContext that functions for generic subunits. Use a subclass that more + * concretely defines the subunits. + */ +public abstract class BasicRebalancerContext implements RebalancerContext { + private ResourceId _resourceId; + private StateModelDefId _stateModelDefId; + private StateModelFactoryId _stateModelFactoryId; + private String _participantGroupTag; + private Class _serializer; + private RebalancerRef _rebalancerRef; + + /** + * Instantiate a basic rebalancer context + */ + public BasicRebalancerContext() { + _serializer = DefaultContextSerializer.class; + } + + @Override + public ResourceId getResourceId() { + return _resourceId; + } + + /** + * Set the resource to rebalance + * @param resourceId resource id + */ + public void setResourceId(ResourceId resourceId) { + _resourceId = resourceId; + } + + @Override + public StateModelDefId getStateModelDefId() { + return _stateModelDefId; + } + + /** + * Set the state model definition that the resource follows + * @param stateModelDefId state model definition id + */ + public void setStateModelDefId(StateModelDefId stateModelDefId) { + _stateModelDefId = stateModelDefId; + } + + @Override + public StateModelFactoryId getStateModelFactoryId() { + return _stateModelFactoryId; + } + + /** + * Set the state model factory that the resource uses + * @param stateModelFactoryId state model factory id + */ + public void setStateModelFactoryId(StateModelFactoryId stateModelFactoryId) { + _stateModelFactoryId = stateModelFactoryId; + } + + @Override + public String getParticipantGroupTag() { + return _participantGroupTag; + } + + /** + * Set a tag that participants must have in order to serve this resource + * @param participantGroupTag string group tag + */ + public void setParticipantGroupTag(String participantGroupTag) { + _participantGroupTag = participantGroupTag; + } + + /** + * Get the serializer. If none is provided, {@link DefaultContextSerializer} is used + */ + @Override + public Class getSerializerClass() { + return _serializer; + } + + /** + * Set the class that can serialize this context + * @param serializer serializer class that implements ContextSerializer + */ + public void setSerializerClass(Class serializer) { + _serializer = serializer; + } + + @Override + @JsonIgnore + public Set getSubUnitIdSet() { + return getSubUnitMap().keySet(); + } + + @Override + @JsonIgnore + public Partition getSubUnit(PartitionId subUnitId) { + return getSubUnitMap().get(subUnitId); + } + + @Override + public RebalancerRef getRebalancerRef() { + return _rebalancerRef; + } + + /** + * Set the reference to the class used to rebalance this resource + * @param rebalancerRef RebalancerRef instance + */ + public void setRebalancerRef(RebalancerRef rebalancerRef) { + _rebalancerRef = rebalancerRef; + } + + /** + * Abstract builder for the base rebalancer context + */ + public static abstract class AbstractBuilder> { + private final ResourceId _resourceId; + private StateModelDefId _stateModelDefId; + private StateModelFactoryId _stateModelFactoryId; + private String _participantGroupTag; + private Class _serializerClass; + private RebalancerRef _rebalancerRef; + + /** + * Instantiate with a resource id + * @param resourceId resource id + */ + public AbstractBuilder(ResourceId resourceId) { + _resourceId = resourceId; + _serializerClass = DefaultContextSerializer.class; + } + + /** + * Set the state model definition that the resource should follow + * @param stateModelDefId state model definition id + * @return Builder + */ + public T stateModelDefId(StateModelDefId stateModelDefId) { + _stateModelDefId = stateModelDefId; + return self(); + } + + /** + * Set the state model factory that the resource should use + * @param stateModelFactoryId state model factory id + * @return Builder + */ + public T stateModelFactoryId(StateModelFactoryId stateModelFactoryId) { + _stateModelFactoryId = stateModelFactoryId; + return self(); + } + + /** + * Set the tag that all participants require in order to serve this resource + * @param participantGroupTag the tag + * @return Builder + */ + public T participantGroupTag(String participantGroupTag) { + _participantGroupTag = participantGroupTag; + return self(); + } + + /** + * Set the serializer class for this rebalancer context + * @param serializerClass class that implements ContextSerializer + * @return Builder + */ + public T serializerClass(Class serializerClass) { + _serializerClass = serializerClass; + return self(); + } + + /** + * Specify a custom class to use for rebalancing + * @param rebalancerRef RebalancerRef instance + * @return Builder + */ + public T rebalancerRef(RebalancerRef rebalancerRef) { + _rebalancerRef = rebalancerRef; + return self(); + } + + /** + * Update an existing context with base fields + * @param context derived context + */ + protected final void update(BasicRebalancerContext context) { + context.setResourceId(_resourceId); + context.setStateModelDefId(_stateModelDefId); + context.setStateModelFactoryId(_stateModelFactoryId); + context.setParticipantGroupTag(_participantGroupTag); + context.setSerializerClass(_serializerClass); + context.setRebalancerRef(_rebalancerRef); + } + + /** + * Get a typed reference to "this" class. Final derived classes should simply return the this + * reference. + * @return this for the most specific type + */ + protected abstract T self(); + + /** + * Get the rebalancer context from the built fields + * @return RebalancerContext + */ + public abstract RebalancerContext build(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ContextSerializer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ContextSerializer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ContextSerializer.java new file mode 100644 index 0000000..ef12a09 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ContextSerializer.java @@ -0,0 +1,37 @@ +package org.apache.helix.controller.rebalancer.context; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +public interface ContextSerializer { + /** + * Convert a RebalancerContext object instance to a String + * @param data instance of the rebalancer context type + * @return String representing the object + */ + public String serialize(final T data); + + /** + * Convert raw bytes to a generic object instance + * @param clazz The class represented by the deserialized string + * @param string String representing the object + * @return instance of the generic type or null if the conversion failed + */ + public T deserialize(final Class clazz, final String string); +} http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java new file mode 100644 index 0000000..1fc1cda --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java @@ -0,0 +1,163 @@ +package org.apache.helix.controller.rebalancer.context; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.helix.api.State; +import org.apache.helix.api.id.ParticipantId; +import org.apache.helix.api.id.PartitionId; +import org.apache.helix.api.id.ResourceId; +import org.apache.helix.controller.rebalancer.CustomRebalancer; +import org.apache.helix.controller.rebalancer.RebalancerRef; +import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment; +import org.apache.helix.controller.strategy.AutoRebalanceStrategy; +import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme; +import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme; +import org.apache.helix.model.IdealState.RebalanceMode; +import org.apache.helix.model.ResourceAssignment; +import org.apache.helix.model.StateModelDefinition; +import org.codehaus.jackson.annotate.JsonIgnore; + +import com.google.common.collect.Maps; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * RebalancerContext for a resource that should be rebalanced in CUSTOMIZED mode. By default, it + * corresponds to {@link CustomRebalancer} + */ +public class CustomRebalancerContext extends PartitionedRebalancerContext { + private Map> _preferenceMaps; + + /** + * Instantiate a CustomRebalancerContext + */ + public CustomRebalancerContext() { + super(RebalanceMode.CUSTOMIZED); + setRebalancerRef(RebalancerRef.from(CustomRebalancer.class)); + _preferenceMaps = Maps.newHashMap(); + } + + /** + * Get the preference maps of the partitions and replicas of the resource + * @return map of partition to participant and state + */ + public Map> getPreferenceMaps() { + return _preferenceMaps; + } + + /** + * Set the preference maps of the partitions and replicas of the resource + * @param preferenceMaps map of partition to participant and state + */ + public void setPreferenceMaps(Map> preferenceMaps) { + _preferenceMaps = preferenceMaps; + } + + /** + * Get the preference map of a partition + * @param partitionId the partition to look up + * @return map of participant to state + */ + @JsonIgnore + public Map getPreferenceMap(PartitionId partitionId) { + return _preferenceMaps.get(partitionId); + } + + /** + * Generate preference maps based on a default cluster setup + * @param stateModelDef the state model definition to follow + * @param participantSet the set of participant ids to configure for + */ + @Override + @JsonIgnore + public void generateDefaultConfiguration(StateModelDefinition stateModelDef, + Set participantSet) { + // compute default upper bounds + Map upperBounds = Maps.newHashMap(); + for (State state : stateModelDef.getTypedStatesPriorityList()) { + upperBounds.put(state, stateModelDef.getNumParticipantsPerState(state)); + } + + // determine the current mapping + Map> currentMapping = getPreferenceMaps(); + + // determine the preference maps + LinkedHashMap stateCounts = + ConstraintBasedAssignment.stateCount(upperBounds, stateModelDef, participantSet.size(), + getReplicaCount()); + ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme(); + List participantList = new ArrayList(participantSet); + List partitionList = new ArrayList(getPartitionSet()); + AutoRebalanceStrategy strategy = + new AutoRebalanceStrategy(ResourceId.from(""), partitionList, stateCounts, + getMaxPartitionsPerParticipant(), placementScheme); + Map> rawPreferenceMaps = + strategy.typedComputePartitionAssignment(participantList, currentMapping, participantList) + .getMapFields(); + Map> preferenceMaps = + Maps.newHashMap(ResourceAssignment.replicaMapsFromStringMaps(rawPreferenceMaps)); + setPreferenceMaps(preferenceMaps); + } + + /** + * Build a CustomRebalancerContext. By default, it corresponds to {@link CustomRebalancer} + */ + public static final class Builder extends PartitionedRebalancerContext.AbstractBuilder { + private final Map> _preferenceMaps; + + /** + * Instantiate for a resource + * @param resourceId resource id + */ + public Builder(ResourceId resourceId) { + super(resourceId); + super.rebalancerRef(RebalancerRef.from(CustomRebalancer.class)); + _preferenceMaps = Maps.newHashMap(); + } + + /** + * Add a preference map for a partition + * @param partitionId partition to set + * @param preferenceList map of participant id to state indicating where replicas are served + * @return Builder + */ + public Builder preferenceMap(PartitionId partitionId, Map preferenceMap) { + _preferenceMaps.put(partitionId, preferenceMap); + return self(); + } + + @Override + protected Builder self() { + return this; + } + + @Override + public CustomRebalancerContext build() { + CustomRebalancerContext context = new CustomRebalancerContext(); + super.update(context); + context.setPreferenceMaps(_preferenceMaps); + return context; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/DefaultContextSerializer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/DefaultContextSerializer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/DefaultContextSerializer.java new file mode 100644 index 0000000..ecc93fb --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/DefaultContextSerializer.java @@ -0,0 +1,83 @@ +package org.apache.helix.controller.rebalancer.context; + +import java.io.ByteArrayInputStream; +import java.io.StringWriter; + +import org.apache.helix.HelixException; +import org.apache.log4j.Logger; +import org.codehaus.jackson.map.DeserializationConfig; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.SerializationConfig; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Default serializer implementation for RebalancerContexts. Uses the Jackson JSON library to + * convert to and from strings + */ +public class DefaultContextSerializer implements ContextSerializer { + + private static Logger logger = Logger.getLogger(DefaultContextSerializer.class); + + @Override + public String serialize(final T data) { + if (data == null) { + return null; + } + + ObjectMapper mapper = new ObjectMapper(); + SerializationConfig serializationConfig = mapper.getSerializationConfig(); + serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true); + serializationConfig.set(SerializationConfig.Feature.AUTO_DETECT_FIELDS, true); + serializationConfig.set(SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true); + StringWriter sw = new StringWriter(); + try { + mapper.writeValue(sw, data); + } catch (Exception e) { + logger.error("Exception during payload data serialization.", e); + throw new HelixException(e); + } + return sw.toString(); + } + + @Override + public T deserialize(final Class clazz, final String string) { + if (string == null || string.length() == 0) { + return null; + } + + ObjectMapper mapper = new ObjectMapper(); + ByteArrayInputStream bais = new ByteArrayInputStream(string.getBytes()); + + DeserializationConfig deserializationConfig = mapper.getDeserializationConfig(); + deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_FIELDS, true); + deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_SETTERS, true); + deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_CREATORS, true); + deserializationConfig.set(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, true); + deserializationConfig.set(DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true); + try { + T payload = mapper.readValue(bais, clazz); + return payload; + } catch (Exception e) { + logger.error("Exception during deserialization of payload bytes: " + string, e); + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancerContext.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancerContext.java new file mode 100644 index 0000000..2db9ac6 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancerContext.java @@ -0,0 +1,63 @@ +package org.apache.helix.controller.rebalancer.context; + +import org.apache.helix.api.id.ResourceId; +import org.apache.helix.controller.rebalancer.FullAutoRebalancer; +import org.apache.helix.controller.rebalancer.RebalancerRef; +import org.apache.helix.model.IdealState.RebalanceMode; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * RebalancerContext for FULL_AUTO rebalancing mode. By default, it corresponds to + * {@link FullAutoRebalancer} + */ +public class FullAutoRebalancerContext extends PartitionedRebalancerContext { + public FullAutoRebalancerContext() { + super(RebalanceMode.FULL_AUTO); + setRebalancerRef(RebalancerRef.from(FullAutoRebalancer.class)); + } + + /** + * Builder for a full auto rebalancer context. By default, it corresponds to + * {@link FullAutoRebalancer} + */ + public static final class Builder extends PartitionedRebalancerContext.AbstractBuilder { + /** + * Instantiate with a resource + * @param resourceId resource id + */ + public Builder(ResourceId resourceId) { + super(resourceId); + super.rebalancerRef(RebalancerRef.from(FullAutoRebalancer.class)); + } + + @Override + protected Builder self() { + return this; + } + + @Override + public FullAutoRebalancerContext build() { + FullAutoRebalancerContext context = new FullAutoRebalancerContext(); + super.update(context); + return context; + } + } +}