helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject [1/2] helix rebalancer refactor using logical models
Date Thu, 05 Sep 2013 20:11:52 GMT
Updated Branches:
  refs/heads/helix-logical-model 9c7de4c33 -> 5d0e048e1


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
index 4038c69..9745c64 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
@@ -27,10 +27,14 @@ import java.util.Map;
 import java.util.TreeMap;
 
 import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.PartitionId;
 import org.apache.helix.api.RebalancerConfig;
 import org.apache.helix.api.Resource;
 import org.apache.helix.api.ResourceId;
 import org.apache.helix.api.State;
+import org.apache.helix.api.StateModelDefId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.model.IdealState;
@@ -75,15 +79,22 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
     public int getUpperBound() {
       return upper;
     }
+
+    @Override
+    public String toString() {
+      return String.format("%d-%d", lower, upper);
+    }
   }
 
   @Override
   public void process(ClusterEvent event) throws Exception {
     Cluster cluster = event.getAttribute("ClusterDataCache");
+    Map<StateModelDefId, StateModelDefinition> stateModelDefMap =
+        event.getAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString());
     Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
     NewCurrentStateOutput currentStateOutput =
         event.getAttribute(AttributeName.CURRENT_STATE.toString());
-    MessageGenerationOutput messageGenOutput =
+    NewMessageOutput messageGenOutput =
         event.getAttribute(AttributeName.MESSAGES_ALL.toString());
     if (cluster == null || resourceMap == null || currentStateOutput == null
         || messageGenOutput == null) {
@@ -91,29 +102,28 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
           + ". Requires DataCache|RESOURCES|CURRENT_STATE|MESSAGES_ALL");
     }
 
-    MessageSelectionStageOutput output = new MessageSelectionStageOutput();
+    NewMessageOutput output = new NewMessageOutput();
 
     for (ResourceId resourceId : resourceMap.keySet()) {
       Resource resource = resourceMap.get(resourceId);
-      // TODO fix it
-      StateModelDefinition stateModelDef = null;
-      // cache.getStateModelDef(resource.getStateModelDefRef());
+      StateModelDefinition stateModelDef =
+          stateModelDefMap.get(resource.getRebalancerConfig().getStateModelDefId());
 
+      // TODO have a logical model for transition
       Map<String, Integer> stateTransitionPriorities = getStateTransitionPriorityMap(stateModelDef);
-      // IdealState idealState = cache.getIdealState(resourceName);
-      Map<String, Bounds> stateConstraints =
+      Map<State, Bounds> stateConstraints =
           computeStateConstraints(stateModelDef, resource.getRebalancerConfig(), cluster);
 
       // TODO fix it
-      // for (Partition partition : resource.getPartitions()) {
-      // List<Message> messages = messageGenOutput.getMessages(resourceId.stringify(), partition);
-      // List<Message> selectedMessages =
-      // selectMessages(cache.getLiveInstances(),
-      // currentStateOutput.getCurrentStateMap(resourceName, partition),
-      // currentStateOutput.getPendingStateMap(resourceName, partition), messages,
-      // stateConstraints, stateTransitionPriorities, stateModelDef.getInitialStateString());
-      // output.addMessages(resourceId.stringify(), partition, selectedMessages);
-      // }
+      for (PartitionId partitionId : resource.getPartitionMap().keySet()) {
+        List<Message> messages = messageGenOutput.getMessages(resourceId, partitionId);
+        List<Message> selectedMessages =
+            selectMessages(cluster.getLiveParticipantMap(),
+            currentStateOutput.getCurrentStateMap(resourceId, partitionId),
+            currentStateOutput.getPendingStateMap(resourceId, partitionId), messages,
+                stateConstraints, stateTransitionPriorities, stateModelDef.getInitialState());
+        output.setMessages(resourceId, partitionId, selectedMessages);
+      }
     }
     event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), output);
   }
@@ -137,22 +147,22 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
    *          : FROME_STATE-TO_STATE -> priority
    * @return: selected messages
    */
-  List<Message> selectMessages(Map<String, LiveInstance> liveInstances,
-      Map<String, String> currentStates, Map<String, String> pendingStates, List<Message> messages,
-      Map<String, Bounds> stateConstraints, final Map<String, Integer> stateTransitionPriorities,
-      String initialState) {
+  List<Message> selectMessages(Map<ParticipantId, Participant> liveParticipants,
+      Map<ParticipantId, State> currentStates, Map<ParticipantId, State> pendingStates,
+      List<Message> messages, Map<State, Bounds> stateConstraints,
+      final Map<String, Integer> stateTransitionPriorities, State initialState) {
     if (messages == null || messages.isEmpty()) {
       return Collections.emptyList();
     }
 
     List<Message> selectedMessages = new ArrayList<Message>();
-    Map<String, Bounds> bounds = new HashMap<String, Bounds>();
+    Map<State, Bounds> bounds = new HashMap<State, Bounds>();
 
     // count currentState, if no currentState, count as in initialState
-    for (String instance : liveInstances.keySet()) {
-      String state = initialState;
-      if (currentStates.containsKey(instance)) {
-        state = currentStates.get(instance);
+    for (ParticipantId liveParticipantId : liveParticipants.keySet()) {
+      State state = initialState;
+      if (currentStates.containsKey(liveParticipantId)) {
+        state = currentStates.get(liveParticipantId);
       }
 
       if (!bounds.containsKey(state)) {
@@ -163,8 +173,8 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
     }
 
     // count pendingStates
-    for (String instance : pendingStates.keySet()) {
-      String state = pendingStates.get(instance);
+    for (ParticipantId participantId : pendingStates.keySet()) {
+      State state = pendingStates.get(participantId);
       if (!bounds.containsKey(state)) {
         bounds.put(state, new Bounds(0, 0));
       }
@@ -178,7 +188,7 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
     for (Message message : messages) {
       State fromState = message.getFromState();
       State toState = message.getToState();
-      String transition = fromState + "-" + toState;
+      String transition = fromState.toString() + "-" + toState.toString();
       int priority = Integer.MAX_VALUE;
 
       if (stateTransitionPriorities.containsKey(transition)) {
@@ -203,7 +213,7 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
         }
 
         if (!bounds.containsKey(toState)) {
-          bounds.put(toState.toString(), new Bounds(0, 0));
+          bounds.put(toState, new Bounds(0, 0));
         }
 
         // check lower bound of fromState
@@ -243,13 +253,13 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
    * beginning and compute the stateConstraint instance once and re use at other places.
    * Each IdealState must have a constraint object associated with it
    */
-  private Map<String, Bounds> computeStateConstraints(StateModelDefinition stateModelDefinition,
+  private Map<State, Bounds> computeStateConstraints(StateModelDefinition stateModelDefinition,
       RebalancerConfig rebalancerConfig, Cluster cluster) {
-    Map<String, Bounds> stateConstraints = new HashMap<String, Bounds>();
+    Map<State, Bounds> stateConstraints = new HashMap<State, Bounds>();
 
-    List<String> statePriorityList = stateModelDefinition.getStatesPriorityStringList();
-    for (String state : statePriorityList) {
-      String numInstancesPerState = stateModelDefinition.getNumInstancesPerState(state);
+    List<State> statePriorityList = stateModelDefinition.getStatesPriorityList();
+    for (State state : statePriorityList) {
+      String numInstancesPerState = stateModelDefinition.getNumInstancesPerState(state.toString());
       int max = -1;
       if ("N".equals(numInstancesPerState)) {
         max = cluster.getLiveParticipantMap().size();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
index e45cd38..5bea5b4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
@@ -27,6 +27,9 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.PartitionId;
 import org.apache.helix.api.Resource;
 import org.apache.helix.api.ResourceId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
@@ -115,7 +118,7 @@ public class NewMessageThrottleStage extends AbstractBaseStage {
   @Override
   public void process(ClusterEvent event) throws Exception {
     Cluster cluster = event.getAttribute("ClusterDataCache");
-    MessageSelectionStageOutput msgSelectionOutput =
+    NewMessageOutput msgSelectionOutput =
         event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
     Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
 
@@ -124,34 +127,33 @@ public class NewMessageThrottleStage extends AbstractBaseStage {
           + ". Requires ClusterDataCache|RESOURCES|MESSAGES_SELECTED");
     }
 
-    MessageThrottleStageOutput output = new MessageThrottleStageOutput();
+    NewMessageOutput output = new NewMessageOutput();
 
     // TODO fix it
-    ClusterConstraints constraint = null;
-    // cache.getConstraint(ConstraintType.MESSAGE_CONSTRAINT);
+    ClusterConstraints constraint = cluster.getConstraint(ConstraintType.MESSAGE_CONSTRAINT);
     Map<String, Integer> throttleCounterMap = new HashMap<String, Integer>();
 
-    // TODO fix it
-    // if (constraint != null) {
-    // // go through all pending messages, they should be counted but not throttled
-    // for (String instance : cache.getLiveInstances().keySet()) {
-    // throttle(throttleCounterMap, constraint, new ArrayList<Message>(cache.getMessages(instance)
-    // .values()), false);
-    // }
-    // }
+    if (constraint != null) {
+      // go through all pending messages, they should be counted but not throttled
+      for (ParticipantId participantId : cluster.getLiveParticipantMap().keySet()) {
+        Participant liveParticipant = cluster.getLiveParticipantMap().get(participantId);
+        throttle(throttleCounterMap, constraint, new ArrayList<Message>(liveParticipant
+            .getMessageMap().values()), false);
+      }
+    }
 
     // go through all new messages, throttle if necessary
     // assume messages should be sorted by state transition priority in messageSelection stage
     for (ResourceId resourceId : resourceMap.keySet()) {
       Resource resource = resourceMap.get(resourceId);
       // TODO fix it
-      // for (Partition partition : resource.getPartitions()) {
-      // List<Message> messages = msgSelectionOutput.getMessages(resourceName, partition);
-      // if (constraint != null && messages != null && messages.size() > 0) {
-      // messages = throttle(throttleCounterMap, constraint, messages, true);
-      // }
-      // output.addMessages(resourceName, partition, messages);
-      // }
+      for (PartitionId partitionId : resource.getPartitionMap().keySet()) {
+        List<Message> messages = msgSelectionOutput.getMessages(resourceId, partitionId);
+        if (constraint != null && messages != null && messages.size() > 0) {
+          messages = throttle(throttleCounterMap, constraint, messages, true);
+        }
+        output.setMessages(resourceId, partitionId, messages);
+      }
     }
 
     event.addAttribute(AttributeName.MESSAGES_THROTTLE.toString(), output);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/stages/NewReadClusterDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewReadClusterDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewReadClusterDataStage.java
new file mode 100644
index 0000000..ed487a1
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewReadClusterDataStage.java
@@ -0,0 +1,85 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.ClusterAccessor;
+import org.apache.helix.api.ClusterId;
+import org.apache.helix.api.Id;
+import org.apache.helix.api.StateModelDefId;
+import org.apache.helix.api.StateModelDefinitionAccessor;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
+import org.apache.log4j.Logger;
+
+public class NewReadClusterDataStage extends AbstractBaseStage {
+  private static final Logger LOG = Logger.getLogger(NewReadClusterDataStage.class.getName());
+
+  @Override
+  public void process(ClusterEvent event) throws Exception {
+    long startTime = System.currentTimeMillis();
+    LOG.info("START ReadClusterDataStage.process()");
+
+    HelixManager manager = event.getAttribute("helixmanager");
+    if (manager == null) {
+      throw new StageException("HelixManager attribute value is null");
+    }
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    ClusterId clusterId = Id.cluster(manager.getClusterName());
+    ClusterAccessor clusterAccessor = new ClusterAccessor(clusterId, accessor);
+    StateModelDefinitionAccessor stateModelDefAccessor =
+        new StateModelDefinitionAccessor(clusterId, accessor);
+
+    Cluster cluster = clusterAccessor.readCluster();
+    Map<StateModelDefId, StateModelDefinition> stateModelDefMap =
+        stateModelDefAccessor.readStateModelDefinitions();
+
+    ClusterStatusMonitor clusterStatusMonitor =
+        (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
+    if (clusterStatusMonitor != null) {
+      // TODO fix it
+      // int disabledInstances = 0;
+      // int disabledPartitions = 0;
+      // for (InstanceConfig config : _cache._instanceConfigMap.values()) {
+      // if (config.getInstanceEnabled() == false) {
+      // disabledInstances++;
+      // }
+      // if (config.getDisabledPartitions() != null) {
+      // disabledPartitions += config.getDisabledPartitions().size();
+      // }
+      // }
+      // clusterStatusMonitor.setClusterStatusCounters(_cache._liveInstanceMap.size(),
+      // _cache._instanceConfigMap.size(), disabledInstances, disabledPartitions);
+    }
+
+    event.addAttribute("ClusterDataCache", cluster);
+    event.addAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString(), stateModelDefMap);
+
+    long endTime = System.currentTimeMillis();
+    LOG.info("END ReadClusterDataStage.process(). took: " + (endTime - startTime) + " ms");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/stages/NewRebalanceIdealStateStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewRebalanceIdealStateStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewRebalanceIdealStateStage.java
deleted file mode 100644
index 3359b50..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewRebalanceIdealStateStage.java
+++ /dev/null
@@ -1,84 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.RebalancerConfig;
-import org.apache.helix.api.Resource;
-import org.apache.helix.api.ResourceId;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.rebalancer.NewRebalancer;
-import org.apache.helix.controller.rebalancer.Rebalancer;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.util.HelixUtil;
-import org.apache.log4j.Logger;
-
-/**
- * Check and invoke custom implementation idealstate rebalancers.<br/>
- * If the resourceConfig has specified className of the customized rebalancer, <br/>
- * the rebalancer will be invoked to re-write the idealstate of the resource<br/>
- */
-public class NewRebalanceIdealStateStage extends AbstractBaseStage {
-  private static final Logger LOG = Logger.getLogger(NewRebalanceIdealStateStage.class.getName());
-
-  @Override
-  public void process(ClusterEvent event) throws Exception {
-    Cluster cluster = event.getAttribute("ClusterDataCache");
-    NewCurrentStateOutput currentStateOutput =
-        event.getAttribute(AttributeName.CURRENT_STATE.toString());
-
-    // Map<String, IdealState> updatedIdealStates = new HashMap<String, IdealState>();
-    for (ResourceId resourceId : cluster.getResourceMap().keySet()) {
-      // IdealState currentIdealState = idealStateMap.get(resourceName);
-      Resource resource = cluster.getResource(resourceId);
-      RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
-      if (rebalancerConfig.getRebalancerMode() == RebalanceMode.USER_DEFINED
-          && rebalancerConfig.getRebalancerClassName() != null) {
-        String rebalancerClassName = rebalancerConfig.getRebalancerClassName();
-        LOG.info("resource " + resourceId + " use idealStateRebalancer " + rebalancerClassName);
-        try {
-          NewRebalancer balancer =
-              (NewRebalancer) (HelixUtil.loadClass(getClass(), rebalancerClassName).newInstance());
-
-          // TODO add state model def
-          ResourceAssignment resourceAssignment =
-              balancer.computeResourceMapping(resource, cluster, null);
-
-          // TODO impl this
-          // currentIdealState.updateFromAssignment(resourceAssignment);
-          // updatedIdealStates.put(resourceName, currentIdealState);
-        } catch (Exception e) {
-          LOG.error("Exception while invoking custom rebalancer class:" + rebalancerClassName, e);
-        }
-      }
-    }
-
-    // TODO
-    // if (updatedIdealStates.size() > 0) {
-      // cache.getIdealStates().putAll(updatedIdealStates);
-    // }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
index b8c1ecf..af23eb2 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
@@ -21,7 +21,6 @@ package org.apache.helix.controller.stages;
 
 import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.helix.api.Cluster;
 import org.apache.helix.api.Participant;
@@ -67,7 +66,7 @@ public class NewResourceComputationStage extends AbstractBaseStage {
 
     // include all partitions from CurrentState as well since idealState might be removed
     for (Participant liveParticipant : cluster.getLiveParticipantMap().values()) {
-      for ( ResourceId resourceId : liveParticipant.getCurrentStateMap().keySet()) {
+      for (ResourceId resourceId : liveParticipant.getCurrentStateMap().keySet()) {
         CurrentState currentState = liveParticipant.getCurrentStateMap().get(resourceId);
 
         if (currentState.getStateModelDefRef() == null) {
@@ -80,13 +79,15 @@ public class NewResourceComputationStage extends AbstractBaseStage {
 
         // don't overwrite ideal state configs
         if (!resourceBuilderMap.containsKey(resourceId)) {
-          RebalancerConfig.Builder rebalancerConfigBuilder = new RebalancerConfig.Builder();
+          RebalancerConfig.Builder rebalancerConfigBuilder =
+              new RebalancerConfig.Builder(resourceId);
           rebalancerConfigBuilder.stateModelDef(currentState.getStateModelDefId());
-          rebalancerConfigBuilder.stateModelFactoryId(new StateModelFactoryId(currentState.getStateModelFactoryName()));
+          rebalancerConfigBuilder.stateModelFactoryId(new StateModelFactoryId(currentState
+              .getStateModelFactoryName()));
           rebalancerConfigBuilder.bucketSize(currentState.getBucketSize());
           rebalancerConfigBuilder.batchMessageMode(currentState.getBatchMessageMode());
 
-          org.apache.helix.api.Resource.Builder resourceBuilder = new org.apache.helix.api.Resource.Builder(resourceId);
+          Resource.Builder resourceBuilder = new Resource.Builder(resourceId);
           resourceBuilder.rebalancerConfig(rebalancerConfigBuilder.build());
           resourceBuilderMap.put(resourceId, resourceBuilder);
         }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
index 95862ae..2b8a0c8 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
@@ -31,8 +31,10 @@ import org.apache.helix.HelixManagerProperties;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Id;
 import org.apache.helix.api.Participant;
 import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.PartitionId;
 import org.apache.helix.api.Resource;
 import org.apache.helix.api.ResourceId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
@@ -52,7 +54,7 @@ public class NewTaskAssignmentStage extends AbstractBaseStage {
 
     HelixManager manager = event.getAttribute("helixmanager");
     Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
-    MessageThrottleStageOutput messageOutput =
+    NewMessageOutput messageOutput =
         event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
     Cluster cluster = event.getAttribute("ClusterDataCache");
     Map<ParticipantId, Participant> liveParticipantMap = cluster.getLiveParticipantMap();
@@ -67,11 +69,10 @@ public class NewTaskAssignmentStage extends AbstractBaseStage {
     List<Message> messagesToSend = new ArrayList<Message>();
     for (ResourceId resourceId : resourceMap.keySet()) {
       Resource resource = resourceMap.get(resourceId);
-      // TODO fix it
-      // for (Partition partition : resource.getPartitions()) {
-      // List<Message> messages = messageOutput.getMessages(resourceName, partition);
-      // messagesToSend.addAll(messages);
-      // }
+      for (PartitionId partitionId : resource.getPartitionMap().keySet()) {
+        List<Message> messages = messageOutput.getMessages(resourceId, partitionId);
+        messagesToSend.addAll(messages);
+      }
     }
 
     List<Message> outputMessages =
@@ -95,9 +96,9 @@ public class NewTaskAssignmentStage extends AbstractBaseStage {
     while (iter.hasNext()) {
       Message message = iter.next();
       ResourceId resourceId = message.getResourceId();
-      Resource resource = resourceMap.get(resourceId.stringify());
+      Resource resource = resourceMap.get(resourceId);
 
-      String participantId = message.getTgtName();
+      ParticipantId participantId = Id.participant(message.getTgtName());
       Participant liveParticipant = liveParticipantMap.get(participantId);
       String participantVersion = null;
       if (liveParticipant != null) {
@@ -141,10 +142,10 @@ public class NewTaskAssignmentStage extends AbstractBaseStage {
           + " transit " + message.getPartitionId() + "|" + message.getPartitionIds() + " from:"
           + message.getFromState() + " to:" + message.getToState());
 
-      // System.out.println("[dbg] Sending Message " + message.getMsgId() + " to " +
-      // message.getTgtName()
-      // + " transit " + message.getPartitionName() + "|" + message.getPartitionNames()
-      // + " from: " + message.getFromState() + " to: " + message.getToState());
+      // System.out.println("[dbg] Sending Message " + message.getMsgId() + " to "
+      // + message.getTgtName() + " transit " + message.getPartitionId() + "|"
+      // + message.getPartitionId() + " from: " + message.getFromState() + " to: "
+      // + message.getToState());
 
       keys.add(keyBuilder.message(message.getTgtName(), message.getId()));
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
index f16bb39..b6facea 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
@@ -29,6 +29,7 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.util.HelixUtil;
 import org.apache.log4j.Logger;
 
@@ -66,7 +67,9 @@ public class RebalanceIdealStateStage extends AbstractBaseStage {
           ResourceAssignment resourceAssignment =
               balancer.computeResourceMapping(resource, currentIdealState, currentStateOutput,
                   cache);
-          currentIdealState.updateFromAssignment(resourceAssignment);
+          StateModelDefinition stateModelDef =
+              cache.getStateModelDef(currentIdealState.getStateModelDefRef());
+          currentIdealState.updateFromAssignment(resourceAssignment, stateModelDef);
           updatedIdealStates.put(resourceName, currentIdealState);
         } catch (Exception e) {
           LOG.error("Exception while invoking custom rebalancer class:" + rebalancerClassName, e);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
index 087d2fb..835af6e 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
@@ -410,6 +410,7 @@ public class ZKHelixDataAccessor implements HelixDataAccessor, ControllerChangeL
       switch (type) {
       case EXTERNALVIEW:
         if (value.getBucketSize() == 0) {
+          System.out.println("set: " + value.getRecord());
           records.add(value.getRecord());
         } else {
           _baseDataAccessor.remove(path, options);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
index 0f690db..7fb641f 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
@@ -712,6 +712,14 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
     for (int i = 0; i < paths.size(); i++) {
       success[i] = (results.get(i)._retCode == RetCode.OK);
     }
+
+    for (int i = 0; i < paths.size(); i++) {
+      String path = paths.get(i);
+      T record = records.get(i);
+      if (path.indexOf("EXTERNALVIEW") != -1) {
+        System.out.println("path: " + path + ", record: " + record + ", success: " + success[i]);
+      }
+    }
     return success;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index ffff483..24ec7c9 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -29,6 +29,7 @@ import java.util.TreeMap;
 import java.util.TreeSet;
 
 import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.api.Id;
@@ -38,14 +39,18 @@ import org.apache.helix.api.RebalancerRef;
 import org.apache.helix.api.ResourceId;
 import org.apache.helix.api.State;
 import org.apache.helix.api.StateModelDefId;
+import org.apache.helix.api.StateModelFactoryId;
 import org.apache.helix.controller.rebalancer.Rebalancer;
 import org.apache.log4j.Logger;
 
 import com.google.common.base.Function;
+import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Multimaps;
 
 /**
  * The ideal states of all partitions in a resource
@@ -459,6 +464,14 @@ public class IdealState extends HelixProperty {
   }
 
   /**
+   * Set the state model associated with this resource
+   * @param stateModel state model identifier
+   */
+  public void setStateModelDefId(StateModelDefId stateModelDefId) {
+    setStateModelDefRef(stateModelDefId.stringify());
+  }
+
+  /**
    * Set the number of partitions of this resource
    * @param numPartitions the number of partitions
    */
@@ -540,6 +553,14 @@ public class IdealState extends HelixProperty {
   }
 
   /**
+   * Set the state model factory associated with this resource
+   * @param name state model factory id
+   */
+  public void setStateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
+    setStateModelFactoryName(stateModelFactoryId.stringify());
+  }
+
+  /**
    * Get the state model factory associated with this resource
    * @return state model factory name
    */
@@ -549,6 +570,14 @@ public class IdealState extends HelixProperty {
   }
 
   /**
+   * Get the state model factory associated with this resource
+   * @return state model factory id
+   */
+  public StateModelFactoryId getStateModelFactoryId() {
+    return Id.stateModelFactory(getStateModelFactoryName());
+  }
+
+  /**
    * Set the frequency with which to rebalance
    * @return the rebalancing timer period
    */
@@ -613,13 +642,39 @@ public class IdealState extends HelixProperty {
     return _record.getSimpleField(IdealStateProperty.INSTANCE_GROUP_TAG.toString());
   }
 
-  public void updateFromAssignment(ResourceAssignment assignment) {
+  /**
+   * Update the ideal state from a ResourceAssignment computed during a rebalance
+   * @param assignment the new resource assignment
+   * @param stateModelDef state model of the resource
+   */
+  public void updateFromAssignment(ResourceAssignment assignment, StateModelDefinition stateModelDef) {
+    // clear all preference lists and maps
     _record.getMapFields().clear();
     _record.getListFields().clear();
+
+    // assign a partition at a time
     for (PartitionId partition : assignment.getMappedPartitions()) {
+      List<ParticipantId> preferenceList = new ArrayList<ParticipantId>();
+      Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
+
+      // invert the map to get in state order
       Map<ParticipantId, State> replicaMap = assignment.getReplicaMap(partition);
-      setParticipantStateMap(partition, replicaMap);
-      setPreferenceList(partition, new ArrayList<ParticipantId>(replicaMap.keySet()));
+      ListMultimap<State, ParticipantId> inverseMap = ArrayListMultimap.create();
+      Multimaps.invertFrom(Multimaps.forMap(replicaMap), inverseMap);
+
+      // update the ideal state in order of state priorities
+      for (State state : stateModelDef.getStatesPriorityList()) {
+        if (!state.equals(State.from(HelixDefinedState.DROPPED))
+            && !state.equals(State.from(HelixDefinedState.ERROR))) {
+          List<ParticipantId> stateParticipants = inverseMap.get(state);
+          for (ParticipantId participant : stateParticipants) {
+            preferenceList.add(participant);
+            participantStateMap.put(participant, state);
+          }
+        }
+      }
+      setPreferenceList(partition, preferenceList);
+      setParticipantStateMap(partition, participantStateMap);
     }
   }
 
@@ -674,12 +729,13 @@ public class IdealState extends HelixProperty {
     if (rawPreferenceList == null) {
       return Collections.emptyList();
     }
-    return Lists.transform(rawPreferenceList, new Function<String, ParticipantId>() {
-      @Override
-      public ParticipantId apply(String participantName) {
-        return Id.participant(participantName);
-      }
-    });
+    return Lists.transform(new ArrayList<String>(rawPreferenceList),
+        new Function<String, ParticipantId>() {
+          @Override
+          public ParticipantId apply(String participantName) {
+            return Id.participant(participantName);
+          }
+        });
   }
 
   /**
@@ -710,12 +766,13 @@ public class IdealState extends HelixProperty {
     if (preferenceList == null) {
       return Collections.emptyList();
     }
-    return Lists.transform(preferenceList, new Function<ParticipantId, String>() {
-      @Override
-      public String apply(ParticipantId participantId) {
-        return participantId.stringify();
-      }
-    });
+    return Lists.transform(new ArrayList<ParticipantId>(preferenceList),
+        new Function<ParticipantId, String>() {
+          @Override
+          public String apply(ParticipantId participantId) {
+            return participantId.stringify();
+          }
+        });
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/test/java/org/apache/helix/api/TestId.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestId.java b/helix-core/src/test/java/org/apache/helix/api/TestId.java
index 05da8a3..57c01e7 100644
--- a/helix-core/src/test/java/org/apache/helix/api/TestId.java
+++ b/helix-core/src/test/java/org/apache/helix/api/TestId.java
@@ -41,6 +41,7 @@ public class TestId {
     final String sessionName = "Session";
     final String processName = "Process";
     final String stateModelName = "StateModel";
+    final String stateModelFactoryName = "StateModelFactory";
     final String messageName = "Message";
     Assert.assertEquals(Id.resource(resourceName).stringify(), resourceName);
     Assert.assertEquals(Id.cluster(clusterName).stringify(), clusterName);
@@ -48,6 +49,8 @@ public class TestId {
     Assert.assertEquals(Id.session(sessionName).stringify(), sessionName);
     Assert.assertEquals(Id.process(processName).stringify(), processName);
     Assert.assertEquals(Id.stateModelDef(stateModelName).stringify(), stateModelName);
+    Assert.assertEquals(Id.stateModelFactory(stateModelFactoryName).stringify(),
+        stateModelFactoryName);
     Assert.assertEquals(Id.message(messageName).stringify(), messageName);
   }
 
@@ -72,6 +75,7 @@ public class TestId {
     Assert.assertNull(Id.session(null));
     Assert.assertNull(Id.process(null));
     Assert.assertNull(Id.stateModelDef(null));
+    Assert.assertNull(Id.stateModelFactory(null));
     Assert.assertNull(Id.message(null));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
index 98ae60b..cc26596 100644
--- a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
+++ b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
@@ -20,20 +20,33 @@ package org.apache.helix.api;
  */
 
 import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.controller.rebalancer.NewAutoRebalancer;
+import org.apache.helix.controller.rebalancer.NewCustomRebalancer;
+import org.apache.helix.controller.rebalancer.NewSemiAutoRebalancer;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.NewBestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.NewBestPossibleStateOutput;
+import org.apache.helix.controller.stages.NewCurrentStateOutput;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.mock.controller.ClusterController;
 import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.helix.tools.StateModelConfigGenerator;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -42,6 +55,7 @@ import org.testng.annotations.Test;
 public class TestNewStages extends ZkUnitTestBase {
   final int n = 2;
   final int p = 8;
+  final int r = 2;
   MockParticipant[] _participants = new MockParticipant[n];
   ClusterController _controller;
 
@@ -88,6 +102,146 @@ public class TestNewStages extends ZkUnitTestBase {
     System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
   }
 
+  @Test
+  public void testBasicBestPossibleStateCalcStage() {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String testName = className + "_" + methodName;
+
+    System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
+
+    // Set up the event
+    ClusterAccessor clusterAccessor = new ClusterAccessor(_clusterId, _dataAccessor);
+    Cluster cluster = clusterAccessor.readCluster();
+    ClusterEvent event = new ClusterEvent(testName);
+    event.addAttribute(AttributeName.CURRENT_STATE.toString(), new NewCurrentStateOutput());
+    event.addAttribute(AttributeName.RESOURCES.toString(), cluster.getResourceMap());
+    event.addAttribute("ClusterDataCache", cluster);
+    Map<StateModelDefId, StateModelDefinition> stateModelMap =
+        new HashMap<StateModelDefId, StateModelDefinition>();
+    stateModelMap.put(Id.stateModelDef("MasterSlave"), new StateModelDefinition(
+        StateModelConfigGenerator.generateConfigForMasterSlave()));
+    event.addAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString(), stateModelMap);
+
+    // Run the stage
+    try {
+      new NewBestPossibleStateCalcStage().process(event);
+    } catch (Exception e) {
+      Assert.fail(e.toString());
+    }
+
+    // Verify the result
+    NewBestPossibleStateOutput bestPossibleStateOutput =
+        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+    Assert.assertNotNull(bestPossibleStateOutput);
+    ResourceId resourceId = new ResourceId("TestDB0");
+    ResourceAssignment assignment = bestPossibleStateOutput.getResourceAssignment(resourceId);
+    Assert.assertNotNull(assignment);
+    Resource resource = cluster.getResource(resourceId);
+    verifySemiAutoRebalance(resource, assignment);
+
+    System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testClusterRebalancers() {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String testName = className + "_" + methodName;
+
+    System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
+
+    ClusterAccessor clusterAccessor = new ClusterAccessor(_clusterId, _dataAccessor);
+    Cluster cluster = clusterAccessor.readCluster();
+
+    ResourceId resourceId = new ResourceId("TestDB0");
+    Resource resource = cluster.getResource(resourceId);
+    StateModelDefinition masterSlave =
+        new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
+    NewCurrentStateOutput currentStateOutput = new NewCurrentStateOutput();
+    ResourceAssignment fullAutoResult =
+        new NewAutoRebalancer().computeResourceMapping(resource, cluster, masterSlave,
+            currentStateOutput);
+    verifyFullAutoRebalance(resource, fullAutoResult);
+    ResourceAssignment semiAutoResult =
+        new NewSemiAutoRebalancer().computeResourceMapping(resource, cluster, masterSlave,
+            currentStateOutput);
+    verifySemiAutoRebalance(resource, semiAutoResult);
+    ResourceAssignment customResult =
+        new NewCustomRebalancer().computeResourceMapping(resource, cluster, masterSlave,
+            currentStateOutput);
+    verifyCustomRebalance(resource, customResult);
+
+    System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  /**
+   * Check that a full auto rebalance is run, and at least one replica per partition is mapped
+   * @param resource the resource to verify
+   * @param assignment the assignment to verify
+   */
+  private void verifyFullAutoRebalance(Resource resource, ResourceAssignment assignment) {
+    Assert.assertEquals(assignment.getMappedPartitions().size(), resource.getPartitionSet().size());
+    for (PartitionId partitionId : assignment.getMappedPartitions()) {
+      Map<ParticipantId, State> replicaMap = assignment.getReplicaMap(partitionId);
+      Assert.assertTrue(replicaMap.size() <= r);
+      Assert.assertTrue(replicaMap.size() > 0);
+      boolean hasMaster = false;
+      for (State state : replicaMap.values()) {
+        if (state.equals(State.from("MASTER"))) {
+          Assert.assertFalse(hasMaster);
+          hasMaster = true;
+        }
+      }
+      Assert.assertTrue(hasMaster);
+    }
+  }
+
+  /**
+   * Check that a semi auto rebalance is run, and all partitions are mapped by preference list
+   * @param resource the resource to verify
+   * @param assignment the assignment to verify
+   */
+  private void verifySemiAutoRebalance(Resource resource, ResourceAssignment assignment) {
+    Assert.assertEquals(assignment.getMappedPartitions().size(), resource.getPartitionSet().size());
+    RebalancerConfig config = resource.getRebalancerConfig();
+    for (PartitionId partitionId : assignment.getMappedPartitions()) {
+      List<ParticipantId> preferenceList = config.getPreferenceList(partitionId);
+      Map<ParticipantId, State> replicaMap = assignment.getReplicaMap(partitionId);
+      Assert.assertEquals(replicaMap.size(), preferenceList.size());
+      Assert.assertEquals(replicaMap.size(), r);
+      boolean hasMaster = false;
+      for (ParticipantId participant : preferenceList) {
+        Assert.assertTrue(replicaMap.containsKey(participant));
+        State state = replicaMap.get(participant);
+        if (state.equals(State.from("MASTER"))) {
+          Assert.assertFalse(hasMaster);
+          hasMaster = true;
+        }
+      }
+      Assert.assertEquals(replicaMap.get(preferenceList.get(0)), State.from("MASTER"));
+    }
+  }
+
+  /**
+   * For vanilla customized rebalancing, the resource assignment should match the preference map
+   * @param resource the resource to verify
+   * @param assignment the assignment to verify
+   */
+  private void verifyCustomRebalance(Resource resource, ResourceAssignment assignment) {
+    Assert.assertEquals(assignment.getMappedPartitions().size(), resource.getPartitionSet().size());
+    RebalancerConfig config = resource.getRebalancerConfig();
+    for (PartitionId partitionId : assignment.getMappedPartitions()) {
+      Map<ParticipantId, State> preferenceMap = config.getPreferenceMap(partitionId);
+      Map<ParticipantId, State> replicaMap = assignment.getReplicaMap(partitionId);
+      Assert.assertEquals(replicaMap.size(), preferenceMap.size());
+      Assert.assertEquals(replicaMap.size(), r);
+      for (ParticipantId participant : preferenceMap.keySet()) {
+        Assert.assertTrue(replicaMap.containsKey(participant));
+        Assert.assertEquals(replicaMap.get(participant), preferenceMap.get(participant));
+      }
+    }
+  }
 
   @BeforeClass
   public void beforeClass() throws Exception {
@@ -106,7 +260,7 @@ public class TestNewStages extends ZkUnitTestBase {
         1, // resources
         p, // partitions per resource
         n, // number of nodes
-        2, // replicas
+        r, // replicas
         "MasterSlave", true); // do rebalance
 
     _controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);


Mime
View raw message