helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject [2/4] [HELIX-327] Simplify rebalancer, rename rebalancer configs, support settable contexts, rb=15981
Date Wed, 04 Dec 2013 01:21:05 GMT
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
deleted file mode 100644
index aa872c4..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
+++ /dev/null
@@ -1,182 +0,0 @@
-package org.apache.helix.controller.rebalancer.context;
-
-import org.apache.helix.api.Scope;
-import org.apache.helix.api.config.NamespacedConfig;
-import org.apache.helix.controller.rebalancer.HelixRebalancer;
-import org.apache.helix.model.ResourceConfiguration;
-import org.apache.helix.util.HelixUtil;
-import org.apache.log4j.Logger;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Configuration for a resource rebalancer. This contains a RebalancerContext, which contains
- * information specific to each rebalancer.
- */
-public final class RebalancerConfig {
-  private enum Fields {
-    SERIALIZER_CLASS,
-    REBALANCER_CONTEXT,
-    REBALANCER_CONTEXT_CLASS
-  }
-
-  private static final Logger LOG = Logger.getLogger(RebalancerConfig.class);
-  private ContextSerializer _serializer;
-  private HelixRebalancer _rebalancer;
-  private final RebalancerContext _context;
-  private final NamespacedConfig _config;
-
-  /**
-   * Instantiate a RebalancerConfig
-   * @param context rebalancer context
-   * @param rebalancerRef reference to the rebalancer class that will be used
-   */
-  public RebalancerConfig(RebalancerContext context) {
-    _config =
-        new NamespacedConfig(Scope.resource(context.getResourceId()),
-            RebalancerConfig.class.getSimpleName());
-    _config.setSimpleField(Fields.SERIALIZER_CLASS.toString(), context.getSerializerClass()
-        .getName());
-    _config
-        .setSimpleField(Fields.REBALANCER_CONTEXT_CLASS.toString(), context.getClass().getName());
-    _context = context;
-    try {
-      _serializer = context.getSerializerClass().newInstance();
-      _config.setSimpleField(Fields.REBALANCER_CONTEXT.toString(), _serializer.serialize(context));
-    } catch (InstantiationException e) {
-      LOG.error("Error initializing the configuration", e);
-    } catch (IllegalAccessException e) {
-      LOG.error("Error initializing the configuration", e);
-    }
-  }
-
-  /**
-   * Instantiate from a physical ResourceConfiguration
-   * @param resourceConfiguration populated ResourceConfiguration
-   */
-  public RebalancerConfig(ResourceConfiguration resourceConfiguration) {
-    _config = new NamespacedConfig(resourceConfiguration, RebalancerConfig.class.getSimpleName());
-    _serializer = getSerializer();
-    _context = getContext();
-  }
-
-  /**
-   * Get the class that can serialize and deserialize the rebalancer context
-   * @return ContextSerializer
-   */
-  private ContextSerializer getSerializer() {
-    String serializerClassName = _config.getSimpleField(Fields.SERIALIZER_CLASS.toString());
-    if (serializerClassName != null) {
-      try {
-        return (ContextSerializer) HelixUtil.loadClass(getClass(), serializerClassName)
-            .newInstance();
-      } catch (InstantiationException e) {
-        LOG.error("Error getting the serializer", e);
-      } catch (IllegalAccessException e) {
-        LOG.error("Error getting the serializer", e);
-      } catch (ClassNotFoundException e) {
-        LOG.error("Error getting the serializer", e);
-      }
-    }
-    return null;
-  }
-
-  private RebalancerContext getContext() {
-    String className = _config.getSimpleField(Fields.REBALANCER_CONTEXT_CLASS.toString());
-    if (className != null) {
-      try {
-        Class<? extends RebalancerContext> contextClass =
-            HelixUtil.loadClass(getClass(), className).asSubclass(RebalancerContext.class);
-        String serialized = _config.getSimpleField(Fields.REBALANCER_CONTEXT.toString());
-        return _serializer.deserialize(contextClass, serialized);
-      } catch (ClassNotFoundException e) {
-        LOG.error(className + " is not a valid class");
-      } catch (ClassCastException e) {
-        LOG.error(className + " does not implement RebalancerContext");
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Get a rebalancer class instance
-   * @return Rebalancer
-   */
-  public HelixRebalancer getRebalancer() {
-    // cache the rebalancer to avoid loading and instantiating it excessively
-    if (_rebalancer == null) {
-      if (_context == null || _context.getRebalancerRef() == null) {
-        return null;
-      }
-      _rebalancer = _context.getRebalancerRef().getRebalancer();
-    }
-    return _rebalancer;
-  }
-
-  /**
-   * Get the instantiated RebalancerContext
-   * @param contextClass specific class of the RebalancerContext
-   * @return RebalancerContext subclass instance, or null if conversion is not possible
-   */
-  public <T extends RebalancerContext> T getRebalancerContext(Class<T> contextClass) {
-    if (_context != null) {
-      try {
-        return contextClass.cast(_context);
-      } catch (ClassCastException e) {
-        LOG.warn(contextClass + " is incompatible with context class: " + _context.getClass());
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Get the rebalancer context serialized as a string
-   * @return string representing the context
-   */
-  public String getSerializedContext() {
-    return _config.getSimpleField(Fields.REBALANCER_CONTEXT.toString());
-  }
-
-  /**
-   * Convert this to a namespaced config
-   * @return NamespacedConfig
-   */
-  public NamespacedConfig toNamespacedConfig() {
-    return _config;
-  }
-
-  /**
-   * Get a RebalancerConfig from a physical resource config
-   * @param resourceConfiguration physical resource config
-   * @return RebalancerConfig
-   */
-  public static RebalancerConfig from(ResourceConfiguration resourceConfiguration) {
-    return new RebalancerConfig(resourceConfiguration);
-  }
-
-  /**
-   * Get a RebalancerConfig from a RebalancerContext
-   * @param context instantiated RebalancerContext
-   * @return RebalancerConfig
-   */
-  public static RebalancerConfig from(RebalancerContext context) {
-    return new RebalancerConfig(context);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java
deleted file mode 100644
index 981891b..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java
+++ /dev/null
@@ -1,94 +0,0 @@
-package org.apache.helix.controller.rebalancer.context;
-
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.api.Partition;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.id.StateModelFactoryId;
-import org.apache.helix.controller.rebalancer.RebalancerRef;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Defines the state available to a rebalancer. The most common use case is to use a
- * {@link PartitionedRebalancerContext} or a subclass and set up a resource with it. A rebalancer
- * configuration, at a minimum, is aware of subunits of a resource, the state model to follow, and
- * how the configuration should be serialized.
- */
-public interface RebalancerContext {
-  /**
-   * Get a map of resource partition identifiers to partitions. A partition is a subunit of a
-   * resource, e.g. a subtask of a task
-   * @return map of (subunit id, subunit) pairs
-   */
-  public Map<? extends PartitionId, ? extends Partition> getSubUnitMap();
-
-  /**
-   * Get the subunits of the resource (e.g. partitions)
-   * @return set of subunit ids
-   */
-  public Set<? extends PartitionId> getSubUnitIdSet();
-
-  /**
-   * Get a specific subunit
-   * @param subUnitId the id of the subunit
-   * @return SubUnit
-   */
-  public Partition getSubUnit(PartitionId subUnitId);
-
-  /**
-   * Get the resource to rebalance
-   * @return resource id
-   */
-  public ResourceId getResourceId();
-
-  /**
-   * Get the state model definition that the resource follows
-   * @return state model definition id
-   */
-  public StateModelDefId getStateModelDefId();
-
-  /**
-   * Get the state model factory of this resource
-   * @return state model factory id
-   */
-  public StateModelFactoryId getStateModelFactoryId();
-
-  /**
-   * Get the tag, if any, that participants must have in order to serve this resource
-   * @return participant group tag, or null
-   */
-  public String getParticipantGroupTag();
-
-  /**
-   * Get the serializer for this context
-   * @return ContextSerializer class object
-   */
-  public Class<? extends ContextSerializer> getSerializerClass();
-
-  /**
-   * Get a reference to the class used to rebalance this resource
-   * @return RebalancerRef
-   */
-  public RebalancerRef getRebalancerRef();
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ReplicatedRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ReplicatedRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ReplicatedRebalancerContext.java
deleted file mode 100644
index 525931d..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ReplicatedRebalancerContext.java
+++ /dev/null
@@ -1,40 +0,0 @@
-package org.apache.helix.controller.rebalancer.context;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Methods specifying a rebalancer context that allows replicas. For instance, a rebalancer context
- * with partitions may accept state model definitions that support multiple replicas per partition,
- * and it's possible that the policy is that each live participant in the system should have a
- * replica.
- */
-public interface ReplicatedRebalancerContext extends RebalancerContext {
-  /**
-   * Check if this resource should be assigned to any live participant
-   * @return true if any live participant expected, false otherwise
-   */
-  public boolean anyLiveParticipant();
-
-  /**
-   * Get the number of replicas that each resource subunit should have
-   * @return replica count
-   */
-  public int getReplicaCount();
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java
deleted file mode 100644
index afa81e2..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java
+++ /dev/null
@@ -1,178 +0,0 @@
-package org.apache.helix.controller.rebalancer.context;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.api.State;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.RebalancerRef;
-import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
-import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.StateModelDefinition;
-import org.codehaus.jackson.annotate.JsonIgnore;
-import org.codehaus.jackson.annotate.JsonProperty;
-
-import com.google.common.collect.Maps;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * RebalancerContext for SEMI_AUTO rebalancer mode. It indicates the preferred locations of each
- * partition replica. By default, it corresponds to {@link SemiAutoRebalancer}
- */
-public final class SemiAutoRebalancerContext extends PartitionedRebalancerContext {
-  @JsonProperty("preferenceLists")
-  private Map<PartitionId, List<ParticipantId>> _preferenceLists;
-
-  /**
-   * Instantiate a SemiAutoRebalancerContext
-   */
-  public SemiAutoRebalancerContext() {
-    setRebalanceMode(RebalanceMode.SEMI_AUTO);
-    setRebalancerRef(RebalancerRef.from(SemiAutoRebalancer.class));
-    _preferenceLists = Maps.newHashMap();
-  }
-
-  /**
-   * Get the preference lists of all partitions of the resource
-   * @return map of partition id to list of participant ids
-   */
-  public Map<PartitionId, List<ParticipantId>> getPreferenceLists() {
-    return _preferenceLists;
-  }
-
-  /**
-   * Set the preference lists of all partitions of the resource
-   * @param preferenceLists
-   */
-  public void setPreferenceLists(Map<PartitionId, List<ParticipantId>> preferenceLists) {
-    _preferenceLists = preferenceLists;
-  }
-
-  /**
-   * Get the preference list of a partition
-   * @param partitionId the partition to look up
-   * @return list of participant ids
-   */
-  @JsonIgnore
-  public List<ParticipantId> getPreferenceList(PartitionId partitionId) {
-    return _preferenceLists.get(partitionId);
-  }
-
-  /**
-   * Generate preference lists based on a default cluster setup
-   * @param stateModelDef the state model definition to follow
-   * @param participantSet the set of participant ids to configure for
-   */
-  @Override
-  @JsonIgnore
-  public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
-      Set<ParticipantId> participantSet) {
-    // compute default upper bounds
-    Map<State, String> upperBounds = Maps.newHashMap();
-    for (State state : stateModelDef.getTypedStatesPriorityList()) {
-      upperBounds.put(state, stateModelDef.getNumParticipantsPerState(state));
-    }
-
-    // determine the current mapping
-    Map<PartitionId, Map<ParticipantId, State>> currentMapping = Maps.newHashMap();
-    for (PartitionId partitionId : getPartitionSet()) {
-      List<ParticipantId> preferenceList = getPreferenceList(partitionId);
-      if (preferenceList != null && !preferenceList.isEmpty()) {
-        Set<ParticipantId> disabledParticipants = Collections.emptySet();
-        Map<ParticipantId, State> emptyCurrentState = Collections.emptyMap();
-        Map<ParticipantId, State> initialMap =
-            ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds, participantSet,
-                stateModelDef, preferenceList, emptyCurrentState, disabledParticipants);
-        currentMapping.put(partitionId, initialMap);
-      }
-    }
-
-    // determine the preference
-    LinkedHashMap<State, Integer> stateCounts =
-        ConstraintBasedAssignment.stateCount(upperBounds, stateModelDef, participantSet.size(),
-            getReplicaCount());
-    ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
-    List<ParticipantId> participantList = new ArrayList<ParticipantId>(participantSet);
-    List<PartitionId> partitionList = new ArrayList<PartitionId>(getPartitionSet());
-    AutoRebalanceStrategy strategy =
-        new AutoRebalanceStrategy(ResourceId.from(""), partitionList, stateCounts,
-            getMaxPartitionsPerParticipant(), placementScheme);
-    Map<String, List<String>> rawPreferenceLists =
-        strategy.typedComputePartitionAssignment(participantList, currentMapping, participantList)
-            .getListFields();
-    Map<PartitionId, List<ParticipantId>> preferenceLists =
-        Maps.newHashMap(IdealState.preferenceListsFromStringLists(rawPreferenceLists));
-    setPreferenceLists(preferenceLists);
-  }
-
-  /**
-   * Build a SemiAutoRebalancerContext. By default, it corresponds to {@link SemiAutoRebalancer}
-   */
-  public static final class Builder extends PartitionedRebalancerContext.AbstractBuilder<Builder> {
-    private final Map<PartitionId, List<ParticipantId>> _preferenceLists;
-
-    /**
-     * Instantiate for a resource
-     * @param resourceId resource id
-     */
-    public Builder(ResourceId resourceId) {
-      super(resourceId);
-      super.rebalancerRef(RebalancerRef.from(SemiAutoRebalancer.class));
-      super.rebalanceMode(RebalanceMode.SEMI_AUTO);
-      _preferenceLists = Maps.newHashMap();
-    }
-
-    /**
-     * Add a preference list for a partition
-     * @param partitionId partition to set
-     * @param preferenceList ordered list of participants who can serve the partition
-     * @return Builder
-     */
-    public Builder preferenceList(PartitionId partitionId, List<ParticipantId> preferenceList) {
-      _preferenceLists.put(partitionId, preferenceList);
-      return self();
-    }
-
-    @Override
-    protected Builder self() {
-      return this;
-    }
-
-    @Override
-    public SemiAutoRebalancerContext build() {
-      SemiAutoRebalancerContext context = new SemiAutoRebalancerContext();
-      super.update(context);
-      context.setPreferenceLists(_preferenceLists);
-      return context;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/serializer/DefaultStringSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/serializer/DefaultStringSerializer.java b/helix-core/src/main/java/org/apache/helix/controller/serializer/DefaultStringSerializer.java
new file mode 100644
index 0000000..d612451
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/serializer/DefaultStringSerializer.java
@@ -0,0 +1,82 @@
+package org.apache.helix.controller.serializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.StringWriter;
+
+import org.apache.helix.HelixException;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Default serializer implementation for converting to/from strings. Uses the Jackson JSON library
+ * to do the conversion
+ */
+public class DefaultStringSerializer implements StringSerializer {
+
+  private static Logger logger = Logger.getLogger(DefaultStringSerializer.class);
+
+  @Override
+  public <T> String serialize(final T data) {
+    if (data == null) {
+      return null;
+    }
+
+    ObjectMapper mapper = new ObjectMapper();
+    SerializationConfig serializationConfig = mapper.getSerializationConfig();
+    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+    serializationConfig.set(SerializationConfig.Feature.AUTO_DETECT_FIELDS, true);
+    serializationConfig.set(SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+    StringWriter sw = new StringWriter();
+    try {
+      mapper.writeValue(sw, data);
+    } catch (Exception e) {
+      logger.error("Exception during payload data serialization.", e);
+      throw new HelixException(e);
+    }
+    return sw.toString();
+  }
+
+  @Override
+  public <T> T deserialize(final Class<T> clazz, final String string) {
+    if (string == null || string.length() == 0) {
+      return null;
+    }
+
+    ObjectMapper mapper = new ObjectMapper();
+    ByteArrayInputStream bais = new ByteArrayInputStream(string.getBytes());
+
+    DeserializationConfig deserializationConfig = mapper.getDeserializationConfig();
+    deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_FIELDS, true);
+    deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_SETTERS, true);
+    deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_CREATORS, true);
+    deserializationConfig.set(DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+    try {
+      T payload = mapper.readValue(bais, clazz);
+      return payload;
+    } catch (Exception e) {
+      logger.error("Exception during deserialization of payload bytes: " + string, e);
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/serializer/StringSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/serializer/StringSerializer.java b/helix-core/src/main/java/org/apache/helix/controller/serializer/StringSerializer.java
new file mode 100644
index 0000000..9311191
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/serializer/StringSerializer.java
@@ -0,0 +1,37 @@
+package org.apache.helix.controller.serializer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public interface StringSerializer {
+  /**
+   * Convert an object instance to a String
+   * @param data instance of an arbitrary type
+   * @return String representing the object
+   */
+  public <T> String serialize(final T data);
+
+  /**
+   * Convert raw bytes to a generic object instance
+   * @param clazz The class represented by the deserialized string
+   * @param string String representing the object
+   * @return instance of the generic type or null if the conversion failed
+   */
+  public <T> T deserialize(final Class<T> clazz, final String string);
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
index ae0278b..5cedd7c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
@@ -26,5 +26,6 @@ public enum AttributeName {
   MESSAGES_ALL,
   MESSAGES_SELECTED,
   MESSAGES_THROTTLE,
-  LOCAL_STATE
+  LOCAL_STATE,
+  CONTEXT_PROVIDER,
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 7b143bd..ec812b2 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -25,18 +25,19 @@ import java.util.Set;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixManager;
 import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Resource;
 import org.apache.helix.api.State;
 import org.apache.helix.api.config.ResourceConfig;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.controller.context.ControllerContextProvider;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.controller.rebalancer.FallbackRebalancer;
 import org.apache.helix.controller.rebalancer.HelixRebalancer;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.StateModelDefinition;
@@ -173,18 +174,29 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
       }
       ResourceConfig resourceConfig = resourceMap.get(resourceId);
       RebalancerConfig rebalancerConfig = resourceConfig.getRebalancerConfig();
-      RebalancerContext context = rebalancerConfig.getRebalancerContext(RebalancerContext.class);
-      StateModelDefinition stateModelDef = stateModelDefs.get(context.getStateModelDefId());
+      StateModelDefinition stateModelDef =
+          stateModelDefs.get(rebalancerConfig.getStateModelDefId());
       ResourceAssignment resourceAssignment = null;
       if (rebalancerConfig != null) {
-        HelixRebalancer rebalancer = rebalancerConfig.getRebalancer();
+        HelixRebalancer rebalancer = null;
+        if (rebalancerConfig != null && rebalancerConfig.getRebalancerRef() != null) {
+          rebalancer = rebalancerConfig.getRebalancerRef().getRebalancer();
+        }
         HelixManager manager = event.getAttribute("helixmanager");
+        ControllerContextProvider provider =
+            event.getAttribute(AttributeName.CONTEXT_PROVIDER.toString());
         if (rebalancer == null) {
           rebalancer = new FallbackRebalancer();
         }
-        rebalancer.init(manager);
+        rebalancer.init(manager, provider);
+        ResourceAssignment currentAssignment = null;
+        Resource resourceSnapshot = cluster.getResource(resourceId);
+        if (resourceSnapshot != null) {
+          currentAssignment = resourceSnapshot.getResourceAssignment();
+        }
         resourceAssignment =
-            rebalancer.computeResourceMapping(rebalancerConfig, cluster, currentStateOutput);
+            rebalancer.computeResourceMapping(rebalancerConfig, currentAssignment, cluster,
+                currentStateOutput);
       }
       if (resourceAssignment == null) {
         resourceAssignment =

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index edceed6..7704378 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -44,8 +44,7 @@ import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
@@ -139,11 +138,8 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
         // partitions are finished (COMPLETED or ERROR), update the status update of the original
         // scheduler
         // message, and then remove the partitions from the ideal state
-        RebalancerContext rebalancerContext =
-            (rebalancerConfig != null) ? rebalancerConfig
-                .getRebalancerContext(RebalancerContext.class) : null;
-        if (rebalancerContext != null
-            && rebalancerContext.getStateModelDefId().equalsIgnoreCase(
+        if (rebalancerConfig != null
+            && rebalancerConfig.getStateModelDefId().equalsIgnoreCase(
                 StateModelDefId.SchedulerTaskQueue)) {
           updateScheduledTaskStatus(resourceId, view, manager, schedulerTaskConfig);
         }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
index 08e6799..ef5a5fd 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
@@ -39,7 +39,7 @@ import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.api.id.StateModelFactoryId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageState;
 import org.apache.helix.model.Message.MessageType;
@@ -76,9 +76,8 @@ public class MessageGenerationStage extends AbstractBaseStage {
       ResourceConfig resourceConfig = resourceMap.get(resourceId);
       int bucketSize = resourceConfig.getBucketSize();
 
-      RebalancerContext rebalancerCtx =
-          resourceConfig.getRebalancerConfig().getRebalancerContext(RebalancerContext.class);
-      StateModelDefinition stateModelDef = stateModelDefMap.get(rebalancerCtx.getStateModelDefId());
+      RebalancerConfig rebalancerCfg = resourceConfig.getRebalancerConfig();
+      StateModelDefinition stateModelDef = stateModelDefMap.get(rebalancerCfg.getStateModelDefId());
 
       ResourceAssignment resourceAssignment =
           bestPossibleStateOutput.getResourceAssignment(resourceId);
@@ -132,16 +131,15 @@ public class MessageGenerationStage extends AbstractBaseStage {
             SessionId sessionId =
                 cluster.getLiveParticipantMap().get(participantId).getRunningInstance()
                     .getSessionId();
-            RebalancerContext rebalancerContext =
-                resourceConfig.getRebalancerConfig().getRebalancerContext(RebalancerContext.class);
+            RebalancerConfig rebalancerConfig = resourceConfig.getRebalancerConfig();
             Message message =
                 createMessage(manager, resourceId, subUnitId, participantId, currentState,
                     nextState, sessionId, StateModelDefId.from(stateModelDef.getId()),
-                    rebalancerContext.getStateModelFactoryId(), bucketSize);
+                    rebalancerConfig.getStateModelFactoryId(), bucketSize);
 
             // TODO refactor get/set timeout/inner-message
-            if (rebalancerContext != null
-                && rebalancerContext.getStateModelDefId().equalsIgnoreCase(
+            if (rebalancerConfig != null
+                && rebalancerConfig.getStateModelDefId().equalsIgnoreCase(
                     StateModelDefId.SchedulerTaskQueue)) {
               if (resourceConfig.getSubUnitMap().size() > 0) {
                 // TODO refactor it -- we need a way to read in scheduler tasks a priori

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
index bbbf5c6..9adc833 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
@@ -38,9 +38,9 @@ import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
-import org.apache.helix.controller.rebalancer.context.ReplicatedRebalancerContext;
+import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.ReplicatedRebalancerConfig;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
@@ -109,8 +109,7 @@ public class MessageSelectionStage extends AbstractBaseStage {
     for (ResourceId resourceId : resourceMap.keySet()) {
       ResourceConfig resource = resourceMap.get(resourceId);
       StateModelDefinition stateModelDef =
-          stateModelDefMap.get(resource.getRebalancerConfig()
-              .getRebalancerContext(RebalancerContext.class).getStateModelDefId());
+          stateModelDefMap.get(resource.getRebalancerConfig().getStateModelDefId());
 
       // TODO have a logical model for transition
       Map<String, Integer> stateTransitionPriorities = getStateTransitionPriorityMap(stateModelDef);
@@ -266,9 +265,9 @@ public class MessageSelectionStage extends AbstractBaseStage {
    */
   private Map<State, Bounds> computeStateConstraints(StateModelDefinition stateModelDefinition,
       RebalancerConfig rebalancerConfig, Cluster cluster) {
-    ReplicatedRebalancerContext context =
-        (rebalancerConfig != null) ? rebalancerConfig
-            .getRebalancerContext(ReplicatedRebalancerContext.class) : null;
+    ReplicatedRebalancerConfig config =
+        (rebalancerConfig != null) ? BasicRebalancerConfig.convert(rebalancerConfig,
+            ReplicatedRebalancerConfig.class) : null;
     Map<State, Bounds> stateConstraints = new HashMap<State, Bounds>();
 
     List<State> statePriorityList = stateModelDefinition.getTypedStatesPriorityList();
@@ -282,11 +281,11 @@ public class MessageSelectionStage extends AbstractBaseStage {
       } else if ("R".equals(numInstancesPerState)) {
         // idealState is null when resource has been dropped,
         // R can't be evaluated and ignore state constraints
-        if (context != null) {
-          if (context.anyLiveParticipant()) {
+        if (config != null) {
+          if (config.anyLiveParticipant()) {
             max = cluster.getLiveParticipantMap().size();
           } else {
-            max = context.getReplicaCount();
+            max = config.getReplicaCount();
           }
         }
       } else {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
index 31dbb08..45fc355 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
@@ -1,12 +1,5 @@
 package org.apache.helix.controller.stages;
 
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.api.accessor.ResourceAccessor;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.model.ResourceAssignment;
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -26,6 +19,13 @@ import org.apache.helix.model.ResourceAssignment;
  * under the License.
  */
 
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.accessor.ResourceAccessor;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.model.ResourceAssignment;
+
 /**
  * Persist the ResourceAssignment of each resource that went through rebalancing
  */

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/stages/PersistContextStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistContextStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistContextStage.java
new file mode 100644
index 0000000..e63041a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistContextStage.java
@@ -0,0 +1,59 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.api.id.ContextId;
+import org.apache.helix.controller.context.ControllerContext;
+import org.apache.helix.controller.context.ControllerContextHolder;
+import org.apache.helix.controller.context.ControllerContextProvider;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Persist all dirty contexts set in the controller pipeline
+ */
+public class PersistContextStage extends AbstractBaseStage {
+  @Override
+  public void process(ClusterEvent event) throws Exception {
+    HelixManager helixManager = event.getAttribute("helixmanager");
+    HelixDataAccessor accessor = helixManager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    ControllerContextProvider contextProvider =
+        event.getAttribute(AttributeName.CONTEXT_PROVIDER.toString());
+    Map<ContextId, ControllerContext> pendingContexts = contextProvider.getPendingContexts();
+    List<PropertyKey> keys = Lists.newArrayList();
+    List<ControllerContextHolder> properties = Lists.newArrayList();
+    for (ContextId contextId : pendingContexts.keySet()) {
+      ControllerContextHolder holder = new ControllerContextHolder(pendingContexts.get(contextId));
+      if (holder != null) {
+        keys.add(keyBuilder.controllerContext(contextId.stringify()));
+        properties.add(holder);
+      }
+    }
+    accessor.setChildren(keys, properties);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
index 44fddb6..fb016f1 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
@@ -19,16 +19,23 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
+import java.util.Map;
+
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.api.Cluster;
 import org.apache.helix.api.accessor.ClusterAccessor;
 import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ContextId;
+import org.apache.helix.controller.context.ControllerContext;
+import org.apache.helix.controller.context.ControllerContextProvider;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.log4j.Logger;
 
+import com.google.common.collect.Maps;
+
 public class ReadClusterDataStage extends AbstractBaseStage {
   private static final Logger LOG = Logger.getLogger(ReadClusterDataStage.class.getName());
 
@@ -67,6 +74,16 @@ public class ReadClusterDataStage extends AbstractBaseStage {
 
     event.addAttribute("ClusterDataCache", cluster);
 
+    // read contexts (if any)
+    Map<ContextId, ControllerContext> persistedContexts = null;
+    if (cluster != null) {
+      persistedContexts = cluster.getContextMap();
+    } else {
+      persistedContexts = Maps.newHashMap();
+    }
+    ControllerContextProvider contextProvider = new ControllerContextProvider(persistedContexts);
+    event.addAttribute(AttributeName.CONTEXT_PROVIDER.toString(), contextProvider);
+
     long endTime = System.currentTimeMillis();
     LOG.info("END ReadClusterDataStage.process(). took: " + (endTime - startTime) + " ms");
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
index 1fdd892..5b75535 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
@@ -32,9 +32,8 @@ import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelFactoryId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.model.CurrentState;
 import org.apache.log4j.Logger;
 
@@ -72,7 +71,7 @@ public class ResourceComputationStage extends AbstractBaseStage {
       resCfgBuilder.bucketSize(resource.getBucketSize());
       resCfgBuilder.batchMessageMode(resource.getBatchMessageMode());
       resCfgBuilder.schedulerTaskConfig(resource.getSchedulerTaskConfig());
-      resCfgBuilder.rebalancerContext(rebalancerCfg.getRebalancerContext(RebalancerContext.class));
+      resCfgBuilder.rebalancerConfig(rebalancerCfg);
       resCfgMap.put(resourceId, resCfgBuilder.build());
     }
 
@@ -89,8 +88,8 @@ public class ResourceComputationStage extends AbstractBaseStage {
     Map<ResourceId, ResourceConfig.Builder> resCfgBuilderMap =
         new HashMap<ResourceId, ResourceConfig.Builder>();
 
-    Map<ResourceId, PartitionedRebalancerContext.Builder> rebCtxBuilderMap =
-        new HashMap<ResourceId, PartitionedRebalancerContext.Builder>();
+    Map<ResourceId, PartitionedRebalancerConfig.Builder> rebCtxBuilderMap =
+        new HashMap<ResourceId, PartitionedRebalancerConfig.Builder>();
 
     for (Participant liveParticipant : cluster.getLiveParticipantMap().values()) {
       for (ResourceId resourceId : liveParticipant.getCurrentStateMap().keySet()) {
@@ -98,15 +97,15 @@ public class ResourceComputationStage extends AbstractBaseStage {
 
         if (currentState.getStateModelDefRef() == null) {
           LOG.error("state model def is null." + "resource:" + currentState.getResourceId()
-              + ", partitions: " + currentState.getPartitionStateMap().keySet()
-              + ", states: " + currentState.getPartitionStateMap().values());
+              + ", partitions: " + currentState.getPartitionStateMap().keySet() + ", states: "
+              + currentState.getPartitionStateMap().values());
           throw new StageException("State model def is null for resource:"
               + currentState.getResourceId());
         }
 
         if (!resCfgBuilderMap.containsKey(resourceId)) {
-          PartitionedRebalancerContext.Builder rebCtxBuilder =
-              new PartitionedRebalancerContext.Builder(resourceId);
+          PartitionedRebalancerConfig.Builder rebCtxBuilder =
+              new PartitionedRebalancerConfig.Builder(resourceId);
           rebCtxBuilder.stateModelDefId(currentState.getStateModelDefId());
           rebCtxBuilder.stateModelFactoryId(StateModelFactoryId.from(currentState
               .getStateModelFactoryName()));
@@ -118,7 +117,7 @@ public class ResourceComputationStage extends AbstractBaseStage {
           resCfgBuilderMap.put(resourceId, resCfgBuilder);
         }
 
-        PartitionedRebalancerContext.Builder rebCtxBuilder = rebCtxBuilderMap.get(resourceId);
+        PartitionedRebalancerConfig.Builder rebCtxBuilder = rebCtxBuilderMap.get(resourceId);
         for (PartitionId partitionId : currentState.getTypedPartitionStateMap().keySet()) {
           rebCtxBuilder.addPartition(new Partition(partitionId));
         }
@@ -128,8 +127,8 @@ public class ResourceComputationStage extends AbstractBaseStage {
     Map<ResourceId, ResourceConfig> resCfgMap = new HashMap<ResourceId, ResourceConfig>();
     for (ResourceId resourceId : resCfgBuilderMap.keySet()) {
       ResourceConfig.Builder resCfgBuilder = resCfgBuilderMap.get(resourceId);
-      PartitionedRebalancerContext.Builder rebCtxBuilder = rebCtxBuilderMap.get(resourceId);
-      resCfgBuilder.rebalancerContext(rebCtxBuilder.build());
+      PartitionedRebalancerConfig.Builder rebCtxBuilder = rebCtxBuilderMap.get(resourceId);
+      resCfgBuilder.rebalancerConfig(rebCtxBuilder.build());
       resCfgMap.put(resourceId, resCfgBuilder.build());
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java b/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
index f32649f..8c5b863 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
@@ -6,8 +6,8 @@ import org.apache.helix.api.config.NamespacedConfig;
 import org.apache.helix.api.config.ResourceConfig.ResourceType;
 import org.apache.helix.api.config.UserConfig;
 import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfigHolder;
 
 import com.google.common.base.Enums;
 import com.google.common.base.Optional;
@@ -105,11 +105,11 @@ public class ResourceConfiguration extends HelixProperty {
   }
 
   /**
-   * Get a RebalancerContext if available
-   * @return RebalancerContext, or null
+   * Get a RebalancerConfig if available
+   * @return RebalancerConfig, or null
    */
-  public RebalancerContext getRebalancerContext(Class<? extends RebalancerContext> clazz) {
-    RebalancerConfig config = new RebalancerConfig(this);
-    return config.getRebalancerContext(clazz);
+  public RebalancerConfig getRebalancerConfig(Class<? extends RebalancerConfig> clazz) {
+    RebalancerConfigHolder config = new RebalancerConfigHolder(this);
+    return config.getRebalancerConfig(clazz);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
index 98f7cac..ba8958d 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
@@ -64,10 +64,12 @@ import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.rebalancer.context.CustomRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
-import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
+import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.CustomRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfigHolder;
+import org.apache.helix.controller.rebalancer.config.SemiAutoRebalancerConfig;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
@@ -300,9 +302,9 @@ public class NewClusterSetup {
     idealState.setMaxPartitionsPerInstance(maxPartitionsPerNode);
     idealState.setStateModelDefId(stateModelDefId);
 
-    RebalancerContext rebalancerCtx = PartitionedRebalancerContext.from(idealState);
+    RebalancerConfig rebalancerCtx = PartitionedRebalancerConfig.from(idealState);
     ResourceConfig.Builder builder =
-        new ResourceConfig.Builder(resourceId).rebalancerContext(rebalancerCtx).bucketSize(
+        new ResourceConfig.Builder(resourceId).rebalancerConfig(rebalancerCtx).bucketSize(
             bucketSize);
 
     ClusterAccessor accessor = clusterAccessor(clusterName);
@@ -407,10 +409,10 @@ public class NewClusterSetup {
           new IdealState(
               (ZNRecord) (new ZNRecordSerializer().deserialize(readFile(idealStateJsonFile))));
 
-      RebalancerContext rebalancerCtx = PartitionedRebalancerContext.from(idealState);
+      RebalancerConfig rebalancerCtx = PartitionedRebalancerConfig.from(idealState);
       ResourceConfig.Builder builder =
-          new ResourceConfig.Builder(ResourceId.from(resourceName))
-              .rebalancerContext(rebalancerCtx).bucketSize(idealState.getBucketSize());
+          new ResourceConfig.Builder(ResourceId.from(resourceName)).rebalancerConfig(rebalancerCtx)
+              .bucketSize(idealState.getBucketSize());
 
       ClusterAccessor accessor = clusterAccessor(clusterName);
       accessor.addResourceToCluster(builder.build());
@@ -459,23 +461,25 @@ public class NewClusterSetup {
     StringBuilder sb = new StringBuilder();
     Map<ParticipantId, State> stateMap = resource.getExternalView().getStateMap(partitionId);
     sb.append(resourceName + "/" + partitionName + ", externalView: " + stateMap);
-    PartitionedRebalancerContext partitionedContext =
-        resource.getRebalancerConfig().getRebalancerContext(PartitionedRebalancerContext.class);
-    if (partitionedContext != null) {
+    PartitionedRebalancerConfig partitionedConfig =
+        PartitionedRebalancerConfig.from(resource.getRebalancerConfig());
+    if (partitionedConfig != null) {
       // for partitioned contexts, check the mode and apply mode-specific information if possible
-      if (partitionedContext.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
-        SemiAutoRebalancerContext semiAutoContext =
-            resource.getRebalancerConfig().getRebalancerContext(SemiAutoRebalancerContext.class);
-        sb.append(", preferenceList: " + semiAutoContext.getPreferenceList(partitionId));
-      } else if (partitionedContext.getRebalanceMode() == RebalanceMode.CUSTOMIZED) {
-        CustomRebalancerContext customContext =
-            resource.getRebalancerConfig().getRebalancerContext(CustomRebalancerContext.class);
-        sb.append(", preferenceMap: " + customContext.getPreferenceMap(partitionId));
+      if (partitionedConfig.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
+        SemiAutoRebalancerConfig semiAutoConfig =
+            BasicRebalancerConfig.convert(resource.getRebalancerConfig(),
+                SemiAutoRebalancerConfig.class);
+        sb.append(", preferenceList: " + semiAutoConfig.getPreferenceList(partitionId));
+      } else if (partitionedConfig.getRebalanceMode() == RebalanceMode.CUSTOMIZED) {
+        CustomRebalancerConfig customConfig =
+            BasicRebalancerConfig.convert(resource.getRebalancerConfig(),
+                CustomRebalancerConfig.class);
+        sb.append(", preferenceMap: " + customConfig.getPreferenceMap(partitionId));
       }
-      if (partitionedContext.anyLiveParticipant()) {
-        sb.append(", anyLiveParticipant: " + partitionedContext.anyLiveParticipant());
+      if (partitionedConfig.anyLiveParticipant()) {
+        sb.append(", anyLiveParticipant: " + partitionedConfig.anyLiveParticipant());
       } else {
-        sb.append(", replicaCount: " + partitionedContext.getReplicaCount());
+        sb.append(", replicaCount: " + partitionedConfig.getReplicaCount());
       }
     }
 
@@ -750,12 +754,13 @@ public class NewClusterSetup {
     ResourceAccessor accessor = resourceAccessor(clusterName);
     ResourceId resourceId = ResourceId.from(resourceName);
     Resource resource = accessor.readResource(resourceId);
+    RebalancerConfigHolder holder = new RebalancerConfigHolder(resource.getRebalancerConfig());
     StringBuilder sb =
         new StringBuilder("Resource ").append(resourceName).append(" in cluster ")
             .append(clusterName).append(":\n").append("externalView: ")
             .append(resource.getExternalView()).append(", userConfig: ")
-            .append(resource.getUserConfig()).append(", rebalancerContext: ")
-            .append(resource.getRebalancerConfig().getSerializedContext());
+            .append(resource.getUserConfig()).append(", rebalancerConfig: ")
+            .append(holder.getSerializedConfig());
     System.out.println(sb.toString());
   }
 
@@ -956,17 +961,18 @@ public class NewClusterSetup {
   private void expandResource(ClusterId clusterId, ResourceId resourceId) {
     ResourceAccessor accessor = resourceAccessor(clusterId.stringify());
     Resource resource = accessor.readResource(resourceId);
-    SemiAutoRebalancerContext context =
-        resource.getRebalancerConfig().getRebalancerContext(SemiAutoRebalancerContext.class);
-    if (context == null) {
+    SemiAutoRebalancerConfig config =
+        BasicRebalancerConfig.convert(resource.getRebalancerConfig(),
+            SemiAutoRebalancerConfig.class);
+    if (config == null) {
       LOG.info("Only SEMI_AUTO mode supported for resource expansion");
       return;
     }
-    if (context.anyLiveParticipant()) {
+    if (config.anyLiveParticipant()) {
       LOG.info("Resource uses ANY_LIVE_PARTICIPANT, skipping default assignment");
       return;
     }
-    if (context.getPreferenceLists().size() == 0) {
+    if (config.getPreferenceLists().size() == 0) {
       LOG.info("No preference lists have been set yet, skipping default assignment");
       return;
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
index 0d597f6..ab4ead0 100644
--- a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
+++ b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
@@ -33,11 +33,12 @@ import org.apache.helix.api.id.ClusterId;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
+import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.SemiAutoRebalancerConfig;
 import org.apache.helix.controller.stages.AttributeName;
-import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
 import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.controller.stages.ResourceCurrentState;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
@@ -99,8 +100,8 @@ public class TestNewStages extends ZkUnitTestBase {
     ResourceId resourceId = ResourceId.from("TestDB0");
     Assert.assertTrue(resourceMap.containsKey(resourceId));
     Resource resource = resourceMap.get(resourceId);
-    Assert.assertNotNull(resource.getRebalancerConfig().getRebalancerContext(
-        SemiAutoRebalancerContext.class));
+    Assert.assertNotNull(BasicRebalancerConfig.convert(resource.getRebalancerConfig(),
+        SemiAutoRebalancerConfig.class));
 
     System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
   }
@@ -163,8 +164,12 @@ public class TestNewStages extends ZkUnitTestBase {
     Resource resource = cluster.getResource(resourceId);
     ResourceCurrentState currentStateOutput = new ResourceCurrentState();
     ResourceAssignment semiAutoResult =
-        resource.getRebalancerConfig().getRebalancer()
-            .computeResourceMapping(resource.getRebalancerConfig(), cluster, currentStateOutput);
+        resource
+            .getRebalancerConfig()
+            .getRebalancerRef()
+            .getRebalancer()
+            .computeResourceMapping(resource.getRebalancerConfig(), null, cluster,
+                currentStateOutput);
     verifySemiAutoRebalance(resource, semiAutoResult);
 
     System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
@@ -177,8 +182,9 @@ public class TestNewStages extends ZkUnitTestBase {
    */
   private void verifySemiAutoRebalance(Resource resource, ResourceAssignment assignment) {
     Assert.assertEquals(assignment.getMappedPartitionIds().size(), resource.getSubUnitSet().size());
-    SemiAutoRebalancerContext context =
-        resource.getRebalancerConfig().getRebalancerContext(SemiAutoRebalancerContext.class);
+    SemiAutoRebalancerConfig context =
+        BasicRebalancerConfig.convert(resource.getRebalancerConfig(),
+            SemiAutoRebalancerConfig.class);
     for (PartitionId partitionId : assignment.getMappedPartitionIds()) {
       List<ParticipantId> preferenceList = context.getPreferenceList(partitionId);
       Map<ParticipantId, State> replicaMap = assignment.getReplicaMap(partitionId);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java b/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java
index 74781cd..3c8fb2c 100644
--- a/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java
@@ -9,8 +9,9 @@ import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.rebalancer.context.FullAutoRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
+import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.FullAutoRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.SemiAutoRebalancerConfig;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -91,29 +92,29 @@ public class TestUpdateConfig {
     // message mode
     UserConfig userConfig = new UserConfig(Scope.resource(resourceId));
     userConfig.setSimpleField("key1", "value1");
-    SemiAutoRebalancerContext rebalancerContext =
-        new SemiAutoRebalancerContext.Builder(resourceId).build();
+    SemiAutoRebalancerConfig rebalancerContext =
+        new SemiAutoRebalancerConfig.Builder(resourceId).build();
     ResourceConfig config =
         new ResourceConfig.Builder(resourceId).userConfig(userConfig)
-            .rebalancerContext(rebalancerContext).bucketSize(OLD_BUCKET_SIZE)
-            .batchMessageMode(true).build();
+            .rebalancerConfig(rebalancerContext).bucketSize(OLD_BUCKET_SIZE).batchMessageMode(true)
+            .build();
 
     // update: overwrite user config, change to full auto rebalancer context, and change the bucket
     // size
     UserConfig newUserConfig = new UserConfig(Scope.resource(resourceId));
     newUserConfig.setSimpleField("key2", "value2");
-    FullAutoRebalancerContext newRebalancerContext =
-        new FullAutoRebalancerContext.Builder(resourceId).build();
+    FullAutoRebalancerConfig newRebalancerContext =
+        new FullAutoRebalancerConfig.Builder(resourceId).build();
     ResourceConfig updated =
         new ResourceConfig.Delta(resourceId).setBucketSize(NEW_BUCKET_SIZE)
-            .setUserConfig(newUserConfig).setRebalancerContext(newRebalancerContext)
+            .setUserConfig(newUserConfig).setRebalancerConfig(newRebalancerContext)
             .mergeInto(config);
     Assert.assertEquals(updated.getBucketSize(), NEW_BUCKET_SIZE);
     Assert.assertTrue(updated.getBatchMessageMode());
-    Assert.assertNull(updated.getRebalancerConfig().getRebalancerContext(
-        SemiAutoRebalancerContext.class));
-    Assert.assertNotNull(updated.getRebalancerConfig().getRebalancerContext(
-        FullAutoRebalancerContext.class));
+    Assert.assertNull(BasicRebalancerConfig.convert(updated.getRebalancerConfig(),
+        SemiAutoRebalancerConfig.class));
+    Assert.assertNotNull(BasicRebalancerConfig.convert(updated.getRebalancerConfig(),
+        FullAutoRebalancerConfig.class));
     Assert.assertNull(updated.getUserConfig().getSimpleField("key1"));
     Assert.assertEquals(updated.getUserConfig().getSimpleField("key2"), "value2");
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java
index 5bbe54f..94deaac 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java
@@ -8,6 +8,9 @@ import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.controller.rebalancer.config.CustomRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfigHolder;
 import org.apache.helix.model.ResourceConfiguration;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -41,7 +44,7 @@ public class TestSerializeRebalancerContext {
   @Test
   public void basicTest() {
     // populate a context
-    CustomRebalancerContext context = new CustomRebalancerContext();
+    CustomRebalancerConfig context = new CustomRebalancerConfig();
     context.setAnyLiveParticipant(false);
     context.setMaxPartitionsPerParticipant(Integer.MAX_VALUE);
     Map<PartitionId, Partition> partitionMap = Maps.newHashMap();
@@ -61,9 +64,8 @@ public class TestSerializeRebalancerContext {
     context.setResourceId(resourceId);
 
     // serialize and deserialize by wrapping in a config
-    RebalancerConfig config = new RebalancerConfig(context);
-    CustomRebalancerContext deserialized =
-        config.getRebalancerContext(CustomRebalancerContext.class);
+    RebalancerConfigHolder config = new RebalancerConfigHolder(context);
+    CustomRebalancerConfig deserialized = config.getRebalancerConfig(CustomRebalancerConfig.class);
 
     // check to make sure that the two objects contain the same data
     Assert.assertNotNull(deserialized);
@@ -79,9 +81,9 @@ public class TestSerializeRebalancerContext {
     // wrap in a physical config and then unwrap it
     ResourceConfiguration physicalConfig = new ResourceConfiguration(resourceId);
     physicalConfig.addNamespacedConfig(config.toNamespacedConfig());
-    RebalancerConfig extractedConfig = new RebalancerConfig(physicalConfig);
-    CustomRebalancerContext extractedContext =
-        extractedConfig.getRebalancerContext(CustomRebalancerContext.class);
+    RebalancerConfigHolder extractedConfig = new RebalancerConfigHolder(physicalConfig);
+    CustomRebalancerConfig extractedContext =
+        extractedConfig.getRebalancerConfig(CustomRebalancerConfig.class);
 
     // make sure the unwrapped data hasn't changed
     Assert.assertNotNull(extractedContext);
@@ -95,8 +97,8 @@ public class TestSerializeRebalancerContext {
     Assert.assertEquals(extractedContext.getResourceId(), context.getResourceId());
 
     // make sure that it's legal to use a base rebalancer context
-    RebalancerContext rebalancerContext =
-        extractedConfig.getRebalancerContext(RebalancerContext.class);
+    RebalancerConfig rebalancerContext =
+        extractedConfig.getRebalancerConfig(RebalancerConfig.class);
     Assert.assertNotNull(rebalancerContext);
     Assert.assertEquals(rebalancerContext.getResourceId(), context.getResourceId());
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
index e53530c..22c4168 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
@@ -41,8 +41,8 @@ import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.controller.pipeline.Stage;
 import org.apache.helix.controller.pipeline.StageContext;
-import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.InstanceConfig;
@@ -169,7 +169,7 @@ public class BaseStageTest {
     Map<ResourceId, ResourceConfig> resourceMap = new HashMap<ResourceId, ResourceConfig>();
     for (IdealState idealState : idealStates) {
       ResourceId resourceId = idealState.getResourceId();
-      RebalancerContext context = PartitionedRebalancerContext.from(idealState);
+      RebalancerConfig context = PartitionedRebalancerConfig.from(idealState);
       Resource resource =
           new Resource(resourceId, ResourceType.DATA, idealState, null, null, context,
               new UserConfig(Scope.resource(resourceId)), idealState.getBucketSize(),

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
index e320011..90ea393 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
@@ -33,7 +33,6 @@ import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.controller.pipeline.StageContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
 import org.apache.helix.controller.strategy.DefaultTwoStateStrategy;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.IdealState;
@@ -79,8 +78,7 @@ public class TestResourceComputationStage extends BaseStageTest {
     AssertJUnit.assertEquals(resource.values().iterator().next().getId(),
         ResourceId.from(resourceName));
     AssertJUnit.assertEquals(resource.values().iterator().next().getRebalancerConfig()
-        .getRebalancerContext(RebalancerContext.class).getStateModelDefId(),
-        idealState.getStateModelDefId());
+        .getStateModelDefId(), idealState.getStateModelDefId());
     AssertJUnit
         .assertEquals(resource.values().iterator().next().getSubUnitSet().size(), partitions);
   }
@@ -107,8 +105,7 @@ public class TestResourceComputationStage extends BaseStageTest {
       AssertJUnit.assertTrue(resourceMap.containsKey(resourceId));
       AssertJUnit.assertEquals(resourceMap.get(resourceId).getId(), resourceId);
       AssertJUnit.assertEquals(resourceMap.get(resourceId).getRebalancerConfig()
-          .getRebalancerContext(RebalancerContext.class).getStateModelDefId(),
-          idealState.getStateModelDefId());
+          .getStateModelDefId(), idealState.getStateModelDefId());
       AssertJUnit.assertEquals(resourceMap.get(resourceId).getSubUnitSet().size(),
           idealState.getNumPartitions());
     }
@@ -182,8 +179,7 @@ public class TestResourceComputationStage extends BaseStageTest {
       AssertJUnit.assertTrue(resourceMap.containsKey(resourceId));
       AssertJUnit.assertEquals(resourceMap.get(resourceId).getId(), resourceId);
       AssertJUnit.assertEquals(resourceMap.get(resourceId).getRebalancerConfig()
-          .getRebalancerContext(RebalancerContext.class).getStateModelDefId(),
-          idealState.getStateModelDefId());
+          .getStateModelDefId(), idealState.getStateModelDefId());
       AssertJUnit.assertEquals(resourceMap.get(resourceId).getSubUnitSet().size(),
           idealState.getNumPartitions());
     }
@@ -192,8 +188,7 @@ public class TestResourceComputationStage extends BaseStageTest {
     AssertJUnit.assertTrue(resourceMap.containsKey(oldResourceId));
     AssertJUnit.assertEquals(resourceMap.get(oldResourceId).getId(), oldResourceId);
     AssertJUnit.assertEquals(resourceMap.get(oldResourceId).getRebalancerConfig()
-        .getRebalancerContext(RebalancerContext.class).getStateModelDefId(),
-        currentState.getStateModelDefId());
+        .getStateModelDefId(), currentState.getStateModelDefId());
     AssertJUnit.assertEquals(resourceMap.get(oldResourceId).getSubUnitSet().size(), currentState
         .getTypedPartitionStateMap().size());
     AssertJUnit.assertNotNull(resourceMap.get(oldResourceId).getSubUnit(

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
index ce26e2e..11805fe 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
@@ -30,11 +30,15 @@ import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.api.Cluster;
 import org.apache.helix.api.State;
+import org.apache.helix.api.id.ContextId;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.controller.context.BasicControllerContext;
+import org.apache.helix.controller.context.ControllerContextHolder;
+import org.apache.helix.controller.context.ControllerContextProvider;
 import org.apache.helix.controller.rebalancer.HelixRebalancer;
-import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.controller.stages.ResourceCurrentState;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
@@ -60,22 +64,23 @@ public class TestCustomizedIdealStateRebalancer extends
 
   public static class TestRebalancer implements HelixRebalancer {
 
+    private ControllerContextProvider _contextProvider;
+
     /**
      * Very basic mapping that evenly assigns one replica of each partition to live nodes, each of
      * which is in the highest-priority state.
      */
     @Override
-    public ResourceAssignment computeResourceMapping(RebalancerConfig config, Cluster cluster,
-        ResourceCurrentState currentState) {
-      PartitionedRebalancerContext context =
-          config.getRebalancerContext(PartitionedRebalancerContext.class);
+    public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
+        ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState) {
+      PartitionedRebalancerConfig config = PartitionedRebalancerConfig.from(rebalancerConfig);
       StateModelDefinition stateModelDef =
-          cluster.getStateModelMap().get(context.getStateModelDefId());
+          cluster.getStateModelMap().get(config.getStateModelDefId());
       List<ParticipantId> liveParticipants =
           new ArrayList<ParticipantId>(cluster.getLiveParticipantMap().keySet());
-      ResourceAssignment resourceMapping = new ResourceAssignment(context.getResourceId());
+      ResourceAssignment resourceMapping = new ResourceAssignment(config.getResourceId());
       int i = 0;
-      for (PartitionId partitionId : context.getPartitionSet()) {
+      for (PartitionId partitionId : config.getPartitionSet()) {
         int nodeIndex = i % liveParticipants.size();
         Map<ParticipantId, State> replicaMap = new HashMap<ParticipantId, State>();
         replicaMap.put(liveParticipants.get(nodeIndex), stateModelDef.getTypedStatesPriorityList()
@@ -84,12 +89,17 @@ public class TestCustomizedIdealStateRebalancer extends
         i++;
       }
       testRebalancerInvoked = true;
+
+      // set some basic context
+      ContextId contextId = ContextId.from(config.getResourceId().stringify());
+      _contextProvider.putControllerContext(contextId, new BasicControllerContext(contextId));
       return resourceMapping;
     }
 
     @Override
-    public void init(HelixManager helixManager) {
+    public void init(HelixManager helixManager, ControllerContextProvider contextProvider) {
       testRebalancerCreated = true;
+      _contextProvider = contextProvider;
     }
   }
 
@@ -124,6 +134,11 @@ public class TestCustomizedIdealStateRebalancer extends
     }
     Assert.assertTrue(testRebalancerCreated);
     Assert.assertTrue(testRebalancerInvoked);
+
+    // check that context can be extracted
+    ControllerContextHolder holder = accessor.getProperty(keyBuilder.controllerContext(db2));
+    Assert.assertNotNull(holder);
+    Assert.assertNotNull(holder.getContext());
   }
 
   public static class ExternalViewBalancedVerifier implements ZkVerifier {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java b/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
index 29e0a44..b415393 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
@@ -42,8 +42,8 @@ import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
-import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.SemiAutoRebalancerConfig;
 import org.apache.helix.manager.zk.ZkHelixConnection;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.Message;
@@ -122,13 +122,13 @@ public class TestHelixConnection extends ZkUnitTestBase {
             .addTransition(slave, offline, 4).addTransition(slave, master, 2)
             .addTransition(master, slave, 1).addTransition(offline, dropped).initialState(offline)
             .upperBound(master, 1).dynamicUpperBound(slave, "R").build();
-    RebalancerContext rebalancerCtx =
-        new SemiAutoRebalancerContext.Builder(resourceId).addPartitions(1).replicaCount(1)
+    RebalancerConfig rebalancerCtx =
+        new SemiAutoRebalancerConfig.Builder(resourceId).addPartitions(1).replicaCount(1)
             .stateModelDefId(stateModelDefId)
             .preferenceList(PartitionId.from("testDB_0"), Arrays.asList(participantId)).build();
     clusterAccessor.createCluster(new ClusterConfig.Builder(clusterId).addStateModelDefinition(
         stateModelDef).build());
-    clusterAccessor.addResourceToCluster(new ResourceConfig.Builder(resourceId).rebalancerContext(
+    clusterAccessor.addResourceToCluster(new ResourceConfig.Builder(resourceId).rebalancerConfig(
         rebalancerCtx).build());
     clusterAccessor.addParticipantToCluster(new ParticipantConfig.Builder(participantId).build());
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java
----------------------------------------------------------------------
diff --git a/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java b/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java
index c233417..880d31c 100644
--- a/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java
+++ b/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java
@@ -23,7 +23,7 @@ import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.rebalancer.context.FullAutoRebalancerContext;
+import org.apache.helix.controller.rebalancer.config.FullAutoRebalancerConfig;
 import org.apache.helix.manager.zk.ZkHelixConnection;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.Message;
@@ -219,10 +219,10 @@ public class LogicalModelExample {
     Partition partition2 = new Partition(PartitionId.from(resourceId, "2"));
 
     // specify the rebalancer configuration
-    // this resource will be rebalanced in FULL_AUTO mode, so use the FullAutoRebalancerContext
+    // this resource will be rebalanced in FULL_AUTO mode, so use the FullAutoRebalancerConfig
     // builder
-    FullAutoRebalancerContext.Builder rebalanceContextBuilder =
-        new FullAutoRebalancerContext.Builder(resourceId).replicaCount(1).addPartition(partition1)
+    FullAutoRebalancerConfig.Builder rebalanceConfigBuilder =
+        new FullAutoRebalancerConfig.Builder(resourceId).replicaCount(1).addPartition(partition1)
             .addPartition(partition2).stateModelDefId(stateModelDef.getStateModelDefId());
 
     // create (optional) user-defined configuration properties for the resource
@@ -231,7 +231,7 @@ public class LogicalModelExample {
 
     // create the configuration for a new resource
     ResourceConfig.Builder resourceBuilder =
-        new ResourceConfig.Builder(resourceId).rebalancerContext(rebalanceContextBuilder.build())
+        new ResourceConfig.Builder(resourceId).rebalancerConfig(rebalanceConfigBuilder.build())
             .userConfig(userConfig);
     return resourceBuilder.build();
   }


Mime
View raw message