helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject [38/51] [partial] [HELIX-198] Unify helix code style, rb=13710
Date Wed, 21 Aug 2013 20:43:51 GMT
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index b30cee6..b59976d 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -72,28 +72,27 @@ import org.apache.zookeeper.Watcher.Event.EventType;
 public class CallbackHandler implements IZkChildListener, IZkDataListener
 
 {
-  private static Logger           logger = Logger.getLogger(CallbackHandler.class);
+  private static Logger logger = Logger.getLogger(CallbackHandler.class);
 
   /**
    * define the next possible notification types
    */
   private static Map<Type, List<Type>> nextNotificationType = new HashMap<Type, List<Type>>();
-  static
-  {
+  static {
     nextNotificationType.put(Type.INIT, Arrays.asList(Type.CALLBACK, Type.FINALIZE));
     nextNotificationType.put(Type.CALLBACK, Arrays.asList(Type.CALLBACK, Type.FINALIZE));
     nextNotificationType.put(Type.FINALIZE, Arrays.asList(Type.INIT));
   }
 
-  private final String            _path;
-  private final Object            _listener;
-  private final EventType[]       _eventTypes;
+  private final String _path;
+  private final Object _listener;
+  private final EventType[] _eventTypes;
   private final HelixDataAccessor _accessor;
-  private final ChangeType        _changeType;
-  private final ZkClient          _zkClient;
-  private final AtomicLong        _lastNotificationTimeStamp;
-  private final HelixManager      _manager;
-  private final PropertyKey 	  _propertyKey;
+  private final ChangeType _changeType;
+  private final ZkClient _zkClient;
+  private final AtomicLong _lastNotificationTimeStamp;
+  private final HelixManager _manager;
+  private final PropertyKey _propertyKey;
 
   /**
    * maintain the expected notification types
@@ -101,13 +100,8 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
    */
   private List<NotificationContext.Type> _expectTypes = nextNotificationType.get(Type.FINALIZE);
 
-  public CallbackHandler(HelixManager manager,
-                         ZkClient client,
-                         PropertyKey propertyKey,
-                         Object listener,
-                         EventType[] eventTypes,
-                         ChangeType changeType)
-  {
+  public CallbackHandler(HelixManager manager, ZkClient client, PropertyKey propertyKey,
+      Object listener, EventType[] eventTypes, ChangeType changeType) {
     if (listener == null) {
       throw new HelixException("listener could not be null");
     }
@@ -124,363 +118,277 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
     init();
   }
 
-  public Object getListener()
-  {
+  public Object getListener() {
     return _listener;
   }
 
-  public String getPath()
-  {
+  public String getPath() {
     return _path;
   }
 
-  public void invoke(NotificationContext changeContext) throws Exception
-  {
+  public void invoke(NotificationContext changeContext) throws Exception {
     // This allows the listener to work with one change at a time
-    synchronized (_manager)
-    {
+    synchronized (_manager) {
       Type type = changeContext.getType();
       if (!_expectTypes.contains(type)) {
         logger.warn("Skip processing callbacks for listener: " + _listener + ", path: " + _path
-                    + ", expected types: " + _expectTypes + " but was " + type);
+            + ", expected types: " + _expectTypes + " but was " + type);
         return;
       }
       _expectTypes = nextNotificationType.get(type);
 
       // Builder keyBuilder = _accessor.keyBuilder();
       long start = System.currentTimeMillis();
-      if (logger.isInfoEnabled())
-      {
-        logger.info(Thread.currentThread().getId() + " START:INVOKE "
-            + _path + " listener:" + _listener.getClass().getCanonicalName());
+      if (logger.isInfoEnabled()) {
+        logger.info(Thread.currentThread().getId() + " START:INVOKE " + _path + " listener:"
+            + _listener.getClass().getCanonicalName());
       }
 
-      if (_changeType == IDEAL_STATE)
-      {
+      if (_changeType == IDEAL_STATE) {
 
-        IdealStateChangeListener idealStateChangeListener =
-            (IdealStateChangeListener) _listener;
+        IdealStateChangeListener idealStateChangeListener = (IdealStateChangeListener) _listener;
         subscribeForChanges(changeContext, _path, true, true);
         List<IdealState> idealStates = _accessor.getChildValues(_propertyKey);
 
         idealStateChangeListener.onIdealStateChange(idealStates, changeContext);
 
-      }
-      else if (_changeType == ChangeType.INSTANCE_CONFIG)
-      {
+      } else if (_changeType == ChangeType.INSTANCE_CONFIG) {
         subscribeForChanges(changeContext, _path, true, true);
-      	if (_listener instanceof ConfigChangeListener)
-      	{
-      		ConfigChangeListener configChangeListener = (ConfigChangeListener) _listener;
-      		List<InstanceConfig> configs = _accessor.getChildValues(_propertyKey);
-      		configChangeListener.onConfigChange(configs, changeContext);
-      	} else if (_listener instanceof InstanceConfigChangeListener)
-      	{
-      		InstanceConfigChangeListener listener = (InstanceConfigChangeListener) _listener;
-      		List<InstanceConfig> configs = _accessor.getChildValues(_propertyKey);
-      		listener.onInstanceConfigChange(configs, changeContext);
-      	}
-      }
-      else if (_changeType == CONFIG)
-      {
-            subscribeForChanges(changeContext, _path, true, true);
-      		ScopedConfigChangeListener listener = (ScopedConfigChangeListener) _listener;
-      		List<HelixProperty> configs = _accessor.getChildValues(_propertyKey);
-      		listener.onConfigChange(configs, changeContext);
-      }
-      else if (_changeType == LIVE_INSTANCE)
-      {
+        if (_listener instanceof ConfigChangeListener) {
+          ConfigChangeListener configChangeListener = (ConfigChangeListener) _listener;
+          List<InstanceConfig> configs = _accessor.getChildValues(_propertyKey);
+          configChangeListener.onConfigChange(configs, changeContext);
+        } else if (_listener instanceof InstanceConfigChangeListener) {
+          InstanceConfigChangeListener listener = (InstanceConfigChangeListener) _listener;
+          List<InstanceConfig> configs = _accessor.getChildValues(_propertyKey);
+          listener.onInstanceConfigChange(configs, changeContext);
+        }
+      } else if (_changeType == CONFIG) {
+        subscribeForChanges(changeContext, _path, true, true);
+        ScopedConfigChangeListener listener = (ScopedConfigChangeListener) _listener;
+        List<HelixProperty> configs = _accessor.getChildValues(_propertyKey);
+        listener.onConfigChange(configs, changeContext);
+      } else if (_changeType == LIVE_INSTANCE) {
         LiveInstanceChangeListener liveInstanceChangeListener =
             (LiveInstanceChangeListener) _listener;
         subscribeForChanges(changeContext, _path, true, true);
-        List<LiveInstance> liveInstances =
-            _accessor.getChildValues(_propertyKey);
+        List<LiveInstance> liveInstances = _accessor.getChildValues(_propertyKey);
 
         liveInstanceChangeListener.onLiveInstanceChange(liveInstances, changeContext);
 
-      }
-      else if (_changeType == CURRENT_STATE)
-      {
-        CurrentStateChangeListener currentStateChangeListener = (CurrentStateChangeListener) _listener;
+      } else if (_changeType == CURRENT_STATE) {
+        CurrentStateChangeListener currentStateChangeListener =
+            (CurrentStateChangeListener) _listener;
         subscribeForChanges(changeContext, _path, true, true);
         String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
 
         List<CurrentState> currentStates = _accessor.getChildValues(_propertyKey);
 
-        currentStateChangeListener.onStateChange(instanceName,
-                                                 currentStates,
-                                                 changeContext);
+        currentStateChangeListener.onStateChange(instanceName, currentStates, changeContext);
 
-      }
-      else if (_changeType == MESSAGE)
-      {
+      } else if (_changeType == MESSAGE) {
         MessageListener messageListener = (MessageListener) _listener;
         subscribeForChanges(changeContext, _path, true, false);
         String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
-        List<Message> messages =
-            _accessor.getChildValues(_propertyKey);
+        List<Message> messages = _accessor.getChildValues(_propertyKey);
 
         messageListener.onMessage(instanceName, messages, changeContext);
 
-      }
-      else if (_changeType == MESSAGES_CONTROLLER)
-      {
+      } else if (_changeType == MESSAGES_CONTROLLER) {
         MessageListener messageListener = (MessageListener) _listener;
         subscribeForChanges(changeContext, _path, true, false);
-        List<Message> messages =
-            _accessor.getChildValues(_propertyKey);
+        List<Message> messages = _accessor.getChildValues(_propertyKey);
 
         messageListener.onMessage(_manager.getInstanceName(), messages, changeContext);
 
-      }
-      else if (_changeType == EXTERNAL_VIEW)
-      {
-        ExternalViewChangeListener externalViewListener =
-            (ExternalViewChangeListener) _listener;
+      } else if (_changeType == EXTERNAL_VIEW) {
+        ExternalViewChangeListener externalViewListener = (ExternalViewChangeListener) _listener;
         subscribeForChanges(changeContext, _path, true, true);
-        List<ExternalView> externalViewList =
-            _accessor.getChildValues(_propertyKey);
+        List<ExternalView> externalViewList = _accessor.getChildValues(_propertyKey);
 
         externalViewListener.onExternalViewChange(externalViewList, changeContext);
-      }
-      else if (_changeType == ChangeType.CONTROLLER)
-      {
-        ControllerChangeListener controllerChangelistener =
-            (ControllerChangeListener) _listener;
+      } else if (_changeType == ChangeType.CONTROLLER) {
+        ControllerChangeListener controllerChangelistener = (ControllerChangeListener) _listener;
         subscribeForChanges(changeContext, _path, true, false);
         controllerChangelistener.onControllerChange(changeContext);
-      }
-      else if (_changeType == ChangeType.HEALTH)
-      {
-        HealthStateChangeListener healthStateChangeListener =
-            (HealthStateChangeListener) _listener;
+      } else if (_changeType == ChangeType.HEALTH) {
+        HealthStateChangeListener healthStateChangeListener = (HealthStateChangeListener) _listener;
         subscribeForChanges(changeContext, _path, true, true); // TODO: figure out
         // settings here
         String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
 
-        List<HealthStat> healthReportList =
-            _accessor.getChildValues(_propertyKey);
+        List<HealthStat> healthReportList = _accessor.getChildValues(_propertyKey);
 
-        healthStateChangeListener.onHealthChange(instanceName,
-                                                 healthReportList,
-                                                 changeContext);
+        healthStateChangeListener.onHealthChange(instanceName, healthReportList, changeContext);
       }
 
       long end = System.currentTimeMillis();
-      if (logger.isInfoEnabled())
-      {
-        logger.info(Thread.currentThread().getId() + " END:INVOKE " + _path
-            + " listener:" + _listener.getClass().getCanonicalName() + " Took: "
-            + (end - start) +"ms");
+      if (logger.isInfoEnabled()) {
+        logger.info(Thread.currentThread().getId() + " END:INVOKE " + _path + " listener:"
+            + _listener.getClass().getCanonicalName() + " Took: " + (end - start) + "ms");
       }
     }
   }
 
-  private void subscribeChildChange(String path, NotificationContext context)
-  {
-	  NotificationContext.Type type = context.getType();
-      if (type == NotificationContext.Type.INIT || type == NotificationContext.Type.CALLBACK)
-      {
-        logger.info(_manager.getInstanceName() + " subscribes child-change. path: "
-        		+ path + ", listener: " + _listener);
-        _zkClient.subscribeChildChanges(path, this);
-      }
-      else if (type == NotificationContext.Type.FINALIZE)
-      {
-        logger.info(_manager.getInstanceName() + " unsubscribe child-change. path: "
-        		+ path + ", listener: " + _listener);
+  private void subscribeChildChange(String path, NotificationContext context) {
+    NotificationContext.Type type = context.getType();
+    if (type == NotificationContext.Type.INIT || type == NotificationContext.Type.CALLBACK) {
+      logger.info(_manager.getInstanceName() + " subscribes child-change. path: " + path
+          + ", listener: " + _listener);
+      _zkClient.subscribeChildChanges(path, this);
+    } else if (type == NotificationContext.Type.FINALIZE) {
+      logger.info(_manager.getInstanceName() + " unsubscribe child-change. path: " + path
+          + ", listener: " + _listener);
+
+      _zkClient.unsubscribeChildChanges(path, this);
+    }
+  }
 
-        _zkClient.unsubscribeChildChanges(path, this);
+  private void subscribeDataChange(String path, NotificationContext context) {
+    NotificationContext.Type type = context.getType();
+    if (type == NotificationContext.Type.INIT || type == NotificationContext.Type.CALLBACK) {
+      if (logger.isDebugEnabled()) {
+        logger.debug(_manager.getInstanceName() + " subscribe data-change. path: " + path
+            + ", listener: " + _listener);
       }
-  }
+      _zkClient.subscribeDataChanges(path, this);
 
-  private void subscribeDataChange(String path, NotificationContext context)
-  {
-    	NotificationContext.Type type = context.getType();
-        if (type == NotificationContext.Type.INIT
-            || type == NotificationContext.Type.CALLBACK)
-        {
-          if (logger.isDebugEnabled())
-          {
-            logger.debug(_manager.getInstanceName() + " subscribe data-change. path: "
-            		+ path + ", listener: " + _listener);
-          }
-          _zkClient.subscribeDataChanges(path, this);
+    } else if (type == NotificationContext.Type.FINALIZE) {
+      logger.info(_manager.getInstanceName() + " unsubscribe data-change. path: " + path
+          + ", listener: " + _listener);
 
-        }
-        else if (type == NotificationContext.Type.FINALIZE)
-        {
-          logger.info(_manager.getInstanceName() + " unsubscribe data-change. path: "
-        		  + path + ", listener: " + _listener);
-
-          _zkClient.unsubscribeDataChanges(path, this);
-        }
+      _zkClient.unsubscribeDataChanges(path, this);
+    }
   }
 
   // TODO watchParent is always true. consider remove it
-  private void subscribeForChanges(NotificationContext context,
-                                   String path,
-                                   boolean watchParent,
-                                   boolean watchChild)
-  {
-    if (watchParent)
-    {
-    	subscribeChildChange(path, context);
+  private void subscribeForChanges(NotificationContext context, String path, boolean watchParent,
+      boolean watchChild) {
+    if (watchParent) {
+      subscribeChildChange(path, context);
     }
 
-    if (watchChild)
-    {
-      try
-      {
-    	switch(_changeType)
-    	{
+    if (watchChild) {
+      try {
+        switch (_changeType) {
         case CURRENT_STATE:
         case IDEAL_STATE:
-        case EXTERNAL_VIEW:
-        {
-            // check if bucketized
-        	BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
-        	List<ZNRecord> records = baseAccessor.getChildren(path, null, 0);
-        	for (ZNRecord record : records)
-        	{
-                HelixProperty property = new HelixProperty(record);
-            	String childPath = path + "/" + record.getId();
-
-                int bucketSize = property.getBucketSize();
-                if (bucketSize > 0)
-                {
-                  // subscribe both data-change and child-change on bucketized parent node
-                  // data-change gives a delete-callback which is used to remove watch
-                  subscribeChildChange(childPath, context);
-                  subscribeDataChange(childPath, context);
-
-                  // subscribe data-change on bucketized child
-                  List<String> bucketizedChildNames = _zkClient.getChildren(childPath);
-                  if (bucketizedChildNames != null)
-                  {
-                    for (String bucketizedChildName : bucketizedChildNames)
-                    {
-                       String bucketizedChildPath = childPath + "/" + bucketizedChildName;
-                       subscribeDataChange(bucketizedChildPath, context);
-                    }
-                  }
-                } else
-                {
-                    subscribeDataChange(childPath, context);
+        case EXTERNAL_VIEW: {
+          // check if bucketized
+          BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+          List<ZNRecord> records = baseAccessor.getChildren(path, null, 0);
+          for (ZNRecord record : records) {
+            HelixProperty property = new HelixProperty(record);
+            String childPath = path + "/" + record.getId();
+
+            int bucketSize = property.getBucketSize();
+            if (bucketSize > 0) {
+              // subscribe both data-change and child-change on bucketized parent node
+              // data-change gives a delete-callback which is used to remove watch
+              subscribeChildChange(childPath, context);
+              subscribeDataChange(childPath, context);
+
+              // subscribe data-change on bucketized child
+              List<String> bucketizedChildNames = _zkClient.getChildren(childPath);
+              if (bucketizedChildNames != null) {
+                for (String bucketizedChildName : bucketizedChildNames) {
+                  String bucketizedChildPath = childPath + "/" + bucketizedChildName;
+                  subscribeDataChange(bucketizedChildPath, context);
                 }
-        	}
-        	break;
-        }
-        default:
-        {
-            List<String> childNames = _zkClient.getChildren(path);
-            if (childNames != null)
-            {
-              for (String childName : childNames)
-              {
-                 String childPath = path + "/" + childName;
-                 subscribeDataChange(childPath, context);
               }
+            } else {
+              subscribeDataChange(childPath, context);
             }
-        	break;
+          }
+          break;
         }
-    	}
-      }
-      catch (ZkNoNodeException e)
-      {
-        logger.warn("fail to subscribe child/data change. path: " + path
-        		+ ", listener: " + _listener, e);
+        default: {
+          List<String> childNames = _zkClient.getChildren(path);
+          if (childNames != null) {
+            for (String childName : childNames) {
+              String childPath = path + "/" + childName;
+              subscribeDataChange(childPath, context);
+            }
+          }
+          break;
+        }
+        }
+      } catch (ZkNoNodeException e) {
+        logger.warn("fail to subscribe child/data change. path: " + path + ", listener: "
+            + _listener, e);
       }
     }
 
   }
 
-  public EventType[] getEventTypes()
-  {
+  public EventType[] getEventTypes() {
     return _eventTypes;
   }
 
   /**
    * Invoke the listener so that it sets up the initial values from the zookeeper if any
    * exists
-   *
    */
-  public void init()
-  {
+  public void init() {
     updateNotificationTime(System.nanoTime());
-    try
-    {
+    try {
       NotificationContext changeContext = new NotificationContext(_manager);
       changeContext.setType(NotificationContext.Type.INIT);
       invoke(changeContext);
-    }
-    catch (Exception e)
-    {
-      String msg = "Exception while invoking init callback for listener:"+ _listener;
+    } catch (Exception e) {
+      String msg = "Exception while invoking init callback for listener:" + _listener;
       ZKExceptionHandler.getInstance().handle(msg, e);
     }
   }
 
   @Override
-  public void handleDataChange(String dataPath, Object data)
-  {
-    try
-    {
+  public void handleDataChange(String dataPath, Object data) {
+    try {
       updateNotificationTime(System.nanoTime());
-      if (dataPath != null && dataPath.startsWith(_path))
-      {
+      if (dataPath != null && dataPath.startsWith(_path)) {
         NotificationContext changeContext = new NotificationContext(_manager);
         changeContext.setType(NotificationContext.Type.CALLBACK);
         invoke(changeContext);
       }
-    }
-    catch (Exception e)
-    {
-      String msg = "exception in handling data-change. path: " + dataPath
-    		  + ", listener: " + _listener;
+    } catch (Exception e) {
+      String msg =
+          "exception in handling data-change. path: " + dataPath + ", listener: " + _listener;
       ZKExceptionHandler.getInstance().handle(msg, e);
     }
   }
 
   @Override
-  public void handleDataDeleted(String dataPath)
-  {
-    try
-    {
+  public void handleDataDeleted(String dataPath) {
+    try {
       updateNotificationTime(System.nanoTime());
-      if (dataPath != null && dataPath.startsWith(_path))
-      {
-          logger.info(_manager.getInstanceName() + " unsubscribe data-change. path: "
-        		  + dataPath + ", listener: " + _listener);
-          _zkClient.unsubscribeDataChanges(dataPath, this);
-
-          // only needed for bucketized parent, but OK if we don't have child-change
-          //  watch on the bucketized parent path
-          logger.info(_manager.getInstanceName() + " unsubscribe child-change. path: "
-        		  + dataPath + ", listener: " + _listener);
-          _zkClient.unsubscribeChildChanges(dataPath, this);
-          // No need to invoke() since this event will handled by child-change on parent-node
-//        NotificationContext changeContext = new NotificationContext(_manager);
-//        changeContext.setType(NotificationContext.Type.CALLBACK);
-// 		  invoke(changeContext);
+      if (dataPath != null && dataPath.startsWith(_path)) {
+        logger.info(_manager.getInstanceName() + " unsubscribe data-change. path: " + dataPath
+            + ", listener: " + _listener);
+        _zkClient.unsubscribeDataChanges(dataPath, this);
+
+        // only needed for bucketized parent, but OK if we don't have child-change
+        // watch on the bucketized parent path
+        logger.info(_manager.getInstanceName() + " unsubscribe child-change. path: " + dataPath
+            + ", listener: " + _listener);
+        _zkClient.unsubscribeChildChanges(dataPath, this);
+        // No need to invoke() since this event will handled by child-change on parent-node
+        // NotificationContext changeContext = new NotificationContext(_manager);
+        // changeContext.setType(NotificationContext.Type.CALLBACK);
+        // invoke(changeContext);
       }
-    }
-    catch (Exception e)
-    {
-      String msg = "exception in handling data-delete-change. path: " + dataPath
-          + ", listener: " + _listener;
+    } catch (Exception e) {
+      String msg =
+          "exception in handling data-delete-change. path: " + dataPath + ", listener: "
+              + _listener;
       ZKExceptionHandler.getInstance().handle(msg, e);
     }
   }
 
   @Override
-  public void handleChildChange(String parentPath, List<String> currentChilds)
-  {
-    try
-    {
+  public void handleChildChange(String parentPath, List<String> currentChilds) {
+    try {
       updateNotificationTime(System.nanoTime());
-      if (parentPath != null && parentPath.startsWith(_path))
-      {
+      if (parentPath != null && parentPath.startsWith(_path)) {
         NotificationContext changeContext = new NotificationContext(_manager);
 
         if (currentChilds == null) {
@@ -495,46 +403,35 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
         }
         invoke(changeContext);
       }
-    }
-    catch (Exception e)
-    {
-      String msg = "exception in handling child-change. instance: " + _manager.getInstanceName()
-    		  + ", parentPath: " + parentPath + ", listener: " + _listener;
+    } catch (Exception e) {
+      String msg =
+          "exception in handling child-change. instance: " + _manager.getInstanceName()
+              + ", parentPath: " + parentPath + ", listener: " + _listener;
       ZKExceptionHandler.getInstance().handle(msg, e);
     }
   }
 
   /**
    * Invoke the listener for the last time so that the listener could clean up resources
-   *
    */
-  public void reset()
-  {
-    try
-    {
+  public void reset() {
+    try {
       NotificationContext changeContext = new NotificationContext(_manager);
       changeContext.setType(NotificationContext.Type.FINALIZE);
       invoke(changeContext);
-    }
-    catch (Exception e)
-    {
-      String msg = "Exception while resetting the listener:"+_listener;
+    } catch (Exception e) {
+      String msg = "Exception while resetting the listener:" + _listener;
       ZKExceptionHandler.getInstance().handle(msg, e);
     }
   }
 
-  private void updateNotificationTime(long nanoTime)
-  {
+  private void updateNotificationTime(long nanoTime) {
     long l = _lastNotificationTimeStamp.get();
-    while (nanoTime > l)
-    {
+    while (nanoTime > l) {
       boolean b = _lastNotificationTimeStamp.compareAndSet(l, nanoTime);
-      if (b)
-      {
+      if (b) {
         break;
-      }
-      else
-      {
+      } else {
         l = _lastNotificationTimeStamp.get();
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/manager/zk/ChainedPathZkSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ChainedPathZkSerializer.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ChainedPathZkSerializer.java
index d5bcf4b..6975ea1 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ChainedPathZkSerializer.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ChainedPathZkSerializer.java
@@ -26,16 +26,13 @@ import java.util.List;
 import org.I0Itec.zkclient.exception.ZkMarshallingError;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
 
-public class ChainedPathZkSerializer implements PathBasedZkSerializer
-{
+public class ChainedPathZkSerializer implements PathBasedZkSerializer {
 
-  public static class Builder
-  {
+  public static class Builder {
     private final ZkSerializer _defaultSerializer;
     private List<ChainItem> _items = new ArrayList<ChainItem>();
 
-    private Builder(ZkSerializer defaultSerializer)
-    {
+    private Builder(ZkSerializer defaultSerializer) {
       _defaultSerializer = defaultSerializer;
     }
 
@@ -44,12 +41,11 @@ public class ChainedPathZkSerializer implements PathBasedZkSerializer
      * The most specific path will triumph over a more generic (shorter)
      * one regardless of the ordering of the calls.
      */
-    public Builder serialize(String path, ZkSerializer withSerializer)
-    {
+    public Builder serialize(String path, ZkSerializer withSerializer) {
       _items.add(new ChainItem(normalize(path), withSerializer));
       return this;
     }
-    
+
     /**
      * Builds the serializer with the given strategies and default serializer.
      */
@@ -57,21 +53,19 @@ public class ChainedPathZkSerializer implements PathBasedZkSerializer
       return new ChainedPathZkSerializer(_defaultSerializer, _items);
     }
   }
-  
+
   /**
    * Create a builder that will use the given serializer by default
    * if no other strategy is given to solve the path in question.
    */
-  public static Builder builder(ZkSerializer defaultSerializer) 
-  {
+  public static Builder builder(ZkSerializer defaultSerializer) {
     return new Builder(defaultSerializer);
   }
 
   private final List<ChainItem> _items;
   private final ZkSerializer _defaultSerializer;
 
-  private ChainedPathZkSerializer(ZkSerializer defaultSerializer, List<ChainItem> items)
-  {
+  private ChainedPathZkSerializer(ZkSerializer defaultSerializer, List<ChainItem> items) {
     _items = items;
     // sort by longest paths first
     // if two items would match one would be prefix of the other
@@ -81,47 +75,37 @@ public class ChainedPathZkSerializer implements PathBasedZkSerializer
   }
 
   @Override
-  public byte[] serialize(Object data, String path) throws ZkMarshallingError
-  {
-    for (ChainItem item : _items)
-    {
-      if (item.matches(path)) return item._serializer.serialize(data);
+  public byte[] serialize(Object data, String path) throws ZkMarshallingError {
+    for (ChainItem item : _items) {
+      if (item.matches(path))
+        return item._serializer.serialize(data);
     }
     return _defaultSerializer.serialize(data);
   }
 
   @Override
-  public Object deserialize(byte[] bytes, String path)
-      throws ZkMarshallingError
-  {
-    for (ChainItem item : _items)
-    {
-      if (item.matches(path)) return item._serializer.deserialize(bytes);
+  public Object deserialize(byte[] bytes, String path) throws ZkMarshallingError {
+    for (ChainItem item : _items) {
+      if (item.matches(path))
+        return item._serializer.deserialize(bytes);
     }
     return _defaultSerializer.deserialize(bytes);
   }
 
-  private static class ChainItem implements Comparable<ChainItem>
-  {
+  private static class ChainItem implements Comparable<ChainItem> {
     final String _path;
     final ZkSerializer _serializer;
 
-    ChainItem(String path, ZkSerializer serializer)
-    {
+    ChainItem(String path, ZkSerializer serializer) {
       _path = path;
       _serializer = serializer;
     }
 
-    boolean matches(String path)
-    {
-      if (_path.equals(path))
-      {
+    boolean matches(String path) {
+      if (_path.equals(path)) {
         return true;
-      } 
-      else if (path.length() > _path.length())
-      {
-        if (path.startsWith(_path) && path.charAt(_path.length()) == '/') 
-        {
+      } else if (path.length() > _path.length()) {
+        if (path.startsWith(_path) && path.charAt(_path.length()) == '/') {
           return true;
         }
       }
@@ -129,12 +113,11 @@ public class ChainedPathZkSerializer implements PathBasedZkSerializer
     }
 
     @Override
-    public int compareTo(ChainItem o)
-    {
+    public int compareTo(ChainItem o) {
       return o._path.length() - _path.length();
     }
   }
-  
+
   private static String normalize(String path) {
     if (!path.startsWith("/")) {
       // ensure leading slash
@@ -142,7 +125,7 @@ public class ChainedPathZkSerializer implements PathBasedZkSerializer
     }
     if (path.endsWith("/")) {
       // remove trailing slash
-      path = path.substring(0, path.length()-1);
+      path = path.substring(0, path.length() - 1);
     }
     return path;
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java
index 4542801..1ed6dea 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java
@@ -49,7 +49,6 @@ public class ControllerManager extends AbstractManager {
 
   /**
    * status dump timer-task
-   *
    */
   static class StatusDumpTask extends HelixTimerTask {
     Timer _timer = null;
@@ -67,23 +66,18 @@ public class ControllerManager extends AbstractManager {
       long period = 120 * 60 * 1000;
       int timeThresholdNoChange = 180 * 60 * 1000;
 
-      if (_timer == null)
-      {
+      if (_timer == null) {
         LOG.info("Start StatusDumpTask");
         _timer = new Timer("StatusDumpTimerTask", true);
-        _timer.scheduleAtFixedRate(new ZKPathDataDumpTask(helixController,
-                                                          zkclient,
-                                                          timeThresholdNoChange),
-                                   initialDelay,
-                                   period);
+        _timer.scheduleAtFixedRate(new ZKPathDataDumpTask(helixController, zkclient,
+            timeThresholdNoChange), initialDelay, period);
       }
 
     }
 
     @Override
     public void stop() {
-      if (_timer != null)
-      {
+      if (_timer != null) {
         LOG.info("Stop StatusDumpTask");
         _timer.cancel();
         _timer = null;
@@ -124,13 +118,11 @@ public class ControllerManager extends AbstractManager {
     if (_leaderElectionHandler != null) {
       _leaderElectionHandler.init();
     } else {
-      _leaderElectionHandler = new CallbackHandler(this,
-                                                   _zkclient,
-                                                   _keyBuilder.controller(),
-                                  new DistributedLeaderElection(this, _controller),
-                                  new EventType[] { EventType.NodeChildrenChanged,
-                                      EventType.NodeDeleted, EventType.NodeCreated },
-                                  ChangeType.CONTROLLER);
+      _leaderElectionHandler =
+          new CallbackHandler(this, _zkclient, _keyBuilder.controller(),
+              new DistributedLeaderElection(this, _controller), new EventType[] {
+                  EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
+              }, ChangeType.CONTROLLER);
     }
 
     /**
@@ -143,28 +135,24 @@ public class ControllerManager extends AbstractManager {
 
   @Override
   void doDisconnect() {
-    if (_leaderElectionHandler != null)
-    {
+    if (_leaderElectionHandler != null) {
       _leaderElectionHandler.reset();
     }
   }
 
   @Override
   public boolean isLeader() {
-    if (!isConnected())
-    {
+    if (!isConnected()) {
       return false;
     }
 
     try {
       LiveInstance leader = _dataAccessor.getProperty(_keyBuilder.controllerLeader());
-      if (leader != null)
-      {
+      if (leader != null) {
         String leaderName = leader.getInstanceName();
         String sessionId = leader.getSessionId();
-        if (leaderName != null && leaderName.equals(_instanceName)
-            && sessionId != null && sessionId.equals(_sessionId))
-        {
+        if (leaderName != null && leaderName.equals(_instanceName) && sessionId != null
+            && sessionId.equals(_sessionId)) {
           return true;
         }
       }
@@ -176,13 +164,11 @@ public class ControllerManager extends AbstractManager {
 
   /**
    * helix-controller uses a write-through cache for external-view
-   *
    */
   @Override
   BaseDataAccessor<ZNRecord> createBaseDataAccessor(ZkBaseDataAccessor<ZNRecord> baseDataAccessor) {
     String extViewPath = PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, _clusterName);
-    return new ZkCacheBaseDataAccessor<ZNRecord>(baseDataAccessor,
-                                                Arrays.asList(extViewPath));
+    return new ZkCacheBaseDataAccessor<ZNRecord>(baseDataAccessor, Arrays.asList(extViewPath));
 
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
index d724095..ff3a264 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
@@ -32,46 +32,41 @@ import org.apache.log4j.Logger;
 
 /**
  * helper class for controller manager
- *
  */
 public class ControllerManagerHelper {
   private static Logger LOG = Logger.getLogger(ControllerManagerHelper.class);
-  
+
   final AbstractManager _manager;
   final DefaultMessagingService _messagingService;
   final List<HelixTimerTask> _controllerTimerTasks;
 
   public ControllerManagerHelper(AbstractManager manager) {
     _manager = manager;
-    _messagingService = (DefaultMessagingService)manager.getMessagingService();
+    _messagingService = (DefaultMessagingService) manager.getMessagingService();
     _controllerTimerTasks = manager.getControllerHelixTimerTasks();
   }
 
-  public void addListenersToController(GenericHelixController controller)
-  {
-    try
-    {
+  public void addListenersToController(GenericHelixController controller) {
+    try {
       /**
-       *  setup controller message listener and register message handlers
+       * setup controller message listener and register message handlers
        */
       _manager.addControllerMessageListener(_messagingService.getExecutor());
       MessageHandlerFactory defaultControllerMsgHandlerFactory =
           new DefaultControllerMessageHandlerFactory();
-      _messagingService.getExecutor()
-                       .registerMessageHandlerFactory(defaultControllerMsgHandlerFactory.getMessageType(),
-                                                      defaultControllerMsgHandlerFactory);
+      _messagingService.getExecutor().registerMessageHandlerFactory(
+          defaultControllerMsgHandlerFactory.getMessageType(), defaultControllerMsgHandlerFactory);
       MessageHandlerFactory defaultSchedulerMsgHandlerFactory =
           new DefaultSchedulerMessageHandlerFactory(_manager);
-      _messagingService.getExecutor()
-                       .registerMessageHandlerFactory(defaultSchedulerMsgHandlerFactory.getMessageType(),
-                                                      defaultSchedulerMsgHandlerFactory);
+      _messagingService.getExecutor().registerMessageHandlerFactory(
+          defaultSchedulerMsgHandlerFactory.getMessageType(), defaultSchedulerMsgHandlerFactory);
       MessageHandlerFactory defaultParticipantErrorMessageHandlerFactory =
           new DefaultParticipantErrorMessageHandlerFactory(_manager);
-      _messagingService.getExecutor()
-                       .registerMessageHandlerFactory(defaultParticipantErrorMessageHandlerFactory.getMessageType(),
-       
-                           defaultParticipantErrorMessageHandlerFactory);
-      
+      _messagingService.getExecutor().registerMessageHandlerFactory(
+          defaultParticipantErrorMessageHandlerFactory.getMessageType(),
+
+          defaultParticipantErrorMessageHandlerFactory);
+
       /**
        * setup generic-controller
        */
@@ -81,16 +76,14 @@ public class ControllerManagerHelper {
       // no need for controller to listen on external-view
       // _manager.addExternalViewChangeListener(controller);
       _manager.addControllerListener(controller);
-    } catch (ZkInterruptedException e)
-    {
+    } catch (ZkInterruptedException e) {
       LOG.warn("zk connection is interrupted during HelixManagerMain.addListenersToController(). "
-              + e);
-    } catch (Exception e)
-    {
+          + e);
+    } catch (Exception e) {
       LOG.error("Error when creating HelixManagerContollerMonitor", e);
     }
   }
-  
+
   public void removeListenersFromController(GenericHelixController controller) {
     PropertyKey.Builder keyBuilder = new PropertyKey.Builder(_manager.getClusterName());
     /**
@@ -100,20 +93,19 @@ public class ControllerManagerHelper {
     _manager.removeListener(keyBuilder.liveInstances(), controller);
     _manager.removeListener(keyBuilder.idealStates(), controller);
     _manager.removeListener(keyBuilder.controller(), controller);
-    
+
     /**
      * reset controller message listener and unregister all message handlers
      */
     _manager.removeListener(keyBuilder.controllerMessages(), _messagingService.getExecutor());
   }
-  
-  
+
   public void startControllerTimerTasks() {
     for (HelixTimerTask task : _controllerTimerTasks) {
       task.start();
     }
   }
-  
+
   public void stopControllerTimerTasks() {
     for (HelixTimerTask task : _controllerTimerTasks) {
       task.stop();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/manager/zk/CurStateCarryOverUpdater.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CurStateCarryOverUpdater.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CurStateCarryOverUpdater.java
index 9931c22..b96de18 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CurStateCarryOverUpdater.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CurStateCarryOverUpdater.java
@@ -25,28 +25,26 @@ import org.apache.helix.model.CurrentState;
 
 /**
  * updater for carrying over last current states
- * 
  * @see HELIX-30: ZkHelixManager.carryOverPreviousCurrentState() should use a special merge logic
- * 
- * because carryOver() is performed after addLiveInstance(). it's possible that carryOver()
- * overwrites current-state updates performed by current session. so carryOver() should be 
- * performed only when current-state is empty for the partition
- *
+ *      because carryOver() is performed after addLiveInstance(). it's possible that carryOver()
+ *      overwrites current-state updates performed by current session. so carryOver() should be
+ *      performed only when current-state is empty for the partition
  */
 class CurStateCarryOverUpdater implements DataUpdater<ZNRecord> {
   final String _curSessionId;
   final String _initState;
   final CurrentState _lastCurState;
-  
+
   public CurStateCarryOverUpdater(String curSessionId, String initState, CurrentState lastCurState) {
     if (curSessionId == null || initState == null || lastCurState == null) {
-      throw new IllegalArgumentException("missing curSessionId|initState|lastCurState for carry-over");
+      throw new IllegalArgumentException(
+          "missing curSessionId|initState|lastCurState for carry-over");
     }
     _curSessionId = curSessionId;
     _initState = initState;
     _lastCurState = lastCurState;
   }
-  
+
   @Override
   public ZNRecord update(ZNRecord currentData) {
     CurrentState curState = null;
@@ -55,11 +53,10 @@ class CurStateCarryOverUpdater implements DataUpdater<ZNRecord> {
       // copy all simple fields settings and overwrite session-id to current session
       curState.getRecord().setSimpleFields(_lastCurState.getRecord().getSimpleFields());
       curState.setSessionId(_curSessionId);
-    } else
-    {
+    } else {
       curState = new CurrentState(currentData);
     }
-    
+
     for (String partitionName : _lastCurState.getPartitionStateMap().keySet()) {
       // carry-over only when current-state not exist
       if (curState.getState(partitionName) == null) {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultControllerMessageHandlerFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultControllerMessageHandlerFactory.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultControllerMessageHandlerFactory.java
index 435e11e..5f6d083 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultControllerMessageHandlerFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultControllerMessageHandlerFactory.java
@@ -1,6 +1,5 @@
 package org.apache.helix.manager.zk;
 
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -29,64 +28,52 @@ import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageType;
 import org.apache.log4j.Logger;
 
-
-public class DefaultControllerMessageHandlerFactory implements
-    MessageHandlerFactory
-{
+public class DefaultControllerMessageHandlerFactory implements MessageHandlerFactory {
   private static Logger _logger = Logger.getLogger(DefaultControllerMessageHandlerFactory.class);
+
   @Override
-  public MessageHandler createHandler(Message message,
-      NotificationContext context)
-  {
+  public MessageHandler createHandler(Message message, NotificationContext context) {
     String type = message.getMsgType();
-    
-    if(!type.equals(getMessageType()))
-    {
-      throw new HelixException("Unexpected msg type for message "+message.getMsgId()
-          +" type:" + message.getMsgType());
+
+    if (!type.equals(getMessageType())) {
+      throw new HelixException("Unexpected msg type for message " + message.getMsgId() + " type:"
+          + message.getMsgType());
     }
-    
+
     return new DefaultControllerMessageHandler(message, context);
   }
 
   @Override
-  public String getMessageType()
-  {
+  public String getMessageType() {
     return MessageType.CONTROLLER_MSG.toString();
   }
 
   @Override
-  public void reset()
-  {
+  public void reset() {
 
   }
-  
-  public static class DefaultControllerMessageHandler extends MessageHandler
-  {
-    public DefaultControllerMessageHandler(Message message,
-        NotificationContext context)
-    {
+
+  public static class DefaultControllerMessageHandler extends MessageHandler {
+    public DefaultControllerMessageHandler(Message message, NotificationContext context) {
       super(message, context);
     }
 
     @Override
-    public HelixTaskResult handleMessage() throws InterruptedException
-    {
+    public HelixTaskResult handleMessage() throws InterruptedException {
       String type = _message.getMsgType();
       HelixTaskResult result = new HelixTaskResult();
-      if(!type.equals(MessageType.CONTROLLER_MSG.toString()))
-      {
-        throw new HelixException("Unexpected msg type for message "+_message.getMsgId()
-            +" type:" + _message.getMsgType());
+      if (!type.equals(MessageType.CONTROLLER_MSG.toString())) {
+        throw new HelixException("Unexpected msg type for message " + _message.getMsgId()
+            + " type:" + _message.getMsgType());
       }
-      result.getTaskResultMap().put("ControllerResult", "msg "+ _message.getMsgId() + " from "+_message.getMsgSrc() + " processed");
+      result.getTaskResultMap().put("ControllerResult",
+          "msg " + _message.getMsgId() + " from " + _message.getMsgSrc() + " processed");
       result.setSuccess(true);
       return result;
     }
 
     @Override
-    public void onError(Exception e, ErrorCode code, ErrorType type)
-    {
+    public void onError(Exception e, ErrorCode code, ErrorType type) {
       _logger.error("Message handling pipeline get an exception. MsgId:" + _message.getMsgId(), e);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultParticipantErrorMessageHandlerFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultParticipantErrorMessageHandlerFactory.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultParticipantErrorMessageHandlerFactory.java
index e25ef4e..d2e56eb 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultParticipantErrorMessageHandlerFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultParticipantErrorMessageHandlerFactory.java
@@ -30,7 +30,6 @@ import org.apache.helix.messaging.handling.MessageHandlerFactory;
 import org.apache.helix.model.Message;
 import org.apache.log4j.Logger;
 
-
 /**
  * DefaultParticipantErrorMessageHandlerFactory works on controller side.
  * When the participant detects a critical error, it will send the PARTICIPANT_ERROR_REPORT
@@ -38,68 +37,59 @@ import org.apache.log4j.Logger;
  * disable the partition. The controller have a chance to do whatever make sense at that point,
  * and then disable the corresponding partition or the instance. More configs per resource will
  * be added to customize the controller behavior.
- * */
-public class DefaultParticipantErrorMessageHandlerFactory implements
-  MessageHandlerFactory
-{
-  public enum ActionOnError
-  {
-    DISABLE_PARTITION, DISABLE_RESOURCE, DISABLE_INSTANCE
+ */
+public class DefaultParticipantErrorMessageHandlerFactory implements MessageHandlerFactory {
+  public enum ActionOnError {
+    DISABLE_PARTITION,
+    DISABLE_RESOURCE,
+    DISABLE_INSTANCE
   }
 
   public static final String ACTIONKEY = "ActionOnError";
 
   private static Logger _logger = Logger
-    .getLogger(DefaultParticipantErrorMessageHandlerFactory.class);
+      .getLogger(DefaultParticipantErrorMessageHandlerFactory.class);
   final HelixManager _manager;
 
-  public DefaultParticipantErrorMessageHandlerFactory(HelixManager manager)
-  {
+  public DefaultParticipantErrorMessageHandlerFactory(HelixManager manager) {
     _manager = manager;
   }
 
-  public static class DefaultParticipantErrorMessageHandler extends MessageHandler
-  {
+  public static class DefaultParticipantErrorMessageHandler extends MessageHandler {
     final HelixManager _manager;
-    public DefaultParticipantErrorMessageHandler(Message message,
-        NotificationContext context,  HelixManager manager)
-    {
-       super(message, context);
-       _manager = manager;
+
+    public DefaultParticipantErrorMessageHandler(Message message, NotificationContext context,
+        HelixManager manager) {
+      super(message, context);
+      _manager = manager;
     }
 
     @Override
-    public HelixTaskResult handleMessage() throws InterruptedException
-    {
+    public HelixTaskResult handleMessage() throws InterruptedException {
       HelixTaskResult result = new HelixTaskResult();
       result.setSuccess(true);
       // TODO : consider unify this with StatsAggregationStage.executeAlertActions()
-      try
-      {
-        ActionOnError actionOnError
-          = ActionOnError.valueOf(_message.getRecord().getSimpleField(ACTIONKEY));
-
-        if(actionOnError == ActionOnError.DISABLE_INSTANCE)
-        {
-          _manager.getClusterManagmentTool().enableInstance(_manager.getClusterName(), _message.getMsgSrc(), false);
+      try {
+        ActionOnError actionOnError =
+            ActionOnError.valueOf(_message.getRecord().getSimpleField(ACTIONKEY));
+
+        if (actionOnError == ActionOnError.DISABLE_INSTANCE) {
+          _manager.getClusterManagmentTool().enableInstance(_manager.getClusterName(),
+              _message.getMsgSrc(), false);
           _logger.info("Instance " + _message.getMsgSrc() + " disabled");
-        }
-        else if(actionOnError == ActionOnError.DISABLE_PARTITION)
-        {
-          _manager.getClusterManagmentTool().enablePartition(false, _manager.getClusterName(), _message.getMsgSrc(),
-              _message.getResourceName(), Arrays.asList( _message.getPartitionName()));
+        } else if (actionOnError == ActionOnError.DISABLE_PARTITION) {
+          _manager.getClusterManagmentTool().enablePartition(false, _manager.getClusterName(),
+              _message.getMsgSrc(), _message.getResourceName(),
+              Arrays.asList(_message.getPartitionName()));
           _logger.info("partition " + _message.getPartitionName() + " disabled");
-        }
-        else if (actionOnError == ActionOnError.DISABLE_RESOURCE)
-        {
+        } else if (actionOnError == ActionOnError.DISABLE_RESOURCE) {
           // NOT IMPLEMENTED, or we can disable all partitions
-          //_manager.getClusterManagmentTool().en(_manager.getClusterName(), _manager.getInstanceName(),
-          //    _message.getResourceName(), _message.getPartitionName(), false);
+          // _manager.getClusterManagmentTool().en(_manager.getClusterName(),
+          // _manager.getInstanceName(),
+          // _message.getResourceName(), _message.getPartitionName(), false);
           _logger.info("resource " + _message.getResourceName() + " disabled");
         }
-      }
-      catch(Exception e)
-      {
+      } catch (Exception e) {
         _logger.error("", e);
         result.setSuccess(false);
         result.setException(e);
@@ -108,38 +98,31 @@ public class DefaultParticipantErrorMessageHandlerFactory implements
     }
 
     @Override
-    public void onError(Exception e, ErrorCode code, ErrorType type)
-    {
-      _logger.error("Message handling pipeline get an exception. MsgId:"
-          + _message.getMsgId(), e);
+    public void onError(Exception e, ErrorCode code, ErrorType type) {
+      _logger.error("Message handling pipeline get an exception. MsgId:" + _message.getMsgId(), e);
     }
 
   }
 
   @Override
-  public MessageHandler createHandler(Message message,
-      NotificationContext context)
-  {
+  public MessageHandler createHandler(Message message, NotificationContext context) {
     String type = message.getMsgType();
 
-    if(!type.equals(getMessageType()))
-    {
-      throw new HelixException("Unexpected msg type for message "+message.getMsgId()
-          +" type:" + message.getMsgType());
+    if (!type.equals(getMessageType())) {
+      throw new HelixException("Unexpected msg type for message " + message.getMsgId() + " type:"
+          + message.getMsgType());
     }
 
     return new DefaultParticipantErrorMessageHandler(message, context, _manager);
   }
 
   @Override
-  public String getMessageType()
-  {
+  public String getMessageType() {
     return Message.MessageType.PARTICIPANT_ERROR_REPORT.toString();
   }
 
   @Override
-  public void reset()
-  {
+  public void reset() {
 
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
index 4fe8750..5451a81 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
@@ -51,68 +51,56 @@ import org.apache.helix.util.StatusUpdateUtil;
 import org.apache.log4j.Logger;
 import org.codehaus.jackson.map.ObjectMapper;
 
-
 /*
  * The current implementation supports throttling on STATE-TRANSITION type of message, transition SCHEDULED-COMPLETED. 
  * 
  */
-public class DefaultSchedulerMessageHandlerFactory implements
-    MessageHandlerFactory
-{
+public class DefaultSchedulerMessageHandlerFactory implements MessageHandlerFactory {
   public static final String WAIT_ALL = "WAIT_ALL";
   public static final String SCHEDULER_MSG_ID = "SchedulerMessageId";
   public static final String SCHEDULER_TASK_QUEUE = "SchedulerTaskQueue";
   public static final String CONTROLLER_MSG_ID = "controllerMsgId";
   public static final int TASKQUEUE_BUCKET_NUM = 10;
-  public static class SchedulerAsyncCallback extends AsyncCallback
-  {
+
+  public static class SchedulerAsyncCallback extends AsyncCallback {
     StatusUpdateUtil _statusUpdateUtil = new StatusUpdateUtil();
     Message _originalMessage;
     HelixManager _manager;
-    final Map<String, Map<String, String>> _resultSummaryMap = new ConcurrentHashMap<String, Map<String, String>>();
+    final Map<String, Map<String, String>> _resultSummaryMap =
+        new ConcurrentHashMap<String, Map<String, String>>();
 
-    public SchedulerAsyncCallback(Message originalMessage, HelixManager manager)
-    {
+    public SchedulerAsyncCallback(Message originalMessage, HelixManager manager) {
       _originalMessage = originalMessage;
       _manager = manager;
     }
 
     @Override
-    public void onTimeOut()
-    {
-      _logger.info("Scheduler msg timeout " + _originalMessage.getMsgId()
-          + " timout with " + _timeout + " Ms");
+    public void onTimeOut() {
+      _logger.info("Scheduler msg timeout " + _originalMessage.getMsgId() + " timout with "
+          + _timeout + " Ms");
 
-      _statusUpdateUtil.logError(_originalMessage,
-          SchedulerAsyncCallback.class, "Task timeout",
+      _statusUpdateUtil.logError(_originalMessage, SchedulerAsyncCallback.class, "Task timeout",
           _manager.getHelixDataAccessor());
       addSummary(_resultSummaryMap, _originalMessage, _manager, true);
     }
 
     @Override
-    public void onReplyMessage(Message message)
-    {
-      _logger.info("Update for scheduler msg " + _originalMessage.getMsgId()
-          + " Message " + message.getMsgSrc() + " id "
-          + message.getCorrelationId() + " completed");
-      String key = "MessageResult " + message.getMsgSrc() + " "
-          + UUID.randomUUID();
+    public void onReplyMessage(Message message) {
+      _logger.info("Update for scheduler msg " + _originalMessage.getMsgId() + " Message "
+          + message.getMsgSrc() + " id " + message.getCorrelationId() + " completed");
+      String key = "MessageResult " + message.getMsgSrc() + " " + UUID.randomUUID();
       _resultSummaryMap.put(key, message.getResultMap());
 
-      if (this.isDone())
-      {
-        _logger.info("Scheduler msg " + _originalMessage.getMsgId()
-            + " completed");
-        _statusUpdateUtil.logInfo(_originalMessage,
-            SchedulerAsyncCallback.class, "Scheduler task completed",
-            _manager.getHelixDataAccessor());
+      if (this.isDone()) {
+        _logger.info("Scheduler msg " + _originalMessage.getMsgId() + " completed");
+        _statusUpdateUtil.logInfo(_originalMessage, SchedulerAsyncCallback.class,
+            "Scheduler task completed", _manager.getHelixDataAccessor());
         addSummary(_resultSummaryMap, _originalMessage, _manager, false);
       }
     }
 
     private void addSummary(Map<String, Map<String, String>> _resultSummaryMap,
-        Message originalMessage, HelixManager manager, boolean timeOut)
-    {
+        Message originalMessage, HelixManager manager, boolean timeOut) {
       Map<String, String> summary = new TreeMap<String, String>();
       summary.put("TotalMessages:", "" + _resultSummaryMap.size());
       summary.put("Timeout", "" + timeOut);
@@ -120,153 +108,138 @@ public class DefaultSchedulerMessageHandlerFactory implements
 
       HelixDataAccessor accessor = manager.getHelixDataAccessor();
       Builder keyBuilder = accessor.keyBuilder();
-      ZNRecord statusUpdate = accessor.getProperty(
-          keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(),
-              originalMessage.getMsgId())).getRecord();
+      ZNRecord statusUpdate =
+          accessor.getProperty(
+              keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(),
+                  originalMessage.getMsgId())).getRecord();
 
       statusUpdate.getMapFields().putAll(_resultSummaryMap);
-      accessor.setProperty(keyBuilder.controllerTaskStatus(
-          MessageType.SCHEDULER_MSG.toString(), originalMessage.getMsgId()),
-          new StatusUpdate(statusUpdate));
+      accessor.setProperty(
+          keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(),
+              originalMessage.getMsgId()), new StatusUpdate(statusUpdate));
 
     }
   }
 
-  private static Logger _logger = Logger
-      .getLogger(DefaultSchedulerMessageHandlerFactory.class);
+  private static Logger _logger = Logger.getLogger(DefaultSchedulerMessageHandlerFactory.class);
   HelixManager _manager;
 
-  public DefaultSchedulerMessageHandlerFactory(HelixManager manager)
-  {
+  public DefaultSchedulerMessageHandlerFactory(HelixManager manager) {
     _manager = manager;
   }
 
   @Override
-  public MessageHandler createHandler(Message message,
-      NotificationContext context)
-  {
+  public MessageHandler createHandler(Message message, NotificationContext context) {
     String type = message.getMsgType();
 
-    if (!type.equals(getMessageType()))
-    {
-      throw new HelixException("Unexpected msg type for message "
-          + message.getMsgId() + " type:" + message.getMsgType());
+    if (!type.equals(getMessageType())) {
+      throw new HelixException("Unexpected msg type for message " + message.getMsgId() + " type:"
+          + message.getMsgType());
     }
 
     return new DefaultSchedulerMessageHandler(message, context, _manager);
   }
 
   @Override
-  public String getMessageType()
-  {
+  public String getMessageType() {
     return MessageType.SCHEDULER_MSG.toString();
   }
 
   @Override
-  public void reset()
-  {
+  public void reset() {
   }
 
-  public static class DefaultSchedulerMessageHandler extends MessageHandler
-  {
+  public static class DefaultSchedulerMessageHandler extends MessageHandler {
     HelixManager _manager;
 
-    public DefaultSchedulerMessageHandler(Message message,
-        NotificationContext context, HelixManager manager)
-    {
+    public DefaultSchedulerMessageHandler(Message message, NotificationContext context,
+        HelixManager manager) {
       super(message, context);
       _manager = manager;
     }
-    
-    void handleMessageUsingScheduledTaskQueue(Criteria recipientCriteria, Message messageTemplate, String controllerMsgId)
-    {
-      HelixDataAccessor accessor = _manager.getHelixDataAccessor();  
+
+    void handleMessageUsingScheduledTaskQueue(Criteria recipientCriteria, Message messageTemplate,
+        String controllerMsgId) {
+      HelixDataAccessor accessor = _manager.getHelixDataAccessor();
       Builder keyBuilder = accessor.keyBuilder();
 
       Map<String, String> sendSummary = new HashMap<String, String>();
       sendSummary.put("MessageCount", "0");
-      Map<InstanceType, List<Message>> messages 
-        = _manager.getMessagingService().generateMessage(recipientCriteria, messageTemplate);
-     
+      Map<InstanceType, List<Message>> messages =
+          _manager.getMessagingService().generateMessage(recipientCriteria, messageTemplate);
+
       // Calculate tasks, and put them into the idealState of the SCHEDULER_TASK_QUEUE resource.
-      // List field are the destination node, while the Message parameters are stored in the mapFields
+      // List field are the destination node, while the Message parameters are stored in the
+      // mapFields
       // task throttling can be done on SCHEDULER_TASK_QUEUE resource
-      if(messages.size() > 0)
-      {
+      if (messages.size() > 0) {
         String taskQueueName = _message.getRecord().getSimpleField(SCHEDULER_TASK_QUEUE);
-        if(taskQueueName == null)
-        {
-          throw new HelixException("SchedulerTaskMessage need to have " + SCHEDULER_TASK_QUEUE +" specified.");
+        if (taskQueueName == null) {
+          throw new HelixException("SchedulerTaskMessage need to have " + SCHEDULER_TASK_QUEUE
+              + " specified.");
         }
         IdealState newAddedScheduledTasks = new IdealState(taskQueueName);
         newAddedScheduledTasks.setBucketSize(TASKQUEUE_BUCKET_NUM);
         newAddedScheduledTasks.setStateModelDefRef(SCHEDULER_TASK_QUEUE);
-        
-        synchronized(_manager)
-        {
+
+        synchronized (_manager) {
           int existingTopPartitionId = 0;
-          IdealState currentTaskQueue =  _manager.getHelixDataAccessor()
-              .getProperty(accessor.keyBuilder().idealStates(newAddedScheduledTasks.getId()));
-          if(currentTaskQueue != null)
-          {
+          IdealState currentTaskQueue =
+              _manager.getHelixDataAccessor().getProperty(
+                  accessor.keyBuilder().idealStates(newAddedScheduledTasks.getId()));
+          if (currentTaskQueue != null) {
             existingTopPartitionId = findTopPartitionId(currentTaskQueue) + 1;
           }
-          
-          List<Message> taskMessages = (List<Message>)(messages.values().toArray()[0]);
-          for(Message task : taskMessages)
-          {
+
+          List<Message> taskMessages = (List<Message>) (messages.values().toArray()[0]);
+          for (Message task : taskMessages) {
             String partitionId = taskQueueName + "_" + existingTopPartitionId;
             existingTopPartitionId++;
             String instanceName = task.getTgtName();
             newAddedScheduledTasks.setPartitionState(partitionId, instanceName, "COMPLETED");
             task.getRecord().setSimpleField(instanceName, "COMPLETED");
             task.getRecord().setSimpleField(CONTROLLER_MSG_ID, controllerMsgId);
-            
+
             List<String> priorityList = new LinkedList<String>();
             priorityList.add(instanceName);
             newAddedScheduledTasks.getRecord().setListField(partitionId, priorityList);
-            newAddedScheduledTasks.getRecord().setMapField(partitionId, task.getRecord().getSimpleFields());
-            _logger.info("Scheduling for controllerMsg " + controllerMsgId + " , sending task " + partitionId + " " + task.getMsgId()
-                 + " to "+instanceName );
-            
-            if(_logger.isDebugEnabled())
-            {
+            newAddedScheduledTasks.getRecord().setMapField(partitionId,
+                task.getRecord().getSimpleFields());
+            _logger.info("Scheduling for controllerMsg " + controllerMsgId + " , sending task "
+                + partitionId + " " + task.getMsgId() + " to " + instanceName);
+
+            if (_logger.isDebugEnabled()) {
               _logger.debug(task.getRecord().getSimpleFields());
             }
           }
-          _manager.getHelixDataAccessor()
-            .updateProperty(accessor.keyBuilder().idealStates(newAddedScheduledTasks.getId()), newAddedScheduledTasks);
+          _manager.getHelixDataAccessor().updateProperty(
+              accessor.keyBuilder().idealStates(newAddedScheduledTasks.getId()),
+              newAddedScheduledTasks);
           sendSummary.put("MessageCount", "" + taskMessages.size());
         }
       }
       // Record the number of messages sent into scheduler message status updates
-      
-      ZNRecord statusUpdate = accessor.getProperty(
-          keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(),
-              _message.getMsgId())).getRecord();
-      
+
+      ZNRecord statusUpdate =
+          accessor.getProperty(
+              keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(),
+                  _message.getMsgId())).getRecord();
+
       statusUpdate.getMapFields().put("SentMessageCount", sendSummary);
-      accessor.updateProperty(keyBuilder.controllerTaskStatus(
-          MessageType.SCHEDULER_MSG.toString(), _message.getMsgId()),
-          new StatusUpdate(statusUpdate));
+      accessor.updateProperty(keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(),
+          _message.getMsgId()), new StatusUpdate(statusUpdate));
     }
 
-    private int findTopPartitionId(IdealState currentTaskQueue)
-    {
+    private int findTopPartitionId(IdealState currentTaskQueue) {
       int topId = 0;
-      for(String partitionName : currentTaskQueue.getPartitionSet())
-      {
-        try
-        {
+      for (String partitionName : currentTaskQueue.getPartitionSet()) {
+        try {
           String partitionNumStr = partitionName.substring(partitionName.lastIndexOf('_') + 1);
           int num = Integer.parseInt(partitionNumStr);
-          if(topId < num)
-          {
+          if (topId < num) {
             topId = num;
           }
-        }
-        catch(Exception e)
-        {
+        } catch (Exception e) {
           _logger.error("", e);
         }
       }
@@ -274,93 +247,76 @@ public class DefaultSchedulerMessageHandlerFactory implements
     }
 
     @Override
-    public HelixTaskResult handleMessage() throws InterruptedException
-    {
+    public HelixTaskResult handleMessage() throws InterruptedException {
       String type = _message.getMsgType();
       HelixTaskResult result = new HelixTaskResult();
-      if (!type.equals(MessageType.SCHEDULER_MSG.toString()))
-      {
-        throw new HelixException("Unexpected msg type for message "
-            + _message.getMsgId() + " type:" + _message.getMsgType());
+      if (!type.equals(MessageType.SCHEDULER_MSG.toString())) {
+        throw new HelixException("Unexpected msg type for message " + _message.getMsgId()
+            + " type:" + _message.getMsgType());
       }
       // Parse timeout value
       int timeOut = -1;
-      if (_message.getRecord().getSimpleFields().containsKey("TIMEOUT"))
-      {
-        try
-        {
-          timeOut = Integer.parseInt(_message.getRecord().getSimpleFields()
-              .get("TIMEOUT"));
-        } catch (Exception e)
-        {
+      if (_message.getRecord().getSimpleFields().containsKey("TIMEOUT")) {
+        try {
+          timeOut = Integer.parseInt(_message.getRecord().getSimpleFields().get("TIMEOUT"));
+        } catch (Exception e) {
         }
       }
 
       // Parse the message template
       ZNRecord record = new ZNRecord("templateMessage");
-      record.getSimpleFields().putAll(
-          _message.getRecord().getMapField("MessageTemplate"));
+      record.getSimpleFields().putAll(_message.getRecord().getMapField("MessageTemplate"));
       Message messageTemplate = new Message(record);
 
       // Parse the criteria
-      StringReader sr = new StringReader(_message.getRecord().getSimpleField(
-          "Criteria"));
+      StringReader sr = new StringReader(_message.getRecord().getSimpleField("Criteria"));
       ObjectMapper mapper = new ObjectMapper();
       Criteria recipientCriteria;
-      try
-      {
+      try {
         recipientCriteria = mapper.readValue(sr, Criteria.class);
-      } catch (Exception e)
-      {
+      } catch (Exception e) {
         _logger.error("", e);
         result.setException(e);
         result.setSuccess(false);
         return result;
       }
       _logger.info("Scheduler sending message, criteria:" + recipientCriteria);
-      
+
       boolean waitAll = false;
-      if(_message.getRecord().getSimpleField(DefaultSchedulerMessageHandlerFactory.WAIT_ALL) !=null)
-      {
-        try
-        {
-          waitAll = Boolean.parseBoolean(_message.getRecord().getSimpleField(DefaultSchedulerMessageHandlerFactory.WAIT_ALL));
-        }
-        catch(Exception e)
-        {
-          _logger.warn("",e);
+      if (_message.getRecord().getSimpleField(DefaultSchedulerMessageHandlerFactory.WAIT_ALL) != null) {
+        try {
+          waitAll =
+              Boolean.parseBoolean(_message.getRecord().getSimpleField(
+                  DefaultSchedulerMessageHandlerFactory.WAIT_ALL));
+        } catch (Exception e) {
+          _logger.warn("", e);
         }
       }
-      boolean hasSchedulerTaskQueue = _message.getRecord().getSimpleFields().containsKey(SCHEDULER_TASK_QUEUE);
+      boolean hasSchedulerTaskQueue =
+          _message.getRecord().getSimpleFields().containsKey(SCHEDULER_TASK_QUEUE);
       // If the target is PARTICIPANT, use the ScheduledTaskQueue
-      if(InstanceType.PARTICIPANT == recipientCriteria.getRecipientInstanceType() && hasSchedulerTaskQueue)
-      {
-        handleMessageUsingScheduledTaskQueue(recipientCriteria, messageTemplate, _message.getMsgId());
+      if (InstanceType.PARTICIPANT == recipientCriteria.getRecipientInstanceType()
+          && hasSchedulerTaskQueue) {
+        handleMessageUsingScheduledTaskQueue(recipientCriteria, messageTemplate,
+            _message.getMsgId());
         result.setSuccess(true);
         result.getTaskResultMap().put(SCHEDULER_MSG_ID, _message.getMsgId());
-        result.getTaskResultMap().put(
-            "ControllerResult",
-            "msg " + _message.getMsgId() + " from " + _message.getMsgSrc()
-                + " processed");
+        result.getTaskResultMap().put("ControllerResult",
+            "msg " + _message.getMsgId() + " from " + _message.getMsgSrc() + " processed");
         return result;
       }
-      
+
       _logger.info("Scheduler sending message to Controller");
       int nMsgsSent = 0;
       SchedulerAsyncCallback callback = new SchedulerAsyncCallback(_message, _manager);
-      if(waitAll)
-      {
-        nMsgsSent = _manager.getMessagingService().sendAndWait(recipientCriteria,
-            messageTemplate, 
-            callback,
-            timeOut);
-      }
-      else
-      {
-        nMsgsSent = _manager.getMessagingService().send(recipientCriteria,
-            messageTemplate, 
-            callback,
-            timeOut);
+      if (waitAll) {
+        nMsgsSent =
+            _manager.getMessagingService().sendAndWait(recipientCriteria, messageTemplate,
+                callback, timeOut);
+      } else {
+        nMsgsSent =
+            _manager.getMessagingService().send(recipientCriteria, messageTemplate, callback,
+                timeOut);
       }
       HelixDataAccessor accessor = _manager.getHelixDataAccessor();
       Builder keyBuilder = accessor.keyBuilder();
@@ -368,31 +324,27 @@ public class DefaultSchedulerMessageHandlerFactory implements
       // Record the number of messages sent into status updates
       Map<String, String> sendSummary = new HashMap<String, String>();
       sendSummary.put("MessageCount", "" + nMsgsSent);
-      
-      ZNRecord statusUpdate = accessor.getProperty(
-          keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(),
-              _message.getMsgId())).getRecord();
+
+      ZNRecord statusUpdate =
+          accessor.getProperty(
+              keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(),
+                  _message.getMsgId())).getRecord();
 
       statusUpdate.getMapFields().put("SentMessageCount", sendSummary);
 
-      accessor.setProperty(keyBuilder.controllerTaskStatus(
-          MessageType.SCHEDULER_MSG.toString(), _message.getMsgId()),
-          new StatusUpdate(statusUpdate));
+      accessor.setProperty(keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(),
+          _message.getMsgId()), new StatusUpdate(statusUpdate));
 
-      result.getTaskResultMap().put(
-          "ControllerResult",
-          "msg " + _message.getMsgId() + " from " + _message.getMsgSrc()
-              + " processed");
+      result.getTaskResultMap().put("ControllerResult",
+          "msg " + _message.getMsgId() + " from " + _message.getMsgSrc() + " processed");
       result.getTaskResultMap().put(SCHEDULER_MSG_ID, _message.getMsgId());
       result.setSuccess(true);
       return result;
     }
 
     @Override
-    public void onError(Exception e, ErrorCode code, ErrorType type)
-    {
-      _logger.error("Message handling pipeline get an exception. MsgId:"
-          + _message.getMsgId(), e);
+    public void onError(Exception e, ErrorCode code, ErrorType type) {
+      _logger.error("Message handling pipeline get an exception. MsgId:" + _message.getMsgId(), e);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java
index 2a05a45..c9ad0f3 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java
@@ -87,8 +87,8 @@ public class DistributedControllerManager extends AbstractManager {
   public void handleNewSession() throws Exception {
     waitUntilConnected();
 
-    ParticipantManagerHelper participantHelper
-      = new ParticipantManagerHelper(this, _zkclient, _sessionTimeout);
+    ParticipantManagerHelper participantHelper =
+        new ParticipantManagerHelper(this, _zkclient, _sessionTimeout);
 
     /**
      * stop all timer tasks, reset all handlers, make sure cleanup completed for previous session
@@ -105,13 +105,11 @@ public class DistributedControllerManager extends AbstractManager {
      */
     _baseDataAccessor.reset();
 
-
     /**
      * from here on, we are dealing with new session
      */
     if (!ZKUtil.isClusterSetup(_clusterName, _zkclient)) {
-      throw new HelixException("Cluster structure is not set up for cluster: "
-          + _clusterName);
+      throw new HelixException("Cluster structure is not set up for cluster: " + _clusterName);
     }
 
     /**
@@ -122,8 +120,7 @@ public class DistributedControllerManager extends AbstractManager {
     /**
      * Invoke PreConnectCallbacks
      */
-    for (PreConnectCallback callback : _preConnectCallbacks)
-    {
+    for (PreConnectCallback callback : _preConnectCallbacks) {
       callback.onPreConnect();
     }
 
@@ -139,13 +136,11 @@ public class DistributedControllerManager extends AbstractManager {
     if (_leaderElectionHandler != null) {
       _leaderElectionHandler.init();
     } else {
-      _leaderElectionHandler = new CallbackHandler(this,
-                                                   _zkclient,
-                                                   _keyBuilder.controller(),
-                                  new DistributedLeaderElection(this, _controller),
-                                  new EventType[] { EventType.NodeChildrenChanged,
-                                      EventType.NodeDeleted, EventType.NodeCreated },
-                                  ChangeType.CONTROLLER);
+      _leaderElectionHandler =
+          new CallbackHandler(this, _zkclient, _keyBuilder.controller(),
+              new DistributedLeaderElection(this, _controller), new EventType[] {
+                  EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
+              }, ChangeType.CONTROLLER);
     }
 
     /**
@@ -165,28 +160,24 @@ public class DistributedControllerManager extends AbstractManager {
 
   @Override
   void doDisconnect() {
-    if (_leaderElectionHandler != null)
-    {
+    if (_leaderElectionHandler != null) {
       _leaderElectionHandler.reset();
     }
   }
 
   @Override
   public boolean isLeader() {
-    if (!isConnected())
-    {
+    if (!isConnected()) {
       return false;
     }
 
     try {
       LiveInstance leader = _dataAccessor.getProperty(_keyBuilder.controllerLeader());
-      if (leader != null)
-      {
+      if (leader != null) {
         String leaderName = leader.getInstanceName();
         String sessionId = leader.getSessionId();
-        if (leaderName != null && leaderName.equals(_instanceName)
-            && sessionId != null && sessionId.equals(_sessionId))
-        {
+        if (leaderName != null && leaderName.equals(_instanceName) && sessionId != null
+            && sessionId.equals(_sessionId)) {
           return true;
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
index bf184cc..0ab8342 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
@@ -36,7 +36,6 @@ import org.apache.log4j.Logger;
 
 /**
  * do distributed leader election
- *
  */
 public class DistributedLeaderElection implements ControllerChangeListener {
   private static Logger LOG = Logger.getLogger(DistributedLeaderElection.class);
@@ -74,7 +73,7 @@ public class DistributedLeaderElection implements ControllerChangeListener {
       if (changeContext.getType().equals(NotificationContext.Type.INIT)
           || changeContext.getType().equals(NotificationContext.Type.CALLBACK)) {
         LOG.info(_manager.getInstanceName() + " is trying to acquire leadership for cluster: "
-                           + _manager.getClusterName());
+            + _manager.getClusterName());
         HelixDataAccessor accessor = manager.getHelixDataAccessor();
         Builder keyBuilder = accessor.keyBuilder();
 
@@ -92,7 +91,7 @@ public class DistributedLeaderElection implements ControllerChangeListener {
         }
       } else if (changeContext.getType().equals(NotificationContext.Type.FINALIZE)) {
         LOG.info(_manager.getInstanceName() + " reqlinquish leadership for cluster: "
-              + _manager.getClusterName());
+            + _manager.getClusterName());
         controllerHelper.stopControllerTimerTasks();
         controllerHelper.removeListenersFromController(_controller);
 
@@ -117,8 +116,8 @@ public class DistributedLeaderElection implements ControllerChangeListener {
       leader.setSessionId(manager.getSessionId());
       leader.setHelixVersion(manager.getVersion());
       if (ZKPropertyTransferServer.getInstance() != null) {
-        String zkPropertyTransferServiceUrl = ZKPropertyTransferServer.getInstance()
-            .getWebserviceUrl();
+        String zkPropertyTransferServiceUrl =
+            ZKPropertyTransferServer.getInstance().getWebserviceUrl();
         if (zkPropertyTransferServiceUrl != null) {
           leader.setWebserviceUrl(zkPropertyTransferServiceUrl);
         }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/manager/zk/HelixGroupCommit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/HelixGroupCommit.java b/helix-core/src/main/java/org/apache/helix/manager/zk/HelixGroupCommit.java
index e95c2bf..8dd9d77 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/HelixGroupCommit.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/HelixGroupCommit.java
@@ -31,24 +31,20 @@ import org.apache.helix.AccessOption;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.data.Stat;
 
-public class HelixGroupCommit<T>
-{
+public class HelixGroupCommit<T> {
   private static Logger LOG = Logger.getLogger(HelixGroupCommit.class);
 
-  private static class Queue<T>
-  {
-    final AtomicReference<Thread>      _running = new AtomicReference<Thread>();
+  private static class Queue<T> {
+    final AtomicReference<Thread> _running = new AtomicReference<Thread>();
     final ConcurrentLinkedQueue<Entry<T>> _pending = new ConcurrentLinkedQueue<Entry<T>>();
   }
 
-  private static class Entry<T>
-  {
-    final String         _key;
+  private static class Entry<T> {
+    final String _key;
     final DataUpdater<T> _updater;
-    AtomicBoolean        _sent = new AtomicBoolean(false);
+    AtomicBoolean _sent = new AtomicBoolean(false);
 
-    Entry(String key, DataUpdater<T> updater)
-    {
+    Entry(String key, DataUpdater<T> updater) {
       _key = key;
       _updater = updater;
     }
@@ -56,40 +52,30 @@ public class HelixGroupCommit<T>
 
   private final Queue<T>[] _queues = new Queue[100];
 
-  public HelixGroupCommit()
-  {
+  public HelixGroupCommit() {
     // Don't use Arrays.fill();
-    for (int i = 0; i < _queues.length; ++i)
-    {
+    for (int i = 0; i < _queues.length; ++i) {
       _queues[i] = new Queue<T>();
     }
   }
 
-  private Queue<T> getQueue(String key)
-  {
+  private Queue<T> getQueue(String key) {
     return _queues[(key.hashCode() & Integer.MAX_VALUE) % _queues.length];
   }
 
-  public boolean commit(ZkBaseDataAccessor<T> accessor,
-                        int options,
-                        String key,
-                        DataUpdater<T> updater)
-  {
+  public boolean commit(ZkBaseDataAccessor<T> accessor, int options, String key,
+      DataUpdater<T> updater) {
     Queue<T> queue = getQueue(key);
     Entry<T> entry = new Entry<T>(key, updater);
 
     queue._pending.add(entry);
 
-    while (!entry._sent.get())
-    {
-      if (queue._running.compareAndSet(null, Thread.currentThread()))
-      {
+    while (!entry._sent.get()) {
+      if (queue._running.compareAndSet(null, Thread.currentThread())) {
         ArrayList<Entry<T>> processed = new ArrayList<Entry<T>>();
-        try
-        {
+        try {
           Entry<T> first = queue._pending.peek();
-          if (first == null)
-          {
+          if (first == null) {
             return true;
           }
 
@@ -100,25 +86,20 @@ public class HelixGroupCommit<T>
           String mergedKey = first._key;
 
           boolean retry;
-          do
-          {
+          do {
             retry = false;
 
-            try
-            {
+            try {
               T merged = null;
 
               Stat readStat = new Stat();
-              
+
               // to create a new znode, we need set version to -1
               readStat.setVersion(-1);
-              try
-              {
+              try {
                 // accessor will fallback to zk if not found in cache
                 merged = accessor.get(mergedKey, readStat, options);
-              }
-              catch (ZkNoNodeException e)
-              {
+              } catch (ZkNoNodeException e) {
                 // OK
               }
 
@@ -127,11 +108,9 @@ public class HelixGroupCommit<T>
 
               // iterate over processed if we are retrying
               Iterator<Entry<T>> it = processed.iterator();
-              while (it.hasNext())
-              {
+              while (it.hasNext()) {
                 Entry<T> ent = it.next();
-                if (!ent._key.equals(mergedKey))
-                {
+                if (!ent._key.equals(mergedKey)) {
                   continue;
                 }
                 merged = ent._updater.update(merged);
@@ -140,11 +119,9 @@ public class HelixGroupCommit<T>
 
               // iterate over queue._pending for newly coming requests
               it = queue._pending.iterator();
-              while (it.hasNext())
-              {
+              while (it.hasNext()) {
                 Entry<T> ent = it.next();
-                if (!ent._key.equals(mergedKey))
-                {
+                if (!ent._key.equals(mergedKey)) {
                   continue;
                 }
                 processed.add(ent);
@@ -154,37 +131,24 @@ public class HelixGroupCommit<T>
               }
               // System.out.println("size:"+ processed.size());
               accessor.set(mergedKey, merged, readStat.getVersion(), options);
-            }
-            catch (ZkBadVersionException e)
-            {
+            } catch (ZkBadVersionException e) {
               retry = true;
             }
-          }
-          while (retry);
-        }
-        finally
-        {
+          } while (retry);
+        } finally {
           queue._running.set(null);
-          for (Entry<T> e : processed)
-          {
-            synchronized (e)
-            {
+          for (Entry<T> e : processed) {
+            synchronized (e) {
               e._sent.set(true);
               e.notify();
             }
           }
         }
-      }
-      else
-      {
-        synchronized (entry)
-        {
-          try
-          {
+      } else {
+        synchronized (entry) {
+          try {
             entry.wait(10);
-          }
-          catch (InterruptedException e)
-          {
+          } catch (InterruptedException e) {
             e.printStackTrace();
             return false;
           }


Mime
View raw message