helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject [34/53] [abbrv] git commit: [HELIX-18] Added more functionality to the new cluster setup
Date Thu, 07 Nov 2013 01:19:42 GMT
[HELIX-18] Added more functionality to the new cluster setup


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/e23a3088
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/e23a3088
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/e23a3088

Branch: refs/heads/master
Commit: e23a3088d46941d884e25ba25fcf8c70f9f82198
Parents: dc94c8c
Author: Kanak Biscuitwala <kanak@apache.org>
Authored: Fri Oct 4 17:17:48 2013 -0700
Committer: Kanak Biscuitwala <kanak@apache.org>
Committed: Wed Nov 6 13:17:36 2013 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/helix/api/Cluster.java |  27 ++-
 .../helix/api/accessor/ClusterAccessor.java     | 208 +++++++++++++++--
 .../helix/api/accessor/ParticipantAccessor.java | 234 ++++++++++++++++++-
 .../helix/api/accessor/ResourceAccessor.java    |  91 ++++++++
 .../apache/helix/api/config/ClusterConfig.java  | 187 ++++++++++++++-
 .../rebalancer/context/CustomRebalancer.java    |  26 +--
 .../context/CustomRebalancerContext.java        |  49 +++-
 .../rebalancer/context/FullAutoRebalancer.java  |  41 ++--
 .../context/PartitionedRebalancerContext.java   |  13 ++
 .../rebalancer/context/RebalancerConfig.java    |   9 +
 .../rebalancer/context/SemiAutoRebalancer.java  |  13 +-
 .../context/SemiAutoRebalancerContext.java      |  59 +++++
 .../util/NewConstraintBasedAssignment.java      |  50 ++--
 .../stages/NewBestPossibleStateCalcStage.java   |   9 +-
 .../strategy/AutoRebalanceStrategy.java         |  56 ++++-
 .../org/apache/helix/tools/NewClusterSetup.java | 164 ++++++++++++-
 .../strategy/TestAutoRebalanceStrategy.java     |  37 +--
 .../strategy/TestNewAutoRebalanceStrategy.java  |   9 +-
 18 files changed, 1146 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e23a3088/helix-core/src/main/java/org/apache/helix/api/Cluster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Cluster.java b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
index 04c87d4..fdeb879 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Cluster.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
@@ -32,8 +32,10 @@ import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.SpectatorId;
 import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.model.Alerts;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
+import org.apache.helix.model.PersistentStats;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.Transition;
 
@@ -84,6 +86,8 @@ public class Cluster {
    * @param leaderId
    * @param constraintMap
    * @param stateModelMap
+   * @param stats
+   * @param alerts
    * @param userConfig
    * @param isPaused
    * @param autoJoinAllowed
@@ -91,8 +95,8 @@ public class Cluster {
   public Cluster(ClusterId id, Map<ResourceId, Resource> resourceMap,
       Map<ParticipantId, Participant> participantMap, Map<ControllerId, Controller> controllerMap,
       ControllerId leaderId, Map<ConstraintType, ClusterConstraints> constraintMap,
-      Map<StateModelDefId, StateModelDefinition> stateModelMap, UserConfig userConfig,
-      boolean isPaused, boolean autoJoinAllowed) {
+      Map<StateModelDefId, StateModelDefinition> stateModelMap, PersistentStats stats,
+      Alerts alerts, UserConfig userConfig, boolean isPaused, boolean autoJoinAllowed) {
 
     // build the config
     // Guava's transform and "copy" functions really return views so the maps all reflect each other
@@ -114,7 +118,8 @@ public class Cluster {
         new ClusterConfig.Builder(id).addResources(resourceConfigMap.values())
             .addParticipants(participantConfigMap.values()).addConstraints(constraintMap.values())
             .addStateModelDefinitions(stateModelMap.values()).pausedStatus(isPaused)
-            .userConfig(userConfig).autoJoin(autoJoinAllowed).build();
+            .userConfig(userConfig).autoJoin(autoJoinAllowed).addStats(stats).addAlerts(alerts)
+            .build();
 
     _resourceMap = ImmutableMap.copyOf(resourceMap);
 
@@ -219,6 +224,22 @@ public class Cluster {
   }
 
   /**
+   * Get all the persisted stats for the cluster
+   * @return PersistentStats instance
+   */
+  public PersistentStats getStats() {
+    return _config.getStats();
+  }
+
+  /**
+   * Get all the persisted alerts for the cluster
+   * @return Alerts instance
+   */
+  public Alerts getAlerts() {
+    return _config.getAlerts();
+  }
+
+  /**
    * Get user-specified configuration properties of this cluster
    * @return UserConfig properties
    */

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e23a3088/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index 5c7df85..8780115 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -25,10 +25,15 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.I0Itec.zkclient.DataUpdater;
 import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.alerts.AlertsHolder;
+import org.apache.helix.alerts.StatsHolder;
 import org.apache.helix.api.Cluster;
 import org.apache.helix.api.Controller;
 import org.apache.helix.api.Participant;
@@ -39,6 +44,7 @@ import org.apache.helix.api.config.ParticipantConfig;
 import org.apache.helix.api.config.ResourceConfig;
 import org.apache.helix.api.config.UserConfig;
 import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ConstraintId;
 import org.apache.helix.api.id.ControllerId;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
@@ -47,6 +53,7 @@ import org.apache.helix.api.id.SessionId;
 import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
 import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.model.Alerts;
 import org.apache.helix.model.ClusterConfiguration;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
@@ -57,6 +64,7 @@ import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.PauseSignal;
+import org.apache.helix.model.PersistentStats;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.ResourceConfiguration;
 import org.apache.helix.model.StateModelDefinition;
@@ -110,6 +118,9 @@ public class ClusterAccessor {
     if (cluster.autoJoinAllowed()) {
       clusterConfig.setAutoJoinAllowed(cluster.autoJoinAllowed());
     }
+    if (cluster.getStats() != null && !cluster.getStats().getMapFields().isEmpty()) {
+      _accessor.createProperty(_keyBuilder.persistantStat(), cluster.getStats());
+    }
     _accessor.createProperty(_keyBuilder.clusterConfig(), clusterConfig);
     if (cluster.isPaused()) {
       pauseCluster();
@@ -150,6 +161,16 @@ public class ClusterAccessor {
       ClusterConstraints constraint = constraints.get(type);
       _accessor.setProperty(_keyBuilder.constraint(type.toString()), constraint);
     }
+    if (config.getStats() == null || config.getStats().getMapFields().isEmpty()) {
+      _accessor.removeProperty(_keyBuilder.persistantStat());
+    } else {
+      _accessor.setProperty(_keyBuilder.persistantStat(), config.getStats());
+    }
+    if (config.getAlerts() == null || config.getAlerts().getMapFields().isEmpty()) {
+      _accessor.removeProperty(_keyBuilder.alerts());
+    } else {
+      _accessor.setProperty(_keyBuilder.alerts(), config.getAlerts());
+    }
     return true;
   }
 
@@ -228,9 +249,15 @@ public class ClusterAccessor {
     // read the state model definitions
     Map<StateModelDefId, StateModelDefinition> stateModelMap = readStateModelDefinitions();
 
+    // read the stats
+    PersistentStats stats = _accessor.getProperty(_keyBuilder.persistantStat());
+
+    // read the alerts
+    Alerts alerts = _accessor.getProperty(_keyBuilder.alerts());
+
     // create the cluster snapshot object
     return new Cluster(_clusterId, resourceMap, participantMap, controllerMap, leaderId,
-        clusterConstraintMap, stateModelMap, userConfig, isPaused, autoJoinAllowed);
+        clusterConstraintMap, stateModelMap, stats, alerts, userConfig, isPaused, autoJoinAllowed);
   }
 
   /**
@@ -351,6 +378,31 @@ public class ClusterAccessor {
   }
 
   /**
+   * Get cluster constraints of a given type
+   * @param type ConstraintType value
+   * @return ClusterConstraints, or null if none present
+   */
+  public ClusterConstraints readConstraints(ConstraintType type) {
+    return _accessor.getProperty(_keyBuilder.constraint(type.toString()));
+  }
+
+  /**
+   * Remove a constraint from the cluster
+   * @param type the constraint type
+   * @param constraintId the constraint id
+   * @return true if removed, false otherwise
+   */
+  public boolean removeConstraint(ConstraintType type, ConstraintId constraintId) {
+    ClusterConstraints constraints = _accessor.getProperty(_keyBuilder.constraint(type.toString()));
+    if (constraints == null || constraints.getConstraintItem(constraintId) == null) {
+      LOG.error("Constraint with id " + constraintId + " not present");
+      return false;
+    }
+    constraints.removeConstraintItem(constraintId);
+    return _accessor.setProperty(_keyBuilder.constraint(type.toString()), constraints);
+  }
+
+  /**
    * Read the user config of the cluster
    * @return UserConfig, or null
    */
@@ -378,6 +430,140 @@ public class ClusterAccessor {
   }
 
   /**
+   * Get the stats persisted on this cluster
+   * @return PersistentStats, or null if none persisted
+   */
+  public PersistentStats readStats() {
+    return _accessor.getProperty(_keyBuilder.persistantStat());
+  }
+
+  /**
+   * Add a statistic specification to the cluster. Existing stat specifications will not be
+   * overwritten
+   * @param statName string representing a stat specification
+   * @return true if the stat spec was added, false otherwise
+   */
+  public boolean addStat(final String statName) {
+    if (!isClusterStructureValid()) {
+      throw new HelixException("cluster " + _clusterId + " is not setup yet");
+    }
+
+    String persistentStatsPath = _keyBuilder.persistantStat().getPath();
+    BaseDataAccessor<ZNRecord> baseAccessor = _accessor.getBaseDataAccessor();
+    return baseAccessor.update(persistentStatsPath, new DataUpdater<ZNRecord>() {
+      @Override
+      public ZNRecord update(ZNRecord statsRec) {
+        if (statsRec == null) {
+          statsRec = new ZNRecord(PersistentStats.nodeName);
+        }
+        Map<String, Map<String, String>> currStatMap = statsRec.getMapFields();
+        Map<String, Map<String, String>> newStatMap = StatsHolder.parseStat(statName);
+        for (String newStat : newStatMap.keySet()) {
+          if (!currStatMap.containsKey(newStat)) {
+            currStatMap.put(newStat, newStatMap.get(newStat));
+          }
+        }
+        statsRec.setMapFields(currStatMap);
+        return statsRec;
+      }
+    }, AccessOption.PERSISTENT);
+  }
+
+  /**
+   * Remove a statistic specification from the cluster
+   * @param statName string representing a statistic specification
+   * @return true if stats removed, false otherwise
+   */
+  public boolean dropStat(final String statName) {
+    if (!isClusterStructureValid()) {
+      throw new HelixException("cluster " + _clusterId + " is not setup yet");
+    }
+
+    String persistentStatsPath = _keyBuilder.persistantStat().getPath();
+    BaseDataAccessor<ZNRecord> baseAccessor = _accessor.getBaseDataAccessor();
+    return baseAccessor.update(persistentStatsPath, new DataUpdater<ZNRecord>() {
+      @Override
+      public ZNRecord update(ZNRecord statsRec) {
+        if (statsRec == null) {
+          throw new HelixException("No stats record in ZK, nothing to drop");
+        }
+        Map<String, Map<String, String>> currStatMap = statsRec.getMapFields();
+        Map<String, Map<String, String>> newStatMap = StatsHolder.parseStat(statName);
+        // delete each stat from stat map
+        for (String newStat : newStatMap.keySet()) {
+          if (currStatMap.containsKey(newStat)) {
+            currStatMap.remove(newStat);
+          }
+        }
+        statsRec.setMapFields(currStatMap);
+        return statsRec;
+      }
+    }, AccessOption.PERSISTENT);
+  }
+
+  /**
+   * Add an alert specification to the cluster
+   * @param alertName string representing the alert spec
+   * @return true if added, false otherwise
+   */
+  public boolean addAlert(final String alertName) {
+    if (!isClusterStructureValid()) {
+      throw new HelixException("cluster " + _clusterId + " is not setup yet");
+    }
+
+    BaseDataAccessor<ZNRecord> baseAccessor = _accessor.getBaseDataAccessor();
+    String alertsPath = _keyBuilder.alerts().getPath();
+    return baseAccessor.update(alertsPath, new DataUpdater<ZNRecord>() {
+      @Override
+      public ZNRecord update(ZNRecord alertsRec) {
+        if (alertsRec == null) {
+          alertsRec = new ZNRecord(Alerts.nodeName);
+        }
+        Map<String, Map<String, String>> currAlertMap = alertsRec.getMapFields();
+        StringBuilder newStatName = new StringBuilder();
+        Map<String, String> newAlertMap = new HashMap<String, String>();
+
+        // use AlertsHolder to get map of new stats and map for this alert
+        AlertsHolder.parseAlert(alertName, newStatName, newAlertMap);
+
+        // add stat
+        addStat(newStatName.toString());
+
+        // add alert
+        currAlertMap.put(alertName, newAlertMap);
+        alertsRec.setMapFields(currAlertMap);
+        return alertsRec;
+      }
+    }, AccessOption.PERSISTENT);
+  }
+
+  /**
+   * Remove an alert specification from the cluster
+   * @param alertName string representing an alert specification
+   * @return true if removed, false otherwise
+   */
+  public boolean dropAlert(final String alertName) {
+    if (!isClusterStructureValid()) {
+      throw new HelixException("cluster " + _clusterId + " is not setup yet");
+    }
+
+    String alertsPath = _keyBuilder.alerts().getPath();
+    BaseDataAccessor<ZNRecord> baseAccessor = _accessor.getBaseDataAccessor();
+    return baseAccessor.update(alertsPath, new DataUpdater<ZNRecord>() {
+      @Override
+      public ZNRecord update(ZNRecord alertsRec) {
+        if (alertsRec == null) {
+          throw new HelixException("No alerts record persisted, nothing to drop");
+        }
+        Map<String, Map<String, String>> currAlertMap = alertsRec.getMapFields();
+        currAlertMap.remove(alertName);
+        alertsRec.setMapFields(currAlertMap);
+        return alertsRec;
+      }
+    }, AccessOption.PERSISTENT);
+  }
+
+  /**
    * Add user configuration to the existing cluster user configuration. Overwrites properties with
    * the same key
    * @param userConfig the user config key-value pairs to add
@@ -600,24 +786,8 @@ public class ClusterAccessor {
    * @return true if participant dropped, false if there was an error
    */
   public boolean dropParticipantFromCluster(ParticipantId participantId) {
-    if (_accessor.getProperty(_keyBuilder.instanceConfig(participantId.stringify())) == null) {
-      LOG.error("Config for participant: " + participantId + " does NOT exist in cluster: "
-          + _clusterId);
-      return false;
-    }
-
-    if (_accessor.getProperty(_keyBuilder.instance(participantId.stringify())) == null) {
-      LOG.error("Participant: " + participantId + " structure does NOT exist in cluster: "
-          + _clusterId);
-      return false;
-    }
-
-    // delete participant config path
-    _accessor.removeProperty(_keyBuilder.instanceConfig(participantId.stringify()));
-
-    // delete participant path
-    _accessor.removeProperty(_keyBuilder.instance(participantId.stringify()));
-    return true;
+    ParticipantAccessor accessor = new ParticipantAccessor(_accessor);
+    return accessor.dropParticipant(participantId);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e23a3088/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
index e139c2e..c53bcd8 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
@@ -19,6 +19,8 @@ package org.apache.helix.api.accessor;
  * under the License.
  */
 
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -26,17 +28,21 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 
 import org.I0Itec.zkclient.DataUpdater;
 import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixException;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.api.Participant;
+import org.apache.helix.api.Resource;
 import org.apache.helix.api.RunningInstance;
 import org.apache.helix.api.Scope;
+import org.apache.helix.api.State;
 import org.apache.helix.api.config.ParticipantConfig;
 import org.apache.helix.api.config.UserConfig;
 import org.apache.helix.api.id.MessageId;
@@ -44,15 +50,27 @@ import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.SessionId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
 import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.InstanceConfig.InstanceConfigProperty;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageState;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
 
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
 public class ParticipantAccessor {
   private static final Logger LOG = Logger.getLogger(ParticipantAccessor.class);
 
@@ -248,14 +266,131 @@ public class ParticipantAccessor {
   }
 
   /**
+   * Reset partitions assigned to a set of participants
+   * @param resetParticipantIdSet the participants to reset
+   * @return true if reset, false otherwise
+   */
+  public boolean resetParticipants(Set<ParticipantId> resetParticipantIdSet) {
+    List<ExternalView> extViews = _accessor.getChildValues(_keyBuilder.externalViews());
+    for (ParticipantId participantId : resetParticipantIdSet) {
+      for (ExternalView extView : extViews) {
+        Set<PartitionId> resetPartitionIdSet = Sets.newHashSet();
+        for (PartitionId partitionId : extView.getPartitionSet()) {
+          Map<ParticipantId, State> stateMap = extView.getStateMap(partitionId);
+          if (stateMap.containsKey(participantId)
+              && stateMap.get(participantId).equals(State.from(HelixDefinedState.ERROR))) {
+            resetPartitionIdSet.add(partitionId);
+          }
+        }
+        resetPartitionsForParticipant(participantId, extView.getResourceId(), resetPartitionIdSet);
+      }
+    }
+    return true;
+  }
+
+  /**
    * reset partitions on a participant
    * @param participantId
    * @param resourceId
    * @param resetPartitionIdSet
+   * @return true if partitions reset, false otherwise
    */
-  public void resetPartitionsForParticipant(ParticipantId participantId, ResourceId resourceId,
+  public boolean resetPartitionsForParticipant(ParticipantId participantId, ResourceId resourceId,
       Set<PartitionId> resetPartitionIdSet) {
-    // TODO impl this
+    // make sure the participant is running
+    Participant participant = readParticipant(participantId);
+    if (!participant.isAlive()) {
+      LOG.error("Cannot reset partitions because the participant is not running");
+      return false;
+    }
+    RunningInstance runningInstance = participant.getRunningInstance();
+
+    // check that the resource exists
+    ResourceAccessor resourceAccessor = new ResourceAccessor(_accessor);
+    Resource resource = resourceAccessor.readResource(resourceId);
+    if (resource == null || resource.getRebalancerConfig() == null) {
+      LOG.error("Cannot reset partitions because the resource is not present");
+      return false;
+    }
+
+    // need the rebalancer context for the resource
+    RebalancerContext context =
+        resource.getRebalancerConfig().getRebalancerContext(RebalancerContext.class);
+    if (context == null) {
+      LOG.error("Rebalancer context for resource does not exist");
+      return false;
+    }
+
+    // ensure that all partitions to reset exist
+    Set<PartitionId> partitionSet = ImmutableSet.copyOf(context.getSubUnitIdSet());
+    if (!partitionSet.containsAll(resetPartitionIdSet)) {
+      LOG.error("Not all of the specified partitions to reset exist for the resource");
+      return false;
+    }
+
+    // check for a valid current state that has all specified partitions in ERROR state
+    CurrentState currentState = participant.getCurrentStateMap().get(resourceId);
+    if (currentState == null) {
+      LOG.error("The participant does not have a current state for the resource");
+      return false;
+    }
+    for (PartitionId partitionId : resetPartitionIdSet) {
+      if (!currentState.getState(partitionId).equals(State.from(HelixDefinedState.ERROR))) {
+        LOG.error("Partition " + partitionId + " is not in error state, aborting reset");
+        return false;
+      }
+    }
+
+    // make sure that there are no pending transition messages
+    for (Message message : participant.getMessageMap().values()) {
+      if (!MessageType.STATE_TRANSITION.toString().equalsIgnoreCase(message.getMsgType())
+          || !runningInstance.getSessionId().equals(message.getTgtSessionId())
+          || !resourceId.equals(message.getResourceId())
+          || !resetPartitionIdSet.contains(message.getPartitionId())) {
+        continue;
+      }
+      LOG.error("Cannot reset partitions because of the following pending message: " + message);
+      return false;
+    }
+
+    // set up the source id
+    String adminName = null;
+    try {
+      adminName = InetAddress.getLocalHost().getCanonicalHostName() + "-ADMIN";
+    } catch (UnknownHostException e) {
+      // can ignore it
+      if (LOG.isInfoEnabled()) {
+        LOG.info("Unable to get host name. Will set it to UNKNOWN, mostly ignorable", e);
+      }
+      adminName = "UNKNOWN";
+    }
+
+    // build messages to signal the transition
+    StateModelDefId stateModelDefId = context.getStateModelDefId();
+    StateModelDefinition stateModelDef =
+        _accessor.getProperty(_keyBuilder.stateModelDef(stateModelDefId.stringify()));
+    Map<MessageId, Message> messageMap = Maps.newHashMap();
+    for (PartitionId partitionId : resetPartitionIdSet) {
+      // send ERROR to initialState message
+      MessageId msgId = MessageId.from(UUID.randomUUID().toString());
+      Message message = new Message(MessageType.STATE_TRANSITION, msgId);
+      message.setSrcName(adminName);
+      message.setTgtName(participantId.stringify());
+      message.setMsgState(MessageState.NEW);
+      message.setPartitionId(partitionId);
+      message.setResourceId(resourceId);
+      message.setTgtSessionId(runningInstance.getSessionId());
+      message.setStateModelDef(stateModelDefId);
+      message.setFromState(State.from(HelixDefinedState.ERROR.toString()));
+      message.setToState(stateModelDef.getInitialState());
+      message.setStateModelFactoryId(context.getStateModelFactoryId());
+
+      messageMap.put(message.getMsgId(), message);
+    }
+
+    // send the messages
+    insertMessagesToParticipant(participantId, messageMap);
+    return true;
   }
 
   /**
@@ -474,4 +609,99 @@ public class ParticipantAccessor {
     _accessor.removeProperty(_keyBuilder.currentState(participantId.stringify(),
         sessionId.stringify(), resourceId.stringify()));
   }
+
+  /**
+   * drop a participant from cluster
+   * @param participantId
+   * @return true if participant dropped, false if there was an error
+   */
+  boolean dropParticipant(ParticipantId participantId) {
+    if (_accessor.getProperty(_keyBuilder.instanceConfig(participantId.stringify())) == null) {
+      LOG.error("Config for participant: " + participantId + " does NOT exist in cluster");
+      return false;
+    }
+
+    if (_accessor.getProperty(_keyBuilder.instance(participantId.stringify())) == null) {
+      LOG.error("Participant: " + participantId + " structure does NOT exist in cluster");
+      return false;
+    }
+
+    // delete participant config path
+    _accessor.removeProperty(_keyBuilder.instanceConfig(participantId.stringify()));
+
+    // delete participant path
+    _accessor.removeProperty(_keyBuilder.instance(participantId.stringify()));
+    return true;
+  }
+
+  /**
+   * Let a new participant take the place of an existing participant
+   * @param oldParticipantId the participant to drop
+   * @param newParticipantId the participant that takes its place
+   * @return true if swap successful, false otherwise
+   */
+  public boolean swapParticipants(ParticipantId oldParticipantId, ParticipantId newParticipantId) {
+    Participant oldParticipant = readParticipant(oldParticipantId);
+    if (oldParticipant == null) {
+      LOG.error("Could not swap participants because the old participant does not exist");
+      return false;
+    }
+    if (oldParticipant.isEnabled()) {
+      LOG.error("Could not swap participants because the old participant is still enabled");
+      return false;
+    }
+    if (oldParticipant.isAlive()) {
+      LOG.error("Could not swap participants because the old participant is still live");
+      return false;
+    }
+    Participant newParticipant = readParticipant(newParticipantId);
+    if (newParticipant == null) {
+      LOG.error("Could not swap participants because the new participant does not exist");
+      return false;
+    }
+    dropParticipant(oldParticipantId);
+    ResourceAccessor resourceAccessor = new ResourceAccessor(_accessor);
+    Map<String, IdealState> idealStateMap = _accessor.getChildValuesMap(_keyBuilder.idealStates());
+    for (String resourceName : idealStateMap.keySet()) {
+      IdealState idealState = idealStateMap.get(resourceName);
+      swapParticipantsInIdealState(idealState, oldParticipantId, newParticipantId);
+      PartitionedRebalancerContext context = PartitionedRebalancerContext.from(idealState);
+      resourceAccessor.setRebalancerContext(ResourceId.from(resourceName), context);
+      _accessor.setProperty(_keyBuilder.idealState(resourceName), idealState);
+    }
+    return true;
+  }
+
+  /**
+   * Replace occurrences of participants in preference lists and maps
+   * @param idealState the current ideal state
+   * @param oldParticipantId the participant to drop
+   * @param newParticipantId the participant that replaces it
+   */
+  private void swapParticipantsInIdealState(IdealState idealState, ParticipantId oldParticipantId,
+      ParticipantId newParticipantId) {
+    for (PartitionId partitionId : idealState.getPartitionSet()) {
+      List<ParticipantId> oldPreferenceList = idealState.getPreferenceList(partitionId);
+      if (oldPreferenceList != null) {
+        List<ParticipantId> newPreferenceList = Lists.newArrayList();
+        for (ParticipantId participantId : oldPreferenceList) {
+          if (participantId.equals(oldParticipantId)) {
+            newPreferenceList.add(newParticipantId);
+          } else if (!participantId.equals(newParticipantId)) {
+            newPreferenceList.add(participantId);
+          }
+        }
+        idealState.setPreferenceList(partitionId, newPreferenceList);
+      }
+      Map<ParticipantId, State> preferenceMap = idealState.getParticipantStateMap(partitionId);
+      if (preferenceMap != null) {
+        if (preferenceMap.containsKey(oldParticipantId)) {
+          State state = preferenceMap.get(oldParticipantId);
+          preferenceMap.remove(oldParticipantId);
+          preferenceMap.put(newParticipantId, state);
+        }
+        idealState.setParticipantStateMap(partitionId, preferenceMap);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e23a3088/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
index e5c9443..c65cb44 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
@@ -20,11 +20,14 @@ package org.apache.helix.api.accessor;
  */
 
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.helix.HelixConstants.StateModelToken;
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixDefinedState;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.api.Resource;
 import org.apache.helix.api.Scope;
@@ -43,10 +46,15 @@ import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.ResourceConfiguration;
+import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
 
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
 public class ResourceAccessor {
   private static final Logger LOG = Logger.getLogger(ResourceAccessor.class);
   private final HelixDataAccessor _accessor;
@@ -144,6 +152,19 @@ public class ResourceAccessor {
     RebalancerConfig config = new RebalancerConfig(context);
     ResourceConfiguration resourceConfig = new ResourceConfiguration(resourceId);
     resourceConfig.addNamespacedConfig(config.toNamespacedConfig());
+
+    // update the ideal state if applicable
+    IdealState oldIdealState =
+        _accessor.getProperty(_keyBuilder.idealState(resourceId.stringify()));
+    if (oldIdealState != null) {
+      IdealState idealState =
+          rebalancerConfigToIdealState(config, oldIdealState.getBucketSize(),
+              oldIdealState.getBatchMessageMode());
+      if (idealState != null) {
+        _accessor.setProperty(_keyBuilder.idealState(resourceId.stringify()), idealState);
+      }
+    }
+
     return _accessor.updateProperty(_keyBuilder.resourceConfig(resourceId.stringify()),
         resourceConfig);
   }
@@ -252,6 +273,76 @@ public class ResourceAccessor {
   }
 
   /**
+   * reset resources for all participants
+   * @param resetResourceIdSet the resources to reset
+   * @return true if they were reset, false otherwise
+   */
+  public boolean resetResources(Set<ResourceId> resetResourceIdSet) {
+    ParticipantAccessor accessor = new ParticipantAccessor(_accessor);
+    List<ExternalView> extViews = _accessor.getChildValues(_keyBuilder.externalViews());
+    for (ExternalView extView : extViews) {
+      if (!resetResourceIdSet.contains(extView.getResourceId())) {
+        continue;
+      }
+
+      Map<ParticipantId, Set<PartitionId>> resetPartitionIds = Maps.newHashMap();
+      for (PartitionId partitionId : extView.getPartitionSet()) {
+        Map<ParticipantId, State> stateMap = extView.getStateMap(partitionId);
+        for (ParticipantId participantId : stateMap.keySet()) {
+          State state = stateMap.get(participantId);
+          if (state.equals(State.from(HelixDefinedState.ERROR))) {
+            if (!resetPartitionIds.containsKey(participantId)) {
+              resetPartitionIds.put(participantId, new HashSet<PartitionId>());
+            }
+            resetPartitionIds.get(participantId).add(partitionId);
+          }
+        }
+      }
+      for (ParticipantId participantId : resetPartitionIds.keySet()) {
+        accessor.resetPartitionsForParticipant(participantId, extView.getResourceId(),
+            resetPartitionIds.get(participantId));
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Generate a default assignment for partitioned resources
+   * @param resourceId the resource to update
+   * @param replicaCount the new replica count (or -1 to use the existing one)
+   * @param participantGroupTag the new participant group tag (or null to use the existing one)
+   * @return true if assignment successful, false otherwise
+   */
+  public boolean generateDefaultAssignment(ResourceId resourceId, int replicaCount,
+      String participantGroupTag) {
+    Resource resource = readResource(resourceId);
+    RebalancerConfig config = resource.getRebalancerConfig();
+    PartitionedRebalancerContext context =
+        config.getRebalancerContext(PartitionedRebalancerContext.class);
+    if (context == null) {
+      LOG.error("Only partitioned resource types are supported");
+      return false;
+    }
+    if (replicaCount != -1) {
+      context.setReplicaCount(replicaCount);
+    }
+    if (participantGroupTag != null) {
+      context.setParticipantGroupTag(participantGroupTag);
+    }
+    StateModelDefinition stateModelDef =
+        _accessor.getProperty(_keyBuilder.stateModelDef(context.getStateModelDefId().stringify()));
+    List<InstanceConfig> participantConfigs =
+        _accessor.getChildValues(_keyBuilder.instanceConfigs());
+    Set<ParticipantId> participantSet = Sets.newHashSet();
+    for (InstanceConfig participantConfig : participantConfigs) {
+      participantSet.add(participantConfig.getParticipantId());
+    }
+    context.generateDefaultConfiguration(stateModelDef, participantSet);
+    setRebalancerContext(resourceId, context);
+    return true;
+  }
+
+  /**
    * Get an ideal state from a rebalancer config if the resource is partitioned
    * @param config RebalancerConfig instance
    * @param bucketSize bucket size to use

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e23a3088/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
index 1cc09e3..ed9750a 100644
--- a/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
@@ -5,6 +5,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.helix.alerts.AlertsHolder;
+import org.apache.helix.alerts.StatsHolder;
 import org.apache.helix.api.Scope;
 import org.apache.helix.api.State;
 import org.apache.helix.api.id.ClusterId;
@@ -12,12 +14,14 @@ import org.apache.helix.api.id.ConstraintId;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.model.Alerts;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
 import org.apache.helix.model.ClusterConstraints.ConstraintValue;
 import org.apache.helix.model.ConstraintItem;
 import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.PersistentStats;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.Transition;
 import org.apache.helix.model.builder.ConstraintItemBuilder;
@@ -57,6 +61,8 @@ public class ClusterConfig {
   private final Map<ParticipantId, ParticipantConfig> _participantMap;
   private final Map<ConstraintType, ClusterConstraints> _constraintMap;
   private final Map<StateModelDefId, StateModelDefinition> _stateModelMap;
+  private final PersistentStats _stats;
+  private final Alerts _alerts;
   private final UserConfig _userConfig;
   private final boolean _isPaused;
   private final boolean _autoJoin;
@@ -68,6 +74,8 @@ public class ClusterConfig {
    * @param participantMap map of participant id to participant config
    * @param constraintMap map of constraint type to all constraints of that type
    * @param stateModelMap map of state model id to state model definition
+   * @param stats statistics to watch on the cluster
+   * @param alerts alerts that the cluster can trigger
    * @param userConfig user-defined cluster properties
    * @param isPaused true if paused, false if active
    * @param allowAutoJoin true if participants can join automatically, false otherwise
@@ -75,13 +83,15 @@ public class ClusterConfig {
   private ClusterConfig(ClusterId id, Map<ResourceId, ResourceConfig> resourceMap,
       Map<ParticipantId, ParticipantConfig> participantMap,
       Map<ConstraintType, ClusterConstraints> constraintMap,
-      Map<StateModelDefId, StateModelDefinition> stateModelMap, UserConfig userConfig,
-      boolean isPaused, boolean allowAutoJoin) {
+      Map<StateModelDefId, StateModelDefinition> stateModelMap, PersistentStats stats,
+      Alerts alerts, UserConfig userConfig, boolean isPaused, boolean allowAutoJoin) {
     _id = id;
     _resourceMap = ImmutableMap.copyOf(resourceMap);
     _participantMap = ImmutableMap.copyOf(participantMap);
     _constraintMap = ImmutableMap.copyOf(constraintMap);
     _stateModelMap = ImmutableMap.copyOf(stateModelMap);
+    _stats = stats;
+    _alerts = alerts;
     _userConfig = userConfig;
     _isPaused = isPaused;
     _autoJoin = allowAutoJoin;
@@ -227,6 +237,22 @@ public class ClusterConfig {
   }
 
   /**
+   * Get all the statistics persisted on the cluster
+   * @return PersistentStats instance
+   */
+  public PersistentStats getStats() {
+    return _stats;
+  }
+
+  /**
+   * Get all the alerts persisted on the cluster
+   * @return Alerts instance
+   */
+  public Alerts getAlerts() {
+    return _alerts;
+  }
+
+  /**
    * Get user-specified configuration properties of this cluster
    * @return UserConfig properties
    */
@@ -261,6 +287,8 @@ public class ClusterConfig {
 
     private Set<Fields> _updateFields;
     private Map<ConstraintType, Set<ConstraintId>> _removedConstraints;
+    private PersistentStats _removedStats;
+    private Alerts _removedAlerts;
     private Builder _builder;
 
     /**
@@ -274,6 +302,8 @@ public class ClusterConfig {
         Set<ConstraintId> constraints = Sets.newHashSet();
         _removedConstraints.put(type, constraints);
       }
+      _removedStats = new PersistentStats(PersistentStats.nodeName);
+      _removedAlerts = new Alerts(Alerts.nodeName);
       _builder = new Builder(clusterId);
     }
 
@@ -401,6 +431,57 @@ public class ClusterConfig {
     }
 
     /**
+     * Add a statistic specification to the cluster. Existing specifications will not be overwritten
+     * @param stat string specifying the stat specification
+     * @return Delta
+     */
+    public Delta addStat(String stat) {
+      _builder.addStat(stat);
+      return this;
+    }
+
+    /**
+     * Add an alert specification for the cluster. Existing specifications will not be overwritten
+     * @param alert string specifying the alert specification
+     * @return Delta
+     */
+    public Delta addAlert(String alert) {
+      _builder.addAlert(alert);
+      return this;
+    }
+
+    /**
+     * Remove a statistic specification from the cluster
+     * @param stat statistic specification
+     * @return Delta
+     */
+    public Delta removeStat(String stat) {
+      Map<String, Map<String, String>> parsedStat = StatsHolder.parseStat(stat);
+      Map<String, Map<String, String>> currentStats = _removedStats.getMapFields();
+      for (String statName : parsedStat.keySet()) {
+        currentStats.put(statName, parsedStat.get(statName));
+      }
+      return this;
+    }
+
+    /**
+     * Remove an alert specification for the cluster
+     * @param alert alert specification
+     * @return Delta
+     */
+    public Delta removeAlert(String alert) {
+      Map<String, Map<String, String>> currAlertMap = _removedAlerts.getMapFields();
+      if (!currAlertMap.containsKey(alert)) {
+        Map<String, String> parsedAlert = Maps.newHashMap();
+        StringBuilder statsName = new StringBuilder();
+        AlertsHolder.parseAlert(alert, statsName, parsedAlert);
+        removeStat(statsName.toString());
+        currAlertMap.put(alert, parsedAlert);
+      }
+      return this;
+    }
+
+    /**
      * Create a ClusterConfig that is the combination of an existing ClusterConfig and this delta
      * @param orig the original ClusterConfig
      * @return updated ClusterConfig
@@ -413,7 +494,8 @@ public class ClusterConfig {
               .addParticipants(orig.getParticipantMap().values())
               .addStateModelDefinitions(orig.getStateModelMap().values())
               .userConfig(orig.getUserConfig()).pausedStatus(orig.isPaused())
-              .autoJoin(orig.autoJoinAllowed());
+              .autoJoin(orig.autoJoinAllowed()).addStats(orig.getStats())
+              .addAlerts(orig.getAlerts());
       for (Fields field : _updateFields) {
         switch (field) {
         case USER_CONFIG:
@@ -446,7 +528,30 @@ public class ClusterConfig {
         }
         builder.addConstraint(constraints);
       }
-      return builder.build();
+
+      // add stats and alerts
+      builder.addStats(deltaConfig.getStats());
+      builder.addAlerts(deltaConfig.getAlerts());
+
+      // get the result
+      ClusterConfig result = builder.build();
+
+      // remove stats
+      PersistentStats stats = result.getStats();
+      for (String removedStat : _removedStats.getMapFields().keySet()) {
+        if (stats.getMapFields().containsKey(removedStat)) {
+          stats.getMapFields().remove(removedStat);
+        }
+      }
+
+      // remove alerts
+      Alerts alerts = result.getAlerts();
+      for (String removedAlert : _removedAlerts.getMapFields().keySet()) {
+        if (alerts.getMapFields().containsKey(removedAlert)) {
+          alerts.getMapFields().remove(removedAlert);
+        }
+      }
+      return result;
     }
   }
 
@@ -460,6 +565,8 @@ public class ClusterConfig {
     private final Map<ConstraintType, ClusterConstraints> _constraintMap;
     private final Map<StateModelDefId, StateModelDefinition> _stateModelMap;
     private UserConfig _userConfig;
+    private PersistentStats _stats;
+    private Alerts _alerts;
     private boolean _isPaused;
     private boolean _autoJoin;
 
@@ -476,6 +583,8 @@ public class ClusterConfig {
       _isPaused = false;
       _autoJoin = false;
       _userConfig = new UserConfig(Scope.cluster(id));
+      _stats = new PersistentStats(PersistentStats.nodeName);
+      _alerts = new Alerts(Alerts.nodeName);
     }
 
     /**
@@ -680,6 +789,74 @@ public class ClusterConfig {
     }
 
     /**
+     * Add a statistic specification to the cluster. Existing specifications will not be overwritten
+     * @param stat String specifying the stat specification
+     * @return Builder
+     */
+    public Builder addStat(String stat) {
+      Map<String, Map<String, String>> parsedStat = StatsHolder.parseStat(stat);
+      Map<String, Map<String, String>> currentStats = _stats.getMapFields();
+      for (String statName : parsedStat.keySet()) {
+        if (!currentStats.containsKey(statName)) {
+          currentStats.put(statName, parsedStat.get(statName));
+        }
+      }
+      return this;
+    }
+
+    /**
+     * Add statistic specifications to the cluster. Existing specifications will not be overwritten
+     * @param stats PersistentStats specifying the stat specification
+     * @return Builder
+     */
+    public Builder addStats(PersistentStats stats) {
+      if (stats == null) {
+        return this;
+      }
+      Map<String, Map<String, String>> parsedStat = stats.getMapFields();
+      Map<String, Map<String, String>> currentStats = _stats.getMapFields();
+      for (String statName : parsedStat.keySet()) {
+        if (!currentStats.containsKey(statName)) {
+          currentStats.put(statName, parsedStat.get(statName));
+        }
+      }
+      return this;
+    }
+
+    /**
+     * Add alert specifications to the cluster. Existing specifications will not be overwritten
+     * @param alert string representing alert specifications
+     * @return Builder
+     */
+    public Builder addAlert(String alert) {
+      Map<String, Map<String, String>> currAlertMap = _alerts.getMapFields();
+      if (!currAlertMap.containsKey(alert)) {
+        Map<String, String> parsedAlert = Maps.newHashMap();
+        StringBuilder statsName = new StringBuilder();
+        AlertsHolder.parseAlert(alert, statsName, parsedAlert);
+        addStat(statsName.toString());
+        currAlertMap.put(alert, parsedAlert);
+      }
+      return this;
+    }
+
+    /**
+     * Add alert specifications to the cluster. Existing specifications will not be overwritten
+     * @param alerts Alerts instance
+     * @return Builder
+     */
+    public Builder addAlerts(Alerts alerts) {
+      if (alerts == null) {
+        return this;
+      }
+      Map<String, Map<String, String>> alertMap = alerts.getMapFields();
+      for (String alert : alertMap.keySet()) {
+        addAlert(alert);
+      }
+      return this;
+    }
+
+    /**
      * Set the paused status of the cluster
      * @param isPaused true if paused, false otherwise
      * @return Builder
@@ -715,7 +892,7 @@ public class ClusterConfig {
      */
     public ClusterConfig build() {
       return new ClusterConfig(_id, _resourceMap, _participantMap, _constraintMap, _stateModelMap,
-          _userConfig, _isPaused, _autoJoin);
+          _stats, _alerts, _userConfig, _isPaused, _autoJoin);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e23a3088/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancer.java
index fb0c512..d245fae 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancer.java
@@ -7,7 +7,6 @@ import java.util.Set;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixManager;
 import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Participant;
 import org.apache.helix.api.State;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
@@ -46,8 +45,8 @@ public class CustomRebalancer implements Rebalancer {
   }
 
   @Override
-  public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig, Cluster cluster,
-      ResourceCurrentState currentState) {
+  public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
+      Cluster cluster, ResourceCurrentState currentState) {
     CustomRebalancerContext config =
         rebalancerConfig.getRebalancerContext(CustomRebalancerContext.class);
     StateModelDefinition stateModelDef =
@@ -63,8 +62,9 @@ public class CustomRebalancer implements Rebalancer {
           NewConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
               partition);
       Map<ParticipantId, State> bestStateForPartition =
-          computeCustomizedBestStateForPartition(cluster.getLiveParticipantMap(), stateModelDef,
-              config.getPreferenceMap(partition), currentStateMap, disabledInstancesForPartition);
+          computeCustomizedBestStateForPartition(cluster.getLiveParticipantMap().keySet(),
+              stateModelDef, config.getPreferenceMap(partition), currentStateMap,
+              disabledInstancesForPartition);
       partitionMapping.addReplicaMap(partition, bestStateForPartition);
     }
     return partitionMapping;
@@ -74,14 +74,14 @@ public class CustomRebalancer implements Rebalancer {
    * compute best state for resource in CUSTOMIZED rebalancer mode
    * @param liveParticipantMap
    * @param stateModelDef
-   * @param idealStateMap
+   * @param preferenceMap
    * @param currentStateMap
    * @param disabledParticipantsForPartition
    * @return
    */
   private Map<ParticipantId, State> computeCustomizedBestStateForPartition(
-      Map<ParticipantId, Participant> liveParticipantMap, StateModelDefinition stateModelDef,
-      Map<ParticipantId, State> idealStateMap, Map<ParticipantId, State> currentStateMap,
+      Set<ParticipantId> liveParticipantSet, StateModelDefinition stateModelDef,
+      Map<ParticipantId, State> preferenceMap, Map<ParticipantId, State> currentStateMap,
       Set<ParticipantId> disabledParticipantsForPartition) {
     Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
 
@@ -89,7 +89,7 @@ public class CustomRebalancer implements Rebalancer {
     // we should drop all resources.
     if (currentStateMap != null) {
       for (ParticipantId participantId : currentStateMap.keySet()) {
-        if ((idealStateMap == null || !idealStateMap.containsKey(participantId))
+        if ((preferenceMap == null || !preferenceMap.containsKey(participantId))
             && !disabledParticipantsForPartition.contains(participantId)) {
           // if dropped and not disabled, transit to DROPPED
           participantStateMap.put(participantId, State.from(HelixDefinedState.DROPPED));
@@ -103,18 +103,18 @@ public class CustomRebalancer implements Rebalancer {
     }
 
     // ideal state is deleted
-    if (idealStateMap == null) {
+    if (preferenceMap == null) {
       return participantStateMap;
     }
 
-    for (ParticipantId participantId : idealStateMap.keySet()) {
+    for (ParticipantId participantId : preferenceMap.keySet()) {
       boolean notInErrorState =
           currentStateMap == null || currentStateMap.get(participantId) == null
               || !currentStateMap.get(participantId).equals(State.from(HelixDefinedState.ERROR));
 
-      if (liveParticipantMap.containsKey(participantId) && notInErrorState
+      if (liveParticipantSet.contains(participantId) && notInErrorState
           && !disabledParticipantsForPartition.contains(participantId)) {
-        participantStateMap.put(participantId, idealStateMap.get(participantId));
+        participantStateMap.put(participantId, preferenceMap.get(participantId));
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e23a3088/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java
index 3ccce3d..904907e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java
@@ -1,14 +1,25 @@
 package org.apache.helix.controller.rebalancer.context;
 
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.helix.api.State;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
 import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
 import org.codehaus.jackson.annotate.JsonIgnore;
-import org.testng.collections.Maps;
+
+import com.google.common.collect.Maps;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -72,6 +83,42 @@ public class CustomRebalancerContext extends PartitionedRebalancerContext {
   }
 
   /**
+   * Generate preference maps based on a default cluster setup
+   * @param stateModelDef the state model definition to follow
+   * @param participantSet the set of participant ids to configure for
+   */
+  @Override
+  @JsonIgnore
+  public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
+      Set<ParticipantId> participantSet) {
+    // compute default upper bounds
+    Map<State, String> upperBounds = Maps.newHashMap();
+    for (State state : stateModelDef.getStatesPriorityList()) {
+      upperBounds.put(state, stateModelDef.getNumParticipantsPerState(state));
+    }
+
+    // determine the current mapping
+    Map<PartitionId, Map<ParticipantId, State>> currentMapping = getPreferenceMaps();
+
+    // determine the preference maps
+    LinkedHashMap<State, Integer> stateCounts =
+        NewConstraintBasedAssignment.stateCount(upperBounds, stateModelDef, participantSet.size(),
+            getReplicaCount());
+    ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
+    List<ParticipantId> participantList = new ArrayList<ParticipantId>(participantSet);
+    List<PartitionId> partitionList = new ArrayList<PartitionId>(getPartitionSet());
+    AutoRebalanceStrategy strategy =
+        new AutoRebalanceStrategy(ResourceId.from(""), partitionList, stateCounts,
+            getMaxPartitionsPerParticipant(), placementScheme);
+    Map<String, Map<String, String>> rawPreferenceMaps =
+        strategy.typedComputePartitionAssignment(participantList, currentMapping, participantList)
+            .getMapFields();
+    Map<PartitionId, Map<ParticipantId, State>> preferenceMaps =
+        Maps.newHashMap(ResourceAssignment.replicaMapsFromStringMaps(rawPreferenceMaps));
+    setPreferenceMaps(preferenceMaps);
+  }
+
+  /**
    * Build a CustomRebalancerContext. By default, it corresponds to {@link CustomRebalancer}
    */
   public static final class Builder extends PartitionedRebalancerContext.AbstractBuilder<Builder> {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e23a3088/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancer.java
index 189df64..521af5c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancer.java
@@ -26,7 +26,6 @@ import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
 
 import com.google.common.base.Function;
-import com.google.common.base.Functions;
 import com.google.common.collect.Lists;
 
 /*
@@ -60,15 +59,14 @@ public class FullAutoRebalancer implements Rebalancer {
   }
 
   @Override
-  public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig, Cluster cluster,
-      ResourceCurrentState currentState) {
+  public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
+      Cluster cluster, ResourceCurrentState currentState) {
     FullAutoRebalancerContext config =
         rebalancerConfig.getRebalancerContext(FullAutoRebalancerContext.class);
     StateModelDefinition stateModelDef =
         cluster.getStateModelMap().get(config.getStateModelDefId());
     // Compute a preference list based on the current ideal state
     List<PartitionId> partitions = new ArrayList<PartitionId>(config.getPartitionSet());
-    List<String> partitionNames = Lists.transform(partitions, Functions.toStringFunction());
     Map<ParticipantId, Participant> liveParticipants = cluster.getLiveParticipantMap();
     Map<ParticipantId, Participant> allParticipants = cluster.getParticipantMap();
     int replicas = -1;
@@ -79,31 +77,29 @@ public class FullAutoRebalancer implements Rebalancer {
     }
 
     // count how many replicas should be in each state
+    Map<State, String> upperBounds =
+        NewConstraintBasedAssignment.stateConstraints(stateModelDef, config.getResourceId(),
+            cluster.getConfig());
     LinkedHashMap<State, Integer> stateCountMap =
-        NewConstraintBasedAssignment.stateCount(cluster.getConfig(), config.getResourceId(),
-            stateModelDef, liveParticipants.size(), replicas);
-    LinkedHashMap<String, Integer> rawStateCountMap = new LinkedHashMap<String, Integer>();
-    for (State state : stateCountMap.keySet()) {
-      rawStateCountMap.put(state.toString(), stateCountMap.get(state));
-    }
+        NewConstraintBasedAssignment.stateCount(upperBounds, stateModelDef,
+            liveParticipants.size(), replicas);
 
     // get the participant lists
     List<ParticipantId> liveParticipantList =
         new ArrayList<ParticipantId>(liveParticipants.keySet());
     List<ParticipantId> allParticipantList =
         new ArrayList<ParticipantId>(cluster.getParticipantMap().keySet());
-    List<String> liveNodes = Lists.transform(liveParticipantList, Functions.toStringFunction());
 
     // compute the current mapping from the current state
     Map<PartitionId, Map<ParticipantId, State>> currentMapping =
         currentMapping(config, currentState, stateCountMap);
 
     // If there are nodes tagged with resource, use only those nodes
-    Set<String> taggedNodes = new HashSet<String>();
+    Set<ParticipantId> taggedNodes = new HashSet<ParticipantId>();
     if (config.getParticipantGroupTag() != null) {
       for (ParticipantId participantId : liveParticipantList) {
         if (liveParticipants.get(participantId).hasTag(config.getParticipantGroupTag())) {
-          taggedNodes.add(participantId.stringify());
+          taggedNodes.add(participantId);
         }
       }
     }
@@ -112,26 +108,25 @@ public class FullAutoRebalancer implements Rebalancer {
         LOG.info("found the following instances with tag " + config.getResourceId() + " "
             + taggedNodes);
       }
-      liveNodes = new ArrayList<String>(taggedNodes);
+      liveParticipantList = new ArrayList<ParticipantId>(taggedNodes);
     }
 
     // determine which nodes the replicas should live on
-    List<String> allNodes = Lists.transform(allParticipantList, Functions.toStringFunction());
     int maxPartition = config.getMaxPartitionsPerParticipant();
     if (LOG.isInfoEnabled()) {
       LOG.info("currentMapping: " + currentMapping);
       LOG.info("stateCountMap: " + stateCountMap);
-      LOG.info("liveNodes: " + liveNodes);
-      LOG.info("allNodes: " + allNodes);
+      LOG.info("liveNodes: " + liveParticipantList);
+      LOG.info("allNodes: " + allParticipantList);
       LOG.info("maxPartition: " + maxPartition);
     }
     ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
     _algorithm =
-        new AutoRebalanceStrategy(config.getResourceId().stringify(), partitionNames,
-            rawStateCountMap, maxPartition, placementScheme);
+        new AutoRebalanceStrategy(config.getResourceId(), partitions, stateCountMap, maxPartition,
+            placementScheme);
     ZNRecord newMapping =
-        _algorithm.computePartitionAssignment(liveNodes,
-            ResourceAssignment.stringMapsFromReplicaMaps(currentMapping), allNodes);
+        _algorithm.typedComputePartitionAssignment(liveParticipantList, currentMapping,
+            allParticipantList);
 
     if (LOG.isInfoEnabled()) {
       LOG.info("newMapping: " + newMapping);
@@ -159,8 +154,8 @@ public class FullAutoRebalancer implements Rebalancer {
       preferenceList =
           NewConstraintBasedAssignment.getPreferenceList(cluster, partition, preferenceList);
       Map<ParticipantId, State> bestStateForPartition =
-          NewConstraintBasedAssignment.computeAutoBestStateForPartition(cluster.getConfig(),
-              config.getResourceId(), liveParticipants, stateModelDef, preferenceList,
+          NewConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds,
+              liveParticipants.keySet(), stateModelDef, preferenceList,
               currentState.getCurrentStateMap(config.getResourceId(), partition),
               disabledParticipantsForPartition);
       partitionMapping.addReplicaMap(partition, bestStateForPartition);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e23a3088/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
index d202e82..decc78d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
@@ -7,10 +7,12 @@ import java.util.Set;
 
 import org.apache.helix.HelixConstants.StateModelToken;
 import org.apache.helix.api.Partition;
+import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.StateModelDefinition;
 import org.codehaus.jackson.annotate.JsonIgnore;
 import org.codehaus.jackson.annotate.JsonIgnoreProperties;
 
@@ -151,6 +153,17 @@ public class PartitionedRebalancerContext extends BasicRebalancerContext impleme
   }
 
   /**
+   * Generate a default configuration given the state model and a participant.
+   * @param stateModelDef the state model definition to follow
+   * @param participantSet the set of participant ids to configure for
+   */
+  @JsonIgnore
+  public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
+      Set<ParticipantId> participantSet) {
+    // the base context does not understand enough to know do to anything
+  }
+
+  /**
    * Convert a physically-stored IdealState into a rebalancer context for a partitioned resource
    * @param idealState populated IdealState
    * @return PartitionedRebalancerContext

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e23a3088/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
index c64941c..924b8a1 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
@@ -165,4 +165,13 @@ public final class RebalancerConfig {
   public static RebalancerConfig from(ResourceConfiguration resourceConfiguration) {
     return new RebalancerConfig(resourceConfiguration);
   }
+
+  /**
+   * Get a RebalancerConfig from a RebalancerContext
+   * @param context instantiated RebalancerContext
+   * @return RebalancerConfig
+   */
+  public static RebalancerConfig from(RebalancerContext context) {
+    return new RebalancerConfig(context);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e23a3088/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancer.java
index c112fcf..3f0dd13 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancer.java
@@ -47,8 +47,8 @@ public class SemiAutoRebalancer implements Rebalancer {
   }
 
   @Override
-  public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig, Cluster cluster,
-      ResourceCurrentState currentState) {
+  public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
+      Cluster cluster, ResourceCurrentState currentState) {
     SemiAutoRebalancerContext config =
         rebalancerConfig.getRebalancerContext(SemiAutoRebalancerContext.class);
     StateModelDefinition stateModelDef =
@@ -66,10 +66,13 @@ public class SemiAutoRebalancer implements Rebalancer {
       List<ParticipantId> preferenceList =
           NewConstraintBasedAssignment.getPreferenceList(cluster, partition,
               config.getPreferenceList(partition));
+      Map<State, String> upperBounds =
+          NewConstraintBasedAssignment.stateConstraints(stateModelDef, config.getResourceId(),
+              cluster.getConfig());
       Map<ParticipantId, State> bestStateForPartition =
-          NewConstraintBasedAssignment.computeAutoBestStateForPartition(cluster.getConfig(),
-              config.getResourceId(), cluster.getLiveParticipantMap(), stateModelDef,
-              preferenceList, currentStateMap, disabledInstancesForPartition);
+          NewConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds, cluster
+              .getLiveParticipantMap().keySet(), stateModelDef, preferenceList, currentStateMap,
+              disabledInstancesForPartition);
       partitionMapping.addReplicaMap(partition, bestStateForPartition);
     }
     return partitionMapping;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e23a3088/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java
index d6d163c..71b5076 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java
@@ -1,12 +1,23 @@
 package org.apache.helix.controller.rebalancer.context;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
+import org.apache.helix.api.State;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.StateModelDefinition;
 import org.codehaus.jackson.annotate.JsonIgnore;
 import org.codehaus.jackson.annotate.JsonProperty;
 
@@ -75,6 +86,54 @@ public final class SemiAutoRebalancerContext extends PartitionedRebalancerContex
   }
 
   /**
+   * Generate preference lists based on a default cluster setup
+   * @param stateModelDef the state model definition to follow
+   * @param participantSet the set of participant ids to configure for
+   */
+  @Override
+  @JsonIgnore
+  public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
+      Set<ParticipantId> participantSet) {
+    // compute default upper bounds
+    Map<State, String> upperBounds = Maps.newHashMap();
+    for (State state : stateModelDef.getStatesPriorityList()) {
+      upperBounds.put(state, stateModelDef.getNumParticipantsPerState(state));
+    }
+
+    // determine the current mapping
+    Map<PartitionId, Map<ParticipantId, State>> currentMapping = Maps.newHashMap();
+    for (PartitionId partitionId : getPartitionSet()) {
+      List<ParticipantId> preferenceList = getPreferenceList(partitionId);
+      if (preferenceList != null && !preferenceList.isEmpty()) {
+        Set<ParticipantId> disabledParticipants = Collections.emptySet();
+        Map<ParticipantId, State> emptyCurrentState = Collections.emptyMap();
+        Map<ParticipantId, State> initialMap =
+            NewConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds,
+                participantSet, stateModelDef, preferenceList, emptyCurrentState,
+                disabledParticipants);
+        currentMapping.put(partitionId, initialMap);
+      }
+    }
+
+    // determine the preference
+    LinkedHashMap<State, Integer> stateCounts =
+        NewConstraintBasedAssignment.stateCount(upperBounds, stateModelDef, participantSet.size(),
+            getReplicaCount());
+    ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
+    List<ParticipantId> participantList = new ArrayList<ParticipantId>(participantSet);
+    List<PartitionId> partitionList = new ArrayList<PartitionId>(getPartitionSet());
+    AutoRebalanceStrategy strategy =
+        new AutoRebalanceStrategy(ResourceId.from(""), partitionList, stateCounts,
+            getMaxPartitionsPerParticipant(), placementScheme);
+    Map<String, List<String>> rawPreferenceLists =
+        strategy.typedComputePartitionAssignment(participantList, currentMapping, participantList)
+            .getListFields();
+    Map<PartitionId, List<ParticipantId>> preferenceLists =
+        Maps.newHashMap(IdealState.preferenceListsFromStringLists(rawPreferenceLists));
+    setPreferenceLists(preferenceLists);
+  }
+
+  /**
    * Build a SemiAutoRebalancerContext. By default, it corresponds to {@link SemiAutoRebalancer}
    */
   public static final class Builder extends PartitionedRebalancerContext.AbstractBuilder<Builder> {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e23a3088/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
index 7bc2769..d5531b1 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
@@ -42,6 +42,7 @@ import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
 
 import com.google.common.base.Predicate;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 /**
@@ -90,8 +91,28 @@ public class NewConstraintBasedAssignment {
   }
 
   /**
+   * Get a map of state to upper bound constraint given a cluster
+   * @param stateModelDef the state model definition to check
+   * @param resourceId the resource that is constraint
+   * @param cluster the cluster the resource belongs to
+   * @return map of state to upper bound
+   */
+  public static Map<State, String> stateConstraints(StateModelDefinition stateModelDef,
+      ResourceId resourceId, ClusterConfig cluster) {
+    Map<State, String> stateMap = Maps.newHashMap();
+    for (State state : stateModelDef.getStatesPriorityList()) {
+      String num =
+          cluster.getStateUpperBoundConstraint(Scope.resource(resourceId),
+              stateModelDef.getStateModelDefId(), state);
+      stateMap.put(state, num);
+    }
+    return stateMap;
+  }
+
+  /**
    * compute best state for resource in SEMI_AUTO and FULL_AUTO modes
-   * @param liveParticipantMap map of id to live participants
+   * @param upperBounds map of state to upper bound
+   * @param liveParticipantSet set of live participant ids
    * @param stateModelDef
    * @param participantPreferenceList
    * @param currentStateMap
@@ -99,8 +120,8 @@ public class NewConstraintBasedAssignment {
    * @param disabledParticipantsForPartition
    * @return
    */
-  public static Map<ParticipantId, State> computeAutoBestStateForPartition(ClusterConfig cluster,
-      ResourceId resourceId, Map<ParticipantId, Participant> liveParticipantMap,
+  public static Map<ParticipantId, State> computeAutoBestStateForPartition(
+      Map<State, String> upperBounds, Set<ParticipantId> liveParticipantSet,
       StateModelDefinition stateModelDef, List<ParticipantId> participantPreferenceList,
       Map<ParticipantId, State> currentStateMap, Set<ParticipantId> disabledParticipantsForPartition) {
     Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
@@ -131,12 +152,10 @@ public class NewConstraintBasedAssignment {
     boolean assigned[] = new boolean[participantPreferenceList.size()];
 
     for (State state : statesPriorityList) {
-      String num =
-          cluster.getStateUpperBoundConstraint(Scope.resource(resourceId),
-              stateModelDef.getStateModelDefId(), state);
+      String num = upperBounds.get(state);
       int stateCount = -1;
       if ("N".equals(num)) {
-        Set<ParticipantId> liveAndEnabled = new HashSet<ParticipantId>(liveParticipantMap.keySet());
+        Set<ParticipantId> liveAndEnabled = new HashSet<ParticipantId>(liveParticipantSet);
         liveAndEnabled.removeAll(disabledParticipantsForPartition);
         stateCount = liveAndEnabled.size();
       } else if ("R".equals(num)) {
@@ -159,7 +178,7 @@ public class NewConstraintBasedAssignment {
                   || !currentStateMap.get(participantId)
                       .equals(State.from(HelixDefinedState.ERROR));
 
-          if (liveParticipantMap.containsKey(participantId) && !assigned[i] && notInErrorState
+          if (liveParticipantSet.contains(participantId) && !assigned[i] && notInErrorState
               && !disabledParticipantsForPartition.contains(participantId)) {
             participantStateMap.put(participantId, state);
             count = count + 1;
@@ -176,23 +195,20 @@ public class NewConstraintBasedAssignment {
 
   /**
    * Get the number of replicas that should be in each state for a partition
-   * @param cluster cluster configuration
-   * @param resourceId the resource for which to get the state count
+   * @param upperBounds map of state to upper bound
    * @param stateModelDef StateModelDefinition object
    * @param liveNodesNb number of live nodes
    * @param total number of replicas
    * @return state count map: state->count
    */
-  public static LinkedHashMap<State, Integer> stateCount(ClusterConfig cluster,
-      ResourceId resourceId, StateModelDefinition stateModelDef, int liveNodesNb, int totalReplicas) {
+  public static LinkedHashMap<State, Integer> stateCount(Map<State, String> upperBounds,
+      StateModelDefinition stateModelDef, int liveNodesNb, int totalReplicas) {
     LinkedHashMap<State, Integer> stateCountMap = new LinkedHashMap<State, Integer>();
     List<State> statesPriorityList = stateModelDef.getStatesPriorityList();
 
     int replicas = totalReplicas;
     for (State state : statesPriorityList) {
-      String num =
-          cluster.getStateUpperBoundConstraint(Scope.resource(resourceId),
-              stateModelDef.getStateModelDefId(), state);
+      String num = upperBounds.get(state);
       if ("N".equals(num)) {
         stateCountMap.put(state, liveNodesNb);
       } else if ("R".equals(num)) {
@@ -216,9 +232,7 @@ public class NewConstraintBasedAssignment {
 
     // get state count for R
     for (State state : statesPriorityList) {
-      String num =
-          cluster.getStateUpperBoundConstraint(Scope.resource(resourceId),
-              stateModelDef.getStateModelDefId(), state);
+      String num = upperBounds.get(state);
       if ("R".equals(num)) {
         stateCountMap.put(state, replicas);
         // should have at most one state using R

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e23a3088/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
index 11d7969..8b56bec 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
@@ -24,6 +24,7 @@ import java.util.Set;
 
 import org.apache.helix.HelixManager;
 import org.apache.helix.api.Cluster;
+import org.apache.helix.api.State;
 import org.apache.helix.api.config.ResourceConfig;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
@@ -94,10 +95,12 @@ public class NewBestPossibleStateCalcStage extends AbstractBaseStage {
       Set<ParticipantId> disabledParticipantsForPartition =
           NewConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
               partitionId);
+      Map<State, String> upperBounds =
+          NewConstraintBasedAssignment.stateConstraints(stateModelDef, resourceId,
+              cluster.getConfig());
       partitionMapping.addReplicaMap(partitionId, NewConstraintBasedAssignment
-          .computeAutoBestStateForPartition(cluster.getConfig(), resourceId,
-              cluster.getLiveParticipantMap(), stateModelDef, null,
-              currentStateOutput.getCurrentStateMap(resourceId, partitionId),
+          .computeAutoBestStateForPartition(upperBounds, cluster.getLiveParticipantMap().keySet(),
+              stateModelDef, null, currentStateOutput.getCurrentStateMap(resourceId, partitionId),
               disabledParticipantsForPartition));
     }
     return partitionMapping;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e23a3088/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
index 85741ed..d3f89ef 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
@@ -34,8 +34,16 @@ import java.util.TreeSet;
 
 import org.apache.helix.HelixManager;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.model.ResourceAssignment;
 import org.apache.log4j.Logger;
 
+import com.google.common.base.Functions;
+import com.google.common.collect.Lists;
+
 public class AutoRebalanceStrategy {
 
   private static Logger logger = Logger.getLogger(AutoRebalanceStrategy.class);
@@ -79,7 +87,7 @@ public class AutoRebalanceStrategy {
   }
 
   /**
-   * Initialize the strategy with a default placement scheme and no
+   * Initialize the strategy with a default placement scheme
    * @see #AutoRebalanceStrategy(String, List, LinkedHashMap, int, ReplicaPlacementScheme)
    */
   public AutoRebalanceStrategy(String resourceName, final List<String> partitions,
@@ -88,6 +96,52 @@ public class AutoRebalanceStrategy {
   }
 
   /**
+   * Constructor to support logically-typed Helix components
+   * @param resourceId the resource for which to compute an assignment
+   * @param partitions the partitions of the resource
+   * @param states the states and counts for each state
+   * @param maximumPerNode the maximum number of replicas per node
+   * @param placementScheme the scheme to use for preferred replica locations. If null, this is
+   *          {@link DefaultPlacementScheme}
+   */
+  public AutoRebalanceStrategy(ResourceId resourceId, final List<PartitionId> partitions,
+      final LinkedHashMap<State, Integer> states, int maximumPerNode,
+      ReplicaPlacementScheme placementScheme) {
+    LinkedHashMap<String, Integer> rawStateCountMap = new LinkedHashMap<String, Integer>();
+    for (State state : states.keySet()) {
+      rawStateCountMap.put(state.toString(), states.get(state));
+    }
+    List<String> partitionNames = Lists.transform(partitions, Functions.toStringFunction());
+    _resourceName = resourceId.stringify();
+    _partitions = partitionNames;
+    _states = rawStateCountMap;
+    _maximumPerNode = maximumPerNode;
+    if (placementScheme != null) {
+      _placementScheme = placementScheme;
+    } else {
+      _placementScheme = new DefaultPlacementScheme();
+    }
+  }
+
+  /**
+   * Wrap {@link #computePartitionAssignment(List, Map, List)} with a function that takes concrete
+   * types
+   * @param liveNodes list of live participant ids
+   * @param currentMapping map of partition id to map of participant id to state
+   * @param allNodes list of all participant ids
+   * @return the preference list and replica mapping
+   */
+  public ZNRecord typedComputePartitionAssignment(final List<ParticipantId> liveNodes,
+      final Map<PartitionId, Map<ParticipantId, State>> currentMapping,
+      final List<ParticipantId> allNodes) {
+    final List<String> rawLiveNodes = Lists.transform(liveNodes, Functions.toStringFunction());
+    final List<String> rawAllNodes = Lists.transform(allNodes, Functions.toStringFunction());
+    final Map<String, Map<String, String>> rawCurrentMapping =
+        ResourceAssignment.stringMapsFromReplicaMaps(currentMapping);
+    return computePartitionAssignment(rawLiveNodes, rawCurrentMapping, rawAllNodes);
+  }
+
+  /**
    * Determine a preference list and mapping of partitions to nodes for all replicas
    * @param liveNodes the current list of live participants
    * @param currentMapping the current assignment of replicas to nodes


Mime
View raw message