helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject git commit: [HELIX-109] Review Helix model package, more changes
Date Sat, 31 Aug 2013 21:13:48 GMT
Updated Branches:
  refs/heads/helix-logical-model 02559174f -> c3c13a62e


[HELIX-109] Review Helix model package, more changes


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/c3c13a62
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/c3c13a62
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/c3c13a62

Branch: refs/heads/helix-logical-model
Commit: c3c13a62e86730860a1502587c9450e28eae4830
Parents: 0255917
Author: zzhang <zzhang5@uci.edu>
Authored: Sat Aug 31 14:13:39 2013 -0700
Committer: zzhang <zzhang5@uci.edu>
Committed: Sat Aug 31 14:13:39 2013 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/helix/api/Cluster.java |  30 +-
 .../java/org/apache/helix/api/Participant.java  |  13 +-
 .../java/org/apache/helix/api/Partition.java    |   5 +
 .../org/apache/helix/api/RebalancerConfig.java  |  46 +-
 .../java/org/apache/helix/api/Resource.java     |  33 +-
 .../main/java/org/apache/helix/api/State.java   |  14 +
 .../rebalancer/NewAutoRebalancer.java           | 158 +++++
 .../rebalancer/NewCustomRebalancer.java         | 125 ++++
 .../controller/rebalancer/NewRebalancer.java    |  43 ++
 .../rebalancer/NewSemiAutoRebalancer.java       |  74 +++
 .../util/NewConstraintBasedAssignment.java      | 219 +++++++
 .../org/apache/helix/model/CurrentState.java    |   7 +
 .../java/org/apache/helix/model/IdealState.java |  29 +-
 .../apache/helix/model/ResourceAssignment.java  |  42 ++
 .../helix/model/StateModelDefinition.java       |   9 +
 .../builder/ResourceAssignmentBuilder.java      |  94 +++
 .../strategy/TestNewAutoRebalanceStrategy.java  | 597 +++++++++++++++++++
 17 files changed, 1507 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c3c13a62/helix-core/src/main/java/org/apache/helix/api/Cluster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Cluster.java b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
index 07deca6..193b238 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Cluster.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
@@ -20,15 +20,8 @@ package org.apache.helix.api;
  */
 
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.helix.model.CurrentState;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.Message;
-
 import com.google.common.collect.ImmutableMap;
 
 /**
@@ -48,6 +41,11 @@ public class Cluster {
   private final Map<ParticipantId, Participant> _participantMap;
 
   /**
+   * map of participant-id to live participant
+   */
+  private final Map<ParticipantId, Participant> _liveParticipantMap;
+
+  /**
    * map of controller-id to controller
    */
   private final Map<ControllerId, Controller> _controllerMap;
@@ -79,6 +77,16 @@ public class Cluster {
 
     _participantMap = ImmutableMap.copyOf(participantMap);
 
+    // Build the subset of participants that is live
+    ImmutableMap.Builder<ParticipantId, Participant> liveParticipantBuilder =
+        new ImmutableMap.Builder<ParticipantId, Participant>();
+    for (Participant participant : participantMap.values()) {
+      if (participant.isAlive()) {
+        liveParticipantBuilder.put(participant.getId(), participant);
+      }
+    }
+    _liveParticipantMap = liveParticipantBuilder.build();
+
     _leaderId = leaderId;
 
     // TODO impl this when we persist controllers and spectators on zookeeper
@@ -120,6 +128,14 @@ public class Cluster {
   }
 
   /**
+   * Get live participants of the cluster
+   * @return a map of participant id to participant, or empty map if none is live
+   */
+  public Map<ParticipantId, Participant> getLiveParticipantMap() {
+    return _liveParticipantMap;
+  }
+
+  /**
    * Get controllers of the cluster
    * @return a map of controller id to controller, or empty map if none
    */

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c3c13a62/helix-core/src/main/java/org/apache/helix/api/Participant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Participant.java b/helix-core/src/main/java/org/apache/helix/api/Participant.java
index f7a9ed0..10ceebd 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Participant.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Participant.java
@@ -19,16 +19,12 @@ package org.apache.helix.api;
  * under the License.
  */
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.helix.model.CurrentState;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 
 import com.google.common.collect.ImmutableMap;
@@ -136,6 +132,15 @@ public class Participant {
   }
 
   /**
+   * Check if participant has a tag
+   * @param tag tag to check
+   * @return true if tagged, false otherwise
+   */
+  public boolean hasTag(String tag) {
+    return _tags.contains(tag);
+  }
+
+  /**
    * Get message map
    * @return message map
    */

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c3c13a62/helix-core/src/main/java/org/apache/helix/api/Partition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Partition.java b/helix-core/src/main/java/org/apache/helix/api/Partition.java
index c903493..6586105 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Partition.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Partition.java
@@ -41,4 +41,9 @@ public class Partition {
   public PartitionId getId() {
     return _id;
   }
+
+  @Override
+  public String toString() {
+    return _id.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c3c13a62/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
index 34498cd..e2e33eb 100644
--- a/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
@@ -19,6 +19,10 @@ package org.apache.helix.api;
  * under the License.
  */
 
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.ResourceAssignment;
 
@@ -26,8 +30,11 @@ public class RebalancerConfig {
   private final RebalanceMode _rebalancerMode;
   private final RebalancerRef _rebalancerRef;
   private final StateModelDefId _stateModelDefId;
-
+  private final Map<PartitionId, List<ParticipantId>> _preferenceLists;
   private final ResourceAssignment _resourceAssignment;
+  private final int _replicaCount;
+  private final String _participantGroupTag;
+  private final int _maxPartitionsPerParticipant;
 
   public RebalancerConfig(RebalanceMode mode, RebalancerRef rebalancerRef,
       StateModelDefId stateModelDefId, ResourceAssignment resourceAssignment) {
@@ -35,6 +42,10 @@ public class RebalancerConfig {
     _rebalancerRef = rebalancerRef;
     _stateModelDefId = stateModelDefId;
     _resourceAssignment = resourceAssignment;
+    _preferenceLists = Collections.emptyMap(); // TODO: stub
+    _replicaCount = 0; // TODO: stub
+    _participantGroupTag = null; // TODO: stub
+    _maxPartitionsPerParticipant = Integer.MAX_VALUE; // TODO: stub
   }
 
   /**
@@ -70,6 +81,39 @@ public class RebalancerConfig {
   }
 
   /**
+   * Get the preference list of participants for a given partition
+   * @param partitionId the partition to look up
+   * @return the ordered preference list (early entries are more preferred)
+   */
+  public List<ParticipantId> getPreferenceList(PartitionId partitionId) {
+    return _preferenceLists.get(partitionId);
+  }
+
+  /**
+   * Get the number of replicas each partition should have
+   * @return replica count
+   */
+  public int getReplicaCount() {
+    return _replicaCount;
+  }
+
+  /**
+   * Get the number of partitions of this resource that a given participant can accept
+   * @return maximum number of partitions
+   */
+  public int getMaxPartitionsPerParticipant() {
+    return _maxPartitionsPerParticipant;
+  }
+
+  /**
+   * Get the tag, if any, which must be present on assignable instances
+   * @return group tag
+   */
+  public String getParticipantGroupTag() {
+    return _participantGroupTag;
+  }
+
+  /**
    * Assembles a RebalancerConfig
    */
   public static class Builder {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c3c13a62/helix-core/src/main/java/org/apache/helix/api/Resource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Resource.java b/helix-core/src/main/java/org/apache/helix/api/Resource.java
index 2c9ba8a..a916025 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Resource.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Resource.java
@@ -38,6 +38,7 @@ public class Resource {
   private final Set<Partition> _partitionSet;
 
   private final ExternalView _externalView;
+  private final ExternalView _pendingExternalView;
 
   // TODO move construct logic to ResourceAccessor
   /**
@@ -53,9 +54,8 @@ public class Resource {
     _rebalancerConfig = null;
 
     Set<Partition> partitionSet = new HashSet<Partition>();
-    for (String partitionId : idealState.getPartitionStringSet()) {
-      partitionSet
-          .add(new Partition(new PartitionId(id, PartitionId.stripResourceId(partitionId))));
+    for (PartitionId partitionId : idealState.getPartitionSet()) {
+      partitionSet.add(new Partition(partitionId));
     }
     _partitionSet = ImmutableSet.copyOf(partitionSet);
 
@@ -63,6 +63,7 @@ public class Resource {
     // _resourceAssignment = null;
 
     _externalView = null;
+    _pendingExternalView = null; // TODO: stub
   }
 
   /**
@@ -70,13 +71,15 @@ public class Resource {
    * @param id resource identifier
    * @param partitionSet disjoint partitions of the resource
    * @param externalView external view of the resource
+   * @param pendingExternalView pending external view based on unprocessed messages
    * @param rebalancerConfig configuration properties for rebalancing this resource
    */
   public Resource(ResourceId id, Set<Partition> partitionSet, ExternalView externalView,
-      RebalancerConfig rebalancerConfig) {
+      ExternalView pendingExternalView, RebalancerConfig rebalancerConfig) {
     _id = id;
     _partitionSet = ImmutableSet.copyOf(partitionSet);
     _externalView = externalView;
+    _pendingExternalView = pendingExternalView;
     _rebalancerConfig = rebalancerConfig;
   }
 
@@ -96,6 +99,14 @@ public class Resource {
     return _externalView;
   }
 
+  /**
+   * Get the pending external view of the resource based on unprocessed messages
+   * @return the external view of the resource
+   */
+  public ExternalView getPendingExternalView() {
+    return _pendingExternalView;
+  }
+
   public RebalancerConfig getRebalancerConfig() {
     return _rebalancerConfig;
   }
@@ -111,6 +122,7 @@ public class Resource {
     private final ResourceId _id;
     private final Set<Partition> _partitionSet;
     private ExternalView _externalView;
+    private ExternalView _pendingExternalView;
     private RebalancerConfig _rebalancerConfig;
 
     /**
@@ -143,6 +155,16 @@ public class Resource {
     }
 
     /**
+     * Set the pending external view of this resource
+     * @param extView replica placements as a result of pending messages
+     * @return Builder
+     */
+    public Builder pendingExternalView(ExternalView pendingExtView) {
+      _pendingExternalView = pendingExtView;
+      return this;
+    }
+
+    /**
      * Set the rebalancer configuration
      * @param rebalancerConfig properties of interest for rebalancing
      * @return Builder
@@ -157,7 +179,8 @@ public class Resource {
      * @return instantiated Resource
      */
     public Resource build() {
-      return new Resource(_id, _partitionSet, _externalView, _rebalancerConfig);
+      return new Resource(_id, _partitionSet, _externalView, _pendingExternalView,
+          _rebalancerConfig);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c3c13a62/helix-core/src/main/java/org/apache/helix/api/State.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/State.java b/helix-core/src/main/java/org/apache/helix/api/State.java
index 2a1f993..b8c38ea 100644
--- a/helix-core/src/main/java/org/apache/helix/api/State.java
+++ b/helix-core/src/main/java/org/apache/helix/api/State.java
@@ -1,5 +1,7 @@
 package org.apache.helix.api;
 
+import org.apache.helix.HelixDefinedState;
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -60,4 +62,16 @@ public class State {
     }
     return new State(state);
   }
+
+  /**
+   * Get a State from a HelixDefinedState
+   * @param state HelixDefinedState
+   * @return State
+   */
+  public static State from(HelixDefinedState state) {
+    if (state == null) {
+      return null;
+    }
+    return new State(state.toString());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c3c13a62/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
new file mode 100644
index 0000000..563b7e2
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
@@ -0,0 +1,158 @@
+package org.apache.helix.controller.rebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.RebalancerConfig;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.State;
+import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
+import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Functions;
+import com.google.common.collect.Lists;
+
+/**
+ * This is a Rebalancer specific to full automatic mode. It is tasked with computing the ideal
+ * state of a resource, fully adapting to the addition or removal of instances. This includes
+ * computation of a new preference list and a partition to instance and state mapping based on the
+ * computed instance preferences.
+ * The input is the current assignment of partitions to instances, as well as existing instance
+ * preferences, if any.
+ * The output is a preference list and a mapping based on that preference list, i.e. partition p
+ * has a replica on node k with state s.
+ */
+public class NewAutoRebalancer implements NewRebalancer {
+  // These should be final, but are initialized in init rather than a constructor
+  private AutoRebalanceStrategy _algorithm;
+
+  private static final Logger LOG = Logger.getLogger(NewAutoRebalancer.class);
+
+  @Override
+  public ResourceAssignment computeResourceMapping(Resource resource, Cluster cluster,
+      StateModelDefinition stateModelDef) {
+    // Compute a preference list based on the current ideal state
+    List<Partition> partitions = new ArrayList<Partition>(resource.getPartitionSet());
+    List<String> partitionNames = Lists.transform(partitions, Functions.toStringFunction());
+    RebalancerConfig config = resource.getRebalancerConfig();
+    Map<ParticipantId, Participant> liveParticipants = cluster.getLiveParticipantMap();
+    Map<ParticipantId, Participant> allParticipants = cluster.getParticipantMap();
+    int replicas = config.getReplicaCount();
+
+    LinkedHashMap<String, Integer> stateCountMap = new LinkedHashMap<String, Integer>();
+    stateCountMap =
+        ConstraintBasedAssignment.stateCount(stateModelDef, liveParticipants.size(), replicas);
+    List<ParticipantId> liveParticipantList =
+        new ArrayList<ParticipantId>(liveParticipants.keySet());
+    List<ParticipantId> allParticipantList =
+        new ArrayList<ParticipantId>(cluster.getParticipantMap().keySet());
+    List<String> liveNodes = Lists.transform(liveParticipantList, Functions.toStringFunction());
+    Map<PartitionId, Map<ParticipantId, State>> currentMapping = currentMapping(resource);
+
+    // If there are nodes tagged with resource, use only those nodes
+    Set<String> taggedNodes = new HashSet<String>();
+    if (config.getParticipantGroupTag() != null) {
+      for (ParticipantId participantId : liveParticipantList) {
+        if (liveParticipants.get(participantId).hasTag(config.getParticipantGroupTag())) {
+          taggedNodes.add(participantId.stringify());
+        }
+      }
+    }
+    if (taggedNodes.size() > 0) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("found the following instances with tag " + resource.getId() + " " + taggedNodes);
+      }
+      liveNodes = new ArrayList<String>(taggedNodes);
+    }
+
+    List<String> allNodes = Lists.transform(allParticipantList, Functions.toStringFunction());
+    int maxPartition = config.getMaxPartitionsPerParticipant();
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("currentMapping: " + currentMapping);
+      LOG.info("stateCountMap: " + stateCountMap);
+      LOG.info("liveNodes: " + liveNodes);
+      LOG.info("allNodes: " + allNodes);
+      LOG.info("maxPartition: " + maxPartition);
+    }
+    ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
+    _algorithm =
+        new AutoRebalanceStrategy(resource.getId().toString(), partitionNames, stateCountMap,
+            maxPartition, placementScheme);
+    ZNRecord newMapping =
+        _algorithm.computePartitionAssignment(liveNodes,
+            ResourceAssignment.stringMapsFromReplicaMaps(currentMapping), allNodes);
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("newMapping: " + newMapping);
+    }
+
+    // compute a full partition mapping for the resource
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing resource:" + resource.getId());
+    }
+    ResourceAssignment partitionMapping = new ResourceAssignment(resource.getId());
+    for (Partition partition : partitions) {
+      Set<ParticipantId> disabledParticipantsForPartition =
+          NewConstraintBasedAssignment.getDisabledParticipants(allParticipants, partition.getId());
+      List<ParticipantId> preferenceList =
+          NewConstraintBasedAssignment.getPreferenceList(cluster, partition.getId(), config);
+      Map<ParticipantId, State> bestStateForPartition =
+          NewConstraintBasedAssignment.computeAutoBestStateForPartition(liveParticipants,
+              stateModelDef, preferenceList,
+              resource.getExternalView().getStateMap(partition.getId()),
+              disabledParticipantsForPartition);
+      partitionMapping.addReplicaMap(partition.getId(), bestStateForPartition);
+    }
+    return partitionMapping;
+  }
+
+  private Map<PartitionId, Map<ParticipantId, State>> currentMapping(Resource resource) {
+    Map<PartitionId, Map<ParticipantId, State>> map =
+        new HashMap<PartitionId, Map<ParticipantId, State>>();
+
+    for (Partition partition : resource.getPartitionSet()) {
+      Map<ParticipantId, State> stateMap = new HashMap<ParticipantId, State>();
+      stateMap.putAll(resource.getExternalView().getStateMap(partition.getId()));
+      stateMap.putAll(resource.getPendingExternalView().getStateMap(partition.getId()));
+    }
+    return map;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c3c13a62/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java
new file mode 100644
index 0000000..600d848
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java
@@ -0,0 +1,125 @@
+package org.apache.helix.controller.rebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.RebalancerConfig;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.State;
+import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+/**
+ * This is a Rebalancer specific to custom mode. It is tasked with checking an existing mapping of
+ * partitions against the set of live instances to mark assignment states as dropped or erroneous
+ * as necessary.
+ * The input is the required current assignment of partitions to instances, as well as the required
+ * existing instance preferences.
+ * The output is a verified mapping based on that preference list, i.e. partition p has a replica
+ * on node k with state s, where s may be a dropped or error state if necessary.
+ */
+public class NewCustomRebalancer implements NewRebalancer {
+
+  private static final Logger LOG = Logger.getLogger(NewCustomRebalancer.class);
+
+  @Override
+  public ResourceAssignment computeResourceMapping(Resource resource, Cluster cluster,
+      StateModelDefinition stateModelDef) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing resource:" + resource.getId());
+    }
+    ResourceAssignment partitionMapping = new ResourceAssignment(resource.getId());
+    RebalancerConfig config = resource.getRebalancerConfig();
+    for (Partition partition : resource.getPartitionSet()) {
+      Map<ParticipantId, State> currentStateMap =
+          resource.getExternalView().getStateMap(partition.getId());
+      Set<ParticipantId> disabledInstancesForPartition =
+          NewConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
+              partition.getId());
+      Map<ParticipantId, State> bestStateForPartition =
+          computeCustomizedBestStateForPartition(cluster.getLiveParticipantMap(), stateModelDef,
+              config.getResourceAssignment().getReplicaMap(partition.getId()), currentStateMap,
+              disabledInstancesForPartition);
+      partitionMapping.addReplicaMap(partition.getId(), bestStateForPartition);
+    }
+    return partitionMapping;
+  }
+
+  /**
+   * compute best state for resource in CUSTOMIZED rebalancer mode
+   * @param cache
+   * @param stateModelDef
+   * @param idealStateMap
+   * @param currentStateMap
+   * @param disabledInstancesForPartition
+   * @return
+   */
+  private Map<ParticipantId, State> computeCustomizedBestStateForPartition(
+      Map<ParticipantId, Participant> liveParticipantMap, StateModelDefinition stateModelDef,
+      Map<ParticipantId, State> idealStateMap, Map<ParticipantId, State> currentStateMap,
+      Set<ParticipantId> disabledParticipantsForPartition) {
+    Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
+
+    // if the ideal state is deleted, idealStateMap will be null/empty and
+    // we should drop all resources.
+    if (currentStateMap != null) {
+      for (ParticipantId participantId : currentStateMap.keySet()) {
+        if ((idealStateMap == null || !idealStateMap.containsKey(participantId))
+            && !disabledParticipantsForPartition.contains(participantId)) {
+          // if dropped and not disabled, transit to DROPPED
+          participantStateMap.put(participantId, State.from(HelixDefinedState.DROPPED));
+        } else if ((currentStateMap.get(participantId) == null || !currentStateMap.get(
+            participantId).equals(State.from(HelixDefinedState.ERROR)))
+            && disabledParticipantsForPartition.contains(participantId)) {
+          // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
+          participantStateMap.put(participantId, stateModelDef.getInitialState());
+        }
+      }
+    }
+
+    // ideal state is deleted
+    if (idealStateMap == null) {
+      return participantStateMap;
+    }
+
+    for (ParticipantId participantId : idealStateMap.keySet()) {
+      boolean notInErrorState =
+          currentStateMap == null || currentStateMap.get(participantId) == null
+              || !currentStateMap.get(participantId).equals(State.from(HelixDefinedState.ERROR));
+
+      if (liveParticipantMap.containsKey(participantId) && notInErrorState
+          && !disabledParticipantsForPartition.contains(participantId)) {
+        participantStateMap.put(participantId, idealStateMap.get(participantId));
+      }
+    }
+
+    return participantStateMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c3c13a62/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewRebalancer.java
new file mode 100644
index 0000000..253723f
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewRebalancer.java
@@ -0,0 +1,43 @@
+package org.apache.helix.controller.rebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Resource;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+
+/**
+ * Allows one to come up with custom implementation of a rebalancer.<br/>
+ * This will be invoked on all changes that happen in the cluster.<br/>
+ * Simply return the newIdealState for a resource in this method.<br/>
+ */
+public interface NewRebalancer {
+
+  /**
+   * Given a resource, existing mapping, and liveness of resources, compute a new mapping of
+   * resources.
+   * @param resource the resource for which a mapping will be computed
+   * @param cluster a snapshot of the entire cluster state
+   * @param stateModelDef the state model for which to rebalance the resource
+   */
+  ResourceAssignment computeResourceMapping(final Resource resource, final Cluster cluster,
+      final StateModelDefinition stateModelDef);
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c3c13a62/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
new file mode 100644
index 0000000..472e7d3
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
@@ -0,0 +1,74 @@
+package org.apache.helix.controller.rebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.RebalancerConfig;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.State;
+import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+/**
+ * This is a Rebalancer specific to semi-automatic mode. It is tasked with computing the ideal
+ * state of a resource based on a predefined preference list of instances willing to accept
+ * replicas.
+ * The input is the optional current assignment of partitions to instances, as well as the required
+ * existing instance preferences.
+ * The output is a mapping based on that preference list, i.e. partition p has a replica on node k
+ * with state s.
+ */
+public class NewSemiAutoRebalancer implements NewRebalancer {
+
+  private static final Logger LOG = Logger.getLogger(NewSemiAutoRebalancer.class);
+
+  @Override
+  public ResourceAssignment computeResourceMapping(Resource resource, Cluster cluster,
+      StateModelDefinition stateModelDef) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing resource:" + resource.getId());
+    }
+    ResourceAssignment partitionMapping = new ResourceAssignment(resource.getId());
+    RebalancerConfig config = resource.getRebalancerConfig();
+    for (Partition partition : resource.getPartitionSet()) {
+      Map<ParticipantId, State> currentStateMap =
+          resource.getExternalView().getStateMap(partition.getId());
+      Set<ParticipantId> disabledInstancesForPartition =
+          NewConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
+              partition.getId());
+      List<ParticipantId> preferenceList =
+          NewConstraintBasedAssignment.getPreferenceList(cluster, partition.getId(), config);
+      Map<ParticipantId, State> bestStateForPartition =
+          NewConstraintBasedAssignment.computeAutoBestStateForPartition(
+              cluster.getLiveParticipantMap(), stateModelDef, preferenceList, currentStateMap,
+              disabledInstancesForPartition);
+      partitionMapping.addReplicaMap(partition.getId(), bestStateForPartition);
+    }
+    return partitionMapping;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c3c13a62/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
new file mode 100644
index 0000000..feb3214
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
@@ -0,0 +1,219 @@
+package org.apache.helix.controller.rebalancer.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixConstants.StateModelToken;
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.RebalancerConfig;
+import org.apache.helix.api.State;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Sets;
+
+/**
+ * Collection of functions that will compute the best possible states given the live instances and
+ * an ideal state.
+ */
+public class NewConstraintBasedAssignment {
+  private static Logger logger = Logger.getLogger(NewConstraintBasedAssignment.class);
+
+  /**
+   * Get a set of disabled participants for a partition
+   * @param participantMap map of all participants
+   * @param partitionId the partition to check
+   * @return a set of all participants that are disabled for the partition
+   */
+  public static Set<ParticipantId> getDisabledParticipants(
+      final Map<ParticipantId, Participant> participantMap, final PartitionId partitionId) {
+    Set<ParticipantId> disabledParticipantsForPartition =
+        Sets.filter(participantMap.keySet(), new Predicate<ParticipantId>() {
+          @Override
+          public boolean apply(ParticipantId participantId) {
+            return participantMap.get(participantId).getDisablePartitionIds().contains(partitionId);
+          }
+        });
+    return disabledParticipantsForPartition;
+  }
+
+  /**
+   * Get an ordered list of participants that can serve a partition
+   * @param cluster cluster snapshot
+   * @param partitionId the partition to look up
+   * @param config rebalancing constraints
+   * @return list with most preferred participants first
+   */
+  public static List<ParticipantId> getPreferenceList(Cluster cluster, PartitionId partitionId,
+      RebalancerConfig config) {
+    List<ParticipantId> prefList = config.getPreferenceList(partitionId);
+
+    if (prefList != null && prefList.size() == 1
+        && StateModelToken.ANY_LIVEINSTANCE.toString().equals(prefList.get(0).stringify())) {
+      prefList = new ArrayList<ParticipantId>(cluster.getLiveParticipantMap().keySet());
+      Collections.sort(prefList);
+    }
+    return prefList;
+  }
+
+  /**
+   * compute best state for resource in AUTO ideal state mode
+   * @param liveParticipantMap map of id to live participants
+   * @param stateModelDef
+   * @param participantPreferenceList
+   * @param currentStateMap
+   *          : participant->state for each partition
+   * @param disabledParticipantsForPartition
+   * @return
+   */
+  public static Map<ParticipantId, State> computeAutoBestStateForPartition(
+      Map<ParticipantId, Participant> liveParticipantMap, StateModelDefinition stateModelDef,
+      List<ParticipantId> participantPreferenceList, Map<ParticipantId, State> currentStateMap,
+      Set<ParticipantId> disabledParticipantsForPartition) {
+    Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
+
+    // if the ideal state is deleted, instancePreferenceList will be empty and
+    // we should drop all resources.
+    if (currentStateMap != null) {
+      for (ParticipantId participantId : currentStateMap.keySet()) {
+        if ((participantPreferenceList == null || !participantPreferenceList
+            .contains(participantId)) && !disabledParticipantsForPartition.contains(participantId)) {
+          // if dropped and not disabled, transit to DROPPED
+          participantStateMap.put(participantId, State.from(HelixDefinedState.DROPPED));
+        } else if ((currentStateMap.get(participantId) == null || !currentStateMap.get(
+            participantId).equals(State.from(HelixDefinedState.ERROR)))
+            && disabledParticipantsForPartition.contains(participantId)) {
+          // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
+          participantStateMap.put(participantId, stateModelDef.getInitialState());
+        }
+      }
+    }
+
+    // ideal state is deleted
+    if (participantPreferenceList == null) {
+      return participantStateMap;
+    }
+
+    List<State> statesPriorityList = stateModelDef.getStatesPriorityList();
+    boolean assigned[] = new boolean[participantPreferenceList.size()];
+
+    for (State state : statesPriorityList) {
+      String num = stateModelDef.getNumParticipantsPerState(state);
+      int stateCount = -1;
+      if ("N".equals(num)) {
+        Set<ParticipantId> liveAndEnabled = new HashSet<ParticipantId>(liveParticipantMap.keySet());
+        liveAndEnabled.removeAll(disabledParticipantsForPartition);
+        stateCount = liveAndEnabled.size();
+      } else if ("R".equals(num)) {
+        stateCount = participantPreferenceList.size();
+      } else {
+        try {
+          stateCount = Integer.parseInt(num);
+        } catch (Exception e) {
+          logger.error("Invalid count for state:" + state + " ,count=" + num);
+        }
+      }
+      if (stateCount > -1) {
+        int count = 0;
+        for (int i = 0; i < participantPreferenceList.size(); i++) {
+          ParticipantId participantId = participantPreferenceList.get(i);
+
+          boolean notInErrorState =
+              currentStateMap == null
+                  || currentStateMap.get(participantId) == null
+                  || !currentStateMap.get(participantId)
+                      .equals(State.from(HelixDefinedState.ERROR));
+
+          if (liveParticipantMap.containsKey(participantId) && !assigned[i] && notInErrorState
+              && !disabledParticipantsForPartition.contains(participantId)) {
+            participantStateMap.put(participantId, state);
+            count = count + 1;
+            assigned[i] = true;
+            if (count == stateCount) {
+              break;
+            }
+          }
+        }
+      }
+    }
+    return participantStateMap;
+  }
+
+  /**
+   * Get the number of replicas that should be in each state for a partition
+   * @param stateModelDef StateModelDefinition object
+   * @param liveNodesNb number of live nodes
+   * @param total number of replicas
+   * @return state count map: state->count
+   */
+  public static LinkedHashMap<State, Integer> stateCount(StateModelDefinition stateModelDef,
+      int liveNodesNb, int totalReplicas) {
+    LinkedHashMap<State, Integer> stateCountMap = new LinkedHashMap<State, Integer>();
+    List<State> statesPriorityList = stateModelDef.getStatesPriorityList();
+
+    int replicas = totalReplicas;
+    for (State state : statesPriorityList) {
+      String num = stateModelDef.getNumParticipantsPerState(state);
+      if ("N".equals(num)) {
+        stateCountMap.put(state, liveNodesNb);
+      } else if ("R".equals(num)) {
+        // wait until we get the counts for all other states
+        continue;
+      } else {
+        int stateCount = -1;
+        try {
+          stateCount = Integer.parseInt(num);
+        } catch (Exception e) {
+          // LOG.error("Invalid count for state: " + state + ", count: " + num +
+          // ", use -1 instead");
+        }
+
+        if (stateCount > 0) {
+          stateCountMap.put(state, stateCount);
+          replicas -= stateCount;
+        }
+      }
+    }
+
+    // get state count for R
+    for (State state : statesPriorityList) {
+      String num = stateModelDef.getNumParticipantsPerState(state);
+      if ("R".equals(num)) {
+        stateCountMap.put(state, replicas);
+        // should have at most one state using R
+        break;
+      }
+    }
+    return stateCountMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c3c13a62/helix-core/src/main/java/org/apache/helix/model/CurrentState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/CurrentState.java b/helix-core/src/main/java/org/apache/helix/model/CurrentState.java
index c4893d4..240939e 100644
--- a/helix-core/src/main/java/org/apache/helix/model/CurrentState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/CurrentState.java
@@ -19,6 +19,7 @@ package org.apache.helix.model;
  * under the License.
  */
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.TreeMap;
@@ -273,6 +274,9 @@ public class CurrentState extends HelixProperty {
    * @return map of partition id to state
    */
   public static Map<PartitionId, State> partitionStateMapFromStringMap(Map<String, String> rawMap) {
+    if (rawMap == null) {
+      return Collections.emptyMap();
+    }
     Map<PartitionId, State> partitionStateMap = new HashMap<PartitionId, State>();
     for (String partitionId : rawMap.keySet()) {
       partitionStateMap.put(Id.partition(partitionId), State.from(rawMap.get(partitionId)));
@@ -287,6 +291,9 @@ public class CurrentState extends HelixProperty {
    */
   public static Map<String, String> stringMapFromPartitionStateMap(
       Map<PartitionId, State> partitionStateMap) {
+    if (partitionStateMap == null) {
+      return Collections.emptyMap();
+    }
     Map<String, String> rawMap = new HashMap<String, String>();
     for (PartitionId partitionId : partitionStateMap.keySet()) {
       rawMap.put(partitionId.stringify(), partitionStateMap.get(partitionId).toString());

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c3c13a62/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index ded32c7..ffff483 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -671,6 +671,9 @@ public class IdealState extends HelixProperty {
    * @return converted list
    */
   public static List<ParticipantId> preferenceListFromStringList(List<String> rawPreferenceList) {
+    if (rawPreferenceList == null) {
+      return Collections.emptyList();
+    }
     return Lists.transform(rawPreferenceList, new Function<String, ParticipantId>() {
       @Override
       public ParticipantId apply(String participantName) {
@@ -686,6 +689,9 @@ public class IdealState extends HelixProperty {
    */
   public static Map<PartitionId, List<ParticipantId>> preferenceListsFromStringLists(
       Map<String, List<String>> rawPreferenceLists) {
+    if (rawPreferenceLists == null) {
+      return Collections.emptyMap();
+    }
     Map<PartitionId, List<ParticipantId>> preferenceLists =
         new HashMap<PartitionId, List<ParticipantId>>();
     for (String partitionId : rawPreferenceLists.keySet()) {
@@ -701,6 +707,9 @@ public class IdealState extends HelixProperty {
    * @return converted list
    */
   public static List<String> stringListFromPreferenceList(List<ParticipantId> preferenceList) {
+    if (preferenceList == null) {
+      return Collections.emptyList();
+    }
     return Lists.transform(preferenceList, new Function<ParticipantId, String>() {
       @Override
       public String apply(ParticipantId participantId) {
@@ -716,6 +725,9 @@ public class IdealState extends HelixProperty {
    */
   public static Map<String, List<String>> stringListsFromPreferenceLists(
       Map<PartitionId, List<ParticipantId>> preferenceLists) {
+    if (preferenceLists == null) {
+      return Collections.emptyMap();
+    }
     Map<String, List<String>> rawPreferenceLists = new HashMap<String, List<String>>();
     for (PartitionId partitionId : preferenceLists.keySet()) {
       rawPreferenceLists.put(partitionId.stringify(),
@@ -736,18 +748,12 @@ public class IdealState extends HelixProperty {
 
   /**
    * Convert a full state mapping as strings into participant state maps
-   * @param rawMap the map of partition name to participant name and state
+   * @param rawMaps the map of partition name to participant name and state
    * @return converted maps
    */
   public static Map<PartitionId, Map<ParticipantId, State>> participantStateMapsFromStringMaps(
       Map<String, Map<String, String>> rawMaps) {
-    Map<PartitionId, Map<ParticipantId, State>> participantStateMaps =
-        new HashMap<PartitionId, Map<ParticipantId, State>>();
-    for (String partitionId : rawMaps.keySet()) {
-      participantStateMaps.put(Id.partition(partitionId),
-          participantStateMapFromStringMap(rawMaps.get(partitionId)));
-    }
-    return participantStateMaps;
+    return ResourceAssignment.replicaMapsFromStringMaps(rawMaps);
   }
 
   /**
@@ -767,11 +773,6 @@ public class IdealState extends HelixProperty {
    */
   public static Map<String, Map<String, String>> stringMapsFromParticipantStateMaps(
       Map<PartitionId, Map<ParticipantId, State>> participantStateMaps) {
-    Map<String, Map<String, String>> rawMaps = new HashMap<String, Map<String, String>>();
-    for (PartitionId partitionId : participantStateMaps.keySet()) {
-      rawMaps.put(partitionId.stringify(),
-          stringMapFromParticipantStateMap(participantStateMaps.get(partitionId)));
-    }
-    return rawMaps;
+    return ResourceAssignment.stringMapsFromReplicaMaps(participantStateMaps);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c3c13a62/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
index bbc2baa..8577578 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
@@ -121,6 +121,9 @@ public class ResourceAssignment extends HelixProperty {
    * @return map of participant id to state
    */
   public static Map<ParticipantId, State> replicaMapFromStringMap(Map<String, String> rawMap) {
+    if (rawMap == null) {
+      return Collections.emptyMap();
+    }
     Map<ParticipantId, State> replicaMap = new HashMap<ParticipantId, State>();
     for (String participantName : rawMap.keySet()) {
       replicaMap.put(Id.participant(participantName), State.from(rawMap.get(participantName)));
@@ -129,15 +132,54 @@ public class ResourceAssignment extends HelixProperty {
   }
 
   /**
+   * Convert a full replica mapping as strings into participant state maps
+   * @param rawMaps the map of partition name to participant name and state
+   * @return converted maps
+   */
+  public static Map<PartitionId, Map<ParticipantId, State>> replicaMapsFromStringMaps(
+      Map<String, Map<String, String>> rawMaps) {
+    if (rawMaps == null) {
+      return Collections.emptyMap();
+    }
+    Map<PartitionId, Map<ParticipantId, State>> participantStateMaps =
+        new HashMap<PartitionId, Map<ParticipantId, State>>();
+    for (String partitionId : rawMaps.keySet()) {
+      participantStateMaps.put(Id.partition(partitionId),
+          replicaMapFromStringMap(rawMaps.get(partitionId)));
+    }
+    return participantStateMaps;
+  }
+
+  /**
    * Helper for converting a replica map to a map of strings
    * @param replicaMap map of participant id to state
    * @return map of participant name to state name
    */
   public static Map<String, String> stringMapFromReplicaMap(Map<ParticipantId, State> replicaMap) {
+    if (replicaMap == null) {
+      return Collections.emptyMap();
+    }
     Map<String, String> rawMap = new HashMap<String, String>();
     for (ParticipantId participantId : replicaMap.keySet()) {
       rawMap.put(participantId.stringify(), replicaMap.get(participantId).toString());
     }
     return rawMap;
   }
+
+  /**
+   * Convert a full state mapping into a mapping of string names
+   * @param replicaMaps the map of partition id to participant id and state
+   * @return converted maps
+   */
+  public static Map<String, Map<String, String>> stringMapsFromReplicaMaps(
+      Map<PartitionId, Map<ParticipantId, State>> replicaMaps) {
+    if (replicaMaps == null) {
+      return Collections.emptyMap();
+    }
+    Map<String, Map<String, String>> rawMaps = new HashMap<String, Map<String, String>>();
+    for (PartitionId partitionId : replicaMaps.keySet()) {
+      rawMaps.put(partitionId.stringify(), stringMapFromReplicaMap(replicaMaps.get(partitionId)));
+    }
+    return rawMaps;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c3c13a62/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
index 3ca9776..f66616c 100644
--- a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
+++ b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
@@ -237,6 +237,15 @@ public class StateModelDefinition extends HelixProperty {
     return _statesCountMap.get(state);
   }
 
+  /**
+   * Number of participants that can be in each state
+   * @param state the state
+   * @return maximum instance count per state, can be "N" or "R"
+   */
+  public String getNumParticipantsPerState(State state) {
+    return _statesCountMap.get(state.toString());
+  }
+
   @Override
   public boolean isValid() {
     if (getInitialStateString() == null) {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c3c13a62/helix-core/src/main/java/org/apache/helix/model/builder/ResourceAssignmentBuilder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/builder/ResourceAssignmentBuilder.java b/helix-core/src/main/java/org/apache/helix/model/builder/ResourceAssignmentBuilder.java
new file mode 100644
index 0000000..735dbae
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/model/builder/ResourceAssignmentBuilder.java
@@ -0,0 +1,94 @@
+package org.apache.helix.model.builder;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.State;
+import org.apache.helix.model.ResourceAssignment;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Build an ideal assignment of resources
+ */
+public class ResourceAssignmentBuilder {
+  private final ResourceId _resourceId;
+  private final Map<PartitionId, Map<ParticipantId, State>> _mapping;
+
+  /**
+   * Create an assignment for a given resource
+   * @param resourceId resource id
+   */
+  public ResourceAssignmentBuilder(ResourceId resourceId) {
+    _resourceId = resourceId;
+    _mapping = new HashMap<PartitionId, Map<ParticipantId, State>>();
+  }
+
+  /**
+   * Add multiple assignments of partition replicas
+   * @param partitionId the partition to assign
+   * @param replicaMap participant-state map of assignments
+   * @return ResourceAssignmentBuilder
+   */
+  public ResourceAssignmentBuilder addAssignments(PartitionId partitionId,
+      Map<ParticipantId, State> replicaMap) {
+    if (_mapping.containsKey(partitionId)) {
+      _mapping.get(partitionId).putAll(replicaMap);
+    } else {
+      _mapping.put(partitionId, replicaMap);
+    }
+    return this;
+  }
+
+  /**
+   * Add a single replica assignment
+   * @param partitonId the partition to assign
+   * @param participantId participant of assignment
+   * @param state replica state
+   * @return ResourceAssignmentBuilder
+   */
+  public ResourceAssignmentBuilder addAssignment(PartitionId partitonId,
+      ParticipantId participantId, State state) {
+    Map<ParticipantId, State> replicaMap;
+    if (!_mapping.containsKey(partitonId)) {
+      replicaMap = new HashMap<ParticipantId, State>();
+      _mapping.put(partitonId, replicaMap);
+    } else {
+      replicaMap = _mapping.get(partitonId);
+    }
+    replicaMap.put(participantId, state);
+    return this;
+  }
+
+  /**
+   * Get a complete resource assignment
+   * @return ResourceAssignment
+   */
+  public ResourceAssignment build() {
+    ResourceAssignment assignment = new ResourceAssignment(_resourceId);
+    for (PartitionId partitionId : _mapping.keySet()) {
+      assignment.addReplicaMap(partitionId, _mapping.get(partitionId));
+    }
+    return assignment;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c3c13a62/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
new file mode 100644
index 0000000..d220db8
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
@@ -0,0 +1,597 @@
+package org.apache.helix.controller.strategy;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.Id;
+import org.apache.helix.api.MessageId;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.State;
+import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+
+public class TestNewAutoRebalanceStrategy {
+  private static Logger logger = Logger.getLogger(TestNewAutoRebalanceStrategy.class);
+
+  /**
+   * Sanity test for a basic Master-Slave model
+   */
+  @Test
+  public void simpleMasterSlaveTest() {
+    final int NUM_ITERATIONS = 10;
+    final int NUM_PARTITIONS = 10;
+    final int NUM_LIVE_NODES = 12;
+    final int NUM_TOTAL_NODES = 20;
+    final int MAX_PER_NODE = 5;
+
+    final String[] STATE_NAMES = {
+        "MASTER", "SLAVE"
+    };
+    final int[] STATE_COUNTS = {
+        1, 2
+    };
+
+    runTest("BasicMasterSlave", NUM_ITERATIONS, NUM_PARTITIONS, NUM_LIVE_NODES, NUM_TOTAL_NODES,
+        MAX_PER_NODE, STATE_NAMES, STATE_COUNTS);
+  }
+
+  /**
+   * Run a test for an arbitrary state model.
+   * @param name Name of the test state model
+   * @param numIterations Number of rebalance tasks to run
+   * @param numPartitions Number of partitions for the resource
+   * @param numLiveNodes Number of live nodes in the cluster
+   * @param numTotalNodes Number of nodes in the cluster, must be greater than or equal to
+   *          numLiveNodes
+   * @param maxPerNode Maximum number of replicas a node can serve
+   * @param stateNames States ordered by preference
+   * @param stateCounts Number of replicas that should be in each state
+   */
+  private void runTest(String name, int numIterations, int numPartitions, int numLiveNodes,
+      int numTotalNodes, int maxPerNode, String[] stateNames, int[] stateCounts) {
+    List<String> partitions = new ArrayList<String>();
+    for (int i = 0; i < numPartitions; i++) {
+      partitions.add("p_" + i);
+    }
+
+    List<String> liveNodes = new ArrayList<String>();
+    List<String> allNodes = new ArrayList<String>();
+    for (int i = 0; i < numTotalNodes; i++) {
+      allNodes.add("n_" + i);
+      if (i < numLiveNodes) {
+        liveNodes.add("n_" + i);
+      }
+    }
+
+    Map<String, Map<String, String>> currentMapping = new TreeMap<String, Map<String, String>>();
+
+    LinkedHashMap<String, Integer> states = new LinkedHashMap<String, Integer>();
+    for (int i = 0; i < Math.min(stateNames.length, stateCounts.length); i++) {
+      states.put(stateNames[i], stateCounts[i]);
+    }
+
+    StateModelDefinition stateModelDef = getIncompleteStateModelDef(name, stateNames[0], states);
+
+    new AutoRebalanceTester(partitions, states, liveNodes, currentMapping, allNodes, maxPerNode,
+        stateModelDef, new AutoRebalanceStrategy.DefaultPlacementScheme())
+        .runRepeatedly(numIterations);
+  }
+
+  /**
+   * Get a StateModelDefinition without transitions. The auto rebalancer doesn't take transitions
+   * into account when computing mappings, so this is acceptable.
+   * @param modelName name to give the model
+   * @param initialState initial state for all nodes
+   * @param states ordered map of state to count
+   * @return incomplete StateModelDefinition for rebalancing
+   */
+  private StateModelDefinition getIncompleteStateModelDef(String modelName, String initialState,
+      LinkedHashMap<String, Integer> states) {
+    StateModelDefinition.Builder builder = new StateModelDefinition.Builder(modelName);
+    builder.initialState(initialState);
+    int i = states.size();
+    for (String state : states.keySet()) {
+      builder.addState(state, i);
+      builder.upperBound(state, states.get(state));
+      i--;
+    }
+    return builder.build();
+  }
+
+  class AutoRebalanceTester {
+    private static final double P_KILL = 0.45;
+    private static final double P_ADD = 0.1;
+    private static final double P_RESURRECT = 0.45;
+    private static final String RESOURCE_NAME = "resource";
+
+    private List<String> _partitions;
+    private LinkedHashMap<String, Integer> _states;
+    private List<String> _liveNodes;
+    private Set<String> _liveSet;
+    private Set<String> _removedSet;
+    private Set<String> _nonLiveSet;
+    private Map<String, Map<String, String>> _currentMapping;
+    private List<String> _allNodes;
+    private int _maxPerNode;
+    private StateModelDefinition _stateModelDef;
+    private ReplicaPlacementScheme _placementScheme;
+    private Random _random;
+
+    public AutoRebalanceTester(List<String> partitions, LinkedHashMap<String, Integer> states,
+        List<String> liveNodes, Map<String, Map<String, String>> currentMapping,
+        List<String> allNodes, int maxPerNode, StateModelDefinition stateModelDef,
+        ReplicaPlacementScheme placementScheme) {
+      _partitions = partitions;
+      _states = states;
+      _liveNodes = liveNodes;
+      _liveSet = new TreeSet<String>();
+      for (String node : _liveNodes) {
+        _liveSet.add(node);
+      }
+      _removedSet = new TreeSet<String>();
+      _nonLiveSet = new TreeSet<String>();
+      _currentMapping = currentMapping;
+      _allNodes = allNodes;
+      for (String node : allNodes) {
+        if (!_liveSet.contains(node)) {
+          _nonLiveSet.add(node);
+        }
+      }
+      _maxPerNode = maxPerNode;
+      _stateModelDef = stateModelDef;
+      _placementScheme = placementScheme;
+      _random = new Random();
+    }
+
+    /**
+     * Repeatedly randomly select a task to run and report the result
+     * @param numIterations
+     *          Number of random tasks to run in sequence
+     */
+    public void runRepeatedly(int numIterations) {
+      logger.info("~~~~ Initial State ~~~~~");
+      ZNRecord initialResult =
+          new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode,
+              _placementScheme).computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
+      _currentMapping = getMapping(initialResult.getListFields());
+      logger.info(_currentMapping);
+      getRunResult(_currentMapping, initialResult.getListFields());
+      for (int i = 0; i < numIterations; i++) {
+        logger.info("~~~~ Iteration " + i + " ~~~~~");
+        ZNRecord znRecord = runOnceRandomly();
+        if (znRecord != null) {
+          final Map<String, List<String>> listResult = znRecord.getListFields();
+          final Map<String, Map<String, String>> mapResult = getMapping(listResult);
+          logger.info(mapResult);
+          logger.info(listResult);
+          getRunResult(mapResult, listResult);
+          _currentMapping = mapResult;
+        }
+      }
+    }
+
+    private Map<String, Map<String, String>> getMapping(final Map<String, List<String>> listResult) {
+      final Map<PartitionId, Map<ParticipantId, State>> mapResult =
+          new HashMap<PartitionId, Map<ParticipantId, State>>();
+      for (String partition : _partitions) {
+        PartitionId partitionId = Id.partition(partition);
+        Set<ParticipantId> disabledParticipantsForPartition = Collections.emptySet();
+        Set<PartitionId> disabledPartitionIdSet = Collections.emptySet();
+        Set<String> tags = Collections.emptySet();
+        Map<MessageId, Message> messageMap = Collections.emptyMap();
+        Map<ResourceId, CurrentState> currentStateMap = Collections.emptyMap();
+        Map<ParticipantId, Participant> liveParticipantMap =
+            new HashMap<ParticipantId, Participant>();
+        // set up some participants
+        for (String nodeName : _liveNodes) {
+          ParticipantId participantId = Id.participant(nodeName);
+          Participant participant =
+              new Participant(participantId, "hostname", 0, true, disabledPartitionIdSet, tags,
+                  null, currentStateMap, messageMap);
+          liveParticipantMap.put(participantId, participant);
+        }
+        List<ParticipantId> participantPreferenceList =
+            Lists.transform(listResult.get(partition), new Function<String, ParticipantId>() {
+              @Override
+              public ParticipantId apply(String participantId) {
+                return Id.participant(participantId);
+              }
+            });
+        // compute the mapping
+        Map<ParticipantId, State> replicaMap =
+            ResourceAssignment.replicaMapFromStringMap(_currentMapping.get(partition));
+        Map<ParticipantId, State> assignment =
+            NewConstraintBasedAssignment.computeAutoBestStateForPartition(liveParticipantMap,
+                _stateModelDef, participantPreferenceList, replicaMap,
+                disabledParticipantsForPartition);
+        mapResult.put(partitionId, assignment);
+      }
+
+      return ResourceAssignment.stringMapsFromReplicaMaps(mapResult);
+    }
+
+    /**
+     * Output various statistics and correctness check results
+     * @param mapFields
+     *          The map-map assignment generated by the rebalancer
+     * @param listFields
+     *          The map-list assignment generated by the rebalancer
+     */
+    public void getRunResult(final Map<String, Map<String, String>> mapFields,
+        final Map<String, List<String>> listFields) {
+      logger.info("***** Statistics *****");
+      dumpStatistics(mapFields);
+      verifyCorrectness(mapFields, listFields);
+    }
+
+    /**
+     * Output statistics about the assignment
+     * @param mapFields
+     *          The map-map assignment generated by the rebalancer
+     */
+    public void dumpStatistics(final Map<String, Map<String, String>> mapFields) {
+      Map<String, Integer> partitionsPerNode = getPartitionBucketsForNode(mapFields);
+      int nodeCount = _liveNodes.size();
+      logger.info("Total number of nodes: " + nodeCount);
+      logger.info("Nodes: " + _liveNodes);
+      int sumPartitions = getSum(partitionsPerNode.values());
+      logger.info("Total number of partitions: " + sumPartitions);
+      double averagePartitions = getAverage(partitionsPerNode.values());
+      logger.info("Average number of partitions per node: " + averagePartitions);
+      double stdevPartitions = getStdev(partitionsPerNode.values(), averagePartitions);
+      logger.info("Standard deviation of partitions: " + stdevPartitions);
+
+      // Statistics about each state
+      Map<String, Map<String, Integer>> statesPerNode = getStateBucketsForNode(mapFields);
+      for (String state : _states.keySet()) {
+        Map<String, Integer> nodeStateCounts = new TreeMap<String, Integer>();
+        for (Entry<String, Map<String, Integer>> nodeStates : statesPerNode.entrySet()) {
+          Map<String, Integer> stateCounts = nodeStates.getValue();
+          if (stateCounts.containsKey(state)) {
+            nodeStateCounts.put(nodeStates.getKey(), stateCounts.get(state));
+          } else {
+            nodeStateCounts.put(nodeStates.getKey(), 0);
+          }
+        }
+        int sumStates = getSum(nodeStateCounts.values());
+        logger.info("Total number of state " + state + ": " + sumStates);
+        double averageStates = getAverage(nodeStateCounts.values());
+        logger.info("Average number of state " + state + " per node: " + averageStates);
+        double stdevStates = getStdev(nodeStateCounts.values(), averageStates);
+        logger.info("Standard deviation of state " + state + " per node: " + stdevStates);
+      }
+    }
+
+    /**
+     * Run a set of correctness tests, reporting success or failure
+     * @param mapFields
+     *          The map-map assignment generated by the rebalancer
+     * @param listFields
+     *          The map-list assignment generated by the rebalancer
+     */
+    public void verifyCorrectness(final Map<String, Map<String, String>> mapFields,
+        final Map<String, List<String>> listFields) {
+      final Map<String, Integer> partitionsPerNode = getPartitionBucketsForNode(mapFields);
+      boolean maxConstraintMet = maxNotExceeded(partitionsPerNode);
+      assert maxConstraintMet : "Max per node constraint: FAIL";
+      logger.info("Max per node constraint: PASS");
+
+      boolean liveConstraintMet = onlyLiveAssigned(partitionsPerNode);
+      assert liveConstraintMet : "Only live nodes have partitions constraint: FAIL";
+      logger.info("Only live nodes have partitions constraint: PASS");
+
+      boolean stateAssignmentPossible = correctStateAssignmentCount(mapFields);
+      assert stateAssignmentPossible : "State replica constraint: FAIL";
+      logger.info("State replica constraint: PASS");
+
+      boolean nodesUniqueForPartitions = atMostOnePartitionReplicaPerNode(listFields);
+      assert nodesUniqueForPartitions : "Node uniqueness per partition constraint: FAIL";
+      logger.info("Node uniqueness per partition constraint: PASS");
+    }
+
+    private boolean maxNotExceeded(final Map<String, Integer> partitionsPerNode) {
+      for (String node : partitionsPerNode.keySet()) {
+        Integer value = partitionsPerNode.get(node);
+        if (value > _maxPerNode) {
+          logger.error("ERROR: Node " + node + " has " + value
+              + " partitions despite a maximum of " + _maxPerNode);
+          return false;
+        }
+      }
+      return true;
+    }
+
+    private boolean onlyLiveAssigned(final Map<String, Integer> partitionsPerNode) {
+      for (final Entry<String, Integer> nodeState : partitionsPerNode.entrySet()) {
+        boolean isLive = _liveSet.contains(nodeState.getKey());
+        boolean isEmpty = nodeState.getValue() == 0;
+        if (!isLive && !isEmpty) {
+          logger.error("ERROR: Node " + nodeState.getKey() + " is not live, but has "
+              + nodeState.getValue() + " replicas!");
+          return false;
+        }
+      }
+      return true;
+    }
+
+    private boolean correctStateAssignmentCount(final Map<String, Map<String, String>> assignment) {
+      for (final Entry<String, Map<String, String>> partitionEntry : assignment.entrySet()) {
+        final Map<String, String> nodeMap = partitionEntry.getValue();
+        final Map<String, Integer> stateCounts = new TreeMap<String, Integer>();
+        for (String state : nodeMap.values()) {
+          if (!stateCounts.containsKey(state)) {
+            stateCounts.put(state, 1);
+          } else {
+            stateCounts.put(state, stateCounts.get(state) + 1);
+          }
+        }
+        for (String state : stateCounts.keySet()) {
+          if (state.equals(HelixDefinedState.DROPPED.toString())) {
+            continue;
+          }
+          int count = stateCounts.get(state);
+          int maximumCount = _states.get(state);
+          if (count > maximumCount) {
+            logger.error("ERROR: State " + state + " for partition " + partitionEntry.getKey()
+                + " has " + count + " replicas when " + maximumCount + " is allowed!");
+            return false;
+          }
+        }
+      }
+      return true;
+    }
+
+    private boolean atMostOnePartitionReplicaPerNode(final Map<String, List<String>> listFields) {
+      for (final Entry<String, List<String>> partitionEntry : listFields.entrySet()) {
+        Set<String> nodeSet = new HashSet<String>(partitionEntry.getValue());
+        int numUniques = nodeSet.size();
+        int total = partitionEntry.getValue().size();
+        if (numUniques < total) {
+          logger.error("ERROR: Partition " + partitionEntry.getKey() + " is assigned to " + total
+              + " nodes, but only " + numUniques + " are unique!");
+          return false;
+        }
+      }
+      return true;
+    }
+
+    private double getAverage(final Collection<Integer> values) {
+      double sum = 0.0;
+      for (Integer value : values) {
+        sum += value;
+      }
+      if (values.size() != 0) {
+        return sum / values.size();
+      } else {
+        return -1.0;
+      }
+    }
+
+    private int getSum(final Collection<Integer> values) {
+      int sum = 0;
+      for (Integer value : values) {
+        sum += value;
+      }
+      return sum;
+    }
+
+    private double getStdev(final Collection<Integer> values, double mean) {
+      double sum = 0.0;
+      for (Integer value : values) {
+        double deviation = mean - value;
+        sum += Math.pow(deviation, 2.0);
+      }
+      if (values.size() != 0) {
+        sum /= values.size();
+        return Math.pow(sum, 0.5);
+      } else {
+        return -1.0;
+      }
+    }
+
+    private Map<String, Integer> getPartitionBucketsForNode(
+        final Map<String, Map<String, String>> assignment) {
+      Map<String, Integer> partitionsPerNode = new TreeMap<String, Integer>();
+      for (String node : _liveNodes) {
+        partitionsPerNode.put(node, 0);
+      }
+      for (Entry<String, Map<String, String>> partitionEntry : assignment.entrySet()) {
+        final Map<String, String> nodeMap = partitionEntry.getValue();
+        for (String node : nodeMap.keySet()) {
+          String state = nodeMap.get(node);
+          if (state.equals(HelixDefinedState.DROPPED.toString())) {
+            continue;
+          }
+          // add 1 for every occurrence of a node
+          if (!partitionsPerNode.containsKey(node)) {
+            partitionsPerNode.put(node, 1);
+          } else {
+            partitionsPerNode.put(node, partitionsPerNode.get(node) + 1);
+          }
+        }
+      }
+      return partitionsPerNode;
+    }
+
+    private Map<String, Map<String, Integer>> getStateBucketsForNode(
+        final Map<String, Map<String, String>> assignment) {
+      Map<String, Map<String, Integer>> result = new TreeMap<String, Map<String, Integer>>();
+      for (String n : _liveNodes) {
+        result.put(n, new TreeMap<String, Integer>());
+      }
+      for (Map<String, String> nodeStateMap : assignment.values()) {
+        for (Entry<String, String> nodeState : nodeStateMap.entrySet()) {
+          if (!result.containsKey(nodeState.getKey())) {
+            result.put(nodeState.getKey(), new TreeMap<String, Integer>());
+          }
+          Map<String, Integer> stateMap = result.get(nodeState.getKey());
+          if (!stateMap.containsKey(nodeState.getValue())) {
+            stateMap.put(nodeState.getValue(), 1);
+          } else {
+            stateMap.put(nodeState.getValue(), stateMap.get(nodeState.getValue()) + 1);
+          }
+        }
+      }
+      return result;
+    }
+
+    /**
+     * Randomly choose between killing, adding, or resurrecting a single node
+     * @return (Partition -> (Node -> State)) ZNRecord
+     */
+    public ZNRecord runOnceRandomly() {
+      double choose = _random.nextDouble();
+      ZNRecord result = null;
+      if (choose < P_KILL) {
+        result = removeSingleNode(null);
+      } else if (choose < P_KILL + P_ADD) {
+        result = addSingleNode(null);
+      } else if (choose < P_KILL + P_ADD + P_RESURRECT) {
+        result = resurrectSingleNode(null);
+      }
+      return result;
+    }
+
+    /**
+     * Run rebalancer trying to add a never-live node
+     * @param node
+     *          Optional String to add
+     * @return ZNRecord result returned by the rebalancer
+     */
+    public ZNRecord addSingleNode(String node) {
+      logger.info("=================== add node =================");
+      if (_nonLiveSet.size() == 0) {
+        logger.warn("Cannot add node because there are no nodes left to add.");
+        return null;
+      }
+
+      // Get a random never-live node
+      if (node == null || !_nonLiveSet.contains(node)) {
+        node = getRandomSetElement(_nonLiveSet);
+      }
+      logger.info("Adding " + node);
+      _liveNodes.add(node);
+      _liveSet.add(node);
+      _nonLiveSet.remove(node);
+
+      return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode,
+          _placementScheme).computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
+    }
+
+    /**
+     * Run rebalancer trying to remove a live node
+     * @param node
+     *          Optional String to remove
+     * @return ZNRecord result returned by the rebalancer
+     */
+    public ZNRecord removeSingleNode(String node) {
+      logger.info("=================== remove node =================");
+      if (_liveSet.size() == 0) {
+        logger.warn("Cannot remove node because there are no nodes left to remove.");
+        return null;
+      }
+
+      // Get a random never-live node
+      if (node == null || !_liveSet.contains(node)) {
+        node = getRandomSetElement(_liveSet);
+      }
+      logger.info("Removing " + node);
+      _removedSet.add(node);
+      _liveNodes.remove(node);
+      _liveSet.remove(node);
+
+      // the rebalancer expects that the current mapping doesn't contain deleted
+      // nodes
+      for (Map<String, String> nodeMap : _currentMapping.values()) {
+        if (nodeMap.containsKey(node)) {
+          nodeMap.remove(node);
+        }
+      }
+
+      return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode,
+          _placementScheme).computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
+    }
+
+    /**
+     * Run rebalancer trying to add back a removed node
+     * @param node
+     *          Optional String to resurrect
+     * @return ZNRecord result returned by the rebalancer
+     */
+    public ZNRecord resurrectSingleNode(String node) {
+      logger.info("=================== resurrect node =================");
+      if (_removedSet.size() == 0) {
+        logger.warn("Cannot remove node because there are no nodes left to resurrect.");
+        return null;
+      }
+
+      // Get a random never-live node
+      if (node == null || !_removedSet.contains(node)) {
+        node = getRandomSetElement(_removedSet);
+      }
+      logger.info("Resurrecting " + node);
+      _removedSet.remove(node);
+      _liveNodes.add(node);
+      _liveSet.add(node);
+
+      return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode,
+          _placementScheme).computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
+    }
+
+    private <T> T getRandomSetElement(Set<T> source) {
+      int element = _random.nextInt(source.size());
+      int i = 0;
+      for (T node : source) {
+        if (i == element) {
+          return node;
+        }
+        i++;
+      }
+      return null;
+    }
+  }
+}


Mime
View raw message