helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject [41/51] [partial] [HELIX-198] Unify helix code style, rb=13710
Date Wed, 21 Aug 2013 20:43:54 GMT
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index 32d9a8e..92964e9 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -41,25 +41,21 @@ import org.apache.log4j.Logger;
 
 /**
  * Compares the currentState, pendingState with IdealState and generate messages
- * 
  */
-public class MessageGenerationPhase extends AbstractBaseStage
-{
+public class MessageGenerationPhase extends AbstractBaseStage {
   private static Logger logger = Logger.getLogger(MessageGenerationPhase.class);
 
   @Override
-  public void process(ClusterEvent event) throws Exception
-  {
+  public void process(ClusterEvent event) throws Exception {
     HelixManager manager = event.getAttribute("helixmanager");
     ClusterDataCache cache = event.getAttribute("ClusterDataCache");
     Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
-    CurrentStateOutput currentStateOutput = event.getAttribute(AttributeName.CURRENT_STATE
-        .toString());
-    BestPossibleStateOutput bestPossibleStateOutput = event
-        .getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+    CurrentStateOutput currentStateOutput =
+        event.getAttribute(AttributeName.CURRENT_STATE.toString());
+    BestPossibleStateOutput bestPossibleStateOutput =
+        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
     if (manager == null || cache == null || resourceMap == null || currentStateOutput == null
-        || bestPossibleStateOutput == null)
-    {
+        || bestPossibleStateOutput == null) {
       throw new StageException("Missing attributes in event:" + event
           + ". Requires HelixManager|DataCache|RESOURCES|CURRENT_STATE|BEST_POSSIBLE_STATE");
     }
@@ -67,112 +63,99 @@ public class MessageGenerationPhase extends AbstractBaseStage
     Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
     Map<String, String> sessionIdMap = new HashMap<String, String>();
 
-    for (LiveInstance liveInstance : liveInstances.values())
-    {
+    for (LiveInstance liveInstance : liveInstances.values()) {
       sessionIdMap.put(liveInstance.getInstanceName(), liveInstance.getSessionId());
     }
     MessageGenerationOutput output = new MessageGenerationOutput();
 
-    for (String resourceName : resourceMap.keySet())
-    {
+    for (String resourceName : resourceMap.keySet()) {
       Resource resource = resourceMap.get(resourceName);
       int bucketSize = resource.getBucketSize();
 
       StateModelDefinition stateModelDef = cache.getStateModelDef(resource.getStateModelDefRef());
 
-      for (Partition partition : resource.getPartitions())
-      {
-        Map<String, String> instanceStateMap = bestPossibleStateOutput.getInstanceStateMap(
-            resourceName, partition);
+      for (Partition partition : resource.getPartitions()) {
+        Map<String, String> instanceStateMap =
+            bestPossibleStateOutput.getInstanceStateMap(resourceName, partition);
 
         // we should generate message based on the desired-state priority
         // so keep generated messages in a temp map keyed by state
         // desired-state->list of generated-messages
         Map<String, List<Message>> messageMap = new HashMap<String, List<Message>>();
-        
-        for (String instanceName : instanceStateMap.keySet())
-        {
+
+        for (String instanceName : instanceStateMap.keySet()) {
           String desiredState = instanceStateMap.get(instanceName);
 
-          String currentState = currentStateOutput.getCurrentState(resourceName, partition,
-              instanceName);
-          if (currentState == null)
-          {
+          String currentState =
+              currentStateOutput.getCurrentState(resourceName, partition, instanceName);
+          if (currentState == null) {
             currentState = stateModelDef.getInitialState();
           }
 
-          if (desiredState.equalsIgnoreCase(currentState))
-          {
+          if (desiredState.equalsIgnoreCase(currentState)) {
             continue;
           }
 
-          String pendingState = currentStateOutput.getPendingState(resourceName, partition,
-              instanceName);
+          String pendingState =
+              currentStateOutput.getPendingState(resourceName, partition, instanceName);
 
           String nextState = stateModelDef.getNextStateForTransition(currentState, desiredState);
-          if (nextState == null)
-          {
+          if (nextState == null) {
             logger.error("Unable to find a next state for partition: "
                 + partition.getPartitionName() + " from stateModelDefinition"
                 + stateModelDef.getClass() + " from:" + currentState + " to:" + desiredState);
             continue;
           }
 
-          if (pendingState != null)
-          {
-            if (nextState.equalsIgnoreCase(pendingState))
-            {
+          if (pendingState != null) {
+            if (nextState.equalsIgnoreCase(pendingState)) {
               logger.debug("Message already exists for " + instanceName + " to transit "
                   + partition.getPartitionName() + " from " + currentState + " to " + nextState);
-            } else if (currentState.equalsIgnoreCase(pendingState))
-            {
+            } else if (currentState.equalsIgnoreCase(pendingState)) {
               logger.info("Message hasn't been removed for " + instanceName + " to transit"
                   + partition.getPartitionName() + " to " + pendingState + ", desiredState: "
                   + desiredState);
-            } else
-            {
+            } else {
               logger.info("IdealState changed before state transition completes for "
                   + partition.getPartitionName() + " on " + instanceName + ", pendingState: "
                   + pendingState + ", currentState: " + currentState + ", nextState: " + nextState);
             }
-          } else
-          {
-            Message message = createMessage(manager, resourceName, partition.getPartitionName(),
-                instanceName, currentState, nextState, sessionIdMap.get(instanceName),
-                stateModelDef.getId(), resource.getStateModelFactoryname(), bucketSize);
+          } else {
+            Message message =
+                createMessage(manager, resourceName, partition.getPartitionName(), instanceName,
+                    currentState, nextState, sessionIdMap.get(instanceName), stateModelDef.getId(),
+                    resource.getStateModelFactoryname(), bucketSize);
             IdealState idealState = cache.getIdealState(resourceName);
-            if(idealState!= null && idealState.getStateModelDefRef().equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE))
-            {
-              if(idealState.getRecord().getMapField(partition.getPartitionName())!=null)
-              {
-                message.getRecord().setMapField(Message.Attributes.INNER_MESSAGE.toString(), idealState.getRecord().getMapField(partition.getPartitionName()));
+            if (idealState != null
+                && idealState.getStateModelDefRef().equalsIgnoreCase(
+                    DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
+              if (idealState.getRecord().getMapField(partition.getPartitionName()) != null) {
+                message.getRecord().setMapField(Message.Attributes.INNER_MESSAGE.toString(),
+                    idealState.getRecord().getMapField(partition.getPartitionName()));
               }
             }
             // Set timeout of needed
-            String stateTransition = currentState + "-" + nextState + "_"
-                + Message.Attributes.TIMEOUT;
-            if (idealState != null)
-            {
+            String stateTransition =
+                currentState + "-" + nextState + "_" + Message.Attributes.TIMEOUT;
+            if (idealState != null) {
               String timeOutStr = idealState.getRecord().getSimpleField(stateTransition);
-              if(timeOutStr == null && idealState.getStateModelDefRef().equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE))
-              {
+              if (timeOutStr == null
+                  && idealState.getStateModelDefRef().equalsIgnoreCase(
+                      DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
                 // scheduled task queue
-                if(idealState.getRecord().getMapField(partition.getPartitionName()) != null)
-                {
-                  timeOutStr = idealState.getRecord().getMapField(partition.getPartitionName()).get(Message.Attributes.TIMEOUT.toString());
+                if (idealState.getRecord().getMapField(partition.getPartitionName()) != null) {
+                  timeOutStr =
+                      idealState.getRecord().getMapField(partition.getPartitionName())
+                          .get(Message.Attributes.TIMEOUT.toString());
                 }
               }
-              if(timeOutStr !=null)
-              {
-                try
-                {
+              if (timeOutStr != null) {
+                try {
                   int timeout = Integer.parseInt(timeOutStr);
-                  if (timeout > 0)
-                  {
+                  if (timeout > 0) {
                     message.setExecutionTimeout(timeout);
                   }
-                } catch (Exception e)
-                {
+                } catch (Exception e) {
                   logger.error("", e);
                 }
               }
@@ -180,31 +163,30 @@ public class MessageGenerationPhase extends AbstractBaseStage
             message.getRecord().setSimpleField("ClusterEventName", event.getName());
             // output.addMessage(resourceName, partition, message);
             if (!messageMap.containsKey(desiredState)) {
-            	messageMap.put(desiredState, new ArrayList<Message>());
+              messageMap.put(desiredState, new ArrayList<Message>());
             }
             messageMap.get(desiredState).add(message);
           }
         }
-        
+
         // add generated messages to output according to state priority
         List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
         for (String state : statesPriorityList) {
-        	if (messageMap.containsKey(state)) {
-        		for (Message message : messageMap.get(state)) {
-        			output.addMessage(resourceName, partition, message);
-        		}
-        	}
+          if (messageMap.containsKey(state)) {
+            for (Message message : messageMap.get(state)) {
+              output.addMessage(resourceName, partition, message);
+            }
+          }
         }
-        
-      }	// end of for-each-partition
+
+      } // end of for-each-partition
     }
     event.addAttribute(AttributeName.MESSAGES_ALL.toString(), output);
   }
 
   private Message createMessage(HelixManager manager, String resourceName, String partitionName,
       String instanceName, String currentState, String nextState, String sessionId,
-      String stateModelDefName, String stateModelFactoryName, int bucketSize)
-  {
+      String stateModelDefName, String stateModelFactoryName, int bucketSize) {
     String uuid = UUID.randomUUID().toString();
     Message message = new Message(MessageType.STATE_TRANSITION, uuid);
     message.setSrcName(manager.getInstanceName());

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
index 00cbe17..9a420aa 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
@@ -36,95 +36,75 @@ import org.apache.helix.model.Resource;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
 
-
-public class MessageSelectionStage extends AbstractBaseStage
-{
+public class MessageSelectionStage extends AbstractBaseStage {
   private static final Logger LOG = Logger.getLogger(MessageSelectionStage.class);
 
-  public static class Bounds
-  {
+  public static class Bounds {
     private int upper;
     private int lower;
 
-    public Bounds(int lower, int upper)
-    {
+    public Bounds(int lower, int upper) {
       this.lower = lower;
       this.upper = upper;
     }
 
-    public void increaseUpperBound()
-    {
+    public void increaseUpperBound() {
       upper++;
     }
 
-    public void increaseLowerBound()
-    {
+    public void increaseLowerBound() {
       lower++;
     }
 
-    public void decreaseUpperBound()
-    {
+    public void decreaseUpperBound() {
       upper--;
     }
 
-    public void decreaseLowerBound()
-    {
+    public void decreaseLowerBound() {
       lower--;
     }
 
-    public int getLowerBound()
-    {
+    public int getLowerBound() {
       return lower;
     }
 
-    public int getUpperBound()
-    {
+    public int getUpperBound() {
       return upper;
     }
   }
 
   @Override
-  public void process(ClusterEvent event) throws Exception
-  {
+  public void process(ClusterEvent event) throws Exception {
     ClusterDataCache cache = event.getAttribute("ClusterDataCache");
-    Map<String, Resource> resourceMap =
-        event.getAttribute(AttributeName.RESOURCES.toString());
+    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
     CurrentStateOutput currentStateOutput =
         event.getAttribute(AttributeName.CURRENT_STATE.toString());
     MessageGenerationOutput messageGenOutput =
         event.getAttribute(AttributeName.MESSAGES_ALL.toString());
     if (cache == null || resourceMap == null || currentStateOutput == null
-        || messageGenOutput == null)
-    {
+        || messageGenOutput == null) {
       throw new StageException("Missing attributes in event:" + event
           + ". Requires DataCache|RESOURCES|CURRENT_STATE|MESSAGES_ALL");
     }
 
     MessageSelectionStageOutput output = new MessageSelectionStageOutput();
 
-    for (String resourceName : resourceMap.keySet())
-    {
+    for (String resourceName : resourceMap.keySet()) {
       Resource resource = resourceMap.get(resourceName);
-      StateModelDefinition stateModelDef =
-          cache.getStateModelDef(resource.getStateModelDefRef());
+      StateModelDefinition stateModelDef = cache.getStateModelDef(resource.getStateModelDefRef());
 
-      Map<String, Integer> stateTransitionPriorities =
-          getStateTransitionPriorityMap(stateModelDef);
+      Map<String, Integer> stateTransitionPriorities = getStateTransitionPriorityMap(stateModelDef);
       IdealState idealState = cache.getIdealState(resourceName);
       Map<String, Bounds> stateConstraints =
           computeStateConstraints(stateModelDef, idealState, cache);
 
-      for (Partition partition : resource.getPartitions())
-      {
+      for (Partition partition : resource.getPartitions()) {
         List<Message> messages = messageGenOutput.getMessages(resourceName, partition);
         List<Message> selectedMessages =
             selectMessages(cache.getLiveInstances(),
-                           currentStateOutput.getCurrentStateMap(resourceName, partition),
-                           currentStateOutput.getPendingStateMap(resourceName, partition),
-                           messages,
-                           stateConstraints,
-                           stateTransitionPriorities,
-                           stateModelDef.getInitialState());
+                currentStateOutput.getCurrentStateMap(resourceName, partition),
+                currentStateOutput.getPendingStateMap(resourceName, partition), messages,
+                stateConstraints, stateTransitionPriorities, stateModelDef.getInitialState());
         output.addMessages(resourceName, partition, selectedMessages);
       }
     }
@@ -141,7 +121,6 @@ public class MessageSelectionStage extends AbstractBaseStage
    * lowest, for each message group with the same transition add message one by one and
    * make sure state constraint is not violated update state lower/upper-bounds when a new
    * message is selected
-   *
    * @param currentStates
    * @param pendingStates
    * @param messages
@@ -152,15 +131,10 @@ public class MessageSelectionStage extends AbstractBaseStage
    * @return: selected messages
    */
   List<Message> selectMessages(Map<String, LiveInstance> liveInstances,
-                               Map<String, String> currentStates,
-                               Map<String, String> pendingStates,
-                               List<Message> messages,
-                               Map<String, Bounds> stateConstraints,
-                               final Map<String, Integer> stateTransitionPriorities,
-                               String initialState)
-  {
-    if (messages == null || messages.isEmpty())
-    {
+      Map<String, String> currentStates, Map<String, String> pendingStates, List<Message> messages,
+      Map<String, Bounds> stateConstraints, final Map<String, Integer> stateTransitionPriorities,
+      String initialState) {
+    if (messages == null || messages.isEmpty()) {
       return Collections.emptyList();
     }
 
@@ -168,16 +142,13 @@ public class MessageSelectionStage extends AbstractBaseStage
     Map<String, Bounds> bounds = new HashMap<String, Bounds>();
 
     // count currentState, if no currentState, count as in initialState
-    for (String instance : liveInstances.keySet())
-    {
+    for (String instance : liveInstances.keySet()) {
       String state = initialState;
-      if (currentStates.containsKey(instance))
-      {
+      if (currentStates.containsKey(instance)) {
         state = currentStates.get(instance);
       }
 
-      if (!bounds.containsKey(state))
-      {
+      if (!bounds.containsKey(state)) {
         bounds.put(state, new Bounds(0, 0));
       }
       bounds.get(state).increaseLowerBound();
@@ -185,11 +156,9 @@ public class MessageSelectionStage extends AbstractBaseStage
     }
 
     // count pendingStates
-    for (String instance : pendingStates.keySet())
-    {
+    for (String instance : pendingStates.keySet()) {
       String state = pendingStates.get(instance);
-      if (!bounds.containsKey(state))
-      {
+      if (!bounds.containsKey(state)) {
         bounds.put(state, new Bounds(0, 0));
       }
       // TODO: add lower bound, need to refactor pendingState to include fromState also
@@ -199,67 +168,55 @@ public class MessageSelectionStage extends AbstractBaseStage
     // group messages based on state transition priority
     Map<Integer, List<Message>> messagesGroupByStateTransitPriority =
         new TreeMap<Integer, List<Message>>();
-    for (Message message : messages)
-    {
+    for (Message message : messages) {
       String fromState = message.getFromState();
       String toState = message.getToState();
       String transition = fromState + "-" + toState;
       int priority = Integer.MAX_VALUE;
 
-      if (stateTransitionPriorities.containsKey(transition))
-      {
+      if (stateTransitionPriorities.containsKey(transition)) {
         priority = stateTransitionPriorities.get(transition);
       }
 
-      if (!messagesGroupByStateTransitPriority.containsKey(priority))
-      {
+      if (!messagesGroupByStateTransitPriority.containsKey(priority)) {
         messagesGroupByStateTransitPriority.put(priority, new ArrayList<Message>());
       }
       messagesGroupByStateTransitPriority.get(priority).add(message);
     }
 
     // select messages
-    for (List<Message> messageList : messagesGroupByStateTransitPriority.values())
-    {
-      for (Message message : messageList)
-      {
+    for (List<Message> messageList : messagesGroupByStateTransitPriority.values()) {
+      for (Message message : messageList) {
         String fromState = message.getFromState();
         String toState = message.getToState();
 
-        if (!bounds.containsKey(fromState))
-        {
+        if (!bounds.containsKey(fromState)) {
           LOG.error("Message's fromState is not in currentState. message: " + message);
           continue;
         }
 
-        if (!bounds.containsKey(toState))
-        {
+        if (!bounds.containsKey(toState)) {
           bounds.put(toState, new Bounds(0, 0));
         }
 
         // check lower bound of fromState
-        if (stateConstraints.containsKey(fromState))
-        {
+        if (stateConstraints.containsKey(fromState)) {
           int newLowerBound = bounds.get(fromState).getLowerBound() - 1;
-          if (newLowerBound < 0)
-          {
+          if (newLowerBound < 0) {
             LOG.error("Number of currentState in " + fromState
                 + " is less than number of messages transiting from " + fromState);
             continue;
           }
 
-          if (newLowerBound < stateConstraints.get(fromState).getLowerBound())
-          {
+          if (newLowerBound < stateConstraints.get(fromState).getLowerBound()) {
             continue;
           }
         }
 
         // check upper bound of toState
-        if (stateConstraints.containsKey(toState))
-        {
+        if (stateConstraints.containsKey(toState)) {
           int newUpperBound = bounds.get(toState).getUpperBound() + 1;
-          if (newUpperBound > stateConstraints.get(toState).getUpperBound())
-          {
+          if (newUpperBound > stateConstraints.get(toState).getUpperBound()) {
             continue;
           }
         }
@@ -279,43 +236,30 @@ public class MessageSelectionStage extends AbstractBaseStage
    * Each IdealState must have a constraint object associated with it
    */
   private Map<String, Bounds> computeStateConstraints(StateModelDefinition stateModelDefinition,
-                                                      IdealState idealState,
-                                                      ClusterDataCache cache)
-  {
+      IdealState idealState, ClusterDataCache cache) {
     Map<String, Bounds> stateConstraints = new HashMap<String, Bounds>();
 
     List<String> statePriorityList = stateModelDefinition.getStatesPriorityList();
-    for (String state : statePriorityList)
-    {
+    for (String state : statePriorityList) {
       String numInstancesPerState = stateModelDefinition.getNumInstancesPerState(state);
       int max = -1;
-      if ("N".equals(numInstancesPerState))
-      {
+      if ("N".equals(numInstancesPerState)) {
         max = cache.getLiveInstances().size();
-      }
-      else if ("R".equals(numInstancesPerState))
-      {
+      } else if ("R".equals(numInstancesPerState)) {
         // idealState is null when resource has been dropped,
         // R can't be evaluated and ignore state constraints
-        if (idealState != null)
-        {
+        if (idealState != null) {
           max = cache.getReplicas(idealState.getResourceName());
         }
-      }
-      else
-      {
-        try
-        {
+      } else {
+        try {
           max = Integer.parseInt(numInstancesPerState);
-        }
-        catch (Exception e)
-        {
+        } catch (Exception e) {
           // use -1
         }
       }
 
-      if (max > -1)
-      {
+      if (max > -1) {
         // if state has no constraint, will not put in map
         stateConstraints.put(state, new Bounds(0, max));
       }
@@ -326,13 +270,10 @@ public class MessageSelectionStage extends AbstractBaseStage
 
   // TODO: if state transition priority is not provided then use lexicographical sorting
   // so that behavior is consistent
-  private Map<String, Integer> getStateTransitionPriorityMap(StateModelDefinition stateModelDef)
-  {
+  private Map<String, Integer> getStateTransitionPriorityMap(StateModelDefinition stateModelDef) {
     Map<String, Integer> stateTransitionPriorities = new HashMap<String, Integer>();
-    List<String> stateTransitionPriorityList =
-        stateModelDef.getStateTransitionPriorityList();
-    for (int i = 0; i < stateTransitionPriorityList.size(); i++)
-    {
+    List<String> stateTransitionPriorityList = stateModelDef.getStateTransitionPriorityList();
+    for (int i = 0; i < stateTransitionPriorityList.size(); i++) {
       stateTransitionPriorities.put(stateTransitionPriorityList.get(i), i);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStageOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStageOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStageOutput.java
index 3d05e0f..7ea545c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStageOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStageOutput.java
@@ -27,34 +27,24 @@ import java.util.Map;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
 
-
-public class MessageSelectionStageOutput
-{
+public class MessageSelectionStageOutput {
   private final Map<String, Map<Partition, List<Message>>> _messagesMap;
 
-  public MessageSelectionStageOutput()
-  {
+  public MessageSelectionStageOutput() {
     _messagesMap = new HashMap<String, Map<Partition, List<Message>>>();
   }
 
-  public void addMessages(String resourceName, Partition partition,
-      List<Message> selectedMessages)
-  {
-    if (!_messagesMap.containsKey(resourceName))
-    {
-      _messagesMap.put(resourceName,
-          new HashMap<Partition, List<Message>>());
+  public void addMessages(String resourceName, Partition partition, List<Message> selectedMessages) {
+    if (!_messagesMap.containsKey(resourceName)) {
+      _messagesMap.put(resourceName, new HashMap<Partition, List<Message>>());
     }
     _messagesMap.get(resourceName).put(partition, selectedMessages);
 
   }
 
-  public List<Message> getMessages(String resourceName,
-      Partition partition)
-  {
+  public List<Message> getMessages(String resourceName, Partition partition) {
     Map<Partition, List<Message>> map = _messagesMap.get(resourceName);
-    if (map != null)
-    {
+    if (map != null) {
       return map.get(partition);
     }
     return Collections.emptyList();
@@ -62,8 +52,7 @@ public class MessageSelectionStageOutput
   }
 
   @Override
-  public String toString()
-  {
+  public String toString() {
     return _messagesMap.toString();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
index 2cedb72..9eea7f1 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
@@ -38,21 +38,15 @@ import org.apache.helix.model.ClusterConstraints.ConstraintType;
 import org.apache.helix.model.ClusterConstraints.ConstraintValue;
 import org.apache.log4j.Logger;
 
+public class MessageThrottleStage extends AbstractBaseStage {
+  private static final Logger LOG = Logger.getLogger(MessageThrottleStage.class.getName());
 
-public class MessageThrottleStage extends AbstractBaseStage
-{
-  private static final Logger LOG =
-                                      Logger.getLogger(MessageThrottleStage.class.getName());
-
-  int valueOf(String valueStr)
-  {
+  int valueOf(String valueStr) {
     int value = Integer.MAX_VALUE;
 
-    try
-    {
+    try {
       ConstraintValue valueToken = ConstraintValue.valueOf(valueStr);
-      switch (valueToken)
-      {
+      switch (valueToken) {
       case ANY:
         value = Integer.MAX_VALUE;
         break;
@@ -61,15 +55,10 @@ public class MessageThrottleStage extends AbstractBaseStage
             + Integer.MAX_VALUE);
         break;
       }
-    }
-    catch (Exception e)
-    {
-      try
-      {
+    } catch (Exception e) {
+      try {
         value = Integer.parseInt(valueStr);
-      }
-      catch (NumberFormatException ne)
-      {
+      } catch (NumberFormatException ne) {
         LOG.error("Invalid constraintValue string:" + valueStr + ". Use default value:"
             + Integer.MAX_VALUE);
       }
@@ -86,44 +75,31 @@ public class MessageThrottleStage extends AbstractBaseStage
    * value, select the first in alphabetic order
    */
   Set<ConstraintItem> selectConstraints(Set<ConstraintItem> items,
-                                        Map<ConstraintAttribute, String> attributes)
-  {
+      Map<ConstraintAttribute, String> attributes) {
     Map<String, ConstraintItem> selectedItems = new HashMap<String, ConstraintItem>();
-    for (ConstraintItem item : items)
-    {
+    for (ConstraintItem item : items) {
       // don't select constraints with CONSTRAINT_VALUE=ANY
-      if (item.getConstraintValue().equals(ConstraintValue.ANY.toString()))
-      {
+      if (item.getConstraintValue().equals(ConstraintValue.ANY.toString())) {
         continue;
       }
 
       String key = item.filter(attributes).toString();
-      if (!selectedItems.containsKey(key))
-      {
+      if (!selectedItems.containsKey(key)) {
         selectedItems.put(key, item);
-      }
-      else
-      {
+      } else {
         ConstraintItem existingItem = selectedItems.get(key);
-        if (existingItem.match(item.getAttributes()))
-        {
+        if (existingItem.match(item.getAttributes())) {
           // item is more specific than existingItem
           selectedItems.put(key, item);
-        }
-        else if (!item.match(existingItem.getAttributes()))
-        {
+        } else if (!item.match(existingItem.getAttributes())) {
           // existingItem and item are of incomparable specificity
           int value = valueOf(item.getConstraintValue());
           int existingValue = valueOf(existingItem.getConstraintValue());
-          if (value < existingValue)
-          {
+          if (value < existingValue) {
             // item's constraint value is less than that of existingItem
             selectedItems.put(key, item);
-          }
-          else if (value == existingValue)
-          {
-            if (item.toString().compareTo(existingItem.toString()) < 0)
-            {
+          } else if (value == existingValue) {
+            if (item.toString().compareTo(existingItem.toString()) < 0) {
               // item is ahead of existingItem in alphabetic order
               selectedItems.put(key, item);
             }
@@ -135,47 +111,37 @@ public class MessageThrottleStage extends AbstractBaseStage
   }
 
   @Override
-  public void process(ClusterEvent event) throws Exception
-  {
+  public void process(ClusterEvent event) throws Exception {
     ClusterDataCache cache = event.getAttribute("ClusterDataCache");
     MessageSelectionStageOutput msgSelectionOutput =
         event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
-    Map<String, Resource> resourceMap =
-        event.getAttribute(AttributeName.RESOURCES.toString());
+    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
 
-    if (cache == null || resourceMap == null || msgSelectionOutput == null)
-    {
+    if (cache == null || resourceMap == null || msgSelectionOutput == null) {
       throw new StageException("Missing attributes in event: " + event
           + ". Requires ClusterDataCache|RESOURCES|MESSAGES_SELECTED");
     }
 
     MessageThrottleStageOutput output = new MessageThrottleStageOutput();
 
-        ClusterConstraints constraint = cache.getConstraint(ConstraintType.MESSAGE_CONSTRAINT);
+    ClusterConstraints constraint = cache.getConstraint(ConstraintType.MESSAGE_CONSTRAINT);
     Map<String, Integer> throttleCounterMap = new HashMap<String, Integer>();
 
-    if (constraint != null)
-    {
+    if (constraint != null) {
       // go through all pending messages, they should be counted but not throttled
-      for (String instance : cache.getLiveInstances().keySet())
-      {
-        throttle(throttleCounterMap,
-                 constraint,
-                 new ArrayList<Message>(cache.getMessages(instance).values()),
-                 false);
+      for (String instance : cache.getLiveInstances().keySet()) {
+        throttle(throttleCounterMap, constraint, new ArrayList<Message>(cache.getMessages(instance)
+            .values()), false);
       }
     }
 
     // go through all new messages, throttle if necessary
     // assume messages should be sorted by state transition priority in messageSelection stage
-    for (String resourceName : resourceMap.keySet())
-    {
+    for (String resourceName : resourceMap.keySet()) {
       Resource resource = resourceMap.get(resourceName);
-      for (Partition partition : resource.getPartitions())
-      {
+      for (Partition partition : resource.getPartitions()) {
         List<Message> messages = msgSelectionOutput.getMessages(resourceName, partition);
-        if (constraint != null && messages != null && messages.size() > 0)
-        {
+        if (constraint != null && messages != null && messages.size() > 0) {
           messages = throttle(throttleCounterMap, constraint, messages, true);
         }
         output.addMessages(resourceName, partition, messages);
@@ -185,45 +151,36 @@ public class MessageThrottleStage extends AbstractBaseStage
     event.addAttribute(AttributeName.MESSAGES_THROTTLE.toString(), output);
   }
 
-  private List<Message> throttle(Map<String, Integer> throttleMap,
-                                 ClusterConstraints constraint,
-                                 List<Message> messages,
-                                 final boolean needThrottle)
-  {
-  
+  private List<Message> throttle(Map<String, Integer> throttleMap, ClusterConstraints constraint,
+      List<Message> messages, final boolean needThrottle) {
+
     List<Message> throttleOutputMsgs = new ArrayList<Message>();
-    for (Message message : messages)
-    {
+    for (Message message : messages) {
       Map<ConstraintAttribute, String> msgAttr = ClusterConstraints.toConstraintAttributes(message);
 
       Set<ConstraintItem> matches = constraint.match(msgAttr);
       matches = selectConstraints(matches, msgAttr);
 
       boolean msgThrottled = false;
-      for (ConstraintItem item : matches)
-      {
+      for (ConstraintItem item : matches) {
         String key = item.filter(msgAttr).toString();
-        if (!throttleMap.containsKey(key))
-        {
+        if (!throttleMap.containsKey(key)) {
           throttleMap.put(key, valueOf(item.getConstraintValue()));
         }
         int value = throttleMap.get(key);
         throttleMap.put(key, --value);
 
-        if (needThrottle && value < 0)
-        {
+        if (needThrottle && value < 0) {
           msgThrottled = true;
-          
-          if (LOG.isDebugEnabled())
-          {
+
+          if (LOG.isDebugEnabled()) {
             // TODO: printout constraint item that throttles the message
             LOG.debug("message: " + message + " is throttled by constraint: " + item);
           }
         }
       }
 
-      if (!msgThrottled)
-      {
+      if (!msgThrottled) {
         throttleOutputMsgs.add(message);
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStageOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStageOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStageOutput.java
index b60f876..7415944 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStageOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStageOutput.java
@@ -27,33 +27,24 @@ import java.util.Map;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
 
-
-public class MessageThrottleStageOutput
-{
+public class MessageThrottleStageOutput {
   private final Map<String, Map<Partition, List<Message>>> _messagesMap;
 
-  public MessageThrottleStageOutput()
-  {
+  public MessageThrottleStageOutput() {
     _messagesMap = new HashMap<String, Map<Partition, List<Message>>>();
   }
 
-  public void addMessages(String resourceName,
-                          Partition partition,
-                          List<Message> selectedMessages)
-  {
-    if (!_messagesMap.containsKey(resourceName))
-    {
+  public void addMessages(String resourceName, Partition partition, List<Message> selectedMessages) {
+    if (!_messagesMap.containsKey(resourceName)) {
       _messagesMap.put(resourceName, new HashMap<Partition, List<Message>>());
     }
     _messagesMap.get(resourceName).put(partition, selectedMessages);
 
   }
 
-  public List<Message> getMessages(String resourceName, Partition partition)
-  {
+  public List<Message> getMessages(String resourceName, Partition partition) {
     Map<Partition, List<Message>> map = _messagesMap.get(resourceName);
-    if (map != null)
-    {
+    if (map != null) {
       return map.get(partition);
     }
     return Collections.emptyList();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
index 204fdca..f077d29 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
@@ -27,55 +27,45 @@ import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.log4j.Logger;
 
-
-public class ReadClusterDataStage extends AbstractBaseStage
-{
-  private static final Logger logger = Logger
-      .getLogger(ReadClusterDataStage.class.getName());
+public class ReadClusterDataStage extends AbstractBaseStage {
+  private static final Logger logger = Logger.getLogger(ReadClusterDataStage.class.getName());
   ClusterDataCache _cache;
 
-  public ReadClusterDataStage()
-  {
+  public ReadClusterDataStage() {
     _cache = new ClusterDataCache();
   }
 
   @Override
-  public void process(ClusterEvent event) throws Exception
-  {
+  public void process(ClusterEvent event) throws Exception {
     long startTime = System.currentTimeMillis();
     logger.info("START ReadClusterDataStage.process()");
 
-    
     HelixManager manager = event.getAttribute("helixmanager");
-    if (manager == null)
-    {
+    if (manager == null) {
       throw new StageException("HelixManager attribute value is null");
     }
     HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
     _cache.refresh(dataAccessor);
-    
-    ClusterStatusMonitor clusterStatusMonitor = (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
-    if(clusterStatusMonitor != null)
-    {
+
+    ClusterStatusMonitor clusterStatusMonitor =
+        (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
+    if (clusterStatusMonitor != null) {
       int disabledInstances = 0;
       int disabledPartitions = 0;
-      for(InstanceConfig  config : _cache._instanceConfigMap.values())
-      {
-        if(config.getInstanceEnabled() == false)
-        {
+      for (InstanceConfig config : _cache._instanceConfigMap.values()) {
+        if (config.getInstanceEnabled() == false) {
           disabledInstances++;
         }
-        if(config.getDisabledPartitions() != null)
-        {
+        if (config.getDisabledPartitions() != null) {
           disabledPartitions += config.getDisabledPartitions().size();
         }
       }
-      clusterStatusMonitor.setClusterStatusCounters(_cache._liveInstanceMap.size(), _cache._instanceConfigMap.size(), 
-          disabledInstances, disabledPartitions);
+      clusterStatusMonitor.setClusterStatusCounters(_cache._liveInstanceMap.size(),
+          _cache._instanceConfigMap.size(), disabledInstances, disabledPartitions);
     }
 
     event.addAttribute("ClusterDataCache", _cache);
-    
+
     long endTime = System.currentTimeMillis();
     logger.info("END ReadClusterDataStage.process(). took: " + (endTime - startTime) + " ms");
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java
index 1a42eca..ae873c7 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java
@@ -25,25 +25,20 @@ import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.log4j.Logger;
 
-
-public class ReadHealthDataStage extends AbstractBaseStage
-{
+public class ReadHealthDataStage extends AbstractBaseStage {
   private static final Logger LOG = Logger.getLogger(ReadHealthDataStage.class.getName());
   HealthDataCache _cache;
 
-  public ReadHealthDataStage()
-  {
+  public ReadHealthDataStage() {
     _cache = new HealthDataCache();
   }
 
   @Override
-  public void process(ClusterEvent event) throws Exception
-  {
+  public void process(ClusterEvent event) throws Exception {
     long startTime = System.currentTimeMillis();
 
     HelixManager manager = event.getAttribute("helixmanager");
-    if (manager == null)
-    {
+    if (manager == null) {
       throw new StageException("HelixManager attribute value is null");
     }
     // DataAccessor dataAccessor = manager.getDataAccessor();
@@ -56,4 +51,3 @@ public class ReadHealthDataStage extends AbstractBaseStage
     addLatencyToMonitor(event, processLatency);
   }
 }
-

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
index b986d13..cf1633c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
@@ -34,48 +34,39 @@ import org.apache.log4j.Logger;
  * Check and invoke custom implementation idealstate rebalancers.<br/>
  * If the resourceConfig has specified className of the customized rebalancer, <br/>
  * the rebalancer will be invoked to re-write the idealstate of the resource<br/>
- * 
  */
-public class RebalanceIdealStateStage extends AbstractBaseStage
-{
-  private static final Logger LOG =
-      Logger.getLogger(RebalanceIdealStateStage.class.getName());
+public class RebalanceIdealStateStage extends AbstractBaseStage {
+  private static final Logger LOG = Logger.getLogger(RebalanceIdealStateStage.class.getName());
 
   @Override
-  public void process(ClusterEvent event) throws Exception
-  {
+  public void process(ClusterEvent event) throws Exception {
     HelixManager manager = event.getAttribute("helixmanager");
     ClusterDataCache cache = event.getAttribute("ClusterDataCache");
     Map<String, IdealState> idealStateMap = cache.getIdealStates();
     CurrentStateOutput currentStateOutput =
         event.getAttribute(AttributeName.CURRENT_STATE.toString());
-    
+
     Map<String, IdealState> updatedIdealStates = new HashMap<String, IdealState>();
-    for(String resourceName : idealStateMap.keySet())
-    {
+    for (String resourceName : idealStateMap.keySet()) {
       IdealState currentIdealState = idealStateMap.get(resourceName);
-      if(currentIdealState.getRebalanceMode() == RebalanceMode.USER_DEFINED
-          && currentIdealState.getRebalancerClassName() != null)
-      {
+      if (currentIdealState.getRebalanceMode() == RebalanceMode.USER_DEFINED
+          && currentIdealState.getRebalancerClassName() != null) {
         String rebalancerClassName = currentIdealState.getRebalancerClassName();
         LOG.info("resource " + resourceName + " use idealStateRebalancer " + rebalancerClassName);
-        try
-        {
-          Rebalancer balancer = (Rebalancer) (HelixUtil.loadClass(
-              getClass(), rebalancerClassName).newInstance());
+        try {
+          Rebalancer balancer =
+              (Rebalancer) (HelixUtil.loadClass(getClass(), rebalancerClassName).newInstance());
           balancer.init(manager);
-          IdealState newIdealState = balancer.computeNewIdealState(
-              resourceName, idealStateMap.get(resourceName), currentStateOutput, cache);
+          IdealState newIdealState =
+              balancer.computeNewIdealState(resourceName, idealStateMap.get(resourceName),
+                  currentStateOutput, cache);
           updatedIdealStates.put(resourceName, newIdealState);
-        }
-        catch(Exception e)
-        {
-          LOG.error("Exception while invoking custom rebalancer class:" + rebalancerClassName , e);
+        } catch (Exception e) {
+          LOG.error("Exception while invoking custom rebalancer class:" + rebalancerClassName, e);
         }
       }
     }
-    if(updatedIdealStates.size() > 0)
-    {
+    if (updatedIdealStates.size() > 0) {
       cache.getIdealStates().putAll(updatedIdealStates);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
index 38ef8ba..51f0ec1 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
@@ -31,25 +31,19 @@ import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Resource;
 import org.apache.log4j.Logger;
 
-
 /**
  * This stage computes all the resources in a cluster. The resources are
  * computed from IdealStates -> this gives all the resources currently active
  * CurrentState for liveInstance-> Helps in finding resources that are inactive
  * and needs to be dropped
- *
- *
  */
-public class ResourceComputationStage extends AbstractBaseStage
-{
+public class ResourceComputationStage extends AbstractBaseStage {
   private static Logger LOG = Logger.getLogger(ResourceComputationStage.class);
 
   @Override
-  public void process(ClusterEvent event) throws Exception
-  {
+  public void process(ClusterEvent event) throws Exception {
     ClusterDataCache cache = event.getAttribute("ClusterDataCache");
-    if (cache == null)
-    {
+    if (cache == null) {
       throw new StageException("Missing attributes in event:" + event + ". Requires DataCache");
     }
 
@@ -57,15 +51,12 @@ public class ResourceComputationStage extends AbstractBaseStage
 
     Map<String, Resource> resourceMap = new LinkedHashMap<String, Resource>();
 
-    if (idealStates != null && idealStates.size() > 0)
-    {
-      for (IdealState idealState : idealStates.values())
-      {
+    if (idealStates != null && idealStates.size() > 0) {
+      for (IdealState idealState : idealStates.values()) {
         Set<String> partitionSet = idealState.getPartitionSet();
         String resourceName = idealState.getResourceName();
 
-        for (String partition : partitionSet)
-        {
+        for (String partition : partitionSet) {
           addPartition(partition, resourceName, resourceMap);
           Resource resource = resourceMap.get(resourceName);
           resource.setStateModelDefRef(idealState.getStateModelDefRef());
@@ -80,28 +71,23 @@ public class ResourceComputationStage extends AbstractBaseStage
     // idealState might be removed.
     Map<String, LiveInstance> availableInstances = cache.getLiveInstances();
 
-    if (availableInstances != null && availableInstances.size() > 0)
-    {
-      for (LiveInstance instance : availableInstances.values())
-      {
+    if (availableInstances != null && availableInstances.size() > 0) {
+      for (LiveInstance instance : availableInstances.values()) {
         String instanceName = instance.getInstanceName();
         String clientSessionId = instance.getSessionId();
 
-        Map<String, CurrentState> currentStateMap = cache.getCurrentState(instanceName,
-            clientSessionId);
-        if (currentStateMap == null || currentStateMap.size() == 0)
-        {
+        Map<String, CurrentState> currentStateMap =
+            cache.getCurrentState(instanceName, clientSessionId);
+        if (currentStateMap == null || currentStateMap.size() == 0) {
           continue;
         }
-        for (CurrentState currentState : currentStateMap.values())
-        {
+        for (CurrentState currentState : currentStateMap.values()) {
 
           String resourceName = currentState.getResourceName();
           Map<String, String> resourceStateMap = currentState.getPartitionStateMap();
 
           // don't overwrite ideal state settings
-          if (!resourceMap.containsKey(resourceName))
-          {
+          if (!resourceMap.containsKey(resourceName)) {
             addResource(resourceName, resourceMap);
             Resource resource = resourceMap.get(resourceName);
             resource.setStateModelDefRef(currentState.getStateModelDefRef());
@@ -109,9 +95,8 @@ public class ResourceComputationStage extends AbstractBaseStage
             resource.setBucketSize(currentState.getBucketSize());
             resource.setBatchMessageMode(currentState.getBatchMessageMode());
           }
-          
-          if (currentState.getStateModelDefRef() == null)
-          {
+
+          if (currentState.getStateModelDefRef() == null) {
             LOG.error("state model def is null." + "resource:" + currentState.getResourceName()
                 + ", partitions: " + currentState.getPartitionStateMap().keySet() + ", states: "
                 + currentState.getPartitionStateMap().values());
@@ -119,8 +104,7 @@ public class ResourceComputationStage extends AbstractBaseStage
                 + currentState.getResourceName());
           }
 
-          for (String partition : resourceStateMap.keySet())
-          {
+          for (String partition : resourceStateMap.keySet()) {
             addPartition(partition, resourceName, resourceMap);
           }
         }
@@ -130,26 +114,20 @@ public class ResourceComputationStage extends AbstractBaseStage
     event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
   }
 
-  private void addResource(String resource, Map<String, Resource> resourceMap)
-  {
-    if (resource == null || resourceMap == null)
-    {
+  private void addResource(String resource, Map<String, Resource> resourceMap) {
+    if (resource == null || resourceMap == null) {
       return;
     }
-    if (!resourceMap.containsKey(resource))
-    {
+    if (!resourceMap.containsKey(resource)) {
       resourceMap.put(resource, new Resource(resource));
     }
   }
 
-  private void addPartition(String partition, String resourceName, Map<String, Resource> resourceMap)
-  {
-    if (resourceName == null || partition == null || resourceMap == null)
-    {
+  private void addPartition(String partition, String resourceName, Map<String, Resource> resourceMap) {
+    if (resourceName == null || partition == null || resourceMap == null) {
       return;
     }
-    if (!resourceMap.containsKey(resourceName))
-    {
+    if (!resourceMap.containsKey(resourceName)) {
       resourceMap.put(resourceName, new Resource(resourceName));
     }
     Resource resource = resourceMap.get(resourceName);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/controller/stages/StatsAggregationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/StatsAggregationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/StatsAggregationStage.java
index 92c849c..e531c88 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/StatsAggregationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/StatsAggregationStage.java
@@ -52,20 +52,16 @@ import org.apache.helix.model.PersistentStats;
 import org.apache.helix.monitoring.mbeans.ClusterAlertMBeanCollection;
 import org.apache.log4j.Logger;
 
-
 /**
  * For each LiveInstances select currentState and message whose sessionId matches
  * sessionId from LiveInstance Get Partition,State for all the resources computed in
  * previous State [ResourceComputationStage]
- *
  */
-public class StatsAggregationStage extends AbstractBaseStage
-{
+public class StatsAggregationStage extends AbstractBaseStage {
 
   public static final int ALERT_HISTORY_SIZE = 30;
 
-  private static final Logger logger =
-      Logger.getLogger(StatsAggregationStage.class.getName());
+  private static final Logger logger = Logger.getLogger(StatsAggregationStage.class.getName());
 
   StatsHolder _statsHolder = null;
   AlertsHolder _alertsHolder = null;
@@ -86,55 +82,46 @@ public class StatsAggregationStage extends AbstractBaseStage
 
   // public AggregationType _defaultAggType;
 
-  public Map<String, Map<String, AlertValueAndStatus>> getAlertStatus()
-  {
+  public Map<String, Map<String, AlertValueAndStatus>> getAlertStatus() {
     return _alertStatus;
   }
 
-  public Map<String, Tuple<String>> getStatStatus()
-  {
+  public Map<String, Tuple<String>> getStatStatus() {
     return _statStatus;
   }
 
-  public void persistAggStats(HelixManager manager)
-  {
+  public void persistAggStats(HelixManager manager) {
     Map<String, String> report = _aggStatsProvider.getRecentHealthReport();
     Map<String, Map<String, String>> partitionReport =
         _aggStatsProvider.getRecentPartitionHealthReport();
     ZNRecord record = new ZNRecord(_aggStatsProvider.getReportName());
-    if (report != null)
-    {
+    if (report != null) {
       record.setSimpleFields(report);
     }
-    if (partitionReport != null)
-    {
+    if (partitionReport != null) {
       record.setMapFields(partitionReport);
     }
 
-//    DataAccessor accessor = manager.getDataAccessor();
+    // DataAccessor accessor = manager.getDataAccessor();
     HelixDataAccessor accessor = manager.getHelixDataAccessor();
-//    boolean retVal = accessor.setProperty(PropertyType.PERSISTENTSTATS, record);
+    // boolean retVal = accessor.setProperty(PropertyType.PERSISTENTSTATS, record);
     Builder keyBuilder = accessor.keyBuilder();
     boolean retVal = accessor.setProperty(keyBuilder.persistantStat(), new PersistentStats(record));
-    if (retVal == false)
-    {
+    if (retVal == false) {
       logger.error("attempt to persist derived stats failed");
     }
   }
 
   @Override
-  public void init(StageContext context)
-  {
+  public void init(StageContext context) {
   }
 
-  public String getAgeStatName(String instance)
-  {
+  public String getAgeStatName(String instance) {
     return instance + ExpressionParser.statFieldDelim + "reportingage";
   }
 
   // currTime in seconds
-  public void reportAgeStat(LiveInstance instance, long modifiedTime, long currTime)
-  {
+  public void reportAgeStat(LiveInstance instance, long modifiedTime, long currTime) {
     String statName = getAgeStatName(instance.getInstanceName());
     long age = (currTime - modifiedTime) / 1000; // XXX: ensure this is in
                                                  // seconds
@@ -146,8 +133,7 @@ public class StatsAggregationStage extends AbstractBaseStage
   }
 
   @Override
-  public void process(ClusterEvent event) throws Exception
-  {
+  public void process(ClusterEvent event) throws Exception {
     long startTime = System.currentTimeMillis();
     // String aggTypeName =
     // DEFAULT_AGG_TYPE+AggregationType.DELIM+DEFAULT_DECAY_PARAM;
@@ -156,23 +142,18 @@ public class StatsAggregationStage extends AbstractBaseStage
     HelixManager manager = event.getAttribute("helixmanager");
     HealthDataCache cache = event.getAttribute("HealthDataCache");
 
-    if (manager == null || cache == null)
-    {
+    if (manager == null || cache == null) {
       throw new StageException("helixmanager|HealthDataCache attribute value is null");
     }
-    if(_alertsHolder == null)
-    {
+    if (_alertsHolder == null) {
       _statsHolder = new StatsHolder(manager, cache);
       _alertsHolder = new AlertsHolder(manager, cache, _statsHolder);
-    }
-    else
-    {
+    } else {
       _statsHolder.updateCache(cache);
       _alertsHolder.updateCache(cache);
     }
-    if (_statsHolder.getStatsList().size() == 0)
-    {
-      if(logger.isTraceEnabled()){
+    if (_statsHolder.getStatsList().size() == 0) {
+      if (logger.isTraceEnabled()) {
         logger.trace("stat holder is empty");
       }
       return;
@@ -186,8 +167,7 @@ public class StatsAggregationStage extends AbstractBaseStage
     long currTime = System.currentTimeMillis();
     // for each live node, read node's stats
     long readInstancesStart = System.currentTimeMillis();
-    for (LiveInstance instance : liveInstances.values())
-    {
+    for (LiveInstance instance : liveInstances.values()) {
       String instanceName = instance.getInstanceName();
       logger.debug("instanceName: " + instanceName);
       // XXX: now have map of HealthStats, so no need to traverse them...verify
@@ -198,10 +178,8 @@ public class StatsAggregationStage extends AbstractBaseStage
       long modTime = -1;
       // TODO: get healthreport child node modified time and reportAgeStat based on that
       boolean reportedAge = false;
-      for (HealthStat participantStat : stats.values())
-      {
-        if (participantStat != null && !reportedAge)
-        {
+      for (HealthStat participantStat : stats.values()) {
+        if (participantStat != null && !reportedAge) {
           // generate and report stats for how old this node's report is
           modTime = participantStat.getLastModifiedTimeStamp();
           reportAgeStat(instance, modTime, currTime);
@@ -211,14 +189,11 @@ public class StatsAggregationStage extends AbstractBaseStage
         // XXX: need to convert participantStat to a better format
         // need to get instanceName in here
 
-        if (participantStat != null)
-        {
+        if (participantStat != null) {
           // String timestamp = String.valueOf(instance.getModifiedTime()); WANT
           // REPORT LEVEL TS
-          Map<String, Map<String, String>> statMap =
-              participantStat.getHealthFields(instanceName);
-          for (String key : statMap.keySet())
-          {
+          Map<String, Map<String, String>> statMap = participantStat.getHealthFields(instanceName);
+          for (String key : statMap.keySet()) {
             _statsHolder.applyStat(key, statMap.get(key));
           }
         }
@@ -227,28 +202,22 @@ public class StatsAggregationStage extends AbstractBaseStage
     // Call _statsHolder.persistStats() once per pipeline. This will
     // write the updated persisted stats into zookeeper
     _statsHolder.persistStats();
-    logger.info("Done processing stats: "
-        + (System.currentTimeMillis() - readInstancesStart));
+    logger.info("Done processing stats: " + (System.currentTimeMillis() - readInstancesStart));
     // populate _statStatus
     _statStatus = _statsHolder.getStatsMap();
 
-    for (String statKey : _statStatus.keySet())
-    {
+    for (String statKey : _statStatus.keySet()) {
       logger.debug("Stat key, value: " + statKey + ": " + _statStatus.get(statKey));
     }
 
     long alertExecuteStartTime = System.currentTimeMillis();
     // execute alerts, populate _alertStatus
     _alertStatus =
-        AlertProcessor.executeAllAlerts(_alertsHolder.getAlertList(),
-                                        _statsHolder.getStatsList());
-    logger.info("done executing alerts: "
-        + (System.currentTimeMillis() - alertExecuteStartTime));
-    for (String originAlertName : _alertStatus.keySet())
-    {
-      _alertBeanCollection.setAlerts(originAlertName,
-                                     _alertStatus.get(originAlertName),
-                                     manager.getClusterName());
+        AlertProcessor.executeAllAlerts(_alertsHolder.getAlertList(), _statsHolder.getStatsList());
+    logger.info("done executing alerts: " + (System.currentTimeMillis() - alertExecuteStartTime));
+    for (String originAlertName : _alertStatus.keySet()) {
+      _alertBeanCollection.setAlerts(originAlertName, _alertStatus.get(originAlertName),
+          manager.getClusterName());
     }
 
     executeAlertActions(manager);
@@ -257,32 +226,27 @@ public class StatsAggregationStage extends AbstractBaseStage
     long writeAlertStartTime = System.currentTimeMillis();
     // write out alert status (to zk)
     _alertsHolder.addAlertStatusSet(_alertStatus);
-    logger.info("done writing alerts: "
-        + (System.currentTimeMillis() - writeAlertStartTime));
+    logger.info("done writing alerts: " + (System.currentTimeMillis() - writeAlertStartTime));
 
     // TODO: access the 2 status variables from somewhere to populate graphs
 
     long logAlertStartTime = System.currentTimeMillis();
     // logging alert status
-    for (String alertOuterKey : _alertStatus.keySet())
-    {
+    for (String alertOuterKey : _alertStatus.keySet()) {
       logger.debug("Alert Outer Key: " + alertOuterKey);
       Map<String, AlertValueAndStatus> alertInnerMap = _alertStatus.get(alertOuterKey);
-      if (alertInnerMap == null)
-      {
+      if (alertInnerMap == null) {
         logger.debug(alertOuterKey + " has no alerts to report.");
         continue;
       }
-      for (String alertInnerKey : alertInnerMap.keySet())
-      {
+      for (String alertInnerKey : alertInnerMap.keySet()) {
         logger.debug("  " + alertInnerKey + " value: "
             + alertInnerMap.get(alertInnerKey).getValue() + ", status: "
             + alertInnerMap.get(alertInnerKey).isFired());
       }
     }
 
-    logger.info("done logging alerts: "
-        + (System.currentTimeMillis() - logAlertStartTime));
+    logger.info("done logging alerts: " + (System.currentTimeMillis() - logAlertStartTime));
 
     long processLatency = System.currentTimeMillis() - startTime;
     addLatencyToMonitor(event, processLatency);
@@ -294,29 +258,24 @@ public class StatsAggregationStage extends AbstractBaseStage
    * has been fired
    */
 
-  void executeAlertActions( HelixManager manager)
-  {
+  void executeAlertActions(HelixManager manager) {
     _alertActionTaken.clear();
     // Go through the original alert strings
-    for(String originAlertName : _alertStatus.keySet())
-    {
+    for (String originAlertName : _alertStatus.keySet()) {
       Map<String, String> alertFields = _alertsHolder.getAlertsMap().get(originAlertName);
-      if(alertFields != null && alertFields.containsKey(AlertParser.ACTION_NAME))
-      {
+      if (alertFields != null && alertFields.containsKey(AlertParser.ACTION_NAME)) {
         String actionValue = alertFields.get(AlertParser.ACTION_NAME);
         Map<String, AlertValueAndStatus> alertResultMap = _alertStatus.get(originAlertName);
-        if(alertResultMap == null)
-        {
-          logger.info("Alert "+ originAlertName + " does not have alert status map");
+        if (alertResultMap == null) {
+          logger.info("Alert " + originAlertName + " does not have alert status map");
           continue;
         }
         // For each original alert, iterate all actual alerts that it expands into
-        for(String actualStatName : alertResultMap.keySet())
-        {
+        for (String actualStatName : alertResultMap.keySet()) {
           // if the actual alert is fired, execute the action
-          if(alertResultMap.get(actualStatName).isFired())
-          {
-            logger.warn("Alert " + originAlertName + " action " + actionValue + " is triggered by " + actualStatName);
+          if (alertResultMap.get(actualStatName).isFired()) {
+            logger.warn("Alert " + originAlertName + " action " + actionValue + " is triggered by "
+                + actualStatName);
             _alertActionTaken.put(actualStatName, actionValue);
             // move functionalities into a seperate class
             executeAlertAction(actualStatName, actionValue, manager);
@@ -325,71 +284,60 @@ public class StatsAggregationStage extends AbstractBaseStage
       }
     }
   }
+
   /**
    * Execute the action if an alert is fired, and the alert has an action associated with it.
    * NOTE: consider unify this with DefaultParticipantErrorMessageHandler.handleMessage()
    */
-  void executeAlertAction(String actualStatName, String actionValue, HelixManager manager)
-  {
-    if(actionValue.equals(ActionOnError.DISABLE_INSTANCE.toString()))
-    {
+  void executeAlertAction(String actualStatName, String actionValue, HelixManager manager) {
+    if (actionValue.equals(ActionOnError.DISABLE_INSTANCE.toString())) {
       String instanceName = parseInstanceName(actualStatName, manager);
-      if(instanceName != null)
-      {
+      if (instanceName != null) {
         logger.info("Disabling instance " + instanceName);
-        manager.getClusterManagmentTool().enableInstance(manager.getClusterName(), instanceName, false);
+        manager.getClusterManagmentTool().enableInstance(manager.getClusterName(), instanceName,
+            false);
       }
-    }
-    else if(actionValue.equals(ActionOnError.DISABLE_PARTITION.toString()))
-    {
+    } else if (actionValue.equals(ActionOnError.DISABLE_PARTITION.toString())) {
       String instanceName = parseInstanceName(actualStatName, manager);
       String resourceName = parseResourceName(actualStatName, manager);
       String partitionName = parsePartitionName(actualStatName, manager);
-      if(instanceName != null && resourceName != null && partitionName != null)
-      {
-        logger.info("Disabling partition " + partitionName + " instanceName " +  instanceName);
-        manager.getClusterManagmentTool().enablePartition(false, manager.getClusterName(), instanceName,
-            resourceName, Arrays.asList(partitionName));
+      if (instanceName != null && resourceName != null && partitionName != null) {
+        logger.info("Disabling partition " + partitionName + " instanceName " + instanceName);
+        manager.getClusterManagmentTool().enablePartition(false, manager.getClusterName(),
+            instanceName, resourceName, Arrays.asList(partitionName));
       }
-    }
-    else if(actionValue.equals(ActionOnError.DISABLE_RESOURCE.toString()))
-    {
+    } else if (actionValue.equals(ActionOnError.DISABLE_RESOURCE.toString())) {
       String instanceName = parseInstanceName(actualStatName, manager);
       String resourceName = parseResourceName(actualStatName, manager);
-      logger.info("Disabling resource " + resourceName + " instanceName " +  instanceName + " not implemented");
+      logger.info("Disabling resource " + resourceName + " instanceName " + instanceName
+          + " not implemented");
 
     }
   }
 
-  public static String parseResourceName(String actualStatName, HelixManager manager)
-  {
+  public static String parseResourceName(String actualStatName, HelixManager manager) {
     HelixDataAccessor accessor = manager.getHelixDataAccessor();
     Builder kb = accessor.keyBuilder();
     List<IdealState> idealStates = accessor.getChildValues(kb.idealStates());
-    for (IdealState idealState : idealStates)
-    {
+    for (IdealState idealState : idealStates) {
       String resourceName = idealState.getResourceName();
-      if(actualStatName.contains("=" + resourceName + ".") || actualStatName.contains("=" + resourceName + ";"))
-      {
+      if (actualStatName.contains("=" + resourceName + ".")
+          || actualStatName.contains("=" + resourceName + ";")) {
         return resourceName;
       }
     }
     return null;
   }
 
-  public static String parsePartitionName(String actualStatName, HelixManager manager)
-  {
+  public static String parsePartitionName(String actualStatName, HelixManager manager) {
     String resourceName = parseResourceName(actualStatName, manager);
-    if(resourceName != null)
-    {
+    if (resourceName != null) {
       String partitionKey = "=" + resourceName + "_";
-      if(actualStatName.contains(partitionKey))
-      {
+      if (actualStatName.contains(partitionKey)) {
         int pos = actualStatName.indexOf(partitionKey);
         int nextDotPos = actualStatName.indexOf('.', pos + partitionKey.length());
         int nextCommaPos = actualStatName.indexOf(';', pos + partitionKey.length());
-        if(nextCommaPos > 0 && nextCommaPos < nextDotPos)
-        {
+        if (nextCommaPos > 0 && nextCommaPos < nextDotPos) {
           nextDotPos = nextCommaPos;
         }
 
@@ -400,30 +348,25 @@ public class StatsAggregationStage extends AbstractBaseStage
     return null;
   }
 
-  public static String parseInstanceName(String actualStatName, HelixManager manager)
-  {
+  public static String parseInstanceName(String actualStatName, HelixManager manager) {
     HelixDataAccessor accessor = manager.getHelixDataAccessor();
     Builder kb = accessor.keyBuilder();
     List<LiveInstance> liveInstances = accessor.getChildValues(kb.liveInstances());
-    for (LiveInstance instance : liveInstances)
-    {
+    for (LiveInstance instance : liveInstances) {
       String instanceName = instance.getInstanceName();
-      if(actualStatName.startsWith(instanceName))
-      {
+      if (actualStatName.startsWith(instanceName)) {
         return instanceName;
       }
     }
     return null;
   }
 
-  void updateAlertHistory(HelixManager manager)
-  {
-   // Write alert fire history to zookeeper
+  void updateAlertHistory(HelixManager manager) {
+    // Write alert fire history to zookeeper
     _alertBeanCollection.refreshAlertDelta(manager.getClusterName());
     Map<String, String> delta = _alertBeanCollection.getRecentAlertDelta();
     // Update history only when some beans has changed
-    if(delta.size() > 0)
-    {
+    if (delta.size() > 0) {
       delta.putAll(_alertActionTaken);
       SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh:mm:ss:SSS");
       String date = dateFormat.format(new Date());
@@ -433,29 +376,24 @@ public class StatsAggregationStage extends AbstractBaseStage
 
       HelixProperty property = accessor.getProperty(keyBuilder.alertHistory());
       ZNRecord alertFiredHistory;
-      if(property == null)
-      {
+      if (property == null) {
         alertFiredHistory = new ZNRecord(PropertyType.ALERT_HISTORY.toString());
-      }
-      else
-      {
+      } else {
         alertFiredHistory = property.getRecord();
       }
-      while(alertFiredHistory.getMapFields().size() >= ALERT_HISTORY_SIZE)
-      {
+      while (alertFiredHistory.getMapFields().size() >= ALERT_HISTORY_SIZE) {
         // ZNRecord uses TreeMap which is sorted ascending internally
-        String firstKey = (String)(alertFiredHistory.getMapFields().keySet().toArray()[0]);
+        String firstKey = (String) (alertFiredHistory.getMapFields().keySet().toArray()[0]);
         alertFiredHistory.getMapFields().remove(firstKey);
       }
       alertFiredHistory.setMapField(date, delta);
-//      manager.getDataAccessor().setProperty(PropertyType.ALERT_HISTORY, alertFiredHistory);
+      // manager.getDataAccessor().setProperty(PropertyType.ALERT_HISTORY, alertFiredHistory);
       accessor.setProperty(keyBuilder.alertHistory(), new AlertHistory(alertFiredHistory));
       _alertBeanCollection.setAlertHistory(alertFiredHistory);
     }
   }
 
-  public ClusterAlertMBeanCollection getClusterAlertMBeanCollection()
-  {
+  public ClusterAlertMBeanCollection getClusterAlertMBeanCollection() {
     return _alertBeanCollection;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
index 859e4a6..192a645 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
@@ -38,72 +38,60 @@ import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.log4j.Logger;
 
-
-public class TaskAssignmentStage extends AbstractBaseStage
-{
+public class TaskAssignmentStage extends AbstractBaseStage {
   private static Logger logger = Logger.getLogger(TaskAssignmentStage.class);
 
   @Override
-  public void process(ClusterEvent event) throws Exception
-  {
+  public void process(ClusterEvent event) throws Exception {
     long startTime = System.currentTimeMillis();
     logger.info("START TaskAssignmentStage.process()");
 
     HelixManager manager = event.getAttribute("helixmanager");
-    Map<String, Resource> resourceMap =
-        event.getAttribute(AttributeName.RESOURCES.toString());
+    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
     MessageThrottleStageOutput messageOutput =
         event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
     ClusterDataCache cache = event.getAttribute("ClusterDataCache");
     Map<String, LiveInstance> liveInstanceMap = cache.getLiveInstances();
 
-    if (manager == null || resourceMap == null || messageOutput == null
-        || cache == null || liveInstanceMap == null)
-    {
+    if (manager == null || resourceMap == null || messageOutput == null || cache == null
+        || liveInstanceMap == null) {
       throw new StageException("Missing attributes in event:" + event
           + ". Requires HelixManager|RESOURCES|MESSAGES_THROTTLE|DataCache|liveInstanceMap");
     }
 
     HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
     List<Message> messagesToSend = new ArrayList<Message>();
-    for (String resourceName : resourceMap.keySet())
-    {
+    for (String resourceName : resourceMap.keySet()) {
       Resource resource = resourceMap.get(resourceName);
-      for (Partition partition : resource.getPartitions())
-      {
+      for (Partition partition : resource.getPartitions()) {
         List<Message> messages = messageOutput.getMessages(resourceName, partition);
         messagesToSend.addAll(messages);
       }
     }
 
-    
-    List<Message> outputMessages = batchMessage(dataAccessor.keyBuilder(), 
-                  messagesToSend, resourceMap, liveInstanceMap, manager.getProperties());
+    List<Message> outputMessages =
+        batchMessage(dataAccessor.keyBuilder(), messagesToSend, resourceMap, liveInstanceMap,
+            manager.getProperties());
     sendMessages(dataAccessor, outputMessages);
 
     long endTime = System.currentTimeMillis();
-    logger.info("END TaskAssignmentStage.process(). took: " + (endTime - startTime)
-        + " ms");
+    logger.info("END TaskAssignmentStage.process(). took: " + (endTime - startTime) + " ms");
 
   }
 
-  List<Message> batchMessage(Builder keyBuilder,
-                             List<Message> messages,
-                             Map<String, Resource> resourceMap,
-                             Map<String, LiveInstance> liveInstanceMap,
-                             HelixManagerProperties properties)
-  {
+  List<Message> batchMessage(Builder keyBuilder, List<Message> messages,
+      Map<String, Resource> resourceMap, Map<String, LiveInstance> liveInstanceMap,
+      HelixManagerProperties properties) {
     // group messages by its CurrentState path + "/" + fromState + "/" + toState
     Map<String, Message> batchMessages = new HashMap<String, Message>();
     List<Message> outputMessages = new ArrayList<Message>();
 
     Iterator<Message> iter = messages.iterator();
-    while (iter.hasNext())
-    {
+    while (iter.hasNext()) {
       Message message = iter.next();
       String resourceName = message.getResourceName();
       Resource resource = resourceMap.get(resourceName);
-      
+
       String instanceName = message.getTgtName();
       LiveInstance liveInstance = liveInstanceMap.get(instanceName);
       String participantVersion = null;
@@ -111,22 +99,18 @@ public class TaskAssignmentStage extends AbstractBaseStage
         participantVersion = liveInstance.getHelixVersion();
       }
 
-      if (resource == null || !resource.getBatchMessageMode() 
-          || participantVersion == null 
-          || !properties.isFeatureSupported("batch_message", participantVersion))
-      {
+      if (resource == null || !resource.getBatchMessageMode() || participantVersion == null
+          || !properties.isFeatureSupported("batch_message", participantVersion)) {
         outputMessages.add(message);
         continue;
       }
 
       String key =
-          keyBuilder.currentState(message.getTgtName(),
-                                  message.getTgtSessionId(),
-                                  message.getResourceName()).getPath()
+          keyBuilder.currentState(message.getTgtName(), message.getTgtSessionId(),
+              message.getResourceName()).getPath()
               + "/" + message.getFromState() + "/" + message.getToState();
 
-      if (!batchMessages.containsKey(key))
-      {
+      if (!batchMessages.containsKey(key)) {
         Message batchMessage = new Message(message.getRecord());
         batchMessage.setBatchMessageMode(true);
         outputMessages.add(batchMessage);
@@ -138,25 +122,23 @@ public class TaskAssignmentStage extends AbstractBaseStage
     return outputMessages;
   }
 
-  protected void sendMessages(HelixDataAccessor dataAccessor, List<Message> messages)
-  {
-    if (messages == null || messages.isEmpty())
-    {
+  protected void sendMessages(HelixDataAccessor dataAccessor, List<Message> messages) {
+    if (messages == null || messages.isEmpty()) {
       return;
     }
 
     Builder keyBuilder = dataAccessor.keyBuilder();
 
     List<PropertyKey> keys = new ArrayList<PropertyKey>();
-    for (Message message : messages)
-    {
+    for (Message message : messages) {
       logger.info("Sending Message " + message.getMsgId() + " to " + message.getTgtName()
-          + " transit " + message.getPartitionName() + "|" + message.getPartitionNames()
-          + " from:" + message.getFromState() + " to:" + message.getToState());
+          + " transit " + message.getPartitionName() + "|" + message.getPartitionNames() + " from:"
+          + message.getFromState() + " to:" + message.getToState());
 
-//      System.out.println("[dbg] Sending Message " + message.getMsgId() + " to " + message.getTgtName()
-//              + " transit " + message.getPartitionName() + "|" + message.getPartitionNames()
-//              + " from: " + message.getFromState() + " to: " + message.getToState());
+      // System.out.println("[dbg] Sending Message " + message.getMsgId() + " to " +
+      // message.getTgtName()
+      // + " transit " + message.getPartitionName() + "|" + message.getPartitionNames()
+      // + " from: " + message.getFromState() + " to: " + message.getToState());
 
       keys.add(keyBuilder.message(message.getTgtName(), message.getId()));
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
index 283e26c..76560a4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
@@ -61,7 +61,7 @@ public class AutoRebalanceStrategy {
     _partitions = partitions;
     _states = states;
     _maximumPerNode = maximumPerNode;
-    if (placementScheme != null){
+    if (placementScheme != null) {
       _placementScheme = placementScheme;
     } else {
       _placementScheme = new DefaultPlacementScheme();
@@ -144,8 +144,7 @@ public class AutoRebalanceStrategy {
     // iterate through non preferred and see if we can move them to
     // preferredlocation if the donor has more than it should and stealer has
     // enough capacity
-    Iterator<Entry<Replica, Node>> iterator = _existingNonPreferredAssignment.entrySet()
-        .iterator();
+    Iterator<Entry<Replica, Node>> iterator = _existingNonPreferredAssignment.entrySet().iterator();
     while (iterator.hasNext()) {
       Entry<Replica, Node> entry = iterator.next();
       Replica replica = entry.getKey();
@@ -266,8 +265,7 @@ public class AutoRebalanceStrategy {
    * Update a ZNRecord with the results of the rebalancing.
    * @param znRecord
    */
-  private void prepareResult(ZNRecord znRecord)
-  {
+  private void prepareResult(ZNRecord znRecord) {
     // The map fields are keyed on partition name to a pair of node and state, i.e. it
     // indicates that the partition with given state is served by that node
     //
@@ -319,8 +317,8 @@ public class AutoRebalanceStrategy {
       for (String nodeId : nodeStateMap.keySet()) {
         Node node = _nodeMap.get(nodeId);
         boolean skip = false;
-        for(Replica replica: node.preferred){
-          if(replica.partition.equals(partition)){
+        for (Replica replica : node.preferred) {
+          if (replica.partition.equals(partition)) {
             skip = true;
             break;
           }
@@ -420,8 +418,9 @@ public class AutoRebalanceStrategy {
     for (String partition : _partitions) {
       for (int replicaId = 0; replicaId < count; replicaId++) {
         Replica replica = new Replica(partition, replicaId);
-        String nodeName = _placementScheme.getLocation(partitionId, replicaId,
-            _partitions.size(), numReplicas, allNodes);
+        String nodeName =
+            _placementScheme.getLocation(partitionId, replicaId, _partitions.size(), numReplicas,
+                allNodes);
         preferredMapping.put(replica, _nodeMap.get(nodeName));
       }
       partitionId = partitionId + 1;
@@ -431,7 +430,6 @@ public class AutoRebalanceStrategy {
 
   /**
    * Counts the total number of replicas given a state-count mapping
-   *
    * @param states
    * @return
    */
@@ -625,8 +623,9 @@ public class AutoRebalanceStrategy {
         index = (partitionId + replicaId * numPartitions) % nodeNames.size();
       } else if (nodeNames.size() == numPartitions) {
         // need a replica offset in case the sizes of these sets are the same
-        index = ((partitionId + replicaId * numPartitions) % nodeNames.size()
-            + replicaId) % nodeNames.size();
+        index =
+            ((partitionId + replicaId * numPartitions) % nodeNames.size() + replicaId)
+                % nodeNames.size();
       } else {
         // in all other cases, assigning a replica at a time for each partition is reasonable
         index = (partitionId + replicaId) % nodeNames.size();
@@ -634,4 +633,4 @@ public class AutoRebalanceStrategy {
       return nodeNames.get(index);
     }
   }
-}
\ No newline at end of file
+}


Mime
View raw message