helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject [33/51] [partial] [HELIX-198] Unify helix code style, rb=13710
Date Wed, 21 Aug 2013 20:43:46 GMT
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
index 64db02e..2eec354 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
@@ -43,117 +43,90 @@ import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageType;
 import org.apache.log4j.Logger;
 
-
-public class DefaultMessagingService implements ClusterMessagingService
-{
-  private final HelixManager         _manager;
-  private final CriteriaEvaluator    _evaluator;
-  private final HelixTaskExecutor    _taskExecutor;
+public class DefaultMessagingService implements ClusterMessagingService {
+  private final HelixManager _manager;
+  private final CriteriaEvaluator _evaluator;
+  private final HelixTaskExecutor _taskExecutor;
   // TODO:rename to factory, this is not a service
   private final AsyncCallbackService _asyncCallbackService;
-  private static Logger              _logger =
-                                                 Logger.getLogger(DefaultMessagingService.class);
-  ConcurrentHashMap<String, MessageHandlerFactory> _messageHandlerFactoriestobeAdded
-    = new ConcurrentHashMap<String, MessageHandlerFactory>();
-  
-  public DefaultMessagingService(HelixManager manager)
-  {
+  private static Logger _logger = Logger.getLogger(DefaultMessagingService.class);
+  ConcurrentHashMap<String, MessageHandlerFactory> _messageHandlerFactoriestobeAdded =
+      new ConcurrentHashMap<String, MessageHandlerFactory>();
+
+  public DefaultMessagingService(HelixManager manager) {
     _manager = manager;
     _evaluator = new CriteriaEvaluator();
     _taskExecutor = new HelixTaskExecutor();
     _asyncCallbackService = new AsyncCallbackService();
     _taskExecutor.registerMessageHandlerFactory(MessageType.TASK_REPLY.toString(),
-                                                _asyncCallbackService);
+        _asyncCallbackService);
   }
 
   @Override
-  public int send(Criteria recipientCriteria, final Message messageTemplate)
-  {
+  public int send(Criteria recipientCriteria, final Message messageTemplate) {
     return send(recipientCriteria, messageTemplate, null, -1);
   }
 
   @Override
-  public int send(final Criteria recipientCriteria,
-                  final Message message,
-                  AsyncCallback callbackOnReply,
-                  int timeOut)
-  {
+  public int send(final Criteria recipientCriteria, final Message message,
+      AsyncCallback callbackOnReply, int timeOut) {
     return send(recipientCriteria, message, callbackOnReply, timeOut, 0);
   }
 
   @Override
-  public int send(final Criteria recipientCriteria,
-                  final Message message,
-                  AsyncCallback callbackOnReply,
-                  int timeOut,
-                  int retryCount)
-  {
-    Map<InstanceType, List<Message>> generateMessage =
-        generateMessage(recipientCriteria, message);
+  public int send(final Criteria recipientCriteria, final Message message,
+      AsyncCallback callbackOnReply, int timeOut, int retryCount) {
+    Map<InstanceType, List<Message>> generateMessage = generateMessage(recipientCriteria, message);
     int totalMessageCount = 0;
-    for (List<Message> messages : generateMessage.values())
-    {
+    for (List<Message> messages : generateMessage.values()) {
       totalMessageCount += messages.size();
     }
-    _logger.info("Send " + totalMessageCount + " messages with criteria "
-        + recipientCriteria);
-    if (totalMessageCount == 0)
-    {
+    _logger.info("Send " + totalMessageCount + " messages with criteria " + recipientCriteria);
+    if (totalMessageCount == 0) {
       return 0;
     }
     String correlationId = null;
-    if (callbackOnReply != null)
-    {
+    if (callbackOnReply != null) {
       int totalTimeout = timeOut * (retryCount + 1);
-      if (totalTimeout < 0)
-      {
+      if (totalTimeout < 0) {
         totalTimeout = -1;
       }
       callbackOnReply.setTimeout(totalTimeout);
       correlationId = UUID.randomUUID().toString();
-      for (List<Message> messages : generateMessage.values())
-      {
+      for (List<Message> messages : generateMessage.values()) {
         callbackOnReply.setMessagesSent(messages);
       }
       _asyncCallbackService.registerAsyncCallback(correlationId, callbackOnReply);
     }
 
-    for (InstanceType receiverType : generateMessage.keySet())
-    {
+    for (InstanceType receiverType : generateMessage.keySet()) {
       List<Message> list = generateMessage.get(receiverType);
-      for (Message tempMessage : list)
-      {
+      for (Message tempMessage : list) {
         tempMessage.setRetryCount(retryCount);
         tempMessage.setExecutionTimeout(timeOut);
         tempMessage.setSrcInstanceType(_manager.getInstanceType());
-        if (correlationId != null)
-        {
+        if (correlationId != null) {
           tempMessage.setCorrelationId(correlationId);
         }
 
         HelixDataAccessor accessor = _manager.getHelixDataAccessor();
         Builder keyBuilder = accessor.keyBuilder();
 
-        if (receiverType == InstanceType.CONTROLLER)
-        {
+        if (receiverType == InstanceType.CONTROLLER) {
           // _manager.getDataAccessor().setProperty(PropertyType.MESSAGES_CONTROLLER,
           // tempMessage,
           // tempMessage.getId());
-          accessor.setProperty(keyBuilder.controllerMessage(tempMessage.getId()),
-                               tempMessage);
+          accessor.setProperty(keyBuilder.controllerMessage(tempMessage.getId()), tempMessage);
         }
 
-        if (receiverType == InstanceType.PARTICIPANT)
-        {
-          accessor.setProperty(keyBuilder.message(tempMessage.getTgtName(),
-                                                  tempMessage.getId()),
-                               tempMessage);
+        if (receiverType == InstanceType.PARTICIPANT) {
+          accessor.setProperty(keyBuilder.message(tempMessage.getTgtName(), tempMessage.getId()),
+              tempMessage);
         }
       }
     }
 
-    if (callbackOnReply != null)
-    {
+    if (callbackOnReply != null) {
       // start timer if timeout is set
       callbackOnReply.startTimer();
     }
@@ -161,59 +134,47 @@ public class DefaultMessagingService implements ClusterMessagingService
   }
 
   public Map<InstanceType, List<Message>> generateMessage(final Criteria recipientCriteria,
-                                                           final Message message)
-  {
-    Map<InstanceType, List<Message>> messagesToSendMap =
-        new HashMap<InstanceType, List<Message>>();
+      final Message message) {
+    Map<InstanceType, List<Message>> messagesToSendMap = new HashMap<InstanceType, List<Message>>();
     InstanceType instanceType = recipientCriteria.getRecipientInstanceType();
 
-    if (instanceType == InstanceType.CONTROLLER)
-    {
+    if (instanceType == InstanceType.CONTROLLER) {
       List<Message> messages = generateMessagesForController(message);
       messagesToSendMap.put(InstanceType.CONTROLLER, messages);
       // _dataAccessor.setControllerProperty(PropertyType.MESSAGES,
       // newMessage.getRecord(), CreateMode.PERSISTENT);
-    }
-    else if (instanceType == InstanceType.PARTICIPANT)
-    {
+    } else if (instanceType == InstanceType.PARTICIPANT) {
       List<Message> messages = new ArrayList<Message>();
       List<Map<String, String>> matchedList =
           _evaluator.evaluateCriteria(recipientCriteria, _manager);
 
-      if (!matchedList.isEmpty())
-      {
+      if (!matchedList.isEmpty()) {
         Map<String, String> sessionIdMap = new HashMap<String, String>();
-        if (recipientCriteria.isSessionSpecific())
-        {
+        if (recipientCriteria.isSessionSpecific()) {
           HelixDataAccessor accessor = _manager.getHelixDataAccessor();
           Builder keyBuilder = accessor.keyBuilder();
 
-          List<LiveInstance> liveInstances =
-              accessor.getChildValues(keyBuilder.liveInstances());
+          List<LiveInstance> liveInstances = accessor.getChildValues(keyBuilder.liveInstances());
 
-          for (LiveInstance liveInstance : liveInstances)
-          {
+          for (LiveInstance liveInstance : liveInstances) {
             sessionIdMap.put(liveInstance.getInstanceName(), liveInstance.getSessionId());
           }
         }
-        for (Map<String, String> map : matchedList)
-        {
+        for (Map<String, String> map : matchedList) {
           String id = UUID.randomUUID().toString();
           Message newMessage = new Message(message.getRecord(), id);
           String srcInstanceName = _manager.getInstanceName();
           String tgtInstanceName = map.get("instanceName");
           // Don't send message to self
           if (recipientCriteria.isSelfExcluded()
-              && srcInstanceName.equalsIgnoreCase(tgtInstanceName))
-          {
+              && srcInstanceName.equalsIgnoreCase(tgtInstanceName)) {
             continue;
           }
           newMessage.setSrcName(srcInstanceName);
           newMessage.setTgtName(tgtInstanceName);
           newMessage.setResourceName(map.get("resourceName"));
           newMessage.setPartitionName(map.get("partitionName"));
-          if (recipientCriteria.isSessionSpecific())
-          {
+          if (recipientCriteria.isSessionSpecific()) {
             newMessage.setTgtSessionId(sessionIdMap.get(tgtInstanceName));
           }
           messages.add(newMessage);
@@ -224,8 +185,7 @@ public class DefaultMessagingService implements ClusterMessagingService
     return messagesToSendMap;
   }
 
-  private List<Message> generateMessagesForController(Message message)
-  {
+  private List<Message> generateMessagesForController(Message message) {
     List<Message> messages = new ArrayList<Message>();
     String id = UUID.randomUUID().toString();
     Message newMessage = new Message(message.getRecord(), id);
@@ -235,73 +195,61 @@ public class DefaultMessagingService implements ClusterMessagingService
     messages.add(newMessage);
     return messages;
   }
-  
+
   @Override
-  public synchronized void registerMessageHandlerFactory(String type, MessageHandlerFactory factory)
-  {
-    if (_manager.isConnected())
-    {
+  public synchronized void registerMessageHandlerFactory(String type, MessageHandlerFactory factory) {
+    if (_manager.isConnected()) {
       registerMessageHandlerFactoryInternal(type, factory);
-    }
-    else
-    {
+    } else {
       _messageHandlerFactoriestobeAdded.put(type, factory);
     }
   }
-  
-  public synchronized void onConnected()
-  {
-    for(String type : _messageHandlerFactoriestobeAdded.keySet())
-    {
+
+  public synchronized void onConnected() {
+    for (String type : _messageHandlerFactoriestobeAdded.keySet()) {
       registerMessageHandlerFactoryInternal(type, _messageHandlerFactoriestobeAdded.get(type));
     }
     _messageHandlerFactoriestobeAdded.clear();
   }
-  
-  void registerMessageHandlerFactoryInternal(String type, MessageHandlerFactory factory)
-  {
+
+  void registerMessageHandlerFactoryInternal(String type, MessageHandlerFactory factory) {
     _logger.info("registering msg factory for type " + type);
     int threadpoolSize = HelixTaskExecutor.DEFAULT_PARALLEL_TASKS;
     String threadpoolSizeStr = null;
     String key = type + "." + HelixTaskExecutor.MAX_THREADS;
-    
+
     ConfigAccessor configAccessor = _manager.getConfigAccessor();
-    if(configAccessor != null)
-    {
+    if (configAccessor != null) {
       ConfigScope scope = null;
-      
+
       // Read the participant config and cluster config for the per-message type thread pool size.
       // participant config will override the cluster config.
-      
-      if(_manager.getInstanceType() == InstanceType.PARTICIPANT || _manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT)
-      { 
-        scope = new ConfigScopeBuilder().forCluster(_manager.getClusterName()).forParticipant(_manager.getInstanceName()).build();
+
+      if (_manager.getInstanceType() == InstanceType.PARTICIPANT
+          || _manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT) {
+        scope =
+            new ConfigScopeBuilder().forCluster(_manager.getClusterName())
+                .forParticipant(_manager.getInstanceName()).build();
         threadpoolSizeStr = configAccessor.get(scope, key);
       }
-      
-      if(threadpoolSizeStr == null)
-      {
+
+      if (threadpoolSizeStr == null) {
         scope = new ConfigScopeBuilder().forCluster(_manager.getClusterName()).build();
         threadpoolSizeStr = configAccessor.get(scope, key);
       }
     }
-    
-    if(threadpoolSizeStr != null)
-    {
-      try
-      {
+
+    if (threadpoolSizeStr != null) {
+      try {
         threadpoolSize = Integer.parseInt(threadpoolSizeStr);
-        if(threadpoolSize <= 0)
-        {
+        if (threadpoolSize <= 0) {
           threadpoolSize = 1;
         }
-      }
-      catch(Exception e)
-      {
+      } catch (Exception e) {
         _logger.error("", e);
       }
     }
-    
+
     _taskExecutor.registerMessageHandlerFactory(type, factory, threadpoolSize);
     // Self-send a no-op message, so that the onMessage() call will be invoked
     // again, and
@@ -311,12 +259,9 @@ public class DefaultMessagingService implements ClusterMessagingService
     sendNopMessage();
   }
 
-  public void sendNopMessage()
-  {
-    if (_manager.isConnected())
-    {
-      try
-      {
+  public void sendNopMessage() {
+    if (_manager.isConnected()) {
+      try {
         Message nopMsg = new Message(MessageType.NO_OP, UUID.randomUUID().toString());
         nopMsg.setSrcName(_manager.getInstanceName());
 
@@ -324,73 +269,51 @@ public class DefaultMessagingService implements ClusterMessagingService
         Builder keyBuilder = accessor.keyBuilder();
 
         if (_manager.getInstanceType() == InstanceType.CONTROLLER
-            || _manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT)
-        {
+            || _manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT) {
           nopMsg.setTgtName("Controller");
           accessor.setProperty(keyBuilder.controllerMessage(nopMsg.getId()), nopMsg);
         }
 
         if (_manager.getInstanceType() == InstanceType.PARTICIPANT
-            || _manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT)
-        {
+            || _manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT) {
           nopMsg.setTgtName(_manager.getInstanceName());
-          accessor.setProperty(keyBuilder.message(nopMsg.getTgtName(), nopMsg.getId()),
-                               nopMsg);
+          accessor.setProperty(keyBuilder.message(nopMsg.getTgtName(), nopMsg.getId()), nopMsg);
         }
-      }
-      catch (Exception e)
-      {
+      } catch (Exception e) {
         _logger.error(e);
       }
     }
   }
 
-  public HelixTaskExecutor getExecutor()
-  {
+  public HelixTaskExecutor getExecutor() {
     return _taskExecutor;
   }
 
   @Override
-  public int sendAndWait(Criteria receipientCriteria,
-                         Message message,
-                         AsyncCallback asyncCallback,
-                         int timeOut,
-                         int retryCount)
-  {
-    int messagesSent =
-        send(receipientCriteria, message, asyncCallback, timeOut, retryCount);
-    if (messagesSent > 0)
-    {
-      while (!asyncCallback.isDone() && !asyncCallback.isTimedOut())
-      {
-        synchronized (asyncCallback)
-        {
-          try
-          {
+  public int sendAndWait(Criteria receipientCriteria, Message message, AsyncCallback asyncCallback,
+      int timeOut, int retryCount) {
+    int messagesSent = send(receipientCriteria, message, asyncCallback, timeOut, retryCount);
+    if (messagesSent > 0) {
+      while (!asyncCallback.isDone() && !asyncCallback.isTimedOut()) {
+        synchronized (asyncCallback) {
+          try {
             asyncCallback.wait();
-          }
-          catch (InterruptedException e)
-          {
+          } catch (InterruptedException e) {
             _logger.error(e);
             asyncCallback.setInterrupted(true);
             break;
           }
         }
       }
-    }
-    else
-    {
+    } else {
       _logger.warn("No messages sent. For Criteria:" + receipientCriteria);
     }
     return messagesSent;
   }
 
   @Override
-  public int sendAndWait(Criteria recipientCriteria,
-                         Message message,
-                         AsyncCallback asyncCallback,
-                         int timeOut)
-  {
+  public int sendAndWait(Criteria recipientCriteria, Message message, AsyncCallback asyncCallback,
+      int timeOut) {
     return sendAndWait(recipientCriteria, message, asyncCallback, timeOut, 0);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/messaging/handling/AsyncCallbackService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/AsyncCallbackService.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/AsyncCallbackService.java
index 8a0d9a1..c218a15 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/AsyncCallbackService.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/AsyncCallbackService.java
@@ -37,106 +37,87 @@ import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.participant.HelixStateMachineEngine;
 import org.apache.log4j.Logger;
 
-
-public class AsyncCallbackService implements MessageHandlerFactory
-{
-  private final ConcurrentHashMap<String, AsyncCallback> _callbackMap = new ConcurrentHashMap<String, AsyncCallback>();
+public class AsyncCallbackService implements MessageHandlerFactory {
+  private final ConcurrentHashMap<String, AsyncCallback> _callbackMap =
+      new ConcurrentHashMap<String, AsyncCallback>();
   private static Logger _logger = Logger.getLogger(AsyncCallbackService.class);
 
-  public AsyncCallbackService()
-  {
+  public AsyncCallbackService() {
   }
 
-  public void registerAsyncCallback(String correlationId, AsyncCallback callback)
-  {
-    if (_callbackMap.containsKey(correlationId))
-    {
+  public void registerAsyncCallback(String correlationId, AsyncCallback callback) {
+    if (_callbackMap.containsKey(correlationId)) {
       _logger.warn("correlation id " + correlationId + " already registered");
     }
     _logger.info("registering correlation id " + correlationId);
     _callbackMap.put(correlationId, callback);
   }
 
-  void verifyMessage(Message message)
-  {
-    if (!message.getMsgType().toString()
-        .equalsIgnoreCase(MessageType.TASK_REPLY.toString()))
-    {
-      String errorMsg = "Unexpected msg type for message " + message.getMsgId()
-          + " type:" + message.getMsgType() + " Expected : "
-          + MessageType.TASK_REPLY;
+  void verifyMessage(Message message) {
+    if (!message.getMsgType().toString().equalsIgnoreCase(MessageType.TASK_REPLY.toString())) {
+      String errorMsg =
+          "Unexpected msg type for message " + message.getMsgId() + " type:" + message.getMsgType()
+              + " Expected : " + MessageType.TASK_REPLY;
       _logger.error(errorMsg);
       throw new HelixException(errorMsg);
     }
     String correlationId = message.getCorrelationId();
-    if (correlationId == null)
-    {
-      String errorMsg = "Message " + message.getMsgId()
-          + " does not have correlation id";
+    if (correlationId == null) {
+      String errorMsg = "Message " + message.getMsgId() + " does not have correlation id";
       _logger.error(errorMsg);
       throw new HelixException(errorMsg);
     }
 
-    if (!_callbackMap.containsKey(correlationId))
-    {
-      String errorMsg = "Message "
-          + message.getMsgId()
-          + " does not have correponding callback. Probably timed out already. Correlation id: "
-          + correlationId;
+    if (!_callbackMap.containsKey(correlationId)) {
+      String errorMsg =
+          "Message "
+              + message.getMsgId()
+              + " does not have correponding callback. Probably timed out already. Correlation id: "
+              + correlationId;
       _logger.error(errorMsg);
       throw new HelixException(errorMsg);
     }
-    _logger.info("Verified reply message " + message.getMsgId()
-        + " correlation:" + correlationId);
+    _logger.info("Verified reply message " + message.getMsgId() + " correlation:" + correlationId);
   }
 
   @Override
-  public MessageHandler createHandler(Message message,
-      NotificationContext context)
-  {
+  public MessageHandler createHandler(Message message, NotificationContext context) {
     verifyMessage(message);
-    return new AsyncCallbackMessageHandler(message.getCorrelationId(),message, context);
+    return new AsyncCallbackMessageHandler(message.getCorrelationId(), message, context);
   }
 
   @Override
-  public String getMessageType()
-  {
+  public String getMessageType() {
     return MessageType.TASK_REPLY.toString();
   }
 
   @Override
-  public void reset()
-  {
+  public void reset() {
 
   }
 
-  public class AsyncCallbackMessageHandler extends MessageHandler
-  {
+  public class AsyncCallbackMessageHandler extends MessageHandler {
     private final String _correlationId;
 
-    public AsyncCallbackMessageHandler(String correlationId, Message message, NotificationContext context)
-    {
+    public AsyncCallbackMessageHandler(String correlationId, Message message,
+        NotificationContext context) {
       super(message, context);
       _correlationId = correlationId;
     }
 
     @Override
-    public HelixTaskResult handleMessage() throws InterruptedException
-    {
+    public HelixTaskResult handleMessage() throws InterruptedException {
       verifyMessage(_message);
       HelixTaskResult result = new HelixTaskResult();
       assert (_correlationId.equalsIgnoreCase(_message.getCorrelationId()));
-      _logger.info("invoking reply message " + _message.getMsgId()
-          + ", correlationid:" + _correlationId);
+      _logger.info("invoking reply message " + _message.getMsgId() + ", correlationid:"
+          + _correlationId);
 
       AsyncCallback callback = _callbackMap.get(_correlationId);
-      synchronized (callback)
-      {
+      synchronized (callback) {
         callback.onReply(_message);
-        if (callback.isDone())
-        {
-          _logger.info("Removing finished callback, correlationid:"
-              + _correlationId);
+        if (callback.isDone()) {
+          _logger.info("Removing finished callback, correlationid:" + _correlationId);
           _callbackMap.remove(_correlationId);
         }
       }
@@ -145,8 +126,7 @@ public class AsyncCallbackService implements MessageHandlerFactory
     }
 
     @Override
-    public void onError(Exception e, ErrorCode code, ErrorType type)
-    {
+    public void onError(Exception e, ErrorCode code, ErrorType type) {
       _logger.error("Message handling pipeline get an exception. MsgId:" + _message.getMsgId(), e);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/messaging/handling/BatchMessageHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/BatchMessageHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/BatchMessageHandler.java
index 4b9a966..b506148 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/BatchMessageHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/BatchMessageHandler.java
@@ -41,193 +41,198 @@ import org.apache.helix.model.Message.Attributes;
 import org.apache.log4j.Logger;
 
 public class BatchMessageHandler extends MessageHandler {
-	private static Logger LOG = Logger.getLogger(BatchMessageHandler.class);
-
-	final MessageHandlerFactory _msgHandlerFty;
-	final TaskExecutor _executor;
-	final List<Message> _subMessages;
-	final List<MessageHandler> _subMessageHandlers;
-	final BatchMessageWrapper _batchMsgWrapper;
-
-	public BatchMessageHandler(Message msg, NotificationContext context, MessageHandlerFactory fty,
-	        BatchMessageWrapper wrapper, TaskExecutor executor) {
-		super(msg, context);
-		
-		if (fty == null || executor == null) {
-			throw new HelixException("MessageHandlerFactory | TaskExecutor can't be null");
-		}
-		
-		_msgHandlerFty = fty;
-		_batchMsgWrapper = wrapper;
-		_executor = executor;
-
-		// create sub-messages
-		_subMessages = new ArrayList<Message>();
-		List<String> partitionKeys = _message.getPartitionNames();
-		for (String partitionKey : partitionKeys) {
-			// assign a new message id, put batch-msg-id to parent-id field
-			Message subMsg = new Message(_message.getRecord(), UUID.randomUUID().toString());
-			subMsg.setPartitionName(partitionKey);
-			subMsg.setAttribute(Attributes.PARENT_MSG_ID, _message.getId());
-			subMsg.setBatchMessageMode(false);
-
-			_subMessages.add(subMsg);
-		}
-
-		// create sub-message handlers
-		_subMessageHandlers = createMsgHandlers(_subMessages, context);
-	}
-	
-	List<MessageHandler> createMsgHandlers(List<Message> msgs, NotificationContext context) {
-		
-		List<MessageHandler> handlers = new ArrayList<MessageHandler>();
-		for (Message msg : msgs) {
-			 MessageHandler handler = _msgHandlerFty.createHandler(msg, context);
-			 handlers.add(handler);
-		}
-		return handlers;
-	}
-
-
-	public void preHandleMessage() {
-	  if (_message.getBatchMessageMode() == true && _batchMsgWrapper != null) {
-	    _batchMsgWrapper.start(_message, _notificationContext);
-	  }
-	}
-
-	public void postHandleMessage() {
-	  if (_message.getBatchMessageMode() == true && _batchMsgWrapper != null) {
-	    _batchMsgWrapper.end(_message, _notificationContext);
-	  }
-	  
-		// update currentState
-		HelixManager manager = _notificationContext.getManager();
-		HelixDataAccessor accessor = manager.getHelixDataAccessor();
-		ConcurrentHashMap<String, CurrentStateUpdate> csUpdateMap = (ConcurrentHashMap<String, CurrentStateUpdate>) _notificationContext
-		        .get(MapKey.CURRENT_STATE_UPDATE.toString());
-		
-		if (csUpdateMap != null) {
-    		Map<PropertyKey, CurrentState> csUpdate = mergeCurStateUpdate(csUpdateMap);
-    
-    		// TODO: change to use asyncSet
-    		for (PropertyKey key : csUpdate.keySet()) {
-    			// logger.info("updateCS: " + key);
-    			// System.out.println("\tupdateCS: " + key.getPath() + ", " +
-    			// curStateMap.get(key));
-    			accessor.updateProperty(key, csUpdate.get(key));
-    		}
-		}
-	}
-
-	// will not return until all sub-message executions are done
-	@Override
-	public HelixTaskResult handleMessage() {
-		HelixTaskResult result = null;
-		List<Future<HelixTaskResult>> futures = null;
-		List<MessageTask> batchTasks = new ArrayList<MessageTask>();
-		
-		synchronized (_batchMsgWrapper)
-		{
-			try {
-				preHandleMessage();
-
-  			int exeBatchSize = 1; // TODO: getExeBatchSize from msg
-  			List<String> partitionKeys = _message.getPartitionNames();
-  			for (int i = 0; i < partitionKeys.size(); i += exeBatchSize) {
-  				if (i + exeBatchSize <= partitionKeys.size()) {
-  					List<Message> msgs = _subMessages.subList(i, i + exeBatchSize);
-  					List<MessageHandler> handlers = _subMessageHandlers.subList(i, i + exeBatchSize);
-  					HelixBatchMessageTask batchTask = new HelixBatchMessageTask(_message, msgs, handlers, _notificationContext);
-  					batchTasks.add(batchTask);
-  
-  				} else {
-  					List<Message> msgs = _subMessages.subList(i, i + partitionKeys.size());
-  					List<MessageHandler> handlers = _subMessageHandlers.subList(i, i + partitionKeys.size());
-  
-  					HelixBatchMessageTask batchTask = new HelixBatchMessageTask(_message, msgs, handlers, _notificationContext);
-  					batchTasks.add(batchTask);
-  				}
-  			}
-
-				// invokeAll() is blocking call
-  			long timeout = _message.getExecutionTimeout();
-  			if (timeout == -1) {
-  				timeout = Long.MAX_VALUE;
-  			}
-				futures = _executor.invokeAllTasks(batchTasks, timeout, TimeUnit.MILLISECONDS);				
-			} catch (Exception e) {
-				LOG.error("fail to execute batchMsg: " + _message.getId(), e);
-				result = new HelixTaskResult();
-				result.setException(e);
-				
-				// HelixTask will call onError on this batch-msg-handler
-				// return result;
-			}
-
-			// combine sub-results to result
-			if (futures != null) {
-				boolean isBatchTaskSucceed = true;
-				
-				for (int i = 0; i < futures.size(); i++) {
-					Future<HelixTaskResult> future = futures.get(i);
-					MessageTask subTask = batchTasks.get(i);
-					try {
-              HelixTaskResult subTaskResult = future.get();
-              if (!subTaskResult.isSuccess()) {
-              	isBatchTaskSucceed = false;
-              }
-            } catch (InterruptedException e) {
-            	isBatchTaskSucceed = false;
-            	LOG.error("interrupted in executing batch-msg: " + _message.getId() + ", sub-msg: " + subTask.getTaskId(), e);
-            } catch (ExecutionException e) {
-            	isBatchTaskSucceed = false;
-            	LOG.error("fail to execute batch-msg: " + _message.getId() + ", sub-msg: " + subTask.getTaskId(), e);
+  private static Logger LOG = Logger.getLogger(BatchMessageHandler.class);
+
+  final MessageHandlerFactory _msgHandlerFty;
+  final TaskExecutor _executor;
+  final List<Message> _subMessages;
+  final List<MessageHandler> _subMessageHandlers;
+  final BatchMessageWrapper _batchMsgWrapper;
+
+  public BatchMessageHandler(Message msg, NotificationContext context, MessageHandlerFactory fty,
+      BatchMessageWrapper wrapper, TaskExecutor executor) {
+    super(msg, context);
+
+    if (fty == null || executor == null) {
+      throw new HelixException("MessageHandlerFactory | TaskExecutor can't be null");
+    }
+
+    _msgHandlerFty = fty;
+    _batchMsgWrapper = wrapper;
+    _executor = executor;
+
+    // create sub-messages
+    _subMessages = new ArrayList<Message>();
+    List<String> partitionKeys = _message.getPartitionNames();
+    for (String partitionKey : partitionKeys) {
+      // assign a new message id, put batch-msg-id to parent-id field
+      Message subMsg = new Message(_message.getRecord(), UUID.randomUUID().toString());
+      subMsg.setPartitionName(partitionKey);
+      subMsg.setAttribute(Attributes.PARENT_MSG_ID, _message.getId());
+      subMsg.setBatchMessageMode(false);
+
+      _subMessages.add(subMsg);
+    }
+
+    // create sub-message handlers
+    _subMessageHandlers = createMsgHandlers(_subMessages, context);
+  }
+
+  List<MessageHandler> createMsgHandlers(List<Message> msgs, NotificationContext context) {
+
+    List<MessageHandler> handlers = new ArrayList<MessageHandler>();
+    for (Message msg : msgs) {
+      MessageHandler handler = _msgHandlerFty.createHandler(msg, context);
+      handlers.add(handler);
+    }
+    return handlers;
+  }
+
+  public void preHandleMessage() {
+    if (_message.getBatchMessageMode() == true && _batchMsgWrapper != null) {
+      _batchMsgWrapper.start(_message, _notificationContext);
+    }
+  }
+
+  public void postHandleMessage() {
+    if (_message.getBatchMessageMode() == true && _batchMsgWrapper != null) {
+      _batchMsgWrapper.end(_message, _notificationContext);
+    }
+
+    // update currentState
+    HelixManager manager = _notificationContext.getManager();
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    ConcurrentHashMap<String, CurrentStateUpdate> csUpdateMap =
+        (ConcurrentHashMap<String, CurrentStateUpdate>) _notificationContext
+            .get(MapKey.CURRENT_STATE_UPDATE.toString());
+
+    if (csUpdateMap != null) {
+      Map<PropertyKey, CurrentState> csUpdate = mergeCurStateUpdate(csUpdateMap);
+
+      // TODO: change to use asyncSet
+      for (PropertyKey key : csUpdate.keySet()) {
+        // logger.info("updateCS: " + key);
+        // System.out.println("\tupdateCS: " + key.getPath() + ", " +
+        // curStateMap.get(key));
+        accessor.updateProperty(key, csUpdate.get(key));
+      }
+    }
+  }
+
+  // will not return until all sub-message executions are done
+  @Override
+  public HelixTaskResult handleMessage() {
+    HelixTaskResult result = null;
+    List<Future<HelixTaskResult>> futures = null;
+    List<MessageTask> batchTasks = new ArrayList<MessageTask>();
+
+    synchronized (_batchMsgWrapper) {
+      try {
+        preHandleMessage();
+
+        int exeBatchSize = 1; // TODO: getExeBatchSize from msg
+        List<String> partitionKeys = _message.getPartitionNames();
+        for (int i = 0; i < partitionKeys.size(); i += exeBatchSize) {
+          if (i + exeBatchSize <= partitionKeys.size()) {
+            List<Message> msgs = _subMessages.subList(i, i + exeBatchSize);
+            List<MessageHandler> handlers = _subMessageHandlers.subList(i, i + exeBatchSize);
+            HelixBatchMessageTask batchTask =
+                new HelixBatchMessageTask(_message, msgs, handlers, _notificationContext);
+            batchTasks.add(batchTask);
+
+          } else {
+            List<Message> msgs = _subMessages.subList(i, i + partitionKeys.size());
+            List<MessageHandler> handlers =
+                _subMessageHandlers.subList(i, i + partitionKeys.size());
+
+            HelixBatchMessageTask batchTask =
+                new HelixBatchMessageTask(_message, msgs, handlers, _notificationContext);
+            batchTasks.add(batchTask);
+          }
+        }
+
+        // invokeAll() is blocking call
+        long timeout = _message.getExecutionTimeout();
+        if (timeout == -1) {
+          timeout = Long.MAX_VALUE;
+        }
+        futures = _executor.invokeAllTasks(batchTasks, timeout, TimeUnit.MILLISECONDS);
+      } catch (Exception e) {
+        LOG.error("fail to execute batchMsg: " + _message.getId(), e);
+        result = new HelixTaskResult();
+        result.setException(e);
+
+        // HelixTask will call onError on this batch-msg-handler
+        // return result;
+      }
+
+      // combine sub-results to result
+      if (futures != null) {
+        boolean isBatchTaskSucceed = true;
+
+        for (int i = 0; i < futures.size(); i++) {
+          Future<HelixTaskResult> future = futures.get(i);
+          MessageTask subTask = batchTasks.get(i);
+          try {
+            HelixTaskResult subTaskResult = future.get();
+            if (!subTaskResult.isSuccess()) {
+              isBatchTaskSucceed = false;
             }
-				}
-				result = new HelixTaskResult();
-				result.setSuccess(isBatchTaskSucceed);
-			}
-			
-			// pass task-result to post-handle-msg
-			_notificationContext.add(MapKey.HELIX_TASK_RESULT.toString(), result);
-			postHandleMessage();
-
-			return result;
-		}
-	}
-
-	@Override
-	public void onError(Exception e, ErrorCode code, ErrorType type) {
-		// if one sub-message execution fails, call onError on all sub-message handlers
-		for (MessageHandler handler : _subMessageHandlers) {
-			handler.onError(e, code, type);
-		}
-	}
-
-	// TODO: optimize this based on the fact that each cs update is for a
-	// distinct partition
-	private Map<PropertyKey, CurrentState> mergeCurStateUpdate(
-	        ConcurrentHashMap<String, CurrentStateUpdate> csUpdateMap) {
-		Map<String, CurrentStateUpdate> curStateUpdateMap = new HashMap<String, CurrentStateUpdate>();
-		for (CurrentStateUpdate update : csUpdateMap.values()) {
-			String path = update._key.getPath(); // TODO: this is time
-			                                     // consuming, optimize it
-			if (!curStateUpdateMap.containsKey(path)) {
-				curStateUpdateMap.put(path, update);
-			} else {
-				// long start = System.currentTimeMillis();
-				curStateUpdateMap.get(path).merge(update._delta);
-				// long end = System.currentTimeMillis();
-				// LOG.info("each merge took: " + (end - start));
-			}
-		}
-
-		Map<PropertyKey, CurrentState> ret = new HashMap<PropertyKey, CurrentState>();
-		for (CurrentStateUpdate update : curStateUpdateMap.values()) {
-			ret.put(update._key, update._delta);
-		}
-
-		return ret;
-	}
+          } catch (InterruptedException e) {
+            isBatchTaskSucceed = false;
+            LOG.error("interrupted in executing batch-msg: " + _message.getId() + ", sub-msg: "
+                + subTask.getTaskId(), e);
+          } catch (ExecutionException e) {
+            isBatchTaskSucceed = false;
+            LOG.error(
+                "fail to execute batch-msg: " + _message.getId() + ", sub-msg: "
+                    + subTask.getTaskId(), e);
+          }
+        }
+        result = new HelixTaskResult();
+        result.setSuccess(isBatchTaskSucceed);
+      }
+
+      // pass task-result to post-handle-msg
+      _notificationContext.add(MapKey.HELIX_TASK_RESULT.toString(), result);
+      postHandleMessage();
+
+      return result;
+    }
+  }
+
+  @Override
+  public void onError(Exception e, ErrorCode code, ErrorType type) {
+    // if one sub-message execution fails, call onError on all sub-message handlers
+    for (MessageHandler handler : _subMessageHandlers) {
+      handler.onError(e, code, type);
+    }
+  }
+
+  // TODO: optimize this based on the fact that each cs update is for a
+  // distinct partition
+  private Map<PropertyKey, CurrentState> mergeCurStateUpdate(
+      ConcurrentHashMap<String, CurrentStateUpdate> csUpdateMap) {
+    Map<String, CurrentStateUpdate> curStateUpdateMap = new HashMap<String, CurrentStateUpdate>();
+    for (CurrentStateUpdate update : csUpdateMap.values()) {
+      String path = update._key.getPath(); // TODO: this is time
+                                           // consuming, optimize it
+      if (!curStateUpdateMap.containsKey(path)) {
+        curStateUpdateMap.put(path, update);
+      } else {
+        // long start = System.currentTimeMillis();
+        curStateUpdateMap.get(path).merge(update._delta);
+        // long end = System.currentTimeMillis();
+        // LOG.info("each merge took: " + (end - start));
+      }
+    }
+
+    Map<PropertyKey, CurrentState> ret = new HashMap<PropertyKey, CurrentState>();
+    for (CurrentStateUpdate update : curStateUpdateMap.values()) {
+      ret.put(update._key, update._delta);
+    }
+
+    return ret;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/messaging/handling/BatchMessageWrapper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/BatchMessageWrapper.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/BatchMessageWrapper.java
index 7c3021d..cf64cd6 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/BatchMessageWrapper.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/BatchMessageWrapper.java
@@ -23,12 +23,13 @@ import org.apache.helix.NotificationContext;
 import org.apache.helix.model.Message;
 
 /**
- * default implementation of handling start/end of batch messages 
- *
+ * default implementation of handling start/end of batch messages
  */
 public class BatchMessageWrapper {
 
-  public void start(Message batchMsg, NotificationContext context) { }
-  
-  public void end(Message batchMsg, NotificationContext context) { }
+  public void start(Message batchMsg, NotificationContext context) {
+  }
+
+  public void end(Message batchMsg, NotificationContext context) {
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/messaging/handling/CurrentStateUpdate.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/CurrentStateUpdate.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/CurrentStateUpdate.java
index ecce683..8f60c47 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/CurrentStateUpdate.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/CurrentStateUpdate.java
@@ -23,18 +23,16 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.model.CurrentState;
 
 public class CurrentStateUpdate {
-	final PropertyKey _key;
-	final CurrentState _delta;
-	
-	CurrentStateUpdate(PropertyKey key, CurrentState delta)
-	{
-		_key = key;
-		_delta = delta;
-	}
-	
-	public void merge(CurrentState anotherDelta)
-	{
-		_delta.getRecord().merge(anotherDelta.getRecord());
-	}
+  final PropertyKey _key;
+  final CurrentState _delta;
+
+  CurrentStateUpdate(PropertyKey key, CurrentState delta) {
+    _key = key;
+    _delta = delta;
+  }
+
+  public void merge(CurrentState anotherDelta) {
+    _delta.getRecord().merge(anotherDelta.getRecord());
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/messaging/handling/GroupMessageHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/GroupMessageHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/GroupMessageHandler.java
index 59c5bdc..6872907 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/GroupMessageHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/GroupMessageHandler.java
@@ -30,90 +30,71 @@ import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.Attributes;
 
-
-public class GroupMessageHandler
-{
-  class CurrentStateUpdate
-  {
-    final PropertyKey  _key;
+public class GroupMessageHandler {
+  class CurrentStateUpdate {
+    final PropertyKey _key;
     final CurrentState _curStateDelta;
 
-    public CurrentStateUpdate(PropertyKey key, CurrentState curStateDelta)
-    {
+    public CurrentStateUpdate(PropertyKey key, CurrentState curStateDelta) {
       _key = key;
       _curStateDelta = curStateDelta;
     }
 
-    public void merge(CurrentState curState)
-    {
+    public void merge(CurrentState curState) {
       _curStateDelta.getRecord().merge(curState.getRecord());
     }
   }
 
-  static class GroupMessageInfo
-  {
-    final Message                                   _message;
-    final AtomicInteger                             _countDown;
+  static class GroupMessageInfo {
+    final Message _message;
+    final AtomicInteger _countDown;
     final ConcurrentLinkedQueue<CurrentStateUpdate> _curStateUpdateList;
 
-    public GroupMessageInfo(Message message)
-    {
+    public GroupMessageInfo(Message message) {
       _message = message;
       List<String> partitionNames = message.getPartitionNames();
       _countDown = new AtomicInteger(partitionNames.size());
       _curStateUpdateList = new ConcurrentLinkedQueue<CurrentStateUpdate>();
     }
-    
-    public Map<PropertyKey, CurrentState> merge()
-    {
-      Map<String, CurrentStateUpdate> curStateUpdateMap =
-          new HashMap<String, CurrentStateUpdate>();
-      for (CurrentStateUpdate update : _curStateUpdateList)
-      {
+
+    public Map<PropertyKey, CurrentState> merge() {
+      Map<String, CurrentStateUpdate> curStateUpdateMap = new HashMap<String, CurrentStateUpdate>();
+      for (CurrentStateUpdate update : _curStateUpdateList) {
         String path = update._key.getPath();
-        if (!curStateUpdateMap.containsKey(path))
-        {
+        if (!curStateUpdateMap.containsKey(path)) {
           curStateUpdateMap.put(path, update);
-        }
-        else
-        {
+        } else {
           curStateUpdateMap.get(path).merge(update._curStateDelta);
         }
       }
 
       Map<PropertyKey, CurrentState> ret = new HashMap<PropertyKey, CurrentState>();
-      for (CurrentStateUpdate update : curStateUpdateMap.values())
-      {
+      for (CurrentStateUpdate update : curStateUpdateMap.values()) {
         ret.put(update._key, update._curStateDelta);
       }
 
       return ret;
     }
- 
+
   }
 
   final ConcurrentHashMap<String, GroupMessageInfo> _groupMsgMap;
 
-  public GroupMessageHandler()
-  {
+  public GroupMessageHandler() {
     _groupMsgMap = new ConcurrentHashMap<String, GroupMessageInfo>();
   }
 
-  public void put(Message message)
-  {
+  public void put(Message message) {
     _groupMsgMap.putIfAbsent(message.getId(), new GroupMessageInfo(message));
   }
 
   // return non-null if all sub-messages are completed
-  public GroupMessageInfo onCompleteSubMessage(Message subMessage)
-  {
+  public GroupMessageInfo onCompleteSubMessage(Message subMessage) {
     String parentMid = subMessage.getAttribute(Attributes.PARENT_MSG_ID);
     GroupMessageInfo info = _groupMsgMap.get(parentMid);
-    if (info != null)
-    {
+    if (info != null) {
       int val = info._countDown.decrementAndGet();
-      if (val <= 0)
-      {
+      if (val <= 0) {
         return _groupMsgMap.remove(parentMid);
       }
     }
@@ -121,12 +102,10 @@ public class GroupMessageHandler
     return null;
   }
 
-  void addCurStateUpdate(Message subMessage, PropertyKey key, CurrentState delta)
-  {
+  void addCurStateUpdate(Message subMessage, PropertyKey key, CurrentState delta) {
     String parentMid = subMessage.getAttribute(Attributes.PARENT_MSG_ID);
     GroupMessageInfo info = _groupMsgMap.get(parentMid);
-    if (info != null)
-    {
+    if (info != null) {
       info._curStateUpdateList.add(new CurrentStateUpdate(key, delta));
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixBatchMessageTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixBatchMessageTask.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixBatchMessageTask.java
index 50eba2c..f707db1 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixBatchMessageTask.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixBatchMessageTask.java
@@ -27,95 +27,91 @@ import org.apache.helix.model.Message;
 import org.apache.log4j.Logger;
 
 public class HelixBatchMessageTask implements MessageTask {
-	private static Logger LOG = Logger.getLogger(HelixBatchMessageTask.class);
-
-	final NotificationContext _context;
-	final Message _batchMsg;
-	final List<Message> _subMsgs;
-	final List<MessageHandler> _handlers;
-
-	public HelixBatchMessageTask(Message batchMsg, List<Message> subMsgs, List<MessageHandler> handlers,
-	        NotificationContext context) {
-		_batchMsg = batchMsg;
-		_context = context;
-		_subMsgs = subMsgs;
-		_handlers = handlers;
-	}
-
-	@Override
-	public HelixTaskResult call() throws Exception {
-	    HelixTaskResult taskResult = null;
-	    
-	    long start = System.currentTimeMillis();
-	    LOG.info("taskId:" + getTaskId() + " handling task begin, at: " + start);
-
-	    boolean isSucceed = true;
-	    try
-	    {
-    		for (MessageHandler handler : _handlers) {
-    			if (handler != null) {
-    				HelixTaskResult subTaskResult = handler.handleMessage();
-    				// if any fails, return false
-    				if (!subTaskResult.isSuccess()) {
-    					// System.err.println("\t[dbg]error handling message: " + handler._message);
-    					isSucceed = false;
-    				}
-    			}
-    		}
-	    }
-	    catch (Exception e)
-	    {
-	      String errorMessage =
-	          "Exception while executing task: " + getTaskId();
-	      LOG.error(errorMessage, e);
-	      
-	      taskResult = new HelixTaskResult();
-	      taskResult.setException(e);
-	      taskResult.setMessage(e.getMessage());
-	      
-	      return taskResult;
-	    }
-
-	    if (isSucceed) {
-	    	LOG.info("task: " + getTaskId() + " completed sucessfully");
-	    }
-	    
-	    taskResult = new HelixTaskResult();
-	    taskResult.setSuccess(isSucceed);
-		return taskResult;
-	}
-
-	@Override
-	public String getTaskId() {
-		StringBuilder sb = new StringBuilder();
-		sb.append(_batchMsg.getId());
-		sb.append("/");
-		List<String> msgIdList = new ArrayList<String>();
-		if (_subMsgs != null) {
-			for (Message msg : _subMsgs) {
-				msgIdList.add(msg.getId());
-			}
-		}
-		sb.append(msgIdList);
-		return sb.toString();
-	}
-
-	@Override
-	public Message getMessage() {
-		return _batchMsg;
-	}
-
-	@Override
-	public NotificationContext getNotificationContext() {
-		return _context;
-	}
-
-	@Override
-	public void onTimeout() {
-		for (MessageHandler handler : _handlers) {
-			if (handler != null) {
-				handler.onTimeout();
-			}
-		}
-	}
-}
\ No newline at end of file
+  private static Logger LOG = Logger.getLogger(HelixBatchMessageTask.class);
+
+  final NotificationContext _context;
+  final Message _batchMsg;
+  final List<Message> _subMsgs;
+  final List<MessageHandler> _handlers;
+
+  public HelixBatchMessageTask(Message batchMsg, List<Message> subMsgs,
+      List<MessageHandler> handlers, NotificationContext context) {
+    _batchMsg = batchMsg;
+    _context = context;
+    _subMsgs = subMsgs;
+    _handlers = handlers;
+  }
+
+  @Override
+  public HelixTaskResult call() throws Exception {
+    HelixTaskResult taskResult = null;
+
+    long start = System.currentTimeMillis();
+    LOG.info("taskId:" + getTaskId() + " handling task begin, at: " + start);
+
+    boolean isSucceed = true;
+    try {
+      for (MessageHandler handler : _handlers) {
+        if (handler != null) {
+          HelixTaskResult subTaskResult = handler.handleMessage();
+          // if any fails, return false
+          if (!subTaskResult.isSuccess()) {
+            // System.err.println("\t[dbg]error handling message: " + handler._message);
+            isSucceed = false;
+          }
+        }
+      }
+    } catch (Exception e) {
+      String errorMessage = "Exception while executing task: " + getTaskId();
+      LOG.error(errorMessage, e);
+
+      taskResult = new HelixTaskResult();
+      taskResult.setException(e);
+      taskResult.setMessage(e.getMessage());
+
+      return taskResult;
+    }
+
+    if (isSucceed) {
+      LOG.info("task: " + getTaskId() + " completed sucessfully");
+    }
+
+    taskResult = new HelixTaskResult();
+    taskResult.setSuccess(isSucceed);
+    return taskResult;
+  }
+
+  @Override
+  public String getTaskId() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(_batchMsg.getId());
+    sb.append("/");
+    List<String> msgIdList = new ArrayList<String>();
+    if (_subMsgs != null) {
+      for (Message msg : _subMsgs) {
+        msgIdList.add(msg.getId());
+      }
+    }
+    sb.append(msgIdList);
+    return sb.toString();
+  }
+
+  @Override
+  public Message getMessage() {
+    return _batchMsg;
+  }
+
+  @Override
+  public NotificationContext getNotificationContext() {
+    return _context;
+  }
+
+  @Override
+  public void onTimeout() {
+    for (MessageHandler handler : _handlers) {
+      if (handler != null) {
+        handler.onTimeout();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
index 752437f..627babc 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
@@ -50,32 +50,25 @@ import org.apache.helix.participant.statemachine.StateTransitionError;
 import org.apache.helix.util.StatusUpdateUtil;
 import org.apache.log4j.Logger;
 
-
-public class HelixStateTransitionHandler extends MessageHandler
-{
-  public static class HelixStateMismatchException extends Exception
-  {
-    public HelixStateMismatchException(String info)
-    {
+public class HelixStateTransitionHandler extends MessageHandler {
+  public static class HelixStateMismatchException extends Exception {
+    public HelixStateMismatchException(String info) {
       super(info);
     }
   }
-  private static Logger          logger     =
-                                                Logger.getLogger(HelixStateTransitionHandler.class);
-  private final StateModel       _stateModel;
-  StatusUpdateUtil               _statusUpdateUtil;
+
+  private static Logger logger = Logger.getLogger(HelixStateTransitionHandler.class);
+  private final StateModel _stateModel;
+  StatusUpdateUtil _statusUpdateUtil;
   private final StateModelParser _transitionMethodFinder;
-  private final CurrentState     _currentStateDelta;
-  private final HelixManager     _manager;
+  private final CurrentState _currentStateDelta;
+  private final HelixManager _manager;
   private final StateModelFactory<? extends StateModel> _stateModelFactory;
-  volatile boolean               _isTimeout = false;
+  volatile boolean _isTimeout = false;
 
   public HelixStateTransitionHandler(StateModelFactory<? extends StateModel> stateModelFactory,
-                                     StateModel stateModel,
-                                     Message message,
-                                     NotificationContext context,
-                                     CurrentState currentStateDelta)
-  {
+      StateModel stateModel, Message message, NotificationContext context,
+      CurrentState currentStateDelta) {
     super(message, context);
     _stateModel = stateModel;
     _statusUpdateUtil = new StatusUpdateUtil();
@@ -85,19 +78,14 @@ public class HelixStateTransitionHandler extends MessageHandler
     _stateModelFactory = stateModelFactory;
   }
 
-  void preHandleMessage() throws Exception
-  {
-    if (!_message.isValid())
-    {
+  void preHandleMessage() throws Exception {
+    if (!_message.isValid()) {
       String errorMessage =
-          "Invalid Message, ensure that message: " + _message
-              + " has all the required fields: "
+          "Invalid Message, ensure that message: " + _message + " has all the required fields: "
               + Arrays.toString(Message.Attributes.values());
 
-      _statusUpdateUtil.logError(_message,
-                                 HelixStateTransitionHandler.class,
-                                 errorMessage,
-                                 _manager.getHelixDataAccessor());
+      _statusUpdateUtil.logError(_message, HelixStateTransitionHandler.class, errorMessage,
+          _manager.getHelixDataAccessor());
       logger.error(errorMessage);
       throw new HelixException(errorMessage);
     }
@@ -110,27 +98,24 @@ public class HelixStateTransitionHandler extends MessageHandler
     // Verify the fromState and current state of the stateModel
     String state = _currentStateDelta.getState(partitionName);
 
-    if (fromState != null && !fromState.equals("*") && !fromState.equalsIgnoreCase(state))
-    {
+    if (fromState != null && !fromState.equals("*") && !fromState.equalsIgnoreCase(state)) {
       String errorMessage =
           "Current state of stateModel does not match the fromState in Message"
-              + ", Current State:" + state + ", message expected:" + fromState
-              + ", partition: " + partitionName + ", from: " + _message.getMsgSrc()
-              + ", to: " + _message.getTgtName();
-
-      _statusUpdateUtil.logError(_message,
-                                 HelixStateTransitionHandler.class,
-                                 errorMessage,
-                                 accessor);
+              + ", Current State:" + state + ", message expected:" + fromState + ", partition: "
+              + partitionName + ", from: " + _message.getMsgSrc() + ", to: "
+              + _message.getTgtName();
+
+      _statusUpdateUtil.logError(_message, HelixStateTransitionHandler.class, errorMessage,
+          accessor);
       logger.error(errorMessage);
       throw new HelixStateMismatchException(errorMessage);
     }
   }
 
-  void postHandleMessage()
-  {
-  	HelixTaskResult taskResult = (HelixTaskResult) _notificationContext.get(MapKey.HELIX_TASK_RESULT.toString());
-  	Exception exception = taskResult.getException();
+  void postHandleMessage() {
+    HelixTaskResult taskResult =
+        (HelixTaskResult) _notificationContext.get(MapKey.HELIX_TASK_RESULT.toString());
+    Exception exception = taskResult.getException();
 
     String partitionKey = _message.getPartitionName();
     String resource = _message.getResourceName();
@@ -143,25 +128,22 @@ public class HelixStateTransitionHandler extends MessageHandler
     int bucketSize = _message.getBucketSize();
     ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(bucketSize);
 
-    // No need to sync on manager, we are cancel executor in expiry session before start executor in new session
+    // No need to sync on manager, we are cancel executor in expiry session before start executor in
+    // new session
     // sessionId might change when we update the state model state.
     // for zk current state it is OK as we have the per-session current state node
-    if (!_message.getTgtSessionId().equals(_manager.getSessionId()))
-    {
+    if (!_message.getTgtSessionId().equals(_manager.getSessionId())) {
       logger.warn("Session id has changed. Skip postExecutionMessage. Old session "
-          + _message.getExecutionSessionId() + " , new session : "
-          + _manager.getSessionId());
+          + _message.getExecutionSessionId() + " , new session : " + _manager.getSessionId());
       return;
     }
 
-    if (taskResult.isSuccess())
-    {
+    if (taskResult.isSuccess()) {
       // String fromState = message.getFromState();
       String toState = _message.getToState();
       _currentStateDelta.setState(partitionKey, toState);
 
-      if (toState.equalsIgnoreCase(HelixDefinedState.DROPPED.toString()))
-      {
+      if (toState.equalsIgnoreCase(HelixDefinedState.DROPPED.toString())) {
         // for "OnOfflineToDROPPED" message, we need to remove the resource key record
         // from the current state of the instance because the resource key is dropped.
         // In the state model it will be stayed as "OFFLINE", which is OK.
@@ -174,44 +156,29 @@ public class HelixStateTransitionHandler extends MessageHandler
         deltaList.add(delta);
         _currentStateDelta.setDeltaList(deltaList);
         _stateModelFactory.removeStateModel(partitionKey);
-      }
-      else
-      {
+      } else {
         // if the partition is not to be dropped, update _stateModel to the TO_STATE
         _stateModel.updateState(toState);
       }
-    }
-    else
-    {
-      if (exception instanceof HelixStateMismatchException)
-      {
+    } else {
+      if (exception instanceof HelixStateMismatchException) {
         // if fromState mismatch, set current state on zk to stateModel's current state
         logger.warn("Force CurrentState on Zk to be stateModel's CurrentState. partitionKey: "
-            + partitionKey
-            + ", currentState: "
-            + _stateModel.getCurrentState()
-            + ", message: " + _message);
+            + partitionKey + ", currentState: " + _stateModel.getCurrentState() + ", message: "
+            + _message);
         _currentStateDelta.setState(partitionKey, _stateModel.getCurrentState());
-      }
-      else
-      {
+      } else {
         StateTransitionError error =
             new StateTransitionError(ErrorType.INTERNAL, ErrorCode.ERROR, exception);
-        if (exception instanceof InterruptedException)
-        {
-          if (_isTimeout)
-          {
-            error =
-                new StateTransitionError(ErrorType.INTERNAL,
-                                         ErrorCode.TIMEOUT,
-                                         exception);
-          }
-          else
-          {
+        if (exception instanceof InterruptedException) {
+          if (_isTimeout) {
+            error = new StateTransitionError(ErrorType.INTERNAL, ErrorCode.TIMEOUT, exception);
+          } else {
             // State transition interrupted but not caused by timeout. Keep the current
             // state in this case
-            logger.error("State transition interrupted but not timeout. Not updating state. Partition : "
-                + _message.getPartitionName() + " MsgId : " + _message.getMsgId());
+            logger
+                .error("State transition interrupted but not timeout. Not updating state. Partition : "
+                    + _message.getPartitionName() + " MsgId : " + _message.getMsgId());
             return;
           }
         }
@@ -226,37 +193,28 @@ public class HelixStateTransitionHandler extends MessageHandler
       }
     }
 
-    try
-    {
+    try {
       // Update the ZK current state of the node
-      PropertyKey key = keyBuilder.currentState(instanceName,
-                              sessionId,
-                              resource,
-                              bucketizer.getBucketName(partitionKey));
-      if (_message.getAttribute(Attributes.PARENT_MSG_ID) == null)
-      {
+      PropertyKey key =
+          keyBuilder.currentState(instanceName, sessionId, resource,
+              bucketizer.getBucketName(partitionKey));
+      if (_message.getAttribute(Attributes.PARENT_MSG_ID) == null) {
         // normal message
         accessor.updateProperty(key, _currentStateDelta);
-      }
-      else
-      {
+      } else {
         // sub-message of a batch message
-        ConcurrentHashMap<String, CurrentStateUpdate> csUpdateMap
-          = (ConcurrentHashMap<String, CurrentStateUpdate>) _notificationContext.get(MapKey.CURRENT_STATE_UPDATE.toString());
+        ConcurrentHashMap<String, CurrentStateUpdate> csUpdateMap =
+            (ConcurrentHashMap<String, CurrentStateUpdate>) _notificationContext
+                .get(MapKey.CURRENT_STATE_UPDATE.toString());
         csUpdateMap.put(partitionKey, new CurrentStateUpdate(key, _currentStateDelta));
       }
-    }
-    catch (Exception e)
-    {
+    } catch (Exception e) {
       logger.error("Error when updating current-state ", e);
       StateTransitionError error =
           new StateTransitionError(ErrorType.FRAMEWORK, ErrorCode.ERROR, e);
       _stateModel.rollbackOnError(_message, _notificationContext, error);
-      _statusUpdateUtil.logError(_message,
-                                 HelixStateTransitionHandler.class,
-                                 e,
-                                 "Error when update current-state ",
-                                 accessor);
+      _statusUpdateUtil.logError(_message, HelixStateTransitionHandler.class, e,
+          "Error when update current-state ", accessor);
     }
   }
 
@@ -266,58 +224,45 @@ public class HelixStateTransitionHandler extends MessageHandler
     String partitionName = _message.getPartitionName();
     String clusterName = _manager.getClusterName();
     HelixAdmin admin = _manager.getClusterManagmentTool();
-    admin.enablePartition(false, clusterName, instanceName, resourceName, Arrays.asList(partitionName));
-    logger.info("error in transit from ERROR to " + _message.getToState()
-          + " for partition: " + partitionName + ". disable it on " + instanceName);
+    admin.enablePartition(false, clusterName, instanceName, resourceName,
+        Arrays.asList(partitionName));
+    logger.info("error in transit from ERROR to " + _message.getToState() + " for partition: "
+        + partitionName + ". disable it on " + instanceName);
 
   }
 
   @Override
-  public HelixTaskResult handleMessage()
-  {
-	NotificationContext context = _notificationContext;
-	Message message = _message;
+  public HelixTaskResult handleMessage() {
+    NotificationContext context = _notificationContext;
+    Message message = _message;
 
-    synchronized (_stateModel)
-    {
+    synchronized (_stateModel) {
       HelixTaskResult taskResult = new HelixTaskResult();
       HelixManager manager = context.getManager();
       HelixDataAccessor accessor = manager.getHelixDataAccessor();
 
-      _statusUpdateUtil.logInfo(message,
-                                HelixStateTransitionHandler.class,
-                                "Message handling task begin execute",
-                                accessor);
+      _statusUpdateUtil.logInfo(message, HelixStateTransitionHandler.class,
+          "Message handling task begin execute", accessor);
       message.setExecuteStartTimeStamp(new Date().getTime());
 
-      try
-      {
-    	preHandleMessage();
+      try {
+        preHandleMessage();
         invoke(accessor, context, taskResult, message);
-      }
-      catch (HelixStateMismatchException e)
-      {
+      } catch (HelixStateMismatchException e) {
         // Simply log error and return from here if State mismatch.
         // The current state of the state model is intact.
         taskResult.setSuccess(false);
         taskResult.setMessage(e.toString());
         taskResult.setException(e);
-      }
-      catch (Exception e)
-      {
+      } catch (Exception e) {
         String errorMessage =
-            "Exception while executing a state transition task "
-                + message.getPartitionName();
+            "Exception while executing a state transition task " + message.getPartitionName();
         logger.error(errorMessage, e);
-        if (e.getCause() != null && e.getCause() instanceof InterruptedException)
-        {
+        if (e.getCause() != null && e.getCause() instanceof InterruptedException) {
           e = (InterruptedException) e.getCause();
         }
-        _statusUpdateUtil.logError(message,
-                                   HelixStateTransitionHandler.class,
-                                   e,
-                                   errorMessage,
-                                   accessor);
+        _statusUpdateUtil.logError(message, HelixStateTransitionHandler.class, e, errorMessage,
+            accessor);
         taskResult.setSuccess(false);
         taskResult.setMessage(e.toString());
         taskResult.setException(e);
@@ -332,51 +277,40 @@ public class HelixStateTransitionHandler extends MessageHandler
     }
   }
 
-  private void invoke(HelixDataAccessor accessor,
-                      NotificationContext context,
-                      HelixTaskResult taskResult,
-                      Message message) throws IllegalAccessException,
-      InvocationTargetException,
-      InterruptedException
-  {
-    _statusUpdateUtil.logInfo(message,
-                              HelixStateTransitionHandler.class,
-                              "Message handling invoking",
-                              accessor);
+  private void invoke(HelixDataAccessor accessor, NotificationContext context,
+      HelixTaskResult taskResult, Message message) throws IllegalAccessException,
+      InvocationTargetException, InterruptedException {
+    _statusUpdateUtil.logInfo(message, HelixStateTransitionHandler.class,
+        "Message handling invoking", accessor);
 
     // by default, we invoke state transition function in state model
     Method methodToInvoke = null;
     String fromState = message.getFromState();
     String toState = message.getToState();
     methodToInvoke =
-        _transitionMethodFinder.getMethodForTransition(_stateModel.getClass(),
-                                                       fromState,
-                                                       toState,
-                                                       new Class[] { Message.class,
-                                                           NotificationContext.class });
-    if (methodToInvoke != null)
-    {
-      methodToInvoke.invoke(_stateModel, new Object[] { message, context });
+        _transitionMethodFinder.getMethodForTransition(_stateModel.getClass(), fromState, toState,
+            new Class[] {
+                Message.class, NotificationContext.class
+            });
+    if (methodToInvoke != null) {
+      methodToInvoke.invoke(_stateModel, new Object[] {
+          message, context
+      });
       taskResult.setSuccess(true);
-    }
-    else
-    {
+    } else {
       String errorMessage =
-          "Unable to find method for transition from " + fromState + " to " + toState
-              + " in " + _stateModel.getClass();
+          "Unable to find method for transition from " + fromState + " to " + toState + " in "
+              + _stateModel.getClass();
       logger.error(errorMessage);
       taskResult.setSuccess(false);
 
-      _statusUpdateUtil.logError(message,
-                                 HelixStateTransitionHandler.class,
-                                 errorMessage,
-                                 accessor);
+      _statusUpdateUtil
+          .logError(message, HelixStateTransitionHandler.class, errorMessage, accessor);
     }
   }
 
   @Override
-  public void onError(Exception e, ErrorCode code, ErrorType type)
-  {
+  public void onError(Exception e, ErrorCode code, ErrorType type) {
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
     Builder keyBuilder = accessor.keyBuilder();
     String instanceName = _manager.getInstanceName();
@@ -384,8 +318,7 @@ public class HelixStateTransitionHandler extends MessageHandler
     String partition = _message.getPartitionName();
 
     // All internal error has been processed already, so we can skip them
-    if (type == ErrorType.INTERNAL)
-    {
+    if (type == ErrorType.INTERNAL) {
       logger.error("Skip internal error. errCode: " + code + ", errMsg: " + e.getMessage());
       return;
     }
@@ -402,11 +335,10 @@ public class HelixStateTransitionHandler extends MessageHandler
         if (_message.getFromState().equalsIgnoreCase(HelixDefinedState.ERROR.toString())) {
           disablePartition();
         }
-        accessor.updateProperty(keyBuilder.currentState(instanceName,
-                                                        _message.getTgtSessionId(),
-                                                        resourceName),
-                                currentStateDelta);
-    }
+        accessor.updateProperty(
+            keyBuilder.currentState(instanceName, _message.getTgtSessionId(), resourceName),
+            currentStateDelta);
+      }
     } finally {
       StateTransitionError error = new StateTransitionError(type, code, e);
       _stateModel.rollbackOnError(_message, _notificationContext, error);
@@ -415,8 +347,7 @@ public class HelixStateTransitionHandler extends MessageHandler
   }
 
   @Override
-  public void onTimeout()
-  {
+  public void onTimeout() {
     _isTimeout = true;
   }
 };

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
index 2e91cd6..d9f7ae2 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
@@ -47,23 +47,18 @@ import org.apache.helix.monitoring.StateTransitionDataPoint;
 import org.apache.helix.util.StatusUpdateUtil;
 import org.apache.log4j.Logger;
 
-
-public class HelixTask implements MessageTask
-{
-  private static Logger             logger     = Logger.getLogger(HelixTask.class);
-  private final Message             _message;
-  private final MessageHandler      _handler;
+public class HelixTask implements MessageTask {
+  private static Logger logger = Logger.getLogger(HelixTask.class);
+  private final Message _message;
+  private final MessageHandler _handler;
   private final NotificationContext _notificationContext;
-  private final HelixManager        _manager;
-  StatusUpdateUtil                  _statusUpdateUtil;
-  HelixTaskExecutor                 _executor;
-  volatile boolean                  _isTimeout = false;
+  private final HelixManager _manager;
+  StatusUpdateUtil _statusUpdateUtil;
+  HelixTaskExecutor _executor;
+  volatile boolean _isTimeout = false;
 
-  public HelixTask(Message message,
-                   NotificationContext notificationContext,
-                   MessageHandler handler,
-                   HelixTaskExecutor executor)
-  {
+  public HelixTask(Message message, NotificationContext notificationContext,
+      MessageHandler handler, HelixTaskExecutor executor) {
     this._notificationContext = notificationContext;
     this._message = message;
     this._handler = handler;
@@ -73,8 +68,7 @@ public class HelixTask implements MessageTask
   }
 
   @Override
-  public HelixTaskResult call()
-  {
+  public HelixTaskResult call() {
     HelixTaskResult taskResult = null;
 
     ErrorType type = null;
@@ -83,43 +77,33 @@ public class HelixTask implements MessageTask
     long start = System.currentTimeMillis();
     logger.info("handling task: " + getTaskId() + " begin, at: " + start);
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-    _statusUpdateUtil.logInfo(_message,
-                              HelixTask.class,
-                              "Message handling task begin execute",
-                              accessor);
+    _statusUpdateUtil.logInfo(_message, HelixTask.class, "Message handling task begin execute",
+        accessor);
     _message.setExecuteStartTimeStamp(new Date().getTime());
 
     // add a concurrent map to hold currentStateUpdates for sub-messages of a batch-message
     // partitionName -> csUpdate
     if (_message.getBatchMessageMode() == true) {
-  	  _notificationContext.add(MapKey.CURRENT_STATE_UPDATE.toString(), 
-  			  new ConcurrentHashMap<String, CurrentStateUpdate>());
+      _notificationContext.add(MapKey.CURRENT_STATE_UPDATE.toString(),
+          new ConcurrentHashMap<String, CurrentStateUpdate>());
     }
 
     // Handle the message
-    try
-    {
+    try {
       taskResult = _handler.handleMessage();
-    }
-    catch (InterruptedException e)
-    {
+    } catch (InterruptedException e) {
       taskResult = new HelixTaskResult();
       taskResult.setException(e);
       taskResult.setInterrupted(true);
 
-      _statusUpdateUtil.logError(_message,
-                                 HelixTask.class,
-                                 e,
-                                 "State transition interrupted, timeout:" + _isTimeout,
-                                 accessor);
+      _statusUpdateUtil.logError(_message, HelixTask.class, e,
+          "State transition interrupted, timeout:" + _isTimeout, accessor);
       logger.info("Message " + _message.getMsgId() + " is interrupted");
-    }
-    catch (Exception e)
-    {
+    } catch (Exception e) {
       taskResult = new HelixTaskResult();
       taskResult.setException(e);
       taskResult.setMessage(e.getMessage());
-        
+
       String errorMessage =
           "Exception while executing a message. " + e + " msgId: " + _message.getMsgId()
               + " type: " + _message.getMsgType();
@@ -129,212 +113,165 @@ public class HelixTask implements MessageTask
 
     // cancel timeout task
     _executor.cancelTimeoutTask(this);
-    
+
     Exception exception = null;
-    try
-    {
-      if (taskResult.isSuccess())
-      {
-        _statusUpdateUtil.logInfo(_message,
-                                _handler.getClass(),
-                                "Message handling task completed successfully",
-                                accessor);
+    try {
+      if (taskResult.isSuccess()) {
+        _statusUpdateUtil.logInfo(_message, _handler.getClass(),
+            "Message handling task completed successfully", accessor);
         logger.info("Message " + _message.getMsgId() + " completed.");
-      }
-      else {
-    	  type = ErrorType.INTERNAL;
-    	  
-    	  if (taskResult.isInterrupted())
-          {
-    		  logger.info("Message " + _message.getMsgId() + " is interrupted");
-    		  code = _isTimeout ? ErrorCode.TIMEOUT : ErrorCode.CANCEL;
-    		  if (_isTimeout)
-    		  {
-    			  int retryCount = _message.getRetryCount();
-    			  logger.info("Message timeout, retry count: " + retryCount + " msgId:"
-    					  + _message.getMsgId());
-    			  _statusUpdateUtil.logInfo(_message,
-                                  _handler.getClass(),
-                                  "Message handling task timeout, retryCount:"
-                                      + retryCount,
-                                  accessor);
-    			  // Notify the handler that timeout happens, and the number of retries left
-    			  // In case timeout happens (time out and also interrupted)
-    			  // we should retry the execution of the message by re-schedule it in
-    			  if (retryCount > 0)
-    			  {
-    				  _message.setRetryCount(retryCount - 1);
-                      HelixTask task = new HelixTask(_message, _notificationContext, _handler, _executor);
-                      _executor.scheduleTask(task);
-                      return taskResult;
-    			  }
-    		  }
+      } else {
+        type = ErrorType.INTERNAL;
+
+        if (taskResult.isInterrupted()) {
+          logger.info("Message " + _message.getMsgId() + " is interrupted");
+          code = _isTimeout ? ErrorCode.TIMEOUT : ErrorCode.CANCEL;
+          if (_isTimeout) {
+            int retryCount = _message.getRetryCount();
+            logger.info("Message timeout, retry count: " + retryCount + " msgId:"
+                + _message.getMsgId());
+            _statusUpdateUtil.logInfo(_message, _handler.getClass(),
+                "Message handling task timeout, retryCount:" + retryCount, accessor);
+            // Notify the handler that timeout happens, and the number of retries left
+            // In case timeout happens (time out and also interrupted)
+            // we should retry the execution of the message by re-schedule it in
+            if (retryCount > 0) {
+              _message.setRetryCount(retryCount - 1);
+              HelixTask task = new HelixTask(_message, _notificationContext, _handler, _executor);
+              _executor.scheduleTask(task);
+              return taskResult;
+            }
           }
-    	  else  // logging for errors
-    	  {
-    		  code = ErrorCode.ERROR;
-    		  String errorMsg =
-    			  "Message execution failed. msgId: " + getTaskId()
-    			  + ", errorMsg: " + taskResult.getMessage();
-    		  logger.error(errorMsg);
-    		  _statusUpdateUtil.logError(_message, _handler.getClass(), errorMsg, accessor);
-    	  }
+        } else // logging for errors
+        {
+          code = ErrorCode.ERROR;
+          String errorMsg =
+              "Message execution failed. msgId: " + getTaskId() + ", errorMsg: "
+                  + taskResult.getMessage();
+          logger.error(errorMsg);
+          _statusUpdateUtil.logError(_message, _handler.getClass(), errorMsg, accessor);
+        }
       }
-      
+
       if (_message.getAttribute(Attributes.PARENT_MSG_ID) == null) {
-    	  // System.err.println("\t[dbg]remove msg: " + getTaskId());
-          removeMessageFromZk(accessor, _message);
-          reportMessageStat(_manager, _message, taskResult);
-          sendReply(accessor, _message, taskResult);
-          _executor.finishTask(this);
+        // System.err.println("\t[dbg]remove msg: " + getTaskId());
+        removeMessageFromZk(accessor, _message);
+        reportMessageStat(_manager, _message, taskResult);
+        sendReply(accessor, _message, taskResult);
+        _executor.finishTask(this);
       }
-    }
-    catch (Exception e)
-    {
+    } catch (Exception e) {
       exception = e;
       type = ErrorType.FRAMEWORK;
       code = ErrorCode.ERROR;
-        
+
       String errorMessage =
           "Exception after executing a message, msgId: " + _message.getMsgId() + e;
       logger.error(errorMessage, e);
       _statusUpdateUtil.logError(_message, HelixTask.class, errorMessage, accessor);
-    }
-    finally
-    {
+    } finally {
       long end = System.currentTimeMillis();
       logger.info("msg: " + _message.getMsgId() + " handling task completed, results:"
           + taskResult.isSuccess() + ", at: " + end + ", took:" + (end - start));
 
       // Notify the handler about any error happened in the handling procedure, so that
       // the handler have chance to finally cleanup
-      if (type == ErrorType.INTERNAL)
-      {
+      if (type == ErrorType.INTERNAL) {
         _handler.onError(taskResult.getException(), code, type);
       } else if (type == ErrorType.FRAMEWORK) {
-    	  _handler.onError(exception, code, type);
+        _handler.onError(exception, code, type);
       }
     }
-    
+
     return taskResult;
   }
 
-  private void removeMessageFromZk(HelixDataAccessor accessor, Message message)
-  {
+  private void removeMessageFromZk(HelixDataAccessor accessor, Message message) {
     Builder keyBuilder = accessor.keyBuilder();
-    if (message.getTgtName().equalsIgnoreCase("controller"))
-    {
+    if (message.getTgtName().equalsIgnoreCase("controller")) {
       // TODO: removeProperty returns boolean
       accessor.removeProperty(keyBuilder.controllerMessage(message.getMsgId()));
-    }
-    else
-    {
-      accessor.removeProperty(keyBuilder.message(_manager.getInstanceName(),
-                                                 message.getMsgId()));
+    } else {
+      accessor.removeProperty(keyBuilder.message(_manager.getInstanceName(), message.getMsgId()));
     }
   }
 
-  private void sendReply(HelixDataAccessor accessor,
-                         Message message,
-                         HelixTaskResult taskResult)
-  {
+  private void sendReply(HelixDataAccessor accessor, Message message, HelixTaskResult taskResult) {
     if (_message.getCorrelationId() != null
-        && !message.getMsgType().equals(MessageType.TASK_REPLY.toString()))
-    {
+        && !message.getMsgType().equals(MessageType.TASK_REPLY.toString())) {
       logger.info("Sending reply for message " + message.getCorrelationId());
       _statusUpdateUtil.logInfo(message, HelixTask.class, "Sending reply", accessor);
 
       taskResult.getTaskResultMap().put("SUCCESS", "" + taskResult.isSuccess());
       taskResult.getTaskResultMap().put("INTERRUPTED", "" + taskResult.isInterrupted());
-      if (!taskResult.isSuccess())
-      {
+      if (!taskResult.isSuccess()) {
         taskResult.getTaskResultMap().put("ERRORINFO", taskResult.getMessage());
       }
       Message replyMessage =
-          Message.createReplyMessage(_message,
-                                     _manager.getInstanceName(),
-                                     taskResult.getTaskResultMap());
+          Message.createReplyMessage(_message, _manager.getInstanceName(),
+              taskResult.getTaskResultMap());
       replyMessage.setSrcInstanceType(_manager.getInstanceType());
 
-      if (message.getSrcInstanceType() == InstanceType.PARTICIPANT)
-      {
+      if (message.getSrcInstanceType() == InstanceType.PARTICIPANT) {
         Builder keyBuilder = accessor.keyBuilder();
-        accessor.setProperty(keyBuilder.message(message.getMsgSrc(),
-                                                replyMessage.getMsgId()),
-                             replyMessage);
-      }
-      else if (message.getSrcInstanceType() == InstanceType.CONTROLLER)
-      {
+        accessor.setProperty(keyBuilder.message(message.getMsgSrc(), replyMessage.getMsgId()),
+            replyMessage);
+      } else if (message.getSrcInstanceType() == InstanceType.CONTROLLER) {
         Builder keyBuilder = accessor.keyBuilder();
-        accessor.setProperty(keyBuilder.controllerMessage(replyMessage.getMsgId()),
-                             replyMessage);
+        accessor.setProperty(keyBuilder.controllerMessage(replyMessage.getMsgId()), replyMessage);
       }
-      _statusUpdateUtil.logInfo(message, HelixTask.class, "1 msg replied to "
-          + replyMessage.getTgtName(), accessor);
+      _statusUpdateUtil.logInfo(message, HelixTask.class,
+          "1 msg replied to " + replyMessage.getTgtName(), accessor);
     }
   }
 
-  private void reportMessageStat(HelixManager manager,
-                                 Message message,
-                                 HelixTaskResult taskResult)
-  {
+  private void reportMessageStat(HelixManager manager, Message message, HelixTaskResult taskResult) {
     // report stat
-    if (!message.getMsgType().equals(MessageType.STATE_TRANSITION.toString()))
-    {
+    if (!message.getMsgType().equals(MessageType.STATE_TRANSITION.toString())) {
       return;
     }
     long now = new Date().getTime();
     long msgReadTime = message.getReadTimeStamp();
     long msgExecutionStartTime = message.getExecuteStartTimeStamp();
-    if (msgReadTime != 0 && msgExecutionStartTime != 0)
-    {
+    if (msgReadTime != 0 && msgExecutionStartTime != 0) {
       long totalDelay = now - msgReadTime;
       long executionDelay = now - msgExecutionStartTime;
-      if (totalDelay > 0 && executionDelay > 0)
-      {
+      if (totalDelay > 0 && executionDelay > 0) {
         String fromState = message.getFromState();
         String toState = message.getToState();
         String transition = fromState + "--" + toState;
 
         StateTransitionContext cxt =
-            new StateTransitionContext(manager.getClusterName(),
-                                       manager.getInstanceName(),
-                                       message.getResourceName(),
-                                       transition);
+            new StateTransitionContext(manager.getClusterName(), manager.getInstanceName(),
+                message.getResourceName(), transition);
 
         StateTransitionDataPoint data =
-            new StateTransitionDataPoint(totalDelay,
-                                         executionDelay,
-                                         taskResult.isSuccess());
+            new StateTransitionDataPoint(totalDelay, executionDelay, taskResult.isSuccess());
         _executor.getParticipantMonitor().reportTransitionStat(cxt, data);
       }
-    }
-    else
-    {
+    } else {
       logger.warn("message read time and start execution time not recorded.");
     }
   }
 
   @Override
-  public String getTaskId()
-  {
-	  return _message.getId();
+  public String getTaskId() {
+    return _message.getId();
   }
-  
+
   @Override
   public Message getMessage() {
-		return _message;
+    return _message;
   }
 
   @Override
-  public 	NotificationContext getNotificationContext()
-  {
-	return _notificationContext;
+  public NotificationContext getNotificationContext() {
+    return _notificationContext;
   }
 
   @Override
   public void onTimeout() {
-	_isTimeout = true;
-	_handler.onTimeout();
+    _isTimeout = true;
+    _handler.onTimeout();
   }
 };


Mime
View raw message