helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject [3/4] [HELIX-327] Simplify rebalancer, rename rebalancer configs, support settable contexts, rb=15981
Date Wed, 04 Dec 2013 01:21:06 GMT
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/CustomRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/CustomRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/CustomRebalancerConfig.java
new file mode 100644
index 0000000..73c3ccc
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/CustomRebalancerConfig.java
@@ -0,0 +1,164 @@
+package org.apache.helix.controller.rebalancer.config;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.CustomRebalancer;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
+import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.codehaus.jackson.annotate.JsonIgnore;
+
+import com.google.common.collect.Maps;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * RebalancerConfig for a resource that should be rebalanced in CUSTOMIZED mode. By default, it
+ * corresponds to {@link CustomRebalancer}
+ */
+public class CustomRebalancerConfig extends PartitionedRebalancerConfig {
+  private Map<PartitionId, Map<ParticipantId, State>> _preferenceMaps;
+
+  /**
+   * Instantiate a CustomRebalancerConfig
+   */
+  public CustomRebalancerConfig() {
+    setRebalanceMode(RebalanceMode.CUSTOMIZED);
+    setRebalancerRef(RebalancerRef.from(CustomRebalancer.class));
+    _preferenceMaps = Maps.newHashMap();
+  }
+
+  /**
+   * Get the preference maps of the partitions and replicas of the resource
+   * @return map of partition to participant and state
+   */
+  public Map<PartitionId, Map<ParticipantId, State>> getPreferenceMaps() {
+    return _preferenceMaps;
+  }
+
+  /**
+   * Set the preference maps of the partitions and replicas of the resource
+   * @param preferenceMaps map of partition to participant and state
+   */
+  public void setPreferenceMaps(Map<PartitionId, Map<ParticipantId, State>> preferenceMaps) {
+    _preferenceMaps = preferenceMaps;
+  }
+
+  /**
+   * Get the preference map of a partition
+   * @param partitionId the partition to look up
+   * @return map of participant to state
+   */
+  @JsonIgnore
+  public Map<ParticipantId, State> getPreferenceMap(PartitionId partitionId) {
+    return _preferenceMaps.get(partitionId);
+  }
+
+  /**
+   * Generate preference maps based on a default cluster setup
+   * @param stateModelDef the state model definition to follow
+   * @param participantSet the set of participant ids to configure for
+   */
+  @Override
+  @JsonIgnore
+  public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
+      Set<ParticipantId> participantSet) {
+    // compute default upper bounds
+    Map<State, String> upperBounds = Maps.newHashMap();
+    for (State state : stateModelDef.getTypedStatesPriorityList()) {
+      upperBounds.put(state, stateModelDef.getNumParticipantsPerState(state));
+    }
+
+    // determine the current mapping
+    Map<PartitionId, Map<ParticipantId, State>> currentMapping = getPreferenceMaps();
+
+    // determine the preference maps
+    LinkedHashMap<State, Integer> stateCounts =
+        ConstraintBasedAssignment.stateCount(upperBounds, stateModelDef, participantSet.size(),
+            getReplicaCount());
+    ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
+    List<ParticipantId> participantList = new ArrayList<ParticipantId>(participantSet);
+    List<PartitionId> partitionList = new ArrayList<PartitionId>(getPartitionSet());
+    AutoRebalanceStrategy strategy =
+        new AutoRebalanceStrategy(ResourceId.from(""), partitionList, stateCounts,
+            getMaxPartitionsPerParticipant(), placementScheme);
+    Map<String, Map<String, String>> rawPreferenceMaps =
+        strategy.typedComputePartitionAssignment(participantList, currentMapping, participantList)
+            .getMapFields();
+    Map<PartitionId, Map<ParticipantId, State>> preferenceMaps =
+        Maps.newHashMap(ResourceAssignment.replicaMapsFromStringMaps(rawPreferenceMaps));
+    setPreferenceMaps(preferenceMaps);
+  }
+
+  /**
+   * Build a CustomRebalancerConfig. By default, it corresponds to {@link CustomRebalancer}
+   */
+  public static final class Builder extends PartitionedRebalancerConfig.AbstractBuilder<Builder> {
+    private final Map<PartitionId, Map<ParticipantId, State>> _preferenceMaps;
+
+    /**
+     * Instantiate for a resource
+     * @param resourceId resource id
+     */
+    public Builder(ResourceId resourceId) {
+      super(resourceId);
+      super.rebalancerRef(RebalancerRef.from(CustomRebalancer.class));
+      super.rebalanceMode(RebalanceMode.CUSTOMIZED);
+      _preferenceMaps = Maps.newHashMap();
+    }
+
+    /**
+     * Add a preference map for a partition
+     * @param partitionId partition to set
+     * @param preferenceList map of participant id to state indicating where replicas are served
+     * @return Builder
+     */
+    public Builder preferenceMap(PartitionId partitionId, Map<ParticipantId, State> preferenceMap) {
+      _preferenceMaps.put(partitionId, preferenceMap);
+      return self();
+    }
+
+    @Override
+    protected Builder self() {
+      return this;
+    }
+
+    @Override
+    public CustomRebalancerConfig build() {
+      CustomRebalancerConfig config = new CustomRebalancerConfig();
+      super.update(config);
+      config.setPreferenceMaps(_preferenceMaps);
+      return config;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/FullAutoRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/FullAutoRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/FullAutoRebalancerConfig.java
new file mode 100644
index 0000000..828d509
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/FullAutoRebalancerConfig.java
@@ -0,0 +1,64 @@
+package org.apache.helix.controller.rebalancer.config;
+
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.FullAutoRebalancer;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
+import org.apache.helix.model.IdealState.RebalanceMode;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * RebalancerConfig for FULL_AUTO rebalancing mode. By default, it corresponds to
+ * {@link FullAutoRebalancer}
+ */
+public class FullAutoRebalancerConfig extends PartitionedRebalancerConfig {
+  public FullAutoRebalancerConfig() {
+    setRebalanceMode(RebalanceMode.FULL_AUTO);
+    setRebalancerRef(RebalancerRef.from(FullAutoRebalancer.class));
+  }
+
+  /**
+   * Builder for a full auto rebalancer config. By default, it corresponds to
+   * {@link FullAutoRebalancer}
+   */
+  public static final class Builder extends PartitionedRebalancerConfig.AbstractBuilder<Builder> {
+    /**
+     * Instantiate with a resource
+     * @param resourceId resource id
+     */
+    public Builder(ResourceId resourceId) {
+      super(resourceId);
+      super.rebalancerRef(RebalancerRef.from(FullAutoRebalancer.class));
+      super.rebalanceMode(RebalanceMode.FULL_AUTO);
+    }
+
+    @Override
+    protected Builder self() {
+      return this;
+    }
+
+    @Override
+    public FullAutoRebalancerConfig build() {
+      FullAutoRebalancerConfig config = new FullAutoRebalancerConfig();
+      super.update(config);
+      return config;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java
new file mode 100644
index 0000000..2c9769d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java
@@ -0,0 +1,405 @@
+package org.apache.helix.controller.rebalancer.config;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixConstants.StateModelToken;
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.StateModelDefinition;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+import com.google.common.collect.Maps;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * RebalancerConfig for a resource whose subunits are partitions. In addition, these partitions can
+ * be replicated.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class PartitionedRebalancerConfig extends BasicRebalancerConfig implements
+    ReplicatedRebalancerConfig {
+  private Map<PartitionId, Partition> _partitionMap;
+  private boolean _anyLiveParticipant;
+  private int _replicaCount;
+  private int _maxPartitionsPerParticipant;
+  private RebalanceMode _rebalanceMode;
+
+  /**
+   * Instantiate a PartitionedRebalancerConfig
+   */
+  public PartitionedRebalancerConfig() {
+    _partitionMap = Collections.emptyMap();
+    _replicaCount = 1;
+    _anyLiveParticipant = false;
+    _maxPartitionsPerParticipant = Integer.MAX_VALUE;
+    _rebalanceMode = RebalanceMode.USER_DEFINED;
+  }
+
+  /**
+   * Get a map from partition id to partition
+   * @return partition map (mutable)
+   */
+  public Map<PartitionId, Partition> getPartitionMap() {
+    return _partitionMap;
+  }
+
+  /**
+   * Set a map of partition id to partition
+   * @param partitionMap partition map
+   */
+  public void setPartitionMap(Map<PartitionId, Partition> partitionMap) {
+    _partitionMap = Maps.newHashMap(partitionMap);
+  }
+
+  /**
+   * Get the set of partitions for this resource
+   * @return set of partition ids
+   */
+  @JsonIgnore
+  public Set<PartitionId> getPartitionSet() {
+    return _partitionMap.keySet();
+  }
+
+  /**
+   * Get a partition
+   * @param partitionId id of the partition to get
+   * @return Partition object, or null if not present
+   */
+  @JsonIgnore
+  public Partition getPartition(PartitionId partitionId) {
+    return _partitionMap.get(partitionId);
+  }
+
+  @Override
+  public boolean anyLiveParticipant() {
+    return _anyLiveParticipant;
+  }
+
+  /**
+   * Indicate if this resource should be assigned to any live participant
+   * @param anyLiveParticipant true if any live participant expected, false otherwise
+   */
+  public void setAnyLiveParticipant(boolean anyLiveParticipant) {
+    _anyLiveParticipant = anyLiveParticipant;
+  }
+
+  @Override
+  public int getReplicaCount() {
+    return _replicaCount;
+  }
+
+  /**
+   * Set the number of replicas that each partition should have
+   * @param replicaCount
+   */
+  public void setReplicaCount(int replicaCount) {
+    _replicaCount = replicaCount;
+  }
+
+  /**
+   * Get the maximum number of partitions that a participant can serve
+   * @return maximum number of partitions per participant
+   */
+  public int getMaxPartitionsPerParticipant() {
+    return _maxPartitionsPerParticipant;
+  }
+
+  /**
+   * Set the maximum number of partitions that a participant can serve
+   * @param maxPartitionsPerParticipant maximum number of partitions per participant
+   */
+  public void setMaxPartitionsPerParticipant(int maxPartitionsPerParticipant) {
+    _maxPartitionsPerParticipant = maxPartitionsPerParticipant;
+  }
+
+  /**
+   * Set the rebalancer mode of the partitioned resource
+   * @param rebalanceMode {@link RebalanceMode} enum value
+   */
+  public void setRebalanceMode(RebalanceMode rebalanceMode) {
+    _rebalanceMode = rebalanceMode;
+  }
+
+  /**
+   * Get the rebalancer mode of the resource
+   * @return RebalanceMode
+   */
+  public RebalanceMode getRebalanceMode() {
+    return _rebalanceMode;
+  }
+
+  @Override
+  @JsonIgnore
+  public Map<PartitionId, Partition> getSubUnitMap() {
+    return getPartitionMap();
+  }
+
+  /**
+   * Generate a default configuration given the state model and a participant.
+   * @param stateModelDef the state model definition to follow
+   * @param participantSet the set of participant ids to configure for
+   */
+  @JsonIgnore
+  public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
+      Set<ParticipantId> participantSet) {
+    // the base config does not understand enough to know do to anything
+  }
+
+  /**
+   * Safely get a {@link PartitionedRebalancerConfig} from a {@link RebalancerConfig}
+   * @param config the base config
+   * @return a {@link PartitionedRebalancerConfig}, or null if the conversion is not possible
+   */
+  public static PartitionedRebalancerConfig from(RebalancerConfig config) {
+    try {
+      return PartitionedRebalancerConfig.class.cast(config);
+    } catch (ClassCastException e) {
+      return null;
+    }
+  }
+
+  /**
+   * Convert a physically-stored IdealState into a rebalancer config for a partitioned resource
+   * @param idealState populated IdealState
+   * @return PartitionedRebalancerConfig
+   */
+  public static PartitionedRebalancerConfig from(IdealState idealState) {
+    PartitionedRebalancerConfig config;
+    switch (idealState.getRebalanceMode()) {
+    case FULL_AUTO:
+      FullAutoRebalancerConfig.Builder fullAutoBuilder =
+          new FullAutoRebalancerConfig.Builder(idealState.getResourceId());
+      populateConfig(fullAutoBuilder, idealState);
+      config = fullAutoBuilder.build();
+      break;
+    case SEMI_AUTO:
+      SemiAutoRebalancerConfig.Builder semiAutoBuilder =
+          new SemiAutoRebalancerConfig.Builder(idealState.getResourceId());
+      for (PartitionId partitionId : idealState.getPartitionIdSet()) {
+        semiAutoBuilder.preferenceList(partitionId, idealState.getPreferenceList(partitionId));
+      }
+      populateConfig(semiAutoBuilder, idealState);
+      config = semiAutoBuilder.build();
+      break;
+    case CUSTOMIZED:
+      CustomRebalancerConfig.Builder customBuilder =
+          new CustomRebalancerConfig.Builder(idealState.getResourceId());
+      for (PartitionId partitionId : idealState.getPartitionIdSet()) {
+        customBuilder.preferenceMap(partitionId, idealState.getParticipantStateMap(partitionId));
+      }
+      populateConfig(customBuilder, idealState);
+      config = customBuilder.build();
+      break;
+    default:
+      Builder baseBuilder = new Builder(idealState.getResourceId());
+      populateConfig(baseBuilder, idealState);
+      config = baseBuilder.build();
+      break;
+    }
+    return config;
+  }
+
+  /**
+   * Update a builder subclass with all the fields of the ideal state
+   * @param builder builder that extends AbstractBuilder
+   * @param idealState populated IdealState
+   */
+  private static <T extends AbstractBuilder<T>> void populateConfig(T builder, IdealState idealState) {
+    String replicas = idealState.getReplicas();
+    int replicaCount = 0;
+    boolean anyLiveParticipant = false;
+    if (replicas.equals(StateModelToken.ANY_LIVEINSTANCE.toString())) {
+      anyLiveParticipant = true;
+    } else {
+      replicaCount = Integer.parseInt(replicas);
+    }
+    if (idealState.getNumPartitions() > 0 && idealState.getPartitionIdSet().size() == 0) {
+      // backwards compatibility: partition sets were based on pref lists/maps previously
+      builder.addPartitions(idealState.getNumPartitions());
+    } else {
+      for (PartitionId partitionId : idealState.getPartitionIdSet()) {
+        builder.addPartition(new Partition(partitionId));
+      }
+    }
+    builder.anyLiveParticipant(anyLiveParticipant).replicaCount(replicaCount)
+        .maxPartitionsPerParticipant(idealState.getMaxPartitionsPerInstance())
+        .participantGroupTag(idealState.getInstanceGroupTag())
+        .stateModelDefId(idealState.getStateModelDefId())
+        .stateModelFactoryId(idealState.getStateModelFactoryId());
+    RebalancerRef rebalancerRef = idealState.getRebalancerRef();
+    if (rebalancerRef != null) {
+      builder.rebalancerRef(rebalancerRef);
+    }
+  }
+
+  /**
+   * Builder for a basic data rebalancer config
+   */
+  public static final class Builder extends AbstractBuilder<Builder> {
+    /**
+     * Instantiate with a resource
+     * @param resourceId resource id
+     */
+    public Builder(ResourceId resourceId) {
+      super(resourceId);
+    }
+
+    @Override
+    protected Builder self() {
+      return this;
+    }
+
+    @Override
+    public PartitionedRebalancerConfig build() {
+      PartitionedRebalancerConfig config = new PartitionedRebalancerConfig();
+      super.update(config);
+      return config;
+    }
+  }
+
+  /**
+   * Abstract builder for a generic partitioned resource rebalancer config
+   */
+  public static abstract class AbstractBuilder<T extends BasicRebalancerConfig.AbstractBuilder<T>>
+      extends BasicRebalancerConfig.AbstractBuilder<T> {
+    private final ResourceId _resourceId;
+    private final Map<PartitionId, Partition> _partitionMap;
+    private RebalanceMode _rebalanceMode;
+    private boolean _anyLiveParticipant;
+    private int _replicaCount;
+    private int _maxPartitionsPerParticipant;
+
+    /**
+     * Instantiate with a resource
+     * @param resourceId resource id
+     */
+    public AbstractBuilder(ResourceId resourceId) {
+      super(resourceId);
+      _resourceId = resourceId;
+      _partitionMap = Maps.newHashMap();
+      _rebalanceMode = RebalanceMode.USER_DEFINED;
+      _anyLiveParticipant = false;
+      _replicaCount = 1;
+      _maxPartitionsPerParticipant = Integer.MAX_VALUE;
+    }
+
+    /**
+     * Set the rebalance mode for a partitioned rebalancer config
+     * @param rebalanceMode {@link RebalanceMode} enum value
+     * @return Builder
+     */
+    public T rebalanceMode(RebalanceMode rebalanceMode) {
+      _rebalanceMode = rebalanceMode;
+      return self();
+    }
+
+    /**
+     * Add a partition that the resource serves
+     * @param partition fully-qualified partition
+     * @return Builder
+     */
+    public T addPartition(Partition partition) {
+      _partitionMap.put(partition.getId(), partition);
+      return self();
+    }
+
+    /**
+     * Add a collection of partitions
+     * @param partitions any collection of Partition objects
+     * @return Builder
+     */
+    public T addPartitions(Collection<Partition> partitions) {
+      for (Partition partition : partitions) {
+        addPartition(partition);
+      }
+      return self();
+    }
+
+    /**
+     * Add a specified number of partitions with a default naming scheme, namely
+     * resourceId_partitionNumber where partitionNumber starts at 0
+     * @param partitionCount number of partitions to add
+     * @return Builder
+     */
+    public T addPartitions(int partitionCount) {
+      for (int i = 0; i < partitionCount; i++) {
+        addPartition(new Partition(PartitionId.from(_resourceId, Integer.toString(i))));
+      }
+      return self();
+    }
+
+    /**
+     * Set whether any live participant should be used in rebalancing
+     * @param anyLiveParticipant true if any live participant can be used, false otherwise
+     * @return Builder
+     */
+    public T anyLiveParticipant(boolean anyLiveParticipant) {
+      _anyLiveParticipant = anyLiveParticipant;
+      return self();
+    }
+
+    /**
+     * Set the number of replicas
+     * @param replicaCount number of replicas
+     * @return Builder
+     */
+    public T replicaCount(int replicaCount) {
+      _replicaCount = replicaCount;
+      return self();
+    }
+
+    /**
+     * Set the maximum number of partitions to assign to any participant
+     * @param maxPartitionsPerParticipant the maximum
+     * @return Builder
+     */
+    public T maxPartitionsPerParticipant(int maxPartitionsPerParticipant) {
+      _maxPartitionsPerParticipant = maxPartitionsPerParticipant;
+      return self();
+    }
+
+    /**
+     * Update a PartitionedRebalancerConfig with fields from this builder level
+     * @param config PartitionedRebalancerConfig
+     */
+    protected final void update(PartitionedRebalancerConfig config) {
+      super.update(config);
+      // enforce at least one partition
+      if (_partitionMap.isEmpty()) {
+        addPartitions(1);
+      }
+      config.setRebalanceMode(_rebalanceMode);
+      config.setPartitionMap(_partitionMap);
+      config.setAnyLiveParticipant(_anyLiveParticipant);
+      config.setMaxPartitionsPerParticipant(_maxPartitionsPerParticipant);
+      config.setReplicaCount(_replicaCount);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfig.java
new file mode 100644
index 0000000..3f8c9d1
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfig.java
@@ -0,0 +1,95 @@
+package org.apache.helix.controller.rebalancer.config;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.api.id.StateModelFactoryId;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
+import org.apache.helix.controller.serializer.StringSerializer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Defines the state available to a rebalancer. The most common use case is to use a
+ * {@link PartitionedRebalancerConfig} or a subclass and set up a resource with it. A rebalancer
+ * configuration, at a minimum, is aware of subunits of a resource, the state model to follow, and
+ * how the configuration should be serialized.
+ */
+public interface RebalancerConfig {
+  /**
+   * Get a map of resource partition identifiers to partitions. A partition is a subunit of a
+   * resource, e.g. a subtask of a task
+   * @return map of (subunit id, subunit) pairs
+   */
+  public Map<? extends PartitionId, ? extends Partition> getSubUnitMap();
+
+  /**
+   * Get the subunits of the resource (e.g. partitions)
+   * @return set of subunit ids
+   */
+  public Set<? extends PartitionId> getSubUnitIdSet();
+
+  /**
+   * Get a specific subunit
+   * @param subUnitId the id of the subunit
+   * @return SubUnit
+   */
+  public Partition getSubUnit(PartitionId subUnitId);
+
+  /**
+   * Get the resource to rebalance
+   * @return resource id
+   */
+  public ResourceId getResourceId();
+
+  /**
+   * Get the state model definition that the resource follows
+   * @return state model definition id
+   */
+  public StateModelDefId getStateModelDefId();
+
+  /**
+   * Get the state model factory of this resource
+   * @return state model factory id
+   */
+  public StateModelFactoryId getStateModelFactoryId();
+
+  /**
+   * Get the tag, if any, that participants must have in order to serve this resource
+   * @return participant group tag, or null
+   */
+  public String getParticipantGroupTag();
+
+  /**
+   * Get the serializer for this config
+   * @return StringSerializer class object
+   */
+  public Class<? extends StringSerializer> getSerializerClass();
+
+  /**
+   * Get a reference to the class used to rebalance this resource
+   * @return RebalancerRef
+   */
+  public RebalancerRef getRebalancerRef();
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java
new file mode 100644
index 0000000..8581732
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java
@@ -0,0 +1,185 @@
+package org.apache.helix.controller.rebalancer.config;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.config.NamespacedConfig;
+import org.apache.helix.controller.rebalancer.HelixRebalancer;
+import org.apache.helix.controller.serializer.StringSerializer;
+import org.apache.helix.model.ResourceConfiguration;
+import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Configuration for a resource rebalancer. This contains a RebalancerConfig, which contains
+ * information specific to each rebalancer.
+ */
+public final class RebalancerConfigHolder {
+  private enum Fields {
+    SERIALIZER_CLASS,
+    REBALANCER_CONFIG,
+    REBALANCER_CONFIG_CLASS
+  }
+
+  private static final Logger LOG = Logger.getLogger(RebalancerConfigHolder.class);
+  private StringSerializer _serializer;
+  private HelixRebalancer _rebalancer;
+  private final RebalancerConfig _config;
+  private final NamespacedConfig _backingConfig;
+
+  /**
+   * Instantiate a RebalancerConfig
+   * @param config rebalancer config
+   * @param rebalancerRef reference to the rebalancer class that will be used
+   */
+  public RebalancerConfigHolder(RebalancerConfig config) {
+    _backingConfig =
+        new NamespacedConfig(Scope.resource(config.getResourceId()),
+            RebalancerConfigHolder.class.getSimpleName());
+    _backingConfig.setSimpleField(Fields.SERIALIZER_CLASS.toString(), config.getSerializerClass()
+        .getName());
+    _backingConfig.setSimpleField(Fields.REBALANCER_CONFIG_CLASS.toString(), config.getClass()
+        .getName());
+    _config = config;
+    try {
+      _serializer = config.getSerializerClass().newInstance();
+      _backingConfig.setSimpleField(Fields.REBALANCER_CONFIG.toString(),
+          _serializer.serialize(config));
+    } catch (InstantiationException e) {
+      LOG.error("Error initializing the configuration", e);
+    } catch (IllegalAccessException e) {
+      LOG.error("Error initializing the configuration", e);
+    }
+  }
+
+  /**
+   * Instantiate from a physical ResourceConfiguration
+   * @param resourceConfiguration populated ResourceConfiguration
+   */
+  public RebalancerConfigHolder(ResourceConfiguration resourceConfiguration) {
+    _backingConfig =
+        new NamespacedConfig(resourceConfiguration, RebalancerConfigHolder.class.getSimpleName());
+    _serializer = getSerializer();
+    _config = getConfig();
+  }
+
+  /**
+   * Get the class that can serialize and deserialize the rebalancer config
+   * @return StringSerializer
+   */
+  private StringSerializer getSerializer() {
+    String serializerClassName = _backingConfig.getSimpleField(Fields.SERIALIZER_CLASS.toString());
+    if (serializerClassName != null) {
+      try {
+        return (StringSerializer) HelixUtil.loadClass(getClass(), serializerClassName)
+            .newInstance();
+      } catch (InstantiationException e) {
+        LOG.error("Error getting the serializer", e);
+      } catch (IllegalAccessException e) {
+        LOG.error("Error getting the serializer", e);
+      } catch (ClassNotFoundException e) {
+        LOG.error("Error getting the serializer", e);
+      }
+    }
+    return null;
+  }
+
+  private RebalancerConfig getConfig() {
+    String className = _backingConfig.getSimpleField(Fields.REBALANCER_CONFIG_CLASS.toString());
+    if (className != null) {
+      try {
+        Class<? extends RebalancerConfig> configClass =
+            HelixUtil.loadClass(getClass(), className).asSubclass(RebalancerConfig.class);
+        String serialized = _backingConfig.getSimpleField(Fields.REBALANCER_CONFIG.toString());
+        return _serializer.deserialize(configClass, serialized);
+      } catch (ClassNotFoundException e) {
+        LOG.error(className + " is not a valid class");
+      } catch (ClassCastException e) {
+        LOG.error("Could not convert the persisted data into a " + className, e);
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Get a rebalancer class instance
+   * @return Rebalancer
+   */
+  public HelixRebalancer getRebalancer() {
+    // cache the rebalancer to avoid loading and instantiating it excessively
+    if (_rebalancer == null) {
+      if (_config == null || _config.getRebalancerRef() == null) {
+        return null;
+      }
+      _rebalancer = _config.getRebalancerRef().getRebalancer();
+    }
+    return _rebalancer;
+  }
+
+  /**
+   * Get the instantiated RebalancerConfig
+   * @param configClass specific class of the RebalancerConfig
+   * @return RebalancerConfig subclass instance, or null if conversion is not possible
+   */
+  public <T extends RebalancerConfig> T getRebalancerConfig(Class<T> configClass) {
+    if (_config != null) {
+      try {
+        return configClass.cast(_config);
+      } catch (ClassCastException e) {
+        LOG.warn(configClass + " is incompatible with config class: " + _config.getClass());
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Get the rebalancer config serialized as a string
+   * @return string representing the config
+   */
+  public String getSerializedConfig() {
+    return _backingConfig.getSimpleField(Fields.REBALANCER_CONFIG.toString());
+  }
+
+  /**
+   * Convert this to a namespaced config
+   * @return NamespacedConfig
+   */
+  public NamespacedConfig toNamespacedConfig() {
+    return _backingConfig;
+  }
+
+  /**
+   * Get a RebalancerConfig from a physical resource config
+   * @param resourceConfiguration physical resource config
+   * @return RebalancerConfig
+   */
+  public static RebalancerConfigHolder from(ResourceConfiguration resourceConfiguration) {
+    return new RebalancerConfigHolder(resourceConfiguration);
+  }
+
+  /**
+   * Get a RebalancerConfigHolder from a RebalancerConfig
+   * @param config instantiated RebalancerConfig
+   * @return RebalancerConfigHolder
+   */
+  public static RebalancerConfigHolder from(RebalancerConfig config) {
+    return new RebalancerConfigHolder(config);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/ReplicatedRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/ReplicatedRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/ReplicatedRebalancerConfig.java
new file mode 100644
index 0000000..3118b2a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/ReplicatedRebalancerConfig.java
@@ -0,0 +1,40 @@
+package org.apache.helix.controller.rebalancer.config;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Methods specifying a rebalancer config that allows replicas. For instance, a rebalancer config
+ * with partitions may accept state model definitions that support multiple replicas per partition,
+ * and it's possible that the policy is that each live participant in the system should have a
+ * replica.
+ */
+public interface ReplicatedRebalancerConfig extends RebalancerConfig {
+  /**
+   * Check if this resource should be assigned to any live participant
+   * @return true if any live participant expected, false otherwise
+   */
+  public boolean anyLiveParticipant();
+
+  /**
+   * Get the number of replicas that each resource subunit should have
+   * @return replica count
+   */
+  public int getReplicaCount();
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
new file mode 100644
index 0000000..bfc3309
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
@@ -0,0 +1,178 @@
+package org.apache.helix.controller.rebalancer.config;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
+import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
+import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.StateModelDefinition;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+import com.google.common.collect.Maps;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * RebalancerConfig for SEMI_AUTO rebalancer mode. It indicates the preferred locations of each
+ * partition replica. By default, it corresponds to {@link SemiAutoRebalancer}
+ */
+public final class SemiAutoRebalancerConfig extends PartitionedRebalancerConfig {
+  @JsonProperty("preferenceLists")
+  private Map<PartitionId, List<ParticipantId>> _preferenceLists;
+
+  /**
+   * Instantiate a SemiAutoRebalancerConfig
+   */
+  public SemiAutoRebalancerConfig() {
+    setRebalanceMode(RebalanceMode.SEMI_AUTO);
+    setRebalancerRef(RebalancerRef.from(SemiAutoRebalancer.class));
+    _preferenceLists = Maps.newHashMap();
+  }
+
+  /**
+   * Get the preference lists of all partitions of the resource
+   * @return map of partition id to list of participant ids
+   */
+  public Map<PartitionId, List<ParticipantId>> getPreferenceLists() {
+    return _preferenceLists;
+  }
+
+  /**
+   * Set the preference lists of all partitions of the resource
+   * @param preferenceLists
+   */
+  public void setPreferenceLists(Map<PartitionId, List<ParticipantId>> preferenceLists) {
+    _preferenceLists = preferenceLists;
+  }
+
+  /**
+   * Get the preference list of a partition
+   * @param partitionId the partition to look up
+   * @return list of participant ids
+   */
+  @JsonIgnore
+  public List<ParticipantId> getPreferenceList(PartitionId partitionId) {
+    return _preferenceLists.get(partitionId);
+  }
+
+  /**
+   * Generate preference lists based on a default cluster setup
+   * @param stateModelDef the state model definition to follow
+   * @param participantSet the set of participant ids to configure for
+   */
+  @Override
+  @JsonIgnore
+  public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
+      Set<ParticipantId> participantSet) {
+    // compute default upper bounds
+    Map<State, String> upperBounds = Maps.newHashMap();
+    for (State state : stateModelDef.getTypedStatesPriorityList()) {
+      upperBounds.put(state, stateModelDef.getNumParticipantsPerState(state));
+    }
+
+    // determine the current mapping
+    Map<PartitionId, Map<ParticipantId, State>> currentMapping = Maps.newHashMap();
+    for (PartitionId partitionId : getPartitionSet()) {
+      List<ParticipantId> preferenceList = getPreferenceList(partitionId);
+      if (preferenceList != null && !preferenceList.isEmpty()) {
+        Set<ParticipantId> disabledParticipants = Collections.emptySet();
+        Map<ParticipantId, State> emptyCurrentState = Collections.emptyMap();
+        Map<ParticipantId, State> initialMap =
+            ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds, participantSet,
+                stateModelDef, preferenceList, emptyCurrentState, disabledParticipants);
+        currentMapping.put(partitionId, initialMap);
+      }
+    }
+
+    // determine the preference
+    LinkedHashMap<State, Integer> stateCounts =
+        ConstraintBasedAssignment.stateCount(upperBounds, stateModelDef, participantSet.size(),
+            getReplicaCount());
+    ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
+    List<ParticipantId> participantList = new ArrayList<ParticipantId>(participantSet);
+    List<PartitionId> partitionList = new ArrayList<PartitionId>(getPartitionSet());
+    AutoRebalanceStrategy strategy =
+        new AutoRebalanceStrategy(ResourceId.from(""), partitionList, stateCounts,
+            getMaxPartitionsPerParticipant(), placementScheme);
+    Map<String, List<String>> rawPreferenceLists =
+        strategy.typedComputePartitionAssignment(participantList, currentMapping, participantList)
+            .getListFields();
+    Map<PartitionId, List<ParticipantId>> preferenceLists =
+        Maps.newHashMap(IdealState.preferenceListsFromStringLists(rawPreferenceLists));
+    setPreferenceLists(preferenceLists);
+  }
+
+  /**
+   * Build a SemiAutoRebalancerConfig. By default, it corresponds to {@link SemiAutoRebalancer}
+   */
+  public static final class Builder extends PartitionedRebalancerConfig.AbstractBuilder<Builder> {
+    private final Map<PartitionId, List<ParticipantId>> _preferenceLists;
+
+    /**
+     * Instantiate for a resource
+     * @param resourceId resource id
+     */
+    public Builder(ResourceId resourceId) {
+      super(resourceId);
+      super.rebalancerRef(RebalancerRef.from(SemiAutoRebalancer.class));
+      super.rebalanceMode(RebalanceMode.SEMI_AUTO);
+      _preferenceLists = Maps.newHashMap();
+    }
+
+    /**
+     * Add a preference list for a partition
+     * @param partitionId partition to set
+     * @param preferenceList ordered list of participants who can serve the partition
+     * @return Builder
+     */
+    public Builder preferenceList(PartitionId partitionId, List<ParticipantId> preferenceList) {
+      _preferenceLists.put(partitionId, preferenceList);
+      return self();
+    }
+
+    @Override
+    protected Builder self() {
+      return this;
+    }
+
+    @Override
+    public SemiAutoRebalancerConfig build() {
+      SemiAutoRebalancerConfig config = new SemiAutoRebalancerConfig();
+      super.update(config);
+      config.setPreferenceLists(_preferenceLists);
+      return config;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/BasicRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/BasicRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/BasicRebalancerContext.java
deleted file mode 100644
index ec765d7..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/BasicRebalancerContext.java
+++ /dev/null
@@ -1,240 +0,0 @@
-package org.apache.helix.controller.rebalancer.context;
-
-import java.util.Set;
-
-import org.apache.helix.api.Partition;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.id.StateModelFactoryId;
-import org.apache.helix.controller.rebalancer.RebalancerRef;
-import org.codehaus.jackson.annotate.JsonIgnore;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Abstract RebalancerContext that functions for generic subunits. Use a subclass that more
- * concretely defines the subunits.
- */
-public abstract class BasicRebalancerContext implements RebalancerContext {
-  private ResourceId _resourceId;
-  private StateModelDefId _stateModelDefId;
-  private StateModelFactoryId _stateModelFactoryId;
-  private String _participantGroupTag;
-  private Class<? extends ContextSerializer> _serializer;
-  private RebalancerRef _rebalancerRef;
-
-  /**
-   * Instantiate a basic rebalancer context
-   */
-  public BasicRebalancerContext() {
-    _serializer = DefaultContextSerializer.class;
-  }
-
-  @Override
-  public ResourceId getResourceId() {
-    return _resourceId;
-  }
-
-  /**
-   * Set the resource to rebalance
-   * @param resourceId resource id
-   */
-  public void setResourceId(ResourceId resourceId) {
-    _resourceId = resourceId;
-  }
-
-  @Override
-  public StateModelDefId getStateModelDefId() {
-    return _stateModelDefId;
-  }
-
-  /**
-   * Set the state model definition that the resource follows
-   * @param stateModelDefId state model definition id
-   */
-  public void setStateModelDefId(StateModelDefId stateModelDefId) {
-    _stateModelDefId = stateModelDefId;
-  }
-
-  @Override
-  public StateModelFactoryId getStateModelFactoryId() {
-    return _stateModelFactoryId;
-  }
-
-  /**
-   * Set the state model factory that the resource uses
-   * @param stateModelFactoryId state model factory id
-   */
-  public void setStateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
-    _stateModelFactoryId = stateModelFactoryId;
-  }
-
-  @Override
-  public String getParticipantGroupTag() {
-    return _participantGroupTag;
-  }
-
-  /**
-   * Set a tag that participants must have in order to serve this resource
-   * @param participantGroupTag string group tag
-   */
-  public void setParticipantGroupTag(String participantGroupTag) {
-    _participantGroupTag = participantGroupTag;
-  }
-
-  /**
-   * Get the serializer. If none is provided, {@link DefaultContextSerializer} is used
-   */
-  @Override
-  public Class<? extends ContextSerializer> getSerializerClass() {
-    return _serializer;
-  }
-
-  /**
-   * Set the class that can serialize this context
-   * @param serializer serializer class that implements ContextSerializer
-   */
-  public void setSerializerClass(Class<? extends ContextSerializer> serializer) {
-    _serializer = serializer;
-  }
-
-  @Override
-  @JsonIgnore
-  public Set<? extends PartitionId> getSubUnitIdSet() {
-    return getSubUnitMap().keySet();
-  }
-
-  @Override
-  @JsonIgnore
-  public Partition getSubUnit(PartitionId subUnitId) {
-    return getSubUnitMap().get(subUnitId);
-  }
-
-  @Override
-  public RebalancerRef getRebalancerRef() {
-    return _rebalancerRef;
-  }
-
-  /**
-   * Set the reference to the class used to rebalance this resource
-   * @param rebalancerRef RebalancerRef instance
-   */
-  public void setRebalancerRef(RebalancerRef rebalancerRef) {
-    _rebalancerRef = rebalancerRef;
-  }
-
-  /**
-   * Abstract builder for the base rebalancer context
-   */
-  public static abstract class AbstractBuilder<T extends AbstractBuilder<T>> {
-    private final ResourceId _resourceId;
-    private StateModelDefId _stateModelDefId;
-    private StateModelFactoryId _stateModelFactoryId;
-    private String _participantGroupTag;
-    private Class<? extends ContextSerializer> _serializerClass;
-    private RebalancerRef _rebalancerRef;
-
-    /**
-     * Instantiate with a resource id
-     * @param resourceId resource id
-     */
-    public AbstractBuilder(ResourceId resourceId) {
-      _resourceId = resourceId;
-      _serializerClass = DefaultContextSerializer.class;
-    }
-
-    /**
-     * Set the state model definition that the resource should follow
-     * @param stateModelDefId state model definition id
-     * @return Builder
-     */
-    public T stateModelDefId(StateModelDefId stateModelDefId) {
-      _stateModelDefId = stateModelDefId;
-      return self();
-    }
-
-    /**
-     * Set the state model factory that the resource should use
-     * @param stateModelFactoryId state model factory id
-     * @return Builder
-     */
-    public T stateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
-      _stateModelFactoryId = stateModelFactoryId;
-      return self();
-    }
-
-    /**
-     * Set the tag that all participants require in order to serve this resource
-     * @param participantGroupTag the tag
-     * @return Builder
-     */
-    public T participantGroupTag(String participantGroupTag) {
-      _participantGroupTag = participantGroupTag;
-      return self();
-    }
-
-    /**
-     * Set the serializer class for this rebalancer context
-     * @param serializerClass class that implements ContextSerializer
-     * @return Builder
-     */
-    public T serializerClass(Class<? extends ContextSerializer> serializerClass) {
-      _serializerClass = serializerClass;
-      return self();
-    }
-
-    /**
-     * Specify a custom class to use for rebalancing
-     * @param rebalancerRef RebalancerRef instance
-     * @return Builder
-     */
-    public T rebalancerRef(RebalancerRef rebalancerRef) {
-      _rebalancerRef = rebalancerRef;
-      return self();
-    }
-
-    /**
-     * Update an existing context with base fields
-     * @param context derived context
-     */
-    protected final void update(BasicRebalancerContext context) {
-      context.setResourceId(_resourceId);
-      context.setStateModelDefId(_stateModelDefId);
-      context.setStateModelFactoryId(_stateModelFactoryId);
-      context.setParticipantGroupTag(_participantGroupTag);
-      context.setSerializerClass(_serializerClass);
-      context.setRebalancerRef(_rebalancerRef);
-    }
-
-    /**
-     * Get a typed reference to "this" class. Final derived classes should simply return the this
-     * reference.
-     * @return this for the most specific type
-     */
-    protected abstract T self();
-
-    /**
-     * Get the rebalancer context from the built fields
-     * @return RebalancerContext
-     */
-    public abstract RebalancerContext build();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ContextSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ContextSerializer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ContextSerializer.java
deleted file mode 100644
index ef12a09..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ContextSerializer.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package org.apache.helix.controller.rebalancer.context;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-public interface ContextSerializer {
-  /**
-   * Convert a RebalancerContext object instance to a String
-   * @param data instance of the rebalancer context type
-   * @return String representing the object
-   */
-  public <T> String serialize(final T data);
-
-  /**
-   * Convert raw bytes to a generic object instance
-   * @param clazz The class represented by the deserialized string
-   * @param string String representing the object
-   * @return instance of the generic type or null if the conversion failed
-   */
-  public <T> T deserialize(final Class<T> clazz, final String string);
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java
deleted file mode 100644
index 0d2c1f2..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java
+++ /dev/null
@@ -1,164 +0,0 @@
-package org.apache.helix.controller.rebalancer.context;
-
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.api.State;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.CustomRebalancer;
-import org.apache.helix.controller.rebalancer.RebalancerRef;
-import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
-import org.codehaus.jackson.annotate.JsonIgnore;
-
-import com.google.common.collect.Maps;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * RebalancerContext for a resource that should be rebalanced in CUSTOMIZED mode. By default, it
- * corresponds to {@link CustomRebalancer}
- */
-public class CustomRebalancerContext extends PartitionedRebalancerContext {
-  private Map<PartitionId, Map<ParticipantId, State>> _preferenceMaps;
-
-  /**
-   * Instantiate a CustomRebalancerContext
-   */
-  public CustomRebalancerContext() {
-    setRebalanceMode(RebalanceMode.CUSTOMIZED);
-    setRebalancerRef(RebalancerRef.from(CustomRebalancer.class));
-    _preferenceMaps = Maps.newHashMap();
-  }
-
-  /**
-   * Get the preference maps of the partitions and replicas of the resource
-   * @return map of partition to participant and state
-   */
-  public Map<PartitionId, Map<ParticipantId, State>> getPreferenceMaps() {
-    return _preferenceMaps;
-  }
-
-  /**
-   * Set the preference maps of the partitions and replicas of the resource
-   * @param preferenceMaps map of partition to participant and state
-   */
-  public void setPreferenceMaps(Map<PartitionId, Map<ParticipantId, State>> preferenceMaps) {
-    _preferenceMaps = preferenceMaps;
-  }
-
-  /**
-   * Get the preference map of a partition
-   * @param partitionId the partition to look up
-   * @return map of participant to state
-   */
-  @JsonIgnore
-  public Map<ParticipantId, State> getPreferenceMap(PartitionId partitionId) {
-    return _preferenceMaps.get(partitionId);
-  }
-
-  /**
-   * Generate preference maps based on a default cluster setup
-   * @param stateModelDef the state model definition to follow
-   * @param participantSet the set of participant ids to configure for
-   */
-  @Override
-  @JsonIgnore
-  public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
-      Set<ParticipantId> participantSet) {
-    // compute default upper bounds
-    Map<State, String> upperBounds = Maps.newHashMap();
-    for (State state : stateModelDef.getTypedStatesPriorityList()) {
-      upperBounds.put(state, stateModelDef.getNumParticipantsPerState(state));
-    }
-
-    // determine the current mapping
-    Map<PartitionId, Map<ParticipantId, State>> currentMapping = getPreferenceMaps();
-
-    // determine the preference maps
-    LinkedHashMap<State, Integer> stateCounts =
-        ConstraintBasedAssignment.stateCount(upperBounds, stateModelDef, participantSet.size(),
-            getReplicaCount());
-    ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
-    List<ParticipantId> participantList = new ArrayList<ParticipantId>(participantSet);
-    List<PartitionId> partitionList = new ArrayList<PartitionId>(getPartitionSet());
-    AutoRebalanceStrategy strategy =
-        new AutoRebalanceStrategy(ResourceId.from(""), partitionList, stateCounts,
-            getMaxPartitionsPerParticipant(), placementScheme);
-    Map<String, Map<String, String>> rawPreferenceMaps =
-        strategy.typedComputePartitionAssignment(participantList, currentMapping, participantList)
-            .getMapFields();
-    Map<PartitionId, Map<ParticipantId, State>> preferenceMaps =
-        Maps.newHashMap(ResourceAssignment.replicaMapsFromStringMaps(rawPreferenceMaps));
-    setPreferenceMaps(preferenceMaps);
-  }
-
-  /**
-   * Build a CustomRebalancerContext. By default, it corresponds to {@link CustomRebalancer}
-   */
-  public static final class Builder extends PartitionedRebalancerContext.AbstractBuilder<Builder> {
-    private final Map<PartitionId, Map<ParticipantId, State>> _preferenceMaps;
-
-    /**
-     * Instantiate for a resource
-     * @param resourceId resource id
-     */
-    public Builder(ResourceId resourceId) {
-      super(resourceId);
-      super.rebalancerRef(RebalancerRef.from(CustomRebalancer.class));
-      super.rebalanceMode(RebalanceMode.CUSTOMIZED);
-      _preferenceMaps = Maps.newHashMap();
-    }
-
-    /**
-     * Add a preference map for a partition
-     * @param partitionId partition to set
-     * @param preferenceList map of participant id to state indicating where replicas are served
-     * @return Builder
-     */
-    public Builder preferenceMap(PartitionId partitionId, Map<ParticipantId, State> preferenceMap) {
-      _preferenceMaps.put(partitionId, preferenceMap);
-      return self();
-    }
-
-    @Override
-    protected Builder self() {
-      return this;
-    }
-
-    @Override
-    public CustomRebalancerContext build() {
-      CustomRebalancerContext context = new CustomRebalancerContext();
-      super.update(context);
-      context.setPreferenceMaps(_preferenceMaps);
-      return context;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/DefaultContextSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/DefaultContextSerializer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/DefaultContextSerializer.java
deleted file mode 100644
index ecc93fb..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/DefaultContextSerializer.java
+++ /dev/null
@@ -1,83 +0,0 @@
-package org.apache.helix.controller.rebalancer.context;
-
-import java.io.ByteArrayInputStream;
-import java.io.StringWriter;
-
-import org.apache.helix.HelixException;
-import org.apache.log4j.Logger;
-import org.codehaus.jackson.map.DeserializationConfig;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.SerializationConfig;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Default serializer implementation for RebalancerContexts. Uses the Jackson JSON library to
- * convert to and from strings
- */
-public class DefaultContextSerializer implements ContextSerializer {
-
-  private static Logger logger = Logger.getLogger(DefaultContextSerializer.class);
-
-  @Override
-  public <T> String serialize(final T data) {
-    if (data == null) {
-      return null;
-    }
-
-    ObjectMapper mapper = new ObjectMapper();
-    SerializationConfig serializationConfig = mapper.getSerializationConfig();
-    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
-    serializationConfig.set(SerializationConfig.Feature.AUTO_DETECT_FIELDS, true);
-    serializationConfig.set(SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
-    StringWriter sw = new StringWriter();
-    try {
-      mapper.writeValue(sw, data);
-    } catch (Exception e) {
-      logger.error("Exception during payload data serialization.", e);
-      throw new HelixException(e);
-    }
-    return sw.toString();
-  }
-
-  @Override
-  public <T> T deserialize(final Class<T> clazz, final String string) {
-    if (string == null || string.length() == 0) {
-      return null;
-    }
-
-    ObjectMapper mapper = new ObjectMapper();
-    ByteArrayInputStream bais = new ByteArrayInputStream(string.getBytes());
-
-    DeserializationConfig deserializationConfig = mapper.getDeserializationConfig();
-    deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_FIELDS, true);
-    deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_SETTERS, true);
-    deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_CREATORS, true);
-    deserializationConfig.set(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, true);
-    deserializationConfig.set(DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
-    try {
-      T payload = mapper.readValue(bais, clazz);
-      return payload;
-    } catch (Exception e) {
-      logger.error("Exception during deserialization of payload bytes: " + string, e);
-      return null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancerContext.java
deleted file mode 100644
index 2400707..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancerContext.java
+++ /dev/null
@@ -1,64 +0,0 @@
-package org.apache.helix.controller.rebalancer.context;
-
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.FullAutoRebalancer;
-import org.apache.helix.controller.rebalancer.RebalancerRef;
-import org.apache.helix.model.IdealState.RebalanceMode;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * RebalancerContext for FULL_AUTO rebalancing mode. By default, it corresponds to
- * {@link FullAutoRebalancer}
- */
-public class FullAutoRebalancerContext extends PartitionedRebalancerContext {
-  public FullAutoRebalancerContext() {
-    setRebalanceMode(RebalanceMode.FULL_AUTO);
-    setRebalancerRef(RebalancerRef.from(FullAutoRebalancer.class));
-  }
-
-  /**
-   * Builder for a full auto rebalancer context. By default, it corresponds to
-   * {@link FullAutoRebalancer}
-   */
-  public static final class Builder extends PartitionedRebalancerContext.AbstractBuilder<Builder> {
-    /**
-     * Instantiate with a resource
-     * @param resourceId resource id
-     */
-    public Builder(ResourceId resourceId) {
-      super(resourceId);
-      super.rebalancerRef(RebalancerRef.from(FullAutoRebalancer.class));
-      super.rebalanceMode(RebalanceMode.FULL_AUTO);
-    }
-
-    @Override
-    protected Builder self() {
-      return this;
-    }
-
-    @Override
-    public FullAutoRebalancerContext build() {
-      FullAutoRebalancerContext context = new FullAutoRebalancerContext();
-      super.update(context);
-      return context;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
deleted file mode 100644
index 15fcf9c..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
+++ /dev/null
@@ -1,393 +0,0 @@
-package org.apache.helix.controller.rebalancer.context;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.HelixConstants.StateModelToken;
-import org.apache.helix.api.Partition;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.RebalancerRef;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.StateModelDefinition;
-import org.codehaus.jackson.annotate.JsonIgnore;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-
-import com.google.common.collect.Maps;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * RebalancerContext for a resource whose subunits are partitions. In addition, these partitions can
- * be replicated.
- */
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class PartitionedRebalancerContext extends BasicRebalancerContext implements
-    ReplicatedRebalancerContext {
-  private Map<PartitionId, Partition> _partitionMap;
-  private boolean _anyLiveParticipant;
-  private int _replicaCount;
-  private int _maxPartitionsPerParticipant;
-  private RebalanceMode _rebalanceMode;
-
-  /**
-   * Instantiate a DataRebalancerContext
-   */
-  public PartitionedRebalancerContext() {
-    _partitionMap = Collections.emptyMap();
-    _replicaCount = 1;
-    _anyLiveParticipant = false;
-    _maxPartitionsPerParticipant = Integer.MAX_VALUE;
-    _rebalanceMode = RebalanceMode.USER_DEFINED;
-  }
-
-  /**
-   * Get a map from partition id to partition
-   * @return partition map (mutable)
-   */
-  public Map<PartitionId, Partition> getPartitionMap() {
-    return _partitionMap;
-  }
-
-  /**
-   * Set a map of partition id to partition
-   * @param partitionMap partition map
-   */
-  public void setPartitionMap(Map<PartitionId, Partition> partitionMap) {
-    _partitionMap = Maps.newHashMap(partitionMap);
-  }
-
-  /**
-   * Get the set of partitions for this resource
-   * @return set of partition ids
-   */
-  @JsonIgnore
-  public Set<PartitionId> getPartitionSet() {
-    return _partitionMap.keySet();
-  }
-
-  /**
-   * Get a partition
-   * @param partitionId id of the partition to get
-   * @return Partition object, or null if not present
-   */
-  @JsonIgnore
-  public Partition getPartition(PartitionId partitionId) {
-    return _partitionMap.get(partitionId);
-  }
-
-  @Override
-  public boolean anyLiveParticipant() {
-    return _anyLiveParticipant;
-  }
-
-  /**
-   * Indicate if this resource should be assigned to any live participant
-   * @param anyLiveParticipant true if any live participant expected, false otherwise
-   */
-  public void setAnyLiveParticipant(boolean anyLiveParticipant) {
-    _anyLiveParticipant = anyLiveParticipant;
-  }
-
-  @Override
-  public int getReplicaCount() {
-    return _replicaCount;
-  }
-
-  /**
-   * Set the number of replicas that each partition should have
-   * @param replicaCount
-   */
-  public void setReplicaCount(int replicaCount) {
-    _replicaCount = replicaCount;
-  }
-
-  /**
-   * Get the maximum number of partitions that a participant can serve
-   * @return maximum number of partitions per participant
-   */
-  public int getMaxPartitionsPerParticipant() {
-    return _maxPartitionsPerParticipant;
-  }
-
-  /**
-   * Set the maximum number of partitions that a participant can serve
-   * @param maxPartitionsPerParticipant maximum number of partitions per participant
-   */
-  public void setMaxPartitionsPerParticipant(int maxPartitionsPerParticipant) {
-    _maxPartitionsPerParticipant = maxPartitionsPerParticipant;
-  }
-
-  /**
-   * Set the rebalancer mode of the partitioned resource
-   * @param rebalanceMode {@link RebalanceMode} enum value
-   */
-  public void setRebalanceMode(RebalanceMode rebalanceMode) {
-    _rebalanceMode = rebalanceMode;
-  }
-
-  /**
-   * Get the rebalancer mode of the resource
-   * @return RebalanceMode
-   */
-  public RebalanceMode getRebalanceMode() {
-    return _rebalanceMode;
-  }
-
-  @Override
-  @JsonIgnore
-  public Map<PartitionId, Partition> getSubUnitMap() {
-    return getPartitionMap();
-  }
-
-  /**
-   * Generate a default configuration given the state model and a participant.
-   * @param stateModelDef the state model definition to follow
-   * @param participantSet the set of participant ids to configure for
-   */
-  @JsonIgnore
-  public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
-      Set<ParticipantId> participantSet) {
-    // the base context does not understand enough to know do to anything
-  }
-
-  /**
-   * Convert a physically-stored IdealState into a rebalancer context for a partitioned resource
-   * @param idealState populated IdealState
-   * @return PartitionedRebalancerContext
-   */
-  public static PartitionedRebalancerContext from(IdealState idealState) {
-    PartitionedRebalancerContext context;
-    switch (idealState.getRebalanceMode()) {
-    case FULL_AUTO:
-      FullAutoRebalancerContext.Builder fullAutoBuilder =
-          new FullAutoRebalancerContext.Builder(idealState.getResourceId());
-      populateContext(fullAutoBuilder, idealState);
-      context = fullAutoBuilder.build();
-      break;
-    case SEMI_AUTO:
-      SemiAutoRebalancerContext.Builder semiAutoBuilder =
-          new SemiAutoRebalancerContext.Builder(idealState.getResourceId());
-      for (PartitionId partitionId : idealState.getPartitionIdSet()) {
-        semiAutoBuilder.preferenceList(partitionId, idealState.getPreferenceList(partitionId));
-      }
-      populateContext(semiAutoBuilder, idealState);
-      context = semiAutoBuilder.build();
-      break;
-    case CUSTOMIZED:
-      CustomRebalancerContext.Builder customBuilder =
-          new CustomRebalancerContext.Builder(idealState.getResourceId());
-      for (PartitionId partitionId : idealState.getPartitionIdSet()) {
-        customBuilder.preferenceMap(partitionId, idealState.getParticipantStateMap(partitionId));
-      }
-      populateContext(customBuilder, idealState);
-      context = customBuilder.build();
-      break;
-    default:
-      Builder baseBuilder = new Builder(idealState.getResourceId());
-      populateContext(baseBuilder, idealState);
-      context = baseBuilder.build();
-      break;
-    }
-    return context;
-  }
-
-  /**
-   * Update a builder subclass with all the fields of the ideal state
-   * @param builder builder that extends AbstractBuilder
-   * @param idealState populated IdealState
-   */
-  private static <T extends AbstractBuilder<T>> void populateContext(T builder,
-      IdealState idealState) {
-    String replicas = idealState.getReplicas();
-    int replicaCount = 0;
-    boolean anyLiveParticipant = false;
-    if (replicas.equals(StateModelToken.ANY_LIVEINSTANCE.toString())) {
-      anyLiveParticipant = true;
-    } else {
-      replicaCount = Integer.parseInt(replicas);
-    }
-    if (idealState.getNumPartitions() > 0 && idealState.getPartitionIdSet().size() == 0) {
-      // backwards compatibility: partition sets were based on pref lists/maps previously
-      builder.addPartitions(idealState.getNumPartitions());
-    } else {
-      for (PartitionId partitionId : idealState.getPartitionIdSet()) {
-        builder.addPartition(new Partition(partitionId));
-      }
-    }
-    builder.anyLiveParticipant(anyLiveParticipant).replicaCount(replicaCount)
-        .maxPartitionsPerParticipant(idealState.getMaxPartitionsPerInstance())
-        .participantGroupTag(idealState.getInstanceGroupTag())
-        .stateModelDefId(idealState.getStateModelDefId())
-        .stateModelFactoryId(idealState.getStateModelFactoryId());
-    RebalancerRef rebalancerRef = idealState.getRebalancerRef();
-    if (rebalancerRef != null) {
-      builder.rebalancerRef(rebalancerRef);
-    }
-  }
-
-  /**
-   * Builder for a basic data rebalancer context
-   */
-  public static final class Builder extends AbstractBuilder<Builder> {
-    /**
-     * Instantiate with a resource
-     * @param resourceId resource id
-     */
-    public Builder(ResourceId resourceId) {
-      super(resourceId);
-    }
-
-    @Override
-    protected Builder self() {
-      return this;
-    }
-
-    @Override
-    public PartitionedRebalancerContext build() {
-      PartitionedRebalancerContext context = new PartitionedRebalancerContext();
-      super.update(context);
-      return context;
-    }
-  }
-
-  /**
-   * Abstract builder for a generic partitioned resource rebalancer context
-   */
-  public static abstract class AbstractBuilder<T extends BasicRebalancerContext.AbstractBuilder<T>>
-      extends BasicRebalancerContext.AbstractBuilder<T> {
-    private final ResourceId _resourceId;
-    private final Map<PartitionId, Partition> _partitionMap;
-    private RebalanceMode _rebalanceMode;
-    private boolean _anyLiveParticipant;
-    private int _replicaCount;
-    private int _maxPartitionsPerParticipant;
-
-    /**
-     * Instantiate with a resource
-     * @param resourceId resource id
-     */
-    public AbstractBuilder(ResourceId resourceId) {
-      super(resourceId);
-      _resourceId = resourceId;
-      _partitionMap = Maps.newHashMap();
-      _rebalanceMode = RebalanceMode.USER_DEFINED;
-      _anyLiveParticipant = false;
-      _replicaCount = 1;
-      _maxPartitionsPerParticipant = Integer.MAX_VALUE;
-    }
-
-    /**
-     * Set the rebalance mode for a partitioned rebalancer context
-     * @param rebalanceMode {@link RebalanceMode} enum value
-     * @return Builder
-     */
-    public T rebalanceMode(RebalanceMode rebalanceMode) {
-      _rebalanceMode = rebalanceMode;
-      return self();
-    }
-
-    /**
-     * Add a partition that the resource serves
-     * @param partition fully-qualified partition
-     * @return Builder
-     */
-    public T addPartition(Partition partition) {
-      _partitionMap.put(partition.getId(), partition);
-      return self();
-    }
-
-    /**
-     * Add a collection of partitions
-     * @param partitions any collection of Partition objects
-     * @return Builder
-     */
-    public T addPartitions(Collection<Partition> partitions) {
-      for (Partition partition : partitions) {
-        addPartition(partition);
-      }
-      return self();
-    }
-
-    /**
-     * Add a specified number of partitions with a default naming scheme, namely
-     * resourceId_partitionNumber where partitionNumber starts at 0
-     * @param partitionCount number of partitions to add
-     * @return Builder
-     */
-    public T addPartitions(int partitionCount) {
-      for (int i = 0; i < partitionCount; i++) {
-        addPartition(new Partition(PartitionId.from(_resourceId, Integer.toString(i))));
-      }
-      return self();
-    }
-
-    /**
-     * Set whether any live participant should be used in rebalancing
-     * @param anyLiveParticipant true if any live participant can be used, false otherwise
-     * @return Builder
-     */
-    public T anyLiveParticipant(boolean anyLiveParticipant) {
-      _anyLiveParticipant = anyLiveParticipant;
-      return self();
-    }
-
-    /**
-     * Set the number of replicas
-     * @param replicaCount number of replicas
-     * @return Builder
-     */
-    public T replicaCount(int replicaCount) {
-      _replicaCount = replicaCount;
-      return self();
-    }
-
-    /**
-     * Set the maximum number of partitions to assign to any participant
-     * @param maxPartitionsPerParticipant the maximum
-     * @return Builder
-     */
-    public T maxPartitionsPerParticipant(int maxPartitionsPerParticipant) {
-      _maxPartitionsPerParticipant = maxPartitionsPerParticipant;
-      return self();
-    }
-
-    /**
-     * Update a DataRebalancerContext with fields from this builder level
-     * @param context DataRebalancerContext
-     */
-    protected final void update(PartitionedRebalancerContext context) {
-      super.update(context);
-      // enforce at least one partition
-      if (_partitionMap.isEmpty()) {
-        addPartitions(1);
-      }
-      context.setRebalanceMode(_rebalanceMode);
-      context.setPartitionMap(_partitionMap);
-      context.setAnyLiveParticipant(_anyLiveParticipant);
-      context.setMaxPartitionsPerParticipant(_maxPartitionsPerParticipant);
-      context.setReplicaCount(_replicaCount);
-    }
-  }
-}


Mime
View raw message