helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject [43/51] [partial] [HELIX-198] Unify helix code style, rb=13710
Date Wed, 21 Aug 2013 20:43:56 GMT
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 58095df..8e4e1ea 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -67,14 +67,11 @@ import org.apache.helix.model.PauseSignal;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.log4j.Logger;
 
-
 /**
  * Cluster Controllers main goal is to keep the cluster state as close as possible to
  * Ideal State. It does this by listening to changes in cluster state and scheduling new
  * tasks to get cluster state to best possible ideal state. Every instance of this class
  * can control can control only one cluster
- *
- *
  * Get all the partitions use IdealState, CurrentState and Messages <br>
  * foreach partition <br>
  * 1. get the (instance,state) from IdealState, CurrentState and PendingMessages <br>
@@ -84,33 +81,24 @@ import org.apache.log4j.Logger;
  * 4. select the messages that can be sent, needs messages and state model constraints <br>
  * 5. send messages
  */
-public class GenericHelixController implements
-    ConfigChangeListener,
-    IdealStateChangeListener,
-    LiveInstanceChangeListener,
-    MessageListener,
-    CurrentStateChangeListener,
-    ExternalViewChangeListener,
-    ControllerChangeListener,
-    HealthStateChangeListener
-{
-  private static final Logger    logger =
-                                            Logger.getLogger(GenericHelixController.class.getName());
-  volatile boolean               init   = false;
+public class GenericHelixController implements ConfigChangeListener, IdealStateChangeListener,
+    LiveInstanceChangeListener, MessageListener, CurrentStateChangeListener,
+    ExternalViewChangeListener, ControllerChangeListener, HealthStateChangeListener {
+  private static final Logger logger = Logger.getLogger(GenericHelixController.class.getName());
+  volatile boolean init = false;
   private final PipelineRegistry _registry;
 
-  final AtomicReference<Map<String, LiveInstance>>	_lastSeenInstances;
-  final AtomicReference<Map<String, LiveInstance>>	_lastSeenSessions;
-
-  ClusterStatusMonitor           _clusterStatusMonitor;
+  final AtomicReference<Map<String, LiveInstance>> _lastSeenInstances;
+  final AtomicReference<Map<String, LiveInstance>> _lastSeenSessions;
 
+  ClusterStatusMonitor _clusterStatusMonitor;
 
   /**
    * The _paused flag is checked by function handleEvent(), while if the flag is set
    * handleEvent() will be no-op. Other event handling logic keeps the same when the flag
    * is set.
    */
-  private boolean                _paused;
+  private boolean _paused;
 
   /**
    * The timer that can periodically run the rebalancing pipeline. The timer will start if there
@@ -124,23 +112,19 @@ public class GenericHelixController implements
    * most cases, but if there is a some thing specific needed use another constructor
    * where in you can pass a pipeline registry
    */
-  public GenericHelixController()
-  {
+  public GenericHelixController() {
     this(createDefaultRegistry());
   }
 
-  class RebalanceTask extends TimerTask
-  {
+  class RebalanceTask extends TimerTask {
     HelixManager _manager;
 
-    public RebalanceTask(HelixManager manager)
-    {
+    public RebalanceTask(HelixManager manager) {
       _manager = manager;
     }
 
     @Override
-    public void run()
-    {
+    public void run() {
       NotificationContext changeContext = new NotificationContext(_manager);
       changeContext.setType(NotificationContext.Type.CALLBACK);
       ClusterEvent event = new ClusterEvent("periodicalRebalance");
@@ -159,21 +143,16 @@ public class GenericHelixController implements
    * If the period is smaller than the current period, cancel the current timer and use
    * the new period.
    */
-  void startRebalancingTimer(int period, HelixManager manager)
-  {
+  void startRebalancingTimer(int period, HelixManager manager) {
     logger.info("Controller starting timer at period " + period);
-    if(period < _timerPeriod)
-    {
-      if(_rebalanceTimer != null)
-      {
+    if (period < _timerPeriod) {
+      if (_rebalanceTimer != null) {
         _rebalanceTimer.cancel();
       }
       _rebalanceTimer = new Timer(true);
       _timerPeriod = period;
       _rebalanceTimer.scheduleAtFixedRate(new RebalanceTask(manager), _timerPeriod, _timerPeriod);
-    }
-    else
-    {
+    } else {
       logger.info("Controller already has timer at period " + _timerPeriod);
     }
   }
@@ -181,21 +160,17 @@ public class GenericHelixController implements
   /**
    * Starts the rebalancing timer
    */
-  void stopRebalancingTimer()
-  {
-    if(_rebalanceTimer != null)
-    {
+  void stopRebalancingTimer() {
+    if (_rebalanceTimer != null) {
       _rebalanceTimer.cancel();
       _rebalanceTimer = null;
     }
     _timerPeriod = Integer.MAX_VALUE;
   }
 
-  private static PipelineRegistry createDefaultRegistry()
-  {
+  private static PipelineRegistry createDefaultRegistry() {
     logger.info("createDefaultRegistry");
-    synchronized (GenericHelixController.class)
-    {
+    synchronized (GenericHelixController.class) {
       PipelineRegistry registry = new PipelineRegistry();
 
       // cluster data cache refresh
@@ -222,23 +197,16 @@ public class GenericHelixController implements
       liveInstancePipeline.addStage(new CompatibilityCheckStage());
 
       registry.register("idealStateChange", dataRefresh, rebalancePipeline);
-      registry.register("currentStateChange",
-                        dataRefresh,
-                        rebalancePipeline,
-                        externalViewPipeline);
+      registry.register("currentStateChange", dataRefresh, rebalancePipeline, externalViewPipeline);
       registry.register("configChange", dataRefresh, rebalancePipeline);
-      registry.register("liveInstanceChange",
-                        dataRefresh,
-                        liveInstancePipeline,
-                        rebalancePipeline,
-                        externalViewPipeline);
-
-      registry.register("messageChange",
-                        dataRefresh,
-                        rebalancePipeline);
+      registry.register("liveInstanceChange", dataRefresh, liveInstancePipeline, rebalancePipeline,
+          externalViewPipeline);
+
+      registry.register("messageChange", dataRefresh, rebalancePipeline);
       registry.register("externalView", dataRefresh);
       registry.register("resume", dataRefresh, rebalancePipeline, externalViewPipeline);
-      registry.register("periodicalRebalance", dataRefresh, rebalancePipeline, externalViewPipeline);
+      registry
+          .register("periodicalRebalance", dataRefresh, rebalancePipeline, externalViewPipeline);
 
       // health stats pipeline
       // Pipeline healthStatsAggregationPipeline = new Pipeline();
@@ -251,55 +219,45 @@ public class GenericHelixController implements
     }
   }
 
-  public GenericHelixController(PipelineRegistry registry)
-  {
+  public GenericHelixController(PipelineRegistry registry) {
     _paused = false;
     _registry = registry;
     _lastSeenInstances = new AtomicReference<Map<String, LiveInstance>>();
-    _lastSeenSessions = new AtomicReference<Map<String,LiveInstance>>();
+    _lastSeenSessions = new AtomicReference<Map<String, LiveInstance>>();
   }
 
   /**
    * lock-always: caller always needs to obtain an external lock before call, calls to
    * handleEvent() should be serialized
-   *
    * @param event
    */
-  protected synchronized void handleEvent(ClusterEvent event)
-  {
+  protected synchronized void handleEvent(ClusterEvent event) {
     HelixManager manager = event.getAttribute("helixmanager");
-    if (manager == null)
-    {
+    if (manager == null) {
       logger.error("No cluster manager in event:" + event.getName());
       return;
     }
 
-    if (!manager.isLeader())
-    {
+    if (!manager.isLeader()) {
       logger.error("Cluster manager: " + manager.getInstanceName()
           + " is not leader. Pipeline will not be invoked");
       return;
     }
 
-    if (_paused)
-    {
+    if (_paused) {
       logger.info("Cluster is paused. Ignoring the event:" + event.getName());
       return;
     }
 
     NotificationContext context = null;
-    if (event.getAttribute("changeContext") != null)
-    {
+    if (event.getAttribute("changeContext") != null) {
       context = (NotificationContext) (event.getAttribute("changeContext"));
     }
 
     // Initialize _clusterStatusMonitor
-    if (context != null)
-    {
-      if (context.getType() == Type.FINALIZE)
-      {
-        if (_clusterStatusMonitor != null)
-        {
+    if (context != null) {
+      if (context.getType() == Type.FINALIZE) {
+        if (_clusterStatusMonitor != null) {
           _clusterStatusMonitor.reset();
           _clusterStatusMonitor = null;
         }
@@ -307,11 +265,8 @@ public class GenericHelixController implements
         stopRebalancingTimer();
         logger.info("Get FINALIZE notification, skip the pipeline. Event :" + event.getName());
         return;
-      }
-      else
-      {
-        if (_clusterStatusMonitor == null)
-        {
+      } else {
+        if (_clusterStatusMonitor == null) {
           _clusterStatusMonitor = new ClusterStatusMonitor(manager.getClusterName());
         }
 
@@ -320,21 +275,16 @@ public class GenericHelixController implements
     }
 
     List<Pipeline> pipelines = _registry.getPipelinesForEvent(event.getName());
-    if (pipelines == null || pipelines.size() == 0)
-    {
+    if (pipelines == null || pipelines.size() == 0) {
       logger.info("No pipeline to run for event:" + event.getName());
       return;
     }
 
-    for (Pipeline pipeline : pipelines)
-    {
-      try
-      {
+    for (Pipeline pipeline : pipelines) {
+      try {
         pipeline.handle(event);
         pipeline.finish();
-      }
-      catch (Exception e)
-      {
+      } catch (Exception e) {
         logger.error("Exception while executing pipeline: " + pipeline
             + ". Will not continue to next pipeline", e);
         break;
@@ -347,22 +297,19 @@ public class GenericHelixController implements
 
   @Override
   public void onExternalViewChange(List<ExternalView> externalViewList,
-                                   NotificationContext changeContext)
-  {
-//    logger.info("START: GenericClusterController.onExternalViewChange()");
-//    ClusterEvent event = new ClusterEvent("externalViewChange");
-//    event.addAttribute("helixmanager", changeContext.getManager());
-//    event.addAttribute("changeContext", changeContext);
-//    event.addAttribute("eventData", externalViewList);
-//    // handleEvent(event);
-//    logger.info("END: GenericClusterController.onExternalViewChange()");
+      NotificationContext changeContext) {
+    // logger.info("START: GenericClusterController.onExternalViewChange()");
+    // ClusterEvent event = new ClusterEvent("externalViewChange");
+    // event.addAttribute("helixmanager", changeContext.getManager());
+    // event.addAttribute("changeContext", changeContext);
+    // event.addAttribute("eventData", externalViewList);
+    // // handleEvent(event);
+    // logger.info("END: GenericClusterController.onExternalViewChange()");
   }
 
   @Override
-  public void onStateChange(String instanceName,
-                            List<CurrentState> statesInfo,
-                            NotificationContext changeContext)
-  {
+  public void onStateChange(String instanceName, List<CurrentState> statesInfo,
+      NotificationContext changeContext) {
     logger.info("START: GenericClusterController.onStateChange()");
     ClusterEvent event = new ClusterEvent("currentStateChange");
     event.addAttribute("helixmanager", changeContext.getManager());
@@ -374,10 +321,8 @@ public class GenericHelixController implements
   }
 
   @Override
-  public void onHealthChange(String instanceName,
-                             List<HealthStat> reports,
-                             NotificationContext changeContext)
-  {
+  public void onHealthChange(String instanceName, List<HealthStat> reports,
+      NotificationContext changeContext) {
     /**
      * When there are more participant ( > 20, can be in hundreds), This callback can be
      * called quite frequently as each participant reports health stat every minute. Thus
@@ -386,10 +331,8 @@ public class GenericHelixController implements
   }
 
   @Override
-  public void onMessage(String instanceName,
-                        List<Message> messages,
-                        NotificationContext changeContext)
-  {
+  public void onMessage(String instanceName, List<Message> messages,
+      NotificationContext changeContext) {
     logger.info("START: GenericClusterController.onMessage()");
 
     ClusterEvent event = new ClusterEvent("messageChange");
@@ -399,8 +342,7 @@ public class GenericHelixController implements
     event.addAttribute("eventData", messages);
     handleEvent(event);
 
-    if (_clusterStatusMonitor != null && messages != null)
-    {
+    if (_clusterStatusMonitor != null && messages != null) {
       _clusterStatusMonitor.addMessageQueueSize(instanceName, messages.size());
     }
 
@@ -409,22 +351,18 @@ public class GenericHelixController implements
 
   @Override
   public void onLiveInstanceChange(List<LiveInstance> liveInstances,
-                                   NotificationContext changeContext)
-  {
+      NotificationContext changeContext) {
     logger.info("START: Generic GenericClusterController.onLiveInstanceChange()");
 
-    if (liveInstances == null)
-    {
+    if (liveInstances == null) {
       liveInstances = Collections.emptyList();
     }
     // Go though the live instance list and make sure that we are observing them
     // accordingly. The action is done regardless of the paused flag.
-    if (changeContext.getType() == NotificationContext.Type.INIT ||
-        changeContext.getType() == NotificationContext.Type.CALLBACK)
-    {
+    if (changeContext.getType() == NotificationContext.Type.INIT
+        || changeContext.getType() == NotificationContext.Type.CALLBACK) {
       checkLiveInstancesObservation(liveInstances, changeContext);
-    } else if (changeContext.getType() == NotificationContext.Type.FINALIZE)
-    {
+    } else if (changeContext.getType() == NotificationContext.Type.FINALIZE) {
       // on finalize, should remove all message/current-state listeners
       logger.info("remove message/current-state listeners. lastSeenInstances: "
           + _lastSeenInstances + ", lastSeenSessions: " + _lastSeenSessions);
@@ -440,28 +378,23 @@ public class GenericHelixController implements
     logger.info("END: Generic GenericClusterController.onLiveInstanceChange()");
   }
 
-  void checkRebalancingTimer(HelixManager manager, List<IdealState> idealStates)
-  {
-    if (manager.getConfigAccessor() == null)
-    {
-      logger.warn(manager.getInstanceName() + " config accessor doesn't exist. should be in file-based mode.");
+  void checkRebalancingTimer(HelixManager manager, List<IdealState> idealStates) {
+    if (manager.getConfigAccessor() == null) {
+      logger.warn(manager.getInstanceName()
+          + " config accessor doesn't exist. should be in file-based mode.");
       return;
     }
 
-    for(IdealState idealState : idealStates)
-    {
+    for (IdealState idealState : idealStates) {
       int period = idealState.getRebalanceTimerPeriod();
-      if(period > 0)
-      {
+      if (period > 0) {
         startRebalancingTimer(period, manager);
       }
     }
   }
 
   @Override
-  public void onIdealStateChange(List<IdealState> idealStates,
-                                 NotificationContext changeContext)
-  {
+  public void onIdealStateChange(List<IdealState> idealStates, NotificationContext changeContext) {
     logger.info("START: Generic GenericClusterController.onIdealStateChange()");
     ClusterEvent event = new ClusterEvent("idealStateChange");
     event.addAttribute("helixmanager", changeContext.getManager());
@@ -469,8 +402,7 @@ public class GenericHelixController implements
     event.addAttribute("eventData", idealStates);
     handleEvent(event);
 
-    if(changeContext.getType() != Type.FINALIZE)
-    {
+    if (changeContext.getType() != Type.FINALIZE) {
       checkRebalancingTimer(changeContext.getManager(), idealStates);
     }
 
@@ -478,9 +410,7 @@ public class GenericHelixController implements
   }
 
   @Override
-  public void onConfigChange(List<InstanceConfig> configs,
-                             NotificationContext changeContext)
-  {
+  public void onConfigChange(List<InstanceConfig> configs, NotificationContext changeContext) {
     logger.info("START: GenericClusterController.onConfigChange()");
     ClusterEvent event = new ClusterEvent("configChange");
     event.addAttribute("changeContext", changeContext);
@@ -491,11 +421,9 @@ public class GenericHelixController implements
   }
 
   @Override
-  public void onControllerChange(NotificationContext changeContext)
-  {
+  public void onControllerChange(NotificationContext changeContext) {
     logger.info("START: GenericClusterController.onControllerChange()");
-    if (changeContext!= null && changeContext.getType() == Type.FINALIZE)
-    {
+    if (changeContext != null && changeContext.getType() == Type.FINALIZE) {
       logger.info("GenericClusterController.onControllerChange() FINALIZE");
       return;
     }
@@ -503,37 +431,27 @@ public class GenericHelixController implements
 
     // double check if this controller is the leader
     Builder keyBuilder = accessor.keyBuilder();
-    LiveInstance leader =
-        accessor.getProperty(keyBuilder.controllerLeader());
-    if (leader == null)
-    {
-      logger.warn("No controller exists for cluster:"
-          + changeContext.getManager().getClusterName());
+    LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
+    if (leader == null) {
+      logger
+          .warn("No controller exists for cluster:" + changeContext.getManager().getClusterName());
       return;
-    }
-    else
-    {
+    } else {
       String leaderName = leader.getInstanceName();
 
       String instanceName = changeContext.getManager().getInstanceName();
-      if (leaderName == null || !leaderName.equals(instanceName))
-      {
-        logger.warn("leader name does NOT match, my name: " + instanceName + ", leader: "
-            + leader);
+      if (leaderName == null || !leaderName.equals(instanceName)) {
+        logger.warn("leader name does NOT match, my name: " + instanceName + ", leader: " + leader);
         return;
       }
     }
 
     PauseSignal pauseSignal = accessor.getProperty(keyBuilder.pause());
-    if (pauseSignal != null)
-    {
+    if (pauseSignal != null) {
       _paused = true;
       logger.info("controller is now paused");
-    }
-    else
-    {
-      if (_paused)
-      {
+    } else {
+      if (_paused) {
         // it currently paused
         logger.info("controller is now resumed");
         _paused = false;
@@ -542,9 +460,7 @@ public class GenericHelixController implements
         event.addAttribute("helixmanager", changeContext.getManager());
         event.addAttribute("eventData", pauseSignal);
         handleEvent(event);
-      }
-      else
-      {
+      } else {
         _paused = false;
       }
     }
@@ -555,77 +471,73 @@ public class GenericHelixController implements
    * Go through the list of liveinstances in the cluster, and add currentstateChange
    * listener and Message listeners to them if they are newly added. For current state
    * change, the observation is tied to the session id of each live instance.
-   *
    */
   protected void checkLiveInstancesObservation(List<LiveInstance> liveInstances,
-                                               NotificationContext changeContext)
-  {
-
-	// construct maps for current live-instances
-	Map<String, LiveInstance> curInstances = new HashMap<String, LiveInstance>();
-	Map<String, LiveInstance> curSessions = new HashMap<String, LiveInstance>();
-	for(LiveInstance liveInstance : liveInstances) {
-		curInstances.put(liveInstance.getInstanceName(), liveInstance);
-		curSessions.put(liveInstance.getSessionId(), liveInstance);
-	}
+      NotificationContext changeContext) {
+
+    // construct maps for current live-instances
+    Map<String, LiveInstance> curInstances = new HashMap<String, LiveInstance>();
+    Map<String, LiveInstance> curSessions = new HashMap<String, LiveInstance>();
+    for (LiveInstance liveInstance : liveInstances) {
+      curInstances.put(liveInstance.getInstanceName(), liveInstance);
+      curSessions.put(liveInstance.getSessionId(), liveInstance);
+    }
 
-	Map<String, LiveInstance> lastInstances = _lastSeenInstances.get();
-	Map<String, LiveInstance> lastSessions = _lastSeenSessions.get();
+    Map<String, LiveInstance> lastInstances = _lastSeenInstances.get();
+    Map<String, LiveInstance> lastSessions = _lastSeenSessions.get();
 
     HelixManager manager = changeContext.getManager();
     Builder keyBuilder = new Builder(manager.getClusterName());
     if (lastSessions != null) {
-    	for (String session : lastSessions.keySet()) {
-    		if (!curSessions.containsKey(session)) {
-    			// remove current-state listener for expired session
-    		    String instanceName = lastSessions.get(session).getInstanceName();
-    			manager.removeListener(keyBuilder.currentStates(instanceName, session), this);
-    		}
-    	}
+      for (String session : lastSessions.keySet()) {
+        if (!curSessions.containsKey(session)) {
+          // remove current-state listener for expired session
+          String instanceName = lastSessions.get(session).getInstanceName();
+          manager.removeListener(keyBuilder.currentStates(instanceName, session), this);
+        }
+      }
     }
 
     if (lastInstances != null) {
-    	for (String instance : lastInstances.keySet()) {
-    		if (!curInstances.containsKey(instance)) {
-    			// remove message listener for disconnected instances
-    			manager.removeListener(keyBuilder.messages(instance), this);
-    		}
-    	}
+      for (String instance : lastInstances.keySet()) {
+        if (!curInstances.containsKey(instance)) {
+          // remove message listener for disconnected instances
+          manager.removeListener(keyBuilder.messages(instance), this);
+        }
+      }
+    }
+
+    for (String session : curSessions.keySet()) {
+      if (lastSessions == null || !lastSessions.containsKey(session)) {
+        String instanceName = curSessions.get(session).getInstanceName();
+        try {
+          // add current-state listeners for new sessions
+          manager.addCurrentStateChangeListener(this, instanceName, session);
+          logger.info(manager.getInstanceName() + " added current-state listener for instance: "
+              + instanceName + ", session: " + session + ", listener: " + this);
+        } catch (Exception e) {
+          logger.error("Fail to add current state listener for instance: " + instanceName
+              + " with session: " + session, e);
+        }
+      }
+    }
+
+    for (String instance : curInstances.keySet()) {
+      if (lastInstances == null || !lastInstances.containsKey(instance)) {
+        try {
+          // add message listeners for new instances
+          manager.addMessageListener(this, instance);
+          logger.info(manager.getInstanceName() + " added message listener for " + instance
+              + ", listener: " + this);
+        } catch (Exception e) {
+          logger.error("Fail to add message listener for instance: " + instance, e);
+        }
+      }
     }
 
-	for (String session : curSessions.keySet()) {
-		if (lastSessions == null || !lastSessions.containsKey(session)) {
-	      String instanceName = curSessions.get(session).getInstanceName();
-          try {
-            // add current-state listeners for new sessions
-	        manager.addCurrentStateChangeListener(this, instanceName, session);
-	        logger.info(manager.getInstanceName() + " added current-state listener for instance: "
-	            + instanceName + ", session: " + session + ", listener: " + this);
-          } catch (Exception e) {
-        	  logger.error("Fail to add current state listener for instance: "
-        		  + instanceName + " with session: " + session, e);
-          }
-		}
-	}
-
-	for (String instance : curInstances.keySet()) {
-		if (lastInstances == null || !lastInstances.containsKey(instance)) {
-	        try {
-	          // add message listeners for new instances
-	          manager.addMessageListener(this, instance);
-	          logger.info(manager.getInstanceName() + " added message listener for "
-	              + instance + ", listener: " + this);
-	        }
-	        catch (Exception e)
-	        {
-	          logger.error("Fail to add message listener for instance: " + instance, e);
-	        }
-		}
-	}
-
-	// update last-seen
-	_lastSeenInstances.set(curInstances);
-	_lastSeenSessions.set(curSessions);
+    // update last-seen
+    _lastSeenInstances.set(curInstances);
+    _lastSeenSessions.set(curSessions);
 
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java b/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
index b595879..4aae39b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
@@ -52,9 +52,7 @@ import org.apache.helix.participant.DistClusterControllerStateModelFactory;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.log4j.Logger;
 
-
-public class HelixControllerMain
-{
+public class HelixControllerMain {
   public static final String zkServerAddress = "zkSvr";
   public static final String cluster = "cluster";
   public static final String help = "help";
@@ -67,47 +65,48 @@ public class HelixControllerMain
 
   // hack: OptionalBuilder is not thread safe
   @SuppressWarnings("static-access")
-  synchronized private static Options constructCommandLineOptions()
-  {
-    Option helpOption = OptionBuilder.withLongOpt(help)
-        .withDescription("Prints command-line options info").create();
-
-    Option zkServerOption = OptionBuilder.withLongOpt(zkServerAddress)
-        .withDescription("Provide zookeeper address").create();
+  synchronized private static Options constructCommandLineOptions() {
+    Option helpOption =
+        OptionBuilder.withLongOpt(help).withDescription("Prints command-line options info")
+            .create();
+
+    Option zkServerOption =
+        OptionBuilder.withLongOpt(zkServerAddress).withDescription("Provide zookeeper address")
+            .create();
     zkServerOption.setArgs(1);
     zkServerOption.setRequired(true);
     zkServerOption.setArgName("ZookeeperServerAddress(Required)");
 
-    Option clusterOption = OptionBuilder.withLongOpt(cluster)
-        .withDescription("Provide cluster name").create();
+    Option clusterOption =
+        OptionBuilder.withLongOpt(cluster).withDescription("Provide cluster name").create();
     clusterOption.setArgs(1);
     clusterOption.setRequired(true);
     clusterOption.setArgName("Cluster name (Required)");
 
-    Option modeOption = OptionBuilder
-        .withLongOpt(mode)
-        .withDescription(
-            "Provide cluster controller mode (Optional): STANDALONE (default) or DISTRIBUTED")
-        .create();
+    Option modeOption =
+        OptionBuilder
+            .withLongOpt(mode)
+            .withDescription(
+                "Provide cluster controller mode (Optional): STANDALONE (default) or DISTRIBUTED")
+            .create();
     modeOption.setArgs(1);
     modeOption.setRequired(false);
     modeOption.setArgName("Cluster controller mode (Optional)");
 
-    Option controllerNameOption = OptionBuilder.withLongOpt(name)
-        .withDescription("Provide cluster controller name (Optional)").create();
+    Option controllerNameOption =
+        OptionBuilder.withLongOpt(name)
+            .withDescription("Provide cluster controller name (Optional)").create();
     controllerNameOption.setArgs(1);
     controllerNameOption.setRequired(false);
     controllerNameOption.setArgName("Cluster controller name (Optional)");
-    
-    Option portOption = OptionBuilder
-        .withLongOpt(propertyTransferServicePort)
-        .withDescription(
-            "Webservice port for ZkProperty controller transfer")
-        .create();
+
+    Option portOption =
+        OptionBuilder.withLongOpt(propertyTransferServicePort)
+            .withDescription("Webservice port for ZkProperty controller transfer").create();
     portOption.setArgs(1);
     portOption.setRequired(false);
     portOption.setArgName("Cluster controller property transfer port (Optional)");
-    
+
     Options options = new Options();
     options.addOption(helpOption);
     options.addOption(zkServerOption);
@@ -119,23 +118,19 @@ public class HelixControllerMain
     return options;
   }
 
-  public static void printUsage(Options cliOptions)
-  {
+  public static void printUsage(Options cliOptions) {
     HelpFormatter helpFormatter = new HelpFormatter();
     helpFormatter.setWidth(1000);
     helpFormatter.printHelp("java " + HelixControllerMain.class.getName(), cliOptions);
   }
 
-  public static CommandLine processCommandLineArgs(String[] cliArgs) throws Exception
-  {
+  public static CommandLine processCommandLineArgs(String[] cliArgs) throws Exception {
     CommandLineParser cliParser = new GnuParser();
     Options cliOptions = constructCommandLineOptions();
 
-    try
-    {
+    try {
       return cliParser.parse(cliOptions, cliArgs);
-    } catch (ParseException pe)
-    {
+    } catch (ParseException pe) {
       logger.error("fail to parse command-line options. cliArgs: " + Arrays.toString(cliArgs), pe);
       printUsage(cliOptions);
       System.exit(1);
@@ -144,45 +139,39 @@ public class HelixControllerMain
   }
 
   public static void addListenersToController(HelixManager manager,
-      GenericHelixController controller)
-  {
-    try
-    {
+      GenericHelixController controller) {
+    try {
       manager.addConfigChangeListener(controller);
       manager.addLiveInstanceChangeListener(controller);
       manager.addIdealStateChangeListener(controller);
       // no need for controller to listen on external-view
       // manager.addExternalViewChangeListener(controller);
       manager.addControllerListener(controller);
-    } catch (ZkInterruptedException e)
-    {
+    } catch (ZkInterruptedException e) {
       logger
           .warn("zk connection is interrupted during HelixManagerMain.addListenersToController(). "
               + e);
-    } catch (Exception e)
-    {
+    } catch (Exception e) {
       logger.error("Error when creating HelixManagerContollerMonitor", e);
     }
   }
 
   public static HelixManager startHelixController(final String zkConnectString,
-      final String clusterName, final String controllerName, final String controllerMode)
-  {
+      final String clusterName, final String controllerName, final String controllerMode) {
     HelixManager manager = null;
-    try
-    {
-      if (controllerMode.equalsIgnoreCase(STANDALONE))
-      {
-        manager = HelixManagerFactory.getZKHelixManager(clusterName, controllerName,
-            InstanceType.CONTROLLER, zkConnectString);
+    try {
+      if (controllerMode.equalsIgnoreCase(STANDALONE)) {
+        manager =
+            HelixManagerFactory.getZKHelixManager(clusterName, controllerName,
+                InstanceType.CONTROLLER, zkConnectString);
         manager.connect();
-      } else if (controllerMode.equalsIgnoreCase(DISTRIBUTED))
-      {
-        manager = HelixManagerFactory.getZKHelixManager(clusterName, controllerName,
-            InstanceType.CONTROLLER_PARTICIPANT, zkConnectString);
+      } else if (controllerMode.equalsIgnoreCase(DISTRIBUTED)) {
+        manager =
+            HelixManagerFactory.getZKHelixManager(clusterName, controllerName,
+                InstanceType.CONTROLLER_PARTICIPANT, zkConnectString);
 
-        DistClusterControllerStateModelFactory stateModelFactory = new DistClusterControllerStateModelFactory(
-            zkConnectString);
+        DistClusterControllerStateModelFactory stateModelFactory =
+            new DistClusterControllerStateModelFactory(zkConnectString);
 
         // StateMachineEngine genericStateMachineHandler = new
         // StateMachineEngine();
@@ -191,23 +180,20 @@ public class HelixControllerMain
         // manager.getMessagingService().registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
         // genericStateMachineHandler);
         manager.connect();
-      } else
-      {
+      } else {
         logger.error("cluster controller mode:" + controllerMode + " NOT supported");
         // throw new
         // IllegalArgumentException("Unsupported cluster controller mode:" +
         // controllerMode);
       }
-    } catch (Exception e)
-    {
-      logger.error("Exception while starting controller",e);
+    } catch (Exception e) {
+      logger.error("Exception while starting controller", e);
     }
 
     return manager;
   }
 
-  public static void main(String[] args) throws Exception
-  {
+  public static void main(String[] args) throws Exception {
     // read the config;
     // check if the this process is the master wait indefinitely
     // other approach is always process the events but when updating the zk
@@ -222,18 +208,15 @@ public class HelixControllerMain
     String controllerMode = STANDALONE;
     String controllerName = null;
     int propertyTransServicePort = -1;
-    
-    if (cmd.hasOption(mode))
-    {
+
+    if (cmd.hasOption(mode)) {
       controllerMode = cmd.getOptionValue(mode);
     }
-    
-    if(cmd.hasOption(propertyTransferServicePort))
-    {
-        propertyTransServicePort = Integer.parseInt(cmd.getOptionValue(propertyTransferServicePort));
+
+    if (cmd.hasOption(propertyTransferServicePort)) {
+      propertyTransServicePort = Integer.parseInt(cmd.getOptionValue(propertyTransferServicePort));
     }
-    if (controllerMode.equalsIgnoreCase(DISTRIBUTED) && !cmd.hasOption(name))
-    {
+    if (controllerMode.equalsIgnoreCase(DISTRIBUTED) && !cmd.hasOption(name)) {
       throw new IllegalArgumentException(
           "A unique cluster controller name is required in DISTRIBUTED mode");
     }
@@ -244,24 +227,18 @@ public class HelixControllerMain
     logger.info("Cluster manager started, zkServer: " + zkConnectString + ", clusterName:"
         + clusterName + ", controllerName:" + controllerName + ", mode:" + controllerMode);
 
-    if (propertyTransServicePort > 0)
-    {
+    if (propertyTransServicePort > 0) {
       ZKPropertyTransferServer.getInstance().init(propertyTransServicePort, zkConnectString);
     }
-    
-    HelixManager manager = startHelixController(zkConnectString, clusterName, controllerName,
-        controllerMode);
-    try
-    {
+
+    HelixManager manager =
+        startHelixController(zkConnectString, clusterName, controllerName, controllerMode);
+    try {
       Thread.currentThread().join();
-    } 
-    catch (InterruptedException e)
-    {
+    } catch (InterruptedException e) {
       logger.info("controller:" + controllerName + ", " + Thread.currentThread().getName()
           + " interrupted");
-    }
-    finally
-    {
+    } finally {
       manager.disconnect();
       ZKPropertyTransferServer.getInstance().shutdown();
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/controller/HierarchicalDataHolder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/HierarchicalDataHolder.java b/helix-core/src/main/java/org/apache/helix/controller/HierarchicalDataHolder.java
index 2e4ea3c..1543a3f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/HierarchicalDataHolder.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/HierarchicalDataHolder.java
@@ -30,15 +30,11 @@ import org.apache.helix.manager.zk.ZkClient;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.data.Stat;
 
-
 /**
  * Generic class that will read the data given the root path.
- * 
  */
-public class HierarchicalDataHolder<T>
-{
-  private static final Logger logger = Logger
-      .getLogger(HierarchicalDataHolder.class.getName());
+public class HierarchicalDataHolder<T> {
+  private static final Logger logger = Logger.getLogger(HierarchicalDataHolder.class.getName());
   AtomicReference<Node<T>> root;
 
   /**
@@ -49,9 +45,7 @@ public class HierarchicalDataHolder<T>
   private final String _rootPath;
   private final FileFilter _filter;
 
-  public HierarchicalDataHolder(ZkClient client, String rootPath,
-      FileFilter filter)
-  {
+  public HierarchicalDataHolder(ZkClient client, String rootPath, FileFilter filter) {
     this._zkClient = client;
     this._rootPath = rootPath;
     this._filter = filter;
@@ -61,66 +55,52 @@ public class HierarchicalDataHolder<T>
     refreshData();
   }
 
-  public long getVersion()
-  {
+  public long getVersion() {
     return currentVersion.get();
   }
 
-  public boolean refreshData()
-  {
+  public boolean refreshData() {
     Node<T> newRoot = new Node<T>();
     boolean dataChanged = refreshRecursively(root.get(), newRoot, _rootPath);
-    if (dataChanged)
-    {
+    if (dataChanged) {
       currentVersion.getAndIncrement();
       root.set(newRoot);
       return true;
-    } else
-    {
+    } else {
       return false;
     }
   }
 
   // private void refreshRecursively(Node<T> oldRoot, Stat oldStat, Node<T>
   // newRoot,Stat newStat, String path)
-  private boolean refreshRecursively(Node<T> oldRoot, Node<T> newRoot,
-      String path)
-  {
+  private boolean refreshRecursively(Node<T> oldRoot, Node<T> newRoot, String path) {
     boolean dataChanged = false;
     Stat newStat = _zkClient.getStat(path);
     Stat oldStat = (oldRoot != null) ? oldRoot.stat : null;
     newRoot.name = path;
-    if (newStat != null)
-    {
-      if (oldStat == null)
-      {
+    if (newStat != null) {
+      if (oldStat == null) {
         newRoot.stat = newStat;
         newRoot.data = _zkClient.<T> readData(path, true);
         dataChanged = true;
-      } else if (newStat.equals(oldStat))
-      {
+      } else if (newStat.equals(oldStat)) {
         newRoot.stat = oldStat;
         newRoot.data = oldRoot.data;
-      } else
-      {
+      } else {
         dataChanged = true;
         newRoot.stat = newStat;
         newRoot.data = _zkClient.<T> readData(path, true);
       }
-      if (newStat.getNumChildren() > 0)
-      {
+      if (newStat.getNumChildren() > 0) {
         List<String> children = _zkClient.getChildren(path);
-        for (String child : children)
-        {
+        for (String child : children) {
           String newPath = path + "/" + child;
-          Node<T> oldChild = (oldRoot != null && oldRoot.children != null) ? oldRoot.children
-              .get(child) : null;
-          if (newRoot.children == null)
-          {
+          Node<T> oldChild =
+              (oldRoot != null && oldRoot.children != null) ? oldRoot.children.get(child) : null;
+          if (newRoot.children == null) {
             newRoot.children = new ConcurrentHashMap<String, HierarchicalDataHolder.Node<T>>();
           }
-          if (!newRoot.children.contains(child))
-          {
+          if (!newRoot.children.contains(child)) {
             newRoot.children.put(child, new Node<T>());
           }
           Node<T> newChild = newRoot.children.get(child);
@@ -128,15 +108,13 @@ public class HierarchicalDataHolder<T>
           dataChanged = dataChanged || childChanged;
         }
       }
-    } else
-    {
+    } else {
       logger.info(path + " does not exist");
     }
     return dataChanged;
   }
 
-  static class Node<T>
-  {
+  static class Node<T> {
     String name;
     Stat stat;
     T data;
@@ -144,27 +122,22 @@ public class HierarchicalDataHolder<T>
 
   }
 
-  public void print()
-  {
-    logger.info("START "+ _rootPath);
+  public void print() {
+    logger.info("START " + _rootPath);
     LinkedList<Node<T>> stack = new LinkedList<HierarchicalDataHolder.Node<T>>();
     stack.push(root.get());
-    while (!stack.isEmpty())
-    {
+    while (!stack.isEmpty()) {
       Node<T> pop = stack.pop();
-      if (pop != null)
-      {
-        logger.info("name:"+ pop.name);
-        logger.info("\tdata:"+pop.data);
-        if (pop.children != null)
-        {
-          for (Node<T> child : pop.children.values())
-          {
+      if (pop != null) {
+        logger.info("name:" + pop.name);
+        logger.info("\tdata:" + pop.data);
+        if (pop.children != null) {
+          for (Node<T> child : pop.children.values()) {
             stack.push(child);
           }
         }
       }
     }
-    logger.info("END "+ _rootPath);
+    logger.info("END " + _rootPath);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java
index 0016597..5bbe264 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java
@@ -24,58 +24,46 @@ import java.util.Map;
 import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.monitoring.mbeans.HelixStageLatencyMonitor;
 
-
-public class AbstractBaseStage implements Stage
-{
+public class AbstractBaseStage implements Stage {
   @Override
-  public void init(StageContext context)
-  {
+  public void init(StageContext context) {
 
   }
 
   @Override
-  public void preProcess()
-  {
+  public void preProcess() {
     // TODO Auto-generated method stub
 
   }
 
   @Override
-  public void process(ClusterEvent event) throws Exception
-  {
+  public void process(ClusterEvent event) throws Exception {
 
   }
 
   @Override
-  public void postProcess()
-  {
+  public void postProcess() {
 
   }
 
   @Override
-  public void release()
-  {
+  public void release() {
 
   }
 
   @Override
-  public String getStageName()
-  {
+  public String getStageName() {
     // default stage name will be the class name
     String className = this.getClass().getName();
     return className;
   }
 
-  public void addLatencyToMonitor(ClusterEvent event, long latency)
-  {
+  public void addLatencyToMonitor(ClusterEvent event, long latency) {
     Map<String, HelixStageLatencyMonitor> stgLatencyMonitorMap =
         event.getAttribute("HelixStageLatencyMonitorMap");
-    if (stgLatencyMonitorMap != null)
-    {
-      if (stgLatencyMonitorMap.containsKey(getStageName()))
-      {
-        HelixStageLatencyMonitor stgLatencyMonitor =
-            stgLatencyMonitorMap.get(getStageName());
+    if (stgLatencyMonitorMap != null) {
+      if (stgLatencyMonitorMap.containsKey(getStageName())) {
+        HelixStageLatencyMonitor stgLatencyMonitor = stgLatencyMonitorMap.get(getStageName());
         stgLatencyMonitor.addStgLatency(latency);
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java
index c5eaca8..16179b8 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java
@@ -25,45 +25,36 @@ import java.util.List;
 import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.log4j.Logger;
 
-
-public class Pipeline
-{
+public class Pipeline {
   private static final Logger logger = Logger.getLogger(Pipeline.class.getName());
   List<Stage> _stages;
 
-  public Pipeline()
-  {
+  public Pipeline() {
     _stages = new ArrayList<Stage>();
   }
 
-  public void addStage(Stage stage)
-  {
+  public void addStage(Stage stage) {
     _stages.add(stage);
     StageContext context = null;
     stage.init(context);
   }
 
-  public void handle(ClusterEvent event) throws Exception
-  {
-    if (_stages == null)
-    {
+  public void handle(ClusterEvent event) throws Exception {
+    if (_stages == null) {
       return;
     }
-    for (Stage stage : _stages)
-    {
+    for (Stage stage : _stages) {
       stage.preProcess();
       stage.process(event);
       stage.postProcess();
     }
   }
 
-  public void finish()
-  {
+  public void finish() {
 
   }
 
-  public List<Stage> getStages()
-  {
+  public List<Stage> getStages() {
     return _stages;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/controller/pipeline/PipelineRegistry.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/PipelineRegistry.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/PipelineRegistry.java
index b0f4f00..468142a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/pipeline/PipelineRegistry.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/PipelineRegistry.java
@@ -25,32 +25,25 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class PipelineRegistry
-{
+public class PipelineRegistry {
   Map<String, List<Pipeline>> _map;
 
-  public PipelineRegistry()
-  {
+  public PipelineRegistry() {
     _map = new HashMap<String, List<Pipeline>>();
   }
 
-  public void register(String eventName, Pipeline... pipelines)
-  {
-    if (!_map.containsKey(eventName))
-    {
+  public void register(String eventName, Pipeline... pipelines) {
+    if (!_map.containsKey(eventName)) {
       _map.put(eventName, new ArrayList<Pipeline>());
     }
     List<Pipeline> list = _map.get(eventName);
-    for (Pipeline pipeline : pipelines)
-    {
+    for (Pipeline pipeline : pipelines) {
       list.add(pipeline);
     }
   }
 
-  public List<Pipeline> getPipelinesForEvent(String eventName)
-  {
-    if (_map.containsKey(eventName))
-    {
+  public List<Pipeline> getPipelinesForEvent(String eventName) {
+    if (_map.containsKey(eventName)) {
       return _map.get(eventName);
     }
     return Collections.emptyList();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/controller/pipeline/Stage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/Stage.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/Stage.java
index a9ab8aa..aeed664 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/pipeline/Stage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/Stage.java
@@ -22,11 +22,9 @@ package org.apache.helix.controller.pipeline;
 import org.apache.helix.controller.stages.ClusterEvent;
 
 /**
- * Logically independent unit in processing callbacks for cluster changes  
- *
+ * Logically independent unit in processing callbacks for cluster changes
  */
-public interface Stage
-{
+public interface Stage {
 
   /**
    * Initialize a stage

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/controller/pipeline/StageContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/StageContext.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/StageContext.java
index cac14e5..b62cf99 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/pipeline/StageContext.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/StageContext.java
@@ -19,7 +19,6 @@ package org.apache.helix.controller.pipeline;
  * under the License.
  */
 
-public class StageContext
-{
+public class StageContext {
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/controller/pipeline/StageException.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/StageException.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/StageException.java
index af07f92..0feb56c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/pipeline/StageException.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/StageException.java
@@ -19,15 +19,13 @@ package org.apache.helix.controller.pipeline;
  * under the License.
  */
 
-public class StageException extends Exception
-{
+public class StageException extends Exception {
 
-  public StageException(String message)
-  {
+  public StageException(String message) {
     super(message);
   }
-  public StageException(String message,Exception e)
-  {
-    super(message,e);
+
+  public StageException(String message, Exception e) {
+    super(message, e);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
index 2e3cce7..4dd5ea6 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
@@ -53,10 +53,8 @@ import org.apache.log4j.Logger;
  * state of a resource, fully adapting to the addition or removal of instances. This includes
  * computation of a new preference list and a partition to instance and state mapping based on the
  * computed instance preferences.
- *
  * The input is the current assignment of partitions to instances, as well as existing instance
  * preferences, if any.
- *
  * The output is a preference list and a mapping based on that preference list, i.e. partition p
  * has a replica on node k with state s.
  */
@@ -74,9 +72,8 @@ public class AutoRebalancer implements Rebalancer {
   }
 
   @Override
-  public IdealState computeNewIdealState(String resourceName,
-      IdealState currentIdealState, CurrentStateOutput currentStateOutput,
-      ClusterDataCache clusterData) {
+  public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState,
+      CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
     List<String> partitions = new ArrayList<String>(currentIdealState.getPartitionSet());
     String stateModelName = currentIdealState.getStateModelDefRef();
     StateModelDefinition stateModelDef = clusterData.getStateModelDef(stateModelName);
@@ -86,8 +83,8 @@ public class AutoRebalancer implements Rebalancer {
     LinkedHashMap<String, Integer> stateCountMap = new LinkedHashMap<String, Integer>();
     stateCountMap = stateCount(stateModelDef, liveInstance.size(), Integer.parseInt(replicas));
     List<String> liveNodes = new ArrayList<String>(liveInstance.keySet());
-    Map<String, Map<String, String>> currentMapping = currentMapping(currentStateOutput,
-        resourceName, partitions, stateCountMap);
+    Map<String, Map<String, String>> currentMapping =
+        currentMapping(currentStateOutput, resourceName, partitions, stateCountMap);
 
     List<String> allNodes = new ArrayList<String>(clusterData.getInstanceConfigMap().keySet());
     int maxPartition = currentIdealState.getMaxPartitionsPerInstance();
@@ -101,10 +98,11 @@ public class AutoRebalancer implements Rebalancer {
     }
     ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
     placementScheme.init(_manager);
-    _algorithm = new AutoRebalanceStrategy(resourceName, partitions, stateCountMap,
-        maxPartition, placementScheme);
-    ZNRecord newMapping = _algorithm.computePartitionAssignment(liveNodes,currentMapping,
-        allNodes);
+    _algorithm =
+        new AutoRebalanceStrategy(resourceName, partitions, stateCountMap, maxPartition,
+            placementScheme);
+    ZNRecord newMapping =
+        _algorithm.computePartitionAssignment(liveNodes, currentMapping, allNodes);
 
     if (LOG.isInfoEnabled()) {
       LOG.info("newMapping: " + newMapping);
@@ -118,11 +116,10 @@ public class AutoRebalancer implements Rebalancer {
   }
 
   /**
-  *
-  * @return state count map: state->count
-  */
-  private LinkedHashMap<String, Integer> stateCount(StateModelDefinition stateModelDef, int liveNodesNb,
-      int totalReplicas) {
+   * @return state count map: state->count
+   */
+  private LinkedHashMap<String, Integer> stateCount(StateModelDefinition stateModelDef,
+      int liveNodesNb, int totalReplicas) {
     LinkedHashMap<String, Integer> stateCountMap = new LinkedHashMap<String, Integer>();
     List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
 
@@ -168,8 +165,8 @@ public class AutoRebalancer implements Rebalancer {
     Map<String, Map<String, String>> map = new HashMap<String, Map<String, String>>();
 
     for (String partition : partitions) {
-      Map<String, String> curStateMap = currentStateOutput.getCurrentStateMap(resourceName,
-          new Partition(partition));
+      Map<String, String> curStateMap =
+          currentStateOutput.getCurrentStateMap(resourceName, new Partition(partition));
       map.put(partition, new HashMap<String, String>());
       for (String node : curStateMap.keySet()) {
         String state = curStateMap.get(node);
@@ -178,8 +175,8 @@ public class AutoRebalancer implements Rebalancer {
         }
       }
 
-      Map<String, String> pendingStateMap = currentStateOutput.getPendingStateMap(resourceName,
-          new Partition(partition));
+      Map<String, String> pendingStateMap =
+          currentStateOutput.getPendingStateMap(resourceName, new Partition(partition));
       for (String node : pendingStateMap.keySet()) {
         String state = pendingStateMap.get(node);
         if (stateCountMap.containsKey(state)) {
@@ -205,8 +202,8 @@ public class AutoRebalancer implements Rebalancer {
           currentStateOutput.getCurrentStateMap(resource.getResourceName(), partition);
       Set<String> disabledInstancesForPartition =
           cache.getDisabledInstancesForPartition(partition.toString());
-      List<String> preferenceList = ConstraintBasedAssignment.getPreferenceList(cache, partition,
-          idealState, stateModelDef);
+      List<String> preferenceList =
+          ConstraintBasedAssignment.getPreferenceList(cache, partition, idealState, stateModelDef);
       Map<String, String> bestStateForPartition =
           ConstraintBasedAssignment.computeAutoBestStateForPartition(cache, stateModelDef,
               preferenceList, currentStateMap, disabledInstancesForPartition);
@@ -219,7 +216,6 @@ public class AutoRebalancer implements Rebalancer {
    * Compute best state for resource in AUTO_REBALANCE ideal state mode. the algorithm
    * will make sure that the master partition are evenly distributed; Also when instances
    * are added / removed, the amount of diff in master partitions are minimized
-   *
    * @param cache
    * @param idealState
    * @param instancePreferenceList
@@ -227,23 +223,22 @@ public class AutoRebalancer implements Rebalancer {
    * @param currentStateOutput
    * @return
    */
-  private void calculateAutoBalancedIdealState(ClusterDataCache cache,
-                                               IdealState idealState,
-                                               StateModelDefinition stateModelDef) {
+  private void calculateAutoBalancedIdealState(ClusterDataCache cache, IdealState idealState,
+      StateModelDefinition stateModelDef) {
     String topStateValue = stateModelDef.getStatesPriorityList().get(0);
     Set<String> liveInstances = cache.getLiveInstances().keySet();
     Set<String> taggedInstances = new HashSet<String>();
 
     // If there are instances tagged with resource name, use only those instances
-    if(idealState.getInstanceGroupTag() != null) {
-      for(String instanceName : liveInstances) {
-        if(cache.getInstanceConfigMap().get(instanceName).containsTag(
-            idealState.getInstanceGroupTag())) {
+    if (idealState.getInstanceGroupTag() != null) {
+      for (String instanceName : liveInstances) {
+        if (cache.getInstanceConfigMap().get(instanceName)
+            .containsTag(idealState.getInstanceGroupTag())) {
           taggedInstances.add(instanceName);
         }
       }
     }
-    if(taggedInstances.size() > 0) {
+    if (taggedInstances.size() > 0) {
       if (LOG.isInfoEnabled()) {
         LOG.info("found the following instances with tag " + idealState.getResourceName() + " "
             + taggedInstances);
@@ -254,8 +249,7 @@ public class AutoRebalancer implements Rebalancer {
     int replicas = 1;
     try {
       replicas = Integer.parseInt(idealState.getReplicas());
-    }
-    catch (Exception e) {
+    } catch (Exception e) {
       LOG.error("", e);
     }
     // Init for all partitions with empty list
@@ -268,8 +262,7 @@ public class AutoRebalancer implements Rebalancer {
     // Return if no live instance
     if (liveInstances.size() == 0) {
       if (LOG.isInfoEnabled()) {
-        LOG.info("No live instances, return. Idealstate : "
-            + idealState.getResourceName());
+        LOG.info("No live instances, return. Idealstate : " + idealState.getResourceName());
       }
       return;
     }
@@ -283,9 +276,8 @@ public class AutoRebalancer implements Rebalancer {
     for (String liveInstanceName : liveInstances) {
       CurrentState currentState =
           cache.getCurrentState(liveInstanceName,
-                                cache.getLiveInstances()
-                                     .get(liveInstanceName)
-                                     .getSessionId()).get(idealState.getId());
+              cache.getLiveInstances().get(liveInstanceName).getSessionId())
+              .get(idealState.getId());
       if (currentState != null) {
         Map<String, String> partitionStates = currentState.getPartitionStateMap();
         for (String partitionName : partitionStates.keySet()) {
@@ -301,15 +293,13 @@ public class AutoRebalancer implements Rebalancer {
     orphanedPartitionsList.addAll(orphanedPartitions);
     int maxPartitionsPerInstance = idealState.getMaxPartitionsPerInstance();
     normalizeAssignmentMap(masterAssignmentMap, orphanedPartitionsList, maxPartitionsPerInstance);
-    idealState.getRecord()
-              .setListFields(generateListFieldFromMasterAssignment(masterAssignmentMap,
-                                                                   replicas));
+    idealState.getRecord().setListFields(
+        generateListFieldFromMasterAssignment(masterAssignmentMap, replicas));
   }
 
   /**
    * Given the current master assignment map and the partitions not hosted, generate an
    * evenly distributed partition assignment map
-   *
    * @param masterAssignmentMap
    *          current master assignment map
    * @param orphanPartitions
@@ -317,8 +307,7 @@ public class AutoRebalancer implements Rebalancer {
    * @return
    */
   private void normalizeAssignmentMap(Map<String, List<String>> masterAssignmentMap,
-                                      List<String> orphanPartitions,
-                                      int maxPartitionsPerInstance) {
+      List<String> orphanPartitions, int maxPartitionsPerInstance) {
     int totalPartitions = 0;
     String[] instanceNames = new String[masterAssignmentMap.size()];
     masterAssignmentMap.keySet().toArray(instanceNames);
@@ -340,8 +329,7 @@ public class AutoRebalancer implements Rebalancer {
       // For hosts that has more partitions, move those partitions to "orphaned"
       while (masterAssignmentMap.get(instanceNames[i]).size() > targetPartitionNo) {
         int lastElementIndex = masterAssignmentMap.get(instanceNames[i]).size() - 1;
-        orphanPartitions.add(masterAssignmentMap.get(instanceNames[i])
-                                                .get(lastElementIndex));
+        orphanPartitions.add(masterAssignmentMap.get(instanceNames[i]).get(lastElementIndex));
         masterAssignmentMap.get(instanceNames[i]).remove(lastElementIndex);
       }
     }
@@ -351,13 +339,12 @@ public class AutoRebalancer implements Rebalancer {
     for (int i = 0; i < instanceNames.length; i++) {
       int targetPartitionNo = leave > 0 ? (partitionNumber + 1) : partitionNumber;
       leave--;
-      if(targetPartitionNo > maxPartitionsPerInstance) {
+      if (targetPartitionNo > maxPartitionsPerInstance) {
         targetPartitionNo = maxPartitionsPerInstance;
       }
       while (masterAssignmentMap.get(instanceNames[i]).size() < targetPartitionNo) {
         int lastElementIndex = orphanPartitions.size() - 1;
-        masterAssignmentMap.get(instanceNames[i])
-                           .add(orphanPartitions.get(lastElementIndex));
+        masterAssignmentMap.get(instanceNames[i]).add(orphanPartitions.get(lastElementIndex));
         orphanPartitions.remove(lastElementIndex);
       }
     }
@@ -369,7 +356,6 @@ public class AutoRebalancer implements Rebalancer {
   /**
    * Generate full preference list from the master assignment map evenly distribute the
    * slave partitions mastered on a host to other hosts
-   *
    * @param masterAssignmentMap
    *          current master assignment map
    * @param orphanPartitions

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
index 1cbfde4..17dc5c8 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
@@ -39,10 +39,8 @@ import org.apache.log4j.Logger;
  * This is a Rebalancer specific to custom mode. It is tasked with checking an existing mapping of
  * partitions against the set of live instances to mark assignment states as dropped or erroneous
  * as necessary.
- *
  * The input is the required current assignment of partitions to instances, as well as the required
  * existing instance preferences.
- *
  * The output is a verified mapping based on that preference list, i.e. partition p has a replica
  * on node k with state s, where s may be a dropped or error state if necessary.
  */
@@ -55,9 +53,8 @@ public class CustomRebalancer implements Rebalancer {
   }
 
   @Override
-  public IdealState computeNewIdealState(String resourceName,
-      IdealState currentIdealState, CurrentStateOutput currentStateOutput,
-      ClusterDataCache clusterData) {
+  public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState,
+      CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
     return currentIdealState;
   }
 
@@ -87,7 +84,6 @@ public class CustomRebalancer implements Rebalancer {
 
   /**
    * compute best state for resource in CUSTOMIZED ideal state mode
-   *
    * @param cache
    * @param stateModelDef
    * @param idealStateMap
@@ -108,9 +104,8 @@ public class CustomRebalancer implements Rebalancer {
             && !disabledInstancesForPartition.contains(instance)) {
           // if dropped and not disabled, transit to DROPPED
           instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString());
-        }
-        else if ( (currentStateMap.get(instance) == null
-            || !currentStateMap.get(instance).equals(HelixDefinedState.ERROR.toString()))
+        } else if ((currentStateMap.get(instance) == null || !currentStateMap.get(instance).equals(
+            HelixDefinedState.ERROR.toString()))
             && disabledInstancesForPartition.contains(instance)) {
           // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
           instanceStateMap.put(instance, stateModelDef.getInitialState());
@@ -125,9 +120,9 @@ public class CustomRebalancer implements Rebalancer {
 
     Map<String, LiveInstance> liveInstancesMap = cache.getLiveInstances();
     for (String instance : idealStateMap.keySet()) {
-      boolean notInErrorState = currentStateMap == null
-          || currentStateMap.get(instance) == null
-          || !currentStateMap.get(instance).equals(HelixDefinedState.ERROR.toString());
+      boolean notInErrorState =
+          currentStateMap == null || currentStateMap.get(instance) == null
+              || !currentStateMap.get(instance).equals(HelixDefinedState.ERROR.toString());
 
       if (liveInstancesMap.containsKey(instance) && notInErrorState
           && !disabledInstancesForPartition.contains(instance)) {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
index fc36124..a0cfbb7 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
@@ -30,17 +30,14 @@ import org.apache.helix.model.Resource;
  * Allows one to come up with custom implementation of a rebalancer.<br/>
  * This will be invoked on all changes that happen in the cluster.<br/>
  * Simply return the newIdealState for a resource in this method.<br/>
- * 
  */
-public interface Rebalancer
-{
+public interface Rebalancer {
   void init(HelixManager manager);
 
   /**
-   * This method provides all the relevant information needed to rebalance a resource. 
+   * This method provides all the relevant information needed to rebalance a resource.
    * If you need additional information use manager.getAccessor to read the cluster data.
    * This allows one to compute the newIdealState according to app specific requirement.
-   * 
    * @param resourceName Name of the resource to be rebalanced
    * @param currentIdealState
    * @param currentStateOutput
@@ -49,10 +46,8 @@ public interface Rebalancer
    * @param clusterData Provides additional methods to retrieve cluster data.
    * @return
    */
-  IdealState computeNewIdealState(String resourceName,
-                                  IdealState currentIdealState,
-                                  final CurrentStateOutput currentStateOutput,
-                                  final ClusterDataCache clusterData);
+  IdealState computeNewIdealState(String resourceName, IdealState currentIdealState,
+      final CurrentStateOutput currentStateOutput, final ClusterDataCache clusterData);
 
   /**
    * Given an ideal state for a resource and the liveness of instances, compute the best possible
@@ -64,7 +59,6 @@ public interface Rebalancer
    *          Provides the current state and pending state transitions for all partitions
    * @return
    */
-  ResourceMapping computeBestPossiblePartitionState(
-      ClusterDataCache cache, IdealState idealState, Resource resource,
-      CurrentStateOutput currentStateOutput);
+  ResourceMapping computeBestPossiblePartitionState(ClusterDataCache cache, IdealState idealState,
+      Resource resource, CurrentStateOutput currentStateOutput);
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
index cb8a948..bc682ff 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
@@ -38,24 +38,22 @@ import org.apache.log4j.Logger;
  * This is a Rebalancer specific to semi-automatic mode. It is tasked with computing the ideal
  * state of a resource based on a predefined preference list of instances willing to accept
  * replicas.
- *
  * The input is the optional current assignment of partitions to instances, as well as the required
  * existing instance preferences.
- *
  * The output is a mapping based on that preference list, i.e. partition p has a replica on node k
  * with state s.
  */
 public class SemiAutoRebalancer implements Rebalancer {
 
   private static final Logger LOG = Logger.getLogger(SemiAutoRebalancer.class);
+
   @Override
   public void init(HelixManager manager) {
   }
 
   @Override
-  public IdealState computeNewIdealState(String resourceName,
-      IdealState currentIdealState, CurrentStateOutput currentStateOutput,
-      ClusterDataCache clusterData) {
+  public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState,
+      CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
     return currentIdealState;
   }
 
@@ -73,8 +71,8 @@ public class SemiAutoRebalancer implements Rebalancer {
           currentStateOutput.getCurrentStateMap(resource.getResourceName(), partition);
       Set<String> disabledInstancesForPartition =
           cache.getDisabledInstancesForPartition(partition.toString());
-      List<String> preferenceList = ConstraintBasedAssignment.getPreferenceList(cache, partition,
-          idealState, stateModelDef);
+      List<String> preferenceList =
+          ConstraintBasedAssignment.getPreferenceList(cache, partition, idealState, stateModelDef);
       Map<String, String> bestStateForPartition =
           ConstraintBasedAssignment.computeAutoBestStateForPartition(cache, stateModelDef,
               preferenceList, currentStateMap, disabledInstancesForPartition);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
index 1efc897..3fd52f4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
@@ -43,10 +43,8 @@ import org.apache.log4j.Logger;
 public class ConstraintBasedAssignment {
   private static Logger logger = Logger.getLogger(ConstraintBasedAssignment.class);
 
-  public static List<String> getPreferenceList(ClusterDataCache cache,
-      Partition resource,
-      IdealState idealState,
-      StateModelDefinition stateModelDef) {
+  public static List<String> getPreferenceList(ClusterDataCache cache, Partition resource,
+      IdealState idealState, StateModelDefinition stateModelDef) {
     List<String> listField = idealState.getPreferenceList(resource.getPartitionName());
 
     if (listField != null && listField.size() == 1
@@ -55,16 +53,13 @@ public class ConstraintBasedAssignment {
       List<String> prefList = new ArrayList<String>(liveInstances.keySet());
       Collections.sort(prefList);
       return prefList;
-    }
-    else
-    {
+    } else {
       return listField;
     }
   }
 
   /**
    * compute best state for resource in AUTO ideal state mode
-   *
    * @param cache
    * @param stateModelDef
    * @param instancePreferenceList
@@ -75,26 +70,20 @@ public class ConstraintBasedAssignment {
    */
   public static Map<String, String> computeAutoBestStateForPartition(ClusterDataCache cache,
       StateModelDefinition stateModelDef, List<String> instancePreferenceList,
-      Map<String, String> currentStateMap, Set<String> disabledInstancesForPartition)
-  {
+      Map<String, String> currentStateMap, Set<String> disabledInstancesForPartition) {
     Map<String, String> instanceStateMap = new HashMap<String, String>();
 
     // if the ideal state is deleted, instancePreferenceList will be empty and
     // we should drop all resources.
-    if (currentStateMap != null)
-    {
-      for (String instance : currentStateMap.keySet())
-      {
+    if (currentStateMap != null) {
+      for (String instance : currentStateMap.keySet()) {
         if ((instancePreferenceList == null || !instancePreferenceList.contains(instance))
-            && !disabledInstancesForPartition.contains(instance))
-        {
+            && !disabledInstancesForPartition.contains(instance)) {
           // if dropped and not disabled, transit to DROPPED
           instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString());
-        }
-        else if ( (currentStateMap.get(instance) == null
-            || !currentStateMap.get(instance).equals(HelixDefinedState.ERROR.toString()))
-            && disabledInstancesForPartition.contains(instance))
-        {
+        } else if ((currentStateMap.get(instance) == null || !currentStateMap.get(instance).equals(
+            HelixDefinedState.ERROR.toString()))
+            && disabledInstancesForPartition.contains(instance)) {
           // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
           instanceStateMap.put(instance, stateModelDef.getInitialState());
         }
@@ -102,8 +91,7 @@ public class ConstraintBasedAssignment {
     }
 
     // ideal state is deleted
-    if (instancePreferenceList == null)
-    {
+    if (instancePreferenceList == null) {
       return instanceStateMap;
     }
 
@@ -112,50 +100,37 @@ public class ConstraintBasedAssignment {
 
     Map<String, LiveInstance> liveInstancesMap = cache.getLiveInstances();
 
-    for (String state : statesPriorityList)
-    {
+    for (String state : statesPriorityList) {
       String num = stateModelDef.getNumInstancesPerState(state);
       int stateCount = -1;
-      if ("N".equals(num))
-      {
+      if ("N".equals(num)) {
         Set<String> liveAndEnabled = new HashSet<String>(liveInstancesMap.keySet());
         liveAndEnabled.removeAll(disabledInstancesForPartition);
         stateCount = liveAndEnabled.size();
-      }
-      else if ("R".equals(num))
-      {
+      } else if ("R".equals(num)) {
         stateCount = instancePreferenceList.size();
-      }
-      else
-      {
-        try
-        {
+      } else {
+        try {
           stateCount = Integer.parseInt(num);
-        }
-        catch (Exception e)
-        {
+        } catch (Exception e) {
           logger.error("Invalid count for state:" + state + " ,count=" + num);
         }
       }
-      if (stateCount > -1)
-      {
+      if (stateCount > -1) {
         int count = 0;
-        for (int i = 0; i < instancePreferenceList.size(); i++)
-        {
+        for (int i = 0; i < instancePreferenceList.size(); i++) {
           String instanceName = instancePreferenceList.get(i);
 
-          boolean notInErrorState = currentStateMap == null
-              || currentStateMap.get(instanceName) == null
-              || !currentStateMap.get(instanceName).equals(HelixDefinedState.ERROR.toString());
+          boolean notInErrorState =
+              currentStateMap == null || currentStateMap.get(instanceName) == null
+                  || !currentStateMap.get(instanceName).equals(HelixDefinedState.ERROR.toString());
 
-          if (liveInstancesMap.containsKey(instanceName) && !assigned[i]
-              && notInErrorState && !disabledInstancesForPartition.contains(instanceName))
-          {
+          if (liveInstancesMap.containsKey(instanceName) && !assigned[i] && notInErrorState
+              && !disabledInstancesForPartition.contains(instanceName)) {
             instanceStateMap.put(instanceName, state);
             count = count + 1;
             assigned[i] = true;
-            if (count == stateCount)
-            {
+            if (count == stateCount) {
               break;
             }
           }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/controller/restlet/ZKPropertyTransferServer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZKPropertyTransferServer.java b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZKPropertyTransferServer.java
index fab54b2..96277cf 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZKPropertyTransferServer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZKPropertyTransferServer.java
@@ -41,117 +41,99 @@ import org.restlet.data.Protocol;
 
 /**
  * Controller side restlet server that receives ZNRecordUpdate requests from
- * clients, and batch the ZNRecordUpdate and apply them to zookeeper. This is 
- * to optimize the concurrency level of zookeeper access for ZNRecord updates 
- * that does not require real-time, like message handling status updates and 
- * healthcheck reports. 
- * 
+ * clients, and batch the ZNRecordUpdate and apply them to zookeeper. This is
+ * to optimize the concurrency level of zookeeper access for ZNRecord updates
+ * that does not require real-time, like message handling status updates and
+ * healthcheck reports.
  * As one server will be used by multiple helix controllers that runs on the same machine,
- * This class is designed as a singleton. Application is responsible to call init() 
+ * This class is designed as a singleton. Application is responsible to call init()
  * and shutdown() on the getInstance().
- * */
-public class ZKPropertyTransferServer
-{
+ */
+public class ZKPropertyTransferServer {
   public static final String PORT = "port";
   public static String RESTRESOURCENAME = "ZNRecordUpdates";
   public static final String SERVER = "ZKPropertyTransferServer";
-  
-  // Frequency period for the ZNRecords are batch written to zookeeper 
+
+  // Frequency period for the ZNRecords are batch written to zookeeper
   public static int PERIOD = 10 * 1000;
   // If the buffered ZNRecord updates exceed the limit, do a zookeeper batch update.
   public static int MAX_UPDATE_LIMIT = 10000;
   private static Logger LOG = Logger.getLogger(ZKPropertyTransferServer.class);
-  
+
   int _localWebservicePort;
   String _webserviceUrl;
   ZkBaseDataAccessor<ZNRecord> _accessor;
   String _zkAddress;
-  
-  AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>> _dataBufferRef
-    = new AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>>();
-  
+
+  AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>> _dataBufferRef =
+      new AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>>();
+
   boolean _initialized = false;
   boolean _shutdownFlag = false;
   Component _component = null;
   Timer _timer = null;
-  
+
   /**
    * Timertask for zookeeper batched writes
-   * */
-  class ZKPropertyTransferTask extends TimerTask
-  {
+   */
+  class ZKPropertyTransferTask extends TimerTask {
     @Override
-    public void run()
-    {
-      try
-      {
+    public void run() {
+      try {
         sendData();
-      }
-      catch(Throwable t)
-      {
+      } catch (Throwable t) {
         LOG.error("", t);
       }
-    
+
     }
   }
-  
-  void sendData()
-  {
+
+  void sendData() {
     LOG.info("ZKPropertyTransferServer transfering data to zookeeper");
-    ConcurrentHashMap<String, ZNRecordUpdate> updateCache  = null;
-    
-    synchronized(_dataBufferRef)
-    {
+    ConcurrentHashMap<String, ZNRecordUpdate> updateCache = null;
+
+    synchronized (_dataBufferRef) {
       updateCache = _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
     }
-    
-    if(updateCache != null)
-    {
+
+    if (updateCache != null) {
       List<String> paths = new ArrayList<String>();
       List<DataUpdater<ZNRecord>> updaters = new ArrayList<DataUpdater<ZNRecord>>();
       List<ZNRecord> vals = new ArrayList<ZNRecord>();
-      // BUGBUG : what if the instance is dropped? 
-      for(ZNRecordUpdate holder : updateCache.values())
-      {
+      // BUGBUG : what if the instance is dropped?
+      for (ZNRecordUpdate holder : updateCache.values()) {
         paths.add(holder.getPath());
         updaters.add(holder.getZNRecordUpdater());
         vals.add(holder.getRecord());
       }
       // Batch write the accumulated updates into zookeeper
       long timeStart = System.currentTimeMillis();
-      if(paths.size() > 0)
-      {
+      if (paths.size() > 0) {
         _accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
       }
-      LOG.info("ZKPropertyTransferServer updated " + vals.size() + " records in " + (System.currentTimeMillis() - timeStart) + " ms");
-    }
-    else
-    {
+      LOG.info("ZKPropertyTransferServer updated " + vals.size() + " records in "
+          + (System.currentTimeMillis() - timeStart) + " ms");
+    } else {
       LOG.warn("null _dataQueueRef. Should be in the beginning only");
     }
   }
-  
+
   static ZKPropertyTransferServer _instance = new ZKPropertyTransferServer();
-  
-  private ZKPropertyTransferServer()
-  {
+
+  private ZKPropertyTransferServer() {
     _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
   }
-  
-  public static ZKPropertyTransferServer getInstance()
-  {
+
+  public static ZKPropertyTransferServer getInstance() {
     return _instance;
   }
-  
-  public boolean isInitialized()
-  {
+
+  public boolean isInitialized() {
     return _initialized;
   }
-  
-  public void init(int localWebservicePort, String zkAddress)
-  {
-    if(!_initialized && !_shutdownFlag)
-    {
+
+  public void init(int localWebservicePort, String zkAddress) {
+    if (!_initialized && !_shutdownFlag) {
       LOG.error("Initializing with port " + localWebservicePort + " zkAddress: " + zkAddress);
       _localWebservicePort = localWebservicePort;
       ZkClient zkClient = new ZkClient(zkAddress);
@@ -159,118 +141,98 @@ public class ZKPropertyTransferServer
       _accessor = new ZkBaseDataAccessor<ZNRecord>(zkClient);
       _zkAddress = zkAddress;
       startServer();
-    }
-    else
-    {
-      LOG.error("Already initialized with port " + _localWebservicePort + " shutdownFlag: " + _shutdownFlag);
+    } else {
+      LOG.error("Already initialized with port " + _localWebservicePort + " shutdownFlag: "
+          + _shutdownFlag);
     }
   }
-  
-  public String getWebserviceUrl()
-  {
-    if(!_initialized || _shutdownFlag)
-    {
-      LOG.debug("inited:" + _initialized + " shutdownFlag:"+_shutdownFlag+" , return");
+
+  public String getWebserviceUrl() {
+    if (!_initialized || _shutdownFlag) {
+      LOG.debug("inited:" + _initialized + " shutdownFlag:" + _shutdownFlag + " , return");
       return null;
     }
     return _webserviceUrl;
   }
-  
-  /** Add an ZNRecordUpdate to the change queue. 
-   *  Called by the webservice front-end.
-   *
+
+  /**
+   * Add an ZNRecordUpdate to the change queue.
+   * Called by the webservice front-end.
    */
-  void enqueueData(ZNRecordUpdate e)
-  {
-    if(!_initialized || _shutdownFlag)
-    {
-      LOG.error("zkDataTransferServer inited:" + _initialized 
-          + " shutdownFlag:"+_shutdownFlag+" , return");
+  void enqueueData(ZNRecordUpdate e) {
+    if (!_initialized || _shutdownFlag) {
+      LOG.error("zkDataTransferServer inited:" + _initialized + " shutdownFlag:" + _shutdownFlag
+          + " , return");
       return;
     }
     // Do local merge if receive multiple update on the same path
-    synchronized(_dataBufferRef)
-    {
+    synchronized (_dataBufferRef) {
       e.getRecord().setSimpleField(SERVER, _webserviceUrl);
-      if(_dataBufferRef.get().containsKey(e.getPath()))
-      {
+      if (_dataBufferRef.get().containsKey(e.getPath())) {
         ZNRecord oldVal = _dataBufferRef.get().get(e.getPath()).getRecord();
         oldVal = e.getZNRecordUpdater().update(oldVal);
         _dataBufferRef.get().get(e.getPath())._record = oldVal;
-      }
-      else
-      {
+      } else {
         _dataBufferRef.get().put(e.getPath(), e);
       }
     }
-    if(_dataBufferRef.get().size() > MAX_UPDATE_LIMIT)
-    {
+    if (_dataBufferRef.get().size() > MAX_UPDATE_LIMIT) {
       sendData();
     }
   }
-  
-  void startServer()
-  {
-    LOG.info("zkDataTransferServer starting on Port " + _localWebservicePort + " zkAddress " + _zkAddress);
-    
+
+  void startServer() {
+    LOG.info("zkDataTransferServer starting on Port " + _localWebservicePort + " zkAddress "
+        + _zkAddress);
+
     _component = new Component();
-    
+
     _component.getServers().add(Protocol.HTTP, _localWebservicePort);
     Context applicationContext = _component.getContext().createChildContext();
     applicationContext.getAttributes().put(SERVER, this);
     applicationContext.getAttributes().put(PORT, "" + _localWebservicePort);
-    ZkPropertyTransferApplication application = new ZkPropertyTransferApplication(
-        applicationContext);
+    ZkPropertyTransferApplication application =
+        new ZkPropertyTransferApplication(applicationContext);
     // Attach the application to the component and start it
     _component.getDefaultHost().attach(application);
     _timer = new Timer(true);
-    _timer.schedule(new ZKPropertyTransferTask(), PERIOD , PERIOD);
-    
-    try
-    {
-      _webserviceUrl 
-        = "http://" + InetAddress.getLocalHost().getCanonicalHostName() + ":" + _localWebservicePort 
-              + "/" + RESTRESOURCENAME;
+    _timer.schedule(new ZKPropertyTransferTask(), PERIOD, PERIOD);
+
+    try {
+      _webserviceUrl =
+          "http://" + InetAddress.getLocalHost().getCanonicalHostName() + ":"
+              + _localWebservicePort + "/" + RESTRESOURCENAME;
       _component.start();
       _initialized = true;
-    }
-    catch (Exception e)
-    {
+    } catch (Exception e) {
       LOG.error("", e);
     }
-    LOG.info("zkDataTransferServer started on Port " + _localWebservicePort + " zkAddress " + _zkAddress);
+    LOG.info("zkDataTransferServer started on Port " + _localWebservicePort + " zkAddress "
+        + _zkAddress);
   }
-  
-  public void shutdown()
-  {
-    if(_shutdownFlag)
-    {
+
+  public void shutdown() {
+    if (_shutdownFlag) {
       LOG.error("ZKPropertyTransferServer already has been shutdown...");
       return;
     }
-    LOG.info("zkDataTransferServer shuting down on Port " + _localWebservicePort + " zkAddress " + _zkAddress);
-    if(_timer != null)
-    {
+    LOG.info("zkDataTransferServer shuting down on Port " + _localWebservicePort + " zkAddress "
+        + _zkAddress);
+    if (_timer != null) {
       _timer.cancel();
     }
-    if(_component != null)
-    {
-      try
-      {
+    if (_component != null) {
+      try {
         _component.stop();
-      }
-      catch (Exception e)
-      {
+      } catch (Exception e) {
         LOG.error("", e);
       }
     }
     _shutdownFlag = true;
   }
 
-  public void reset()
-  {
-    if(_shutdownFlag == true)
-    {
+  public void reset() {
+    if (_shutdownFlag == true) {
       _shutdownFlag = false;
       _initialized = false;
       _component = null;


Mime
View raw message