helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/5] helix git commit: [HELIX-669] State Transition Cancellation Client Implementation
Date Tue, 03 Oct 2017 21:44:01 GMT
Repository: helix
Updated Branches:
  refs/heads/master 55b844657 -> 6775cd3ff


[HELIX-669] State Transition Cancellation Client Implementation

State transition takes a vital part of Helix managing clusters. There are different reasons can cause state transition is not necessary, for example, the node of partition running state transition is down. Thus state transition cancellation would be a useful feature to have in Helix. It not only helps cancel the state transition to avoid invalid state but also benefits for reducing redundant state transitions.

In this implementation following are included:
1. Add new StateTransitionCancellationHandler
2. Implement cancel logic of message received not handled, message handled task not started and message handled task started.
3. Add default implementation of cancel method in StateModel
4. Add new STATE_TRANSITION_CANCELLATION message type
5. Unit test for cancel logic


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/aa2f6411
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/aa2f6411
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/aa2f6411

Branch: refs/heads/master
Commit: aa2f64111696764824b18b60ca6ce6a140b026fc
Parents: 55b8446
Author: Junkai Xue <jxue@linkedin.com>
Authored: Tue Oct 3 11:57:21 2017 -0700
Committer: Junkai Xue <jxue@linkedin.com>
Committed: Tue Oct 3 12:24:43 2017 -0700

----------------------------------------------------------------------
 .../apache/helix/HelixRollbackException.java    |  35 ++++++
 .../stages/CurrentStateComputationStage.java    |   2 +-
 .../apache/helix/examples/BootstrapProcess.java |   2 +-
 .../apache/helix/examples/ExampleProcess.java   |   2 +-
 .../helix/manager/zk/ParticipantManager.java    |   2 +-
 .../apache/helix/manager/zk/ZKHelixAdmin.java   |   2 +-
 ...HelixStateTransitionCancellationHandler.java |  65 ++++++++++
 .../helix/messaging/handling/HelixTask.java     |  18 ++-
 .../messaging/handling/HelixTaskExecutor.java   | 120 +++++++++++++++++--
 .../messaging/handling/HelixTaskResult.java     |   9 ++
 .../messaging/handling/MessageHandler.java      |   8 ++
 .../apache/helix/model/ClusterConstraints.java  |   2 +-
 .../java/org/apache/helix/model/Message.java    |   5 +-
 .../participant/HelixStateMachineEngine.java    |  42 ++++---
 .../participant/statemachine/StateModel.java    |   7 ++
 .../org/apache/helix/util/StatusUpdateUtil.java |   6 +-
 .../helix/integration/TestMessageThrottle.java  |   2 +-
 .../integration/TestResourceGroupEndtoEnd.java  |   2 +-
 .../helix/manager/zk/TestZkHelixAdmin.java      |   8 ++
 .../handling/TestHelixTaskExecutor.java         |  78 +++++++++++-
 .../helix/mock/participant/DummyProcess.java    |   2 +-
 21 files changed, 370 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/main/java/org/apache/helix/HelixRollbackException.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixRollbackException.java b/helix-core/src/main/java/org/apache/helix/HelixRollbackException.java
new file mode 100644
index 0000000..e59e494
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/HelixRollbackException.java
@@ -0,0 +1,35 @@
+package org.apache.helix;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public class HelixRollbackException extends HelixException {
+
+  public HelixRollbackException(String message) {
+    super(message);
+  }
+
+  public HelixRollbackException(Throwable cause) {
+    super(cause);
+  }
+
+  public HelixRollbackException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 0dd4165..d548d3c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -54,7 +54,7 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
       String instanceName = instance.getInstanceName();
       Map<String, Message> instanceMessages = cache.getMessages(instanceName);
       for (Message message : instanceMessages.values()) {
-        if (!MessageType.STATE_TRANSITION.toString().equalsIgnoreCase(message.getMsgType())) {
+        if (!MessageType.STATE_TRANSITION.name().equalsIgnoreCase(message.getMsgType())) {
           continue;
         }
         if (!instance.getSessionId().equals(message.getTgtSessionId())) {

http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/main/java/org/apache/helix/examples/BootstrapProcess.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/BootstrapProcess.java b/helix-core/src/main/java/org/apache/helix/examples/BootstrapProcess.java
index d924fe5..ce4cf10 100644
--- a/helix-core/src/main/java/org/apache/helix/examples/BootstrapProcess.java
+++ b/helix-core/src/main/java/org/apache/helix/examples/BootstrapProcess.java
@@ -107,7 +107,7 @@ public class BootstrapProcess {
     stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
 
     manager.getMessagingService().registerMessageHandlerFactory(
-        MessageType.STATE_TRANSITION.toString(), stateMach);
+        MessageType.STATE_TRANSITION.name(), stateMach);
     manager.getMessagingService().registerMessageHandlerFactory(
         MessageType.USER_DEFINE_MSG.toString(), new CustomMessageHandlerFactory());
     manager.connect();

http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/main/java/org/apache/helix/examples/ExampleProcess.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/ExampleProcess.java b/helix-core/src/main/java/org/apache/helix/examples/ExampleProcess.java
index 9328cf4..ef2d84f 100644
--- a/helix-core/src/main/java/org/apache/helix/examples/ExampleProcess.java
+++ b/helix-core/src/main/java/org/apache/helix/examples/ExampleProcess.java
@@ -91,7 +91,7 @@ public class ExampleProcess {
     stateMach.registerStateModelFactory(stateModelType, stateModelFactory);
     manager.connect();
     manager.getMessagingService().registerMessageHandlerFactory(
-        MessageType.STATE_TRANSITION.toString(), stateMach);
+        MessageType.STATE_TRANSITION.name(), stateMach);
   }
 
   public void stop() {

http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
index bf7302b..209dbca 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
@@ -331,7 +331,7 @@ public class ParticipantManager {
   }
 
   private void setupMsgHandler() throws Exception {
-    _messagingService.registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
+    _messagingService.registerMessageHandlerFactory(MessageType.STATE_TRANSITION.name(),
         _stateMachineEngine);
     _manager.addMessageListener(_messagingService.getExecutor(), _instanceName);
 

http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 19a4193..fb46379 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -386,7 +386,7 @@ public class ZKHelixAdmin implements HelixAdmin {
     // check there is no pending messages for the partitions exist
     List<Message> messages = accessor.getChildValues(keyBuilder.messages(instanceName));
     for (Message message : messages) {
-      if (!MessageType.STATE_TRANSITION.toString().equalsIgnoreCase(message.getMsgType())
+      if (!MessageType.STATE_TRANSITION.name().equalsIgnoreCase(message.getMsgType())
           || !sessionId.equals(message.getTgtSessionId())
           || !resourceName.equals(message.getResourceName())
           || !resetPartitionNames.contains(message.getPartitionName())) {

http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionCancellationHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionCancellationHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionCancellationHandler.java
new file mode 100644
index 0000000..3256d8d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionCancellationHandler.java
@@ -0,0 +1,65 @@
+package org.apache.helix.messaging.handling;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixRollbackException;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.log4j.Logger;
+
+public class HelixStateTransitionCancellationHandler extends MessageHandler {
+  private final StateModel _stateModel;
+  private static final Logger logger =
+      Logger.getLogger(HelixStateTransitionCancellationHandler.class);
+
+
+  public HelixStateTransitionCancellationHandler(StateModel stateModel, Message message,
+      NotificationContext context) {
+    super(message, context);
+    _stateModel = stateModel;
+  }
+
+  @Override
+  public HelixTaskResult handleMessage() throws InterruptedException {
+    HelixTaskResult taskResult = new HelixTaskResult();
+    synchronized (_stateModel) {
+      try {
+        _stateModel.cancel();
+        taskResult.setSuccess(true);
+      } catch (Exception e) {
+        taskResult.setSuccess(false);
+        taskResult.setMessage(e.toString());
+        taskResult.setException(e);
+      }
+
+      if (taskResult.isSuccess()) {
+        throw new HelixRollbackException("Cancellation success for " + _message.getMsgId());
+      }
+    }
+    return taskResult;
+  }
+
+  @Override
+  public void onError(Exception e, ErrorCode code, ErrorType type) {
+    // Nothing need to do when it is error
+    logger.warn("No extra action needed when cancel failed.");
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
index 1aa932e..12193f0 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
+import org.apache.helix.HelixRollbackException;
 import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.NotificationContext.MapKey;
@@ -91,6 +92,14 @@ public class HelixTask implements MessageTask {
       _statusUpdateUtil.logError(_message, HelixTask.class, e,
           "State transition interrupted, timeout:" + _isTimeout, accessor);
       logger.info("Message " + _message.getMsgId() + " is interrupted");
+    } catch (HelixRollbackException e) {
+      // TODO : Support cancel to any state
+      logger.info(
+          "Rollback happened of state transition on resource \"" + _message.getResourceName()
+              + "\" partition \"" + _message.getPartitionName() + "\" from \"" + _message
+              .getFromState() + "\" to \"" + _message.getToState() + "\"");
+      taskResult = new HelixTaskResult();
+      taskResult.setCancelled(true);
     } catch (Exception e) {
       taskResult = new HelixTaskResult();
       taskResult.setException(e);
@@ -136,6 +145,13 @@ public class HelixTask implements MessageTask {
               return taskResult;
             }
           }
+        } else if (taskResult.isCancelled()) {
+          // Cancellation success, report message complete
+          _statusUpdateUtil
+              .logInfo(_message, _handler.getClass(), "Cancellation completed successfully",
+                  accessor);
+          _executor.getParticipantMonitor().reportProcessedMessage(_message,
+              ParticipantMessageMonitor.ProcessedMessageState.COMPLETED);
         } else // logging for errors
         {
           code = ErrorCode.ERROR;
@@ -223,7 +239,7 @@ public class HelixTask implements MessageTask {
 
   private void reportMessageStat(HelixManager manager, Message message, HelixTaskResult taskResult) {
     // report stat
-    if (!message.getMsgType().equals(MessageType.STATE_TRANSITION.toString())) {
+    if (!message.getMsgType().equals(MessageType.STATE_TRANSITION.name())) {
       return;
     }
     long now = new Date().getTime();

http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 3f6a43d..41b5d0d 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -22,6 +22,7 @@ package org.apache.helix.messaging.handling;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -102,6 +103,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
   // From storage point of view, only bootstrap case is expensive
   // and we need to throttle, which is mostly IO / network bounded.
   public static final int DEFAULT_PARALLEL_TASKS = 40;
+  public static final int DEFAULT_CANCELLATION_THREADPOOL_SIZE = 40;
   // TODO: create per-task type threadpool with customizable pool size
   protected final Map<String, MessageTaskInfo> _taskMap;
   private final Object _lock;
@@ -121,6 +123,9 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
 
   final ConcurrentHashMap<String, ExecutorService> _executorMap;
 
+  final ConcurrentHashMap<String, Future<HelixTaskResult>> _messageFutureMap;
+  private ExecutorService _cancellationExcutorService;
+
   /**
    * separate executor for executing batch messages
    */
@@ -144,6 +149,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
 
     _hdlrFtyRegistry = new ConcurrentHashMap<String, MsgHandlerFactoryRegistryItem>();
     _executorMap = new ConcurrentHashMap<String, ExecutorService>();
+    _messageFutureMap = new ConcurrentHashMap<String, Future<HelixTaskResult>>();
+    _cancellationExcutorService = Executors.newFixedThreadPool(DEFAULT_CANCELLATION_THREADPOOL_SIZE);
     _batchMessageExecutorService = Executors.newFixedThreadPool(DEFAULT_PARALLEL_TASKS);
     _batchMessageThreadpoolChecked = false;
     _resourcesThreadpoolChecked =
@@ -209,7 +216,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
    *  This method is to check it and update the thread pool if necessary.
    */
   private void updateStateTransitionMessageThreadPool(Message message, HelixManager manager) {
-    if (!message.getMsgType().equals(MessageType.STATE_TRANSITION.toString())) {
+    if (!message.getMsgType().equals(MessageType.STATE_TRANSITION.name())) {
       return;
     }
 
@@ -245,7 +252,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
               "Failed to parse ThreadPoolSize from resourceConfig for resource" + resourceName, e);
         }
       }
-      String key = MessageType.STATE_TRANSITION.toString() + "." + resourceName;
+      String key = MessageType.STATE_TRANSITION.name() + "." + resourceName;
       if (threadpoolSize > 0) {
         _executorMap.put(key, Executors.newFixedThreadPool(threadpoolSize));
         LOG.info("Added dedicate threadpool for resource: " + resourceName + " with size: "
@@ -283,9 +290,11 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
    */
   ExecutorService findExecutorServiceForMsg(Message message) {
     ExecutorService executorService = _executorMap.get(message.getMsgType());
-    if (message.getMsgType().equals(MessageType.STATE_TRANSITION.toString())) {
+    if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name())) {
       if(message.getBatchMessageMode() == true) {
         executorService = _batchMessageExecutorService;
+      } else if (message.getMsgType().equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) {
+        executorService = _cancellationExcutorService;
       } else {
         String resourceName = message.getResourceName();
         if (resourceName != null) {
@@ -333,6 +342,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
       String taskId = task.getTaskId();
       if (_taskMap.containsKey(taskId)) {
         MessageTaskInfo info = _taskMap.get(taskId);
+        removeMessageFromFutureMap(task.getMessage());
         if (info._timerTask != null) {
           info._timerTask.cancel();
         }
@@ -369,6 +379,9 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
 
           LOG.info("Submit task: " + taskId + " to pool: " + exeSvc);
           Future<HelixTaskResult> future = exeSvc.submit(task);
+          _messageFutureMap
+              .putIfAbsent(getMessageTarget(message.getResourceName(), message.getPartitionName()),
+                  future);
 
           TimerTask timerTask = null;
           if (message.getExecutionTimeout() > 0) {
@@ -416,7 +429,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
 
         // cancel task
         Future<HelixTaskResult> future = taskInfo.getFuture();
-
+        removeMessageFromFutureMap(message);
         _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "Canceling task: " + taskId,
             notificationContext.getManager().getHelixDataAccessor());
 
@@ -450,6 +463,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
     synchronized (_lock) {
       if (_taskMap.containsKey(taskId)) {
         MessageTaskInfo info = _taskMap.remove(taskId);
+        removeMessageFromFutureMap(message);
         if (info._timerTask != null) {
           // ok to cancel multiple times
           info._timerTask.cancel();
@@ -544,6 +558,9 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
     }
     _taskMap.clear();
 
+    shutdownAndAwaitTermination(_cancellationExcutorService);
+    _messageFutureMap.clear();
+
     _lastSessionSyncTime = null;
   }
 
@@ -631,7 +648,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
     Builder keyBuilder = accessor.keyBuilder();
 
     // message handlers created
-    List<MessageHandler> handlers = new ArrayList<MessageHandler>();
+    Map<String, MessageHandler> stateTransitionHandlers = new HashMap<String, MessageHandler>();
+    List<MessageHandler> nonStateTransitionHandlers = new ArrayList<MessageHandler>();
 
     // message read
     List<Message> readMsgs = new ArrayList<Message>();
@@ -702,6 +720,44 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
         continue;
       }
 
+      // State Transition Cancellation
+      // Three Types of Cancellation: 1. Message arrived with previous state transition
+      //                              2. Message handled but task not started
+      //                              3. Message handled and task already started
+      if (message.getMsgType().equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) {
+        String messageTarget =
+            getMessageTarget(message.getResourceName(), message.getPartitionName());
+        // State transition message and cancel message are in same batch
+        if (stateTransitionHandlers.containsKey(messageTarget)) {
+          markReadMessage(message, changeContext, accessor);
+          readMsgs.add(message);
+          _monitor.reportProcessedMessage(message,
+              ParticipantMessageMonitor.ProcessedMessageState.COMPLETED);
+          removeMessageFromZk(accessor, message, instanceName);
+          removeMessageFromZk(accessor, stateTransitionHandlers.get(messageTarget).getMessage(),
+              instanceName);
+          stateTransitionHandlers.remove(messageTarget);
+          continue;
+        } else {
+          if (_messageFutureMap.containsKey(messageTarget)) {
+            Future<HelixTaskResult> future = _messageFutureMap.get(messageTarget);
+            // Cancel the from future without interrupt ->  Cancel the task future without
+            // interruptting the state transition that is already started.  If the state transition
+            // is already started, we should call cancel in the state model.
+            if (future.cancel(false) || future.isDone()) {
+              markReadMessage(message, changeContext, accessor);
+              readMsgs.add(message);
+              _monitor.reportProcessedMessage(message,
+                  ParticipantMessageMonitor.ProcessedMessageState.COMPLETED);
+              removeMessageFromZk(accessor, message, instanceName);
+              removeMessageFromZk(accessor, stateTransitionHandlers.get(messageTarget).getMessage(),
+                  instanceName);
+              continue;
+            }
+          }
+        }
+      }
+
       // create message handlers, if handlers not found, leave its state as NEW
       try {
         MessageHandler createHandler = createMessageHandler(message, changeContext);
@@ -709,7 +765,14 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
           _monitor.reportProcessedMessage(message, ParticipantMessageMonitor.ProcessedMessageState.DISCARDED);
           continue;
         }
-        handlers.add(createHandler);
+        if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name()) || message.getMsgType()
+            .equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) {
+          stateTransitionHandlers
+              .put(getMessageTarget(message.getResourceName(), message.getPartitionName()),
+                  createHandler);
+        } else {
+          nonStateTransitionHandlers.add(createHandler);
+        }
       } catch (Exception e) {
         LOG.error("Failed to create message handler for " + message.getMsgId(), e);
         String error =
@@ -724,10 +787,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
         continue;
       }
 
-      // update msgState to read
-      message.setMsgState(MessageState.READ);
-      message.setReadTimeStamp(new Date().getTime());
-      message.setExecuteSessionId(changeContext.getManager().getSessionId());
+      markReadMessage(message, changeContext, accessor);
 
       _statusUpdateUtil.logInfo(message, HelixStateMachineEngine.class, "New Message", accessor);
 
@@ -736,7 +796,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
       // batch creation of all current state meta data
       // do it for non-controller and state transition messages only
       if (!message.isControlerMsg()
-          && message.getMsgType().equals(Message.MessageType.STATE_TRANSITION.toString())) {
+          && message.getMsgType().equals(Message.MessageType.STATE_TRANSITION.name())) {
         String resourceName = message.getResourceName();
         if (!curResourceNames.contains(resourceName) && !createCurStateNames.contains(resourceName)) {
           createCurStateNames.add(resourceName);
@@ -772,13 +832,27 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
     if (readMsgs.size() > 0) {
       updateMessageState(readMsgs, accessor, instanceName);
 
-      for (MessageHandler handler : handlers) {
+      for (MessageHandler handler : stateTransitionHandlers.values()) {
+        HelixTask task = new HelixTask(handler._message, changeContext, handler, this);
+        scheduleTask(task);
+      }
+
+      for (MessageHandler handler : nonStateTransitionHandlers) {
         HelixTask task = new HelixTask(handler._message, changeContext, handler, this);
         scheduleTask(task);
       }
     }
   }
 
+  private void markReadMessage(Message message, NotificationContext context,
+      HelixDataAccessor accessor) {
+    message.setMsgState(MessageState.READ);
+    message.setReadTimeStamp(new Date().getTime());
+    message.setExecuteSessionId(context.getManager().getSessionId());
+
+    _statusUpdateUtil.logInfo(message, HelixStateMachineEngine.class, "New Message", accessor);
+  }
+
   public MessageHandler createMessageHandler(Message message, NotificationContext changeContext) {
     String msgType = message.getMsgType().toString();
 
@@ -800,6 +874,28 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
     return handlerFactory.createHandler(message, changeContext);
   }
 
+  private void removeMessageFromFutureMap(Message message) {
+    String messageTarget = getMessageTarget(message.getResourceName(), message.getPartitionName());
+    if (_messageFutureMap.containsKey(messageTarget)) {
+      _messageFutureMap.remove(messageTarget);
+    }
+  }
+
+  private String getMessageTarget(String resourceName, String partitionName) {
+    return String.format("%s_%s", resourceName, partitionName);
+  }
+
+  private void removeMessageFromZk(HelixDataAccessor accessor, Message message,
+      String instanceName) {
+    Builder keyBuilder = accessor.keyBuilder();
+    if (message.getTgtName().equalsIgnoreCase("controller")) {
+      // TODO: removeProperty returns boolean
+      accessor.removeProperty(keyBuilder.controllerMessage(message.getMsgId()));
+    } else {
+      accessor.removeProperty(keyBuilder.message(instanceName, message.getMsgId()));
+    }
+  }
+
   @Override
   public void shutdown() {
     LOG.info("Shutting down HelixTaskExecutor");

http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java
index ced9c65..5ed6140 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java
@@ -25,6 +25,7 @@ import java.util.Map;
 public class HelixTaskResult {
 
   private boolean _success;
+  private boolean _cancelled;
   private String _message = "";
   private String _info = "";
   private Map<String, String> _taskResultMap = new HashMap<String, String>();
@@ -43,6 +44,14 @@ public class HelixTaskResult {
     _interrupted = interrupted;
   }
 
+  public boolean isCancelled() {
+    return _cancelled;
+  }
+
+  public void setCancelled(boolean cancelled) {
+    _cancelled = cancelled;
+  }
+
   public void setSuccess(boolean success) {
     this._success = success;
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandler.java
index 5715571..506886d 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandler.java
@@ -80,4 +80,12 @@ public abstract class MessageHandler {
   public void onTimeout() {
 
   }
+
+  /**
+   * Get message for this message handler
+   * @return
+   */
+  public Message getMessage() {
+    return _message;
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
index eff68f7..8da5bc3 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
@@ -164,7 +164,7 @@ public class ClusterConstraints extends HelixProperty {
     Map<ConstraintAttribute, String> attributes = new TreeMap<ConstraintAttribute, String>();
     String msgType = msg.getMsgType();
     attributes.put(ConstraintAttribute.MESSAGE_TYPE, msgType);
-    if (MessageType.STATE_TRANSITION.toString().equals(msgType)) {
+    if (MessageType.STATE_TRANSITION.name().equals(msgType)) {
       if (msg.getFromState() != null && msg.getToState() != null) {
         attributes.put(ConstraintAttribute.TRANSITION, msg.getFromState() + "-" + msg.getToState());
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/main/java/org/apache/helix/model/Message.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java
index 2901b1c..ae90829 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -43,6 +43,7 @@ public class Message extends HelixProperty {
    */
   public enum MessageType {
     STATE_TRANSITION,
+    STATE_TRANSITION_CANCELLATION,
     SCHEDULER_MSG,
     USER_DEFINE_MSG,
     CONTROLLER_MSG,
@@ -50,7 +51,7 @@ public class Message extends HelixProperty {
     NO_OP,
     PARTICIPANT_ERROR_REPORT,
     PARTICIPANT_SESSION_CHANGE
-  };
+  }
 
   /**
    * Properties attached to Messages
@@ -696,7 +697,7 @@ public class Message extends HelixProperty {
     // TODO: refactor message to state transition message and task-message and
     // implement this function separately
 
-    if (getMsgType().equals(MessageType.STATE_TRANSITION.toString())) {
+    if (getMsgType().equals(MessageType.STATE_TRANSITION.name())) {
       boolean isNotValid =
           isNullOrEmpty(getTgtName()) || isNullOrEmpty(getPartitionName())
               || isNullOrEmpty(getResourceName()) || isNullOrEmpty(getStateModelDef())

http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
index 8143b2f..4c38fa4 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
@@ -33,13 +33,14 @@ import org.apache.helix.NotificationContext.MapKey;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.messaging.handling.BatchMessageHandler;
 import org.apache.helix.messaging.handling.BatchMessageWrapper;
+import org.apache.helix.messaging.handling.HelixStateTransitionCancellationHandler;
 import org.apache.helix.messaging.handling.HelixStateTransitionHandler;
 import org.apache.helix.messaging.handling.MessageHandler;
 import org.apache.helix.messaging.handling.TaskExecutor;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Message;
-import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.participant.statemachine.StateModel;
 import org.apache.helix.participant.statemachine.StateModelFactory;
 import org.apache.helix.participant.statemachine.StateModelParser;
@@ -161,7 +162,7 @@ public class HelixStateMachineEngine implements StateMachineEngine {
   public MessageHandler createHandler(Message message, NotificationContext context) {
     String type = message.getMsgType();
 
-    if (!type.equals(MessageType.STATE_TRANSITION.toString())) {
+    if (!type.equals(MessageType.STATE_TRANSITION.name())) {
       throw new HelixException("Expect state-transition message type, but was "
           + message.getMsgType() + ", msgId: " + message.getMsgId());
     }
@@ -207,7 +208,6 @@ public class HelixStateMachineEngine implements StateMachineEngine {
     }
 
     if (message.getBatchMessageMode() == false) {
-      // create currentStateDelta for this partition
       String initState = _stateModelDefs.get(message.getStateModelDef()).getInitialState();
       StateModel stateModel = stateModelFactory.getStateModel(resourceName, partitionKey);
       if (stateModel == null) {
@@ -215,18 +215,23 @@ public class HelixStateMachineEngine implements StateMachineEngine {
         stateModel.updateState(initState);
       }
 
-      // TODO: move currentStateDelta to StateTransitionMsgHandler
-      CurrentState currentStateDelta = new CurrentState(resourceName);
-      currentStateDelta.setSessionId(sessionId);
-      currentStateDelta.setStateModelDefRef(stateModelName);
-      currentStateDelta.setStateModelFactoryName(factoryName);
-      currentStateDelta.setBucketSize(bucketSize);
-
-      currentStateDelta.setState(partitionKey, (stateModel.getCurrentState() == null) ? initState
-          : stateModel.getCurrentState());
-
-      return new HelixStateTransitionHandler(stateModelFactory, stateModel, message, context,
-          currentStateDelta);
+      if (message.getMsgType().equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) {
+        return new HelixStateTransitionCancellationHandler(stateModel, message, context);
+      } else {
+        // create currentStateDelta for this partition
+        // TODO: move currentStateDelta to StateTransitionMsgHandler
+        CurrentState currentStateDelta = new CurrentState(resourceName);
+        currentStateDelta.setSessionId(sessionId);
+        currentStateDelta.setStateModelDefRef(stateModelName);
+        currentStateDelta.setStateModelFactoryName(factoryName);
+        currentStateDelta.setBucketSize(bucketSize);
+
+        currentStateDelta.setState(partitionKey,
+            (stateModel.getCurrentState() == null) ? initState : stateModel.getCurrentState());
+
+        return new HelixStateTransitionHandler(stateModelFactory, stateModel, message, context,
+            currentStateDelta);
+      }
     } else {
       BatchMessageWrapper wrapper = stateModelFactory.getBatchMessageWrapper(resourceName);
       if (wrapper == null) {
@@ -236,8 +241,9 @@ public class HelixStateMachineEngine implements StateMachineEngine {
       // get executor-service for the message
       TaskExecutor executor = (TaskExecutor) context.get(MapKey.TASK_EXECUTOR.toString());
       if (executor == null) {
-        logger.error("fail to get executor-service for batch message: " + message.getId()
-            + ". msgType: " + message.getMsgType() + ", resource: " + message.getResourceName());
+        logger.error(
+            "fail to get executor-service for batch message: " + message.getId() + ". msgType: "
+                + message.getMsgType() + ", resource: " + message.getResourceName());
         return null;
       }
       return new BatchMessageHandler(message, context, this, wrapper, executor);
@@ -246,7 +252,7 @@ public class HelixStateMachineEngine implements StateMachineEngine {
 
   @Override
   public String getMessageType() {
-    return MessageType.STATE_TRANSITION.toString();
+    return MessageType.STATE_TRANSITION.name();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
index b88262b..56ea430 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
@@ -25,6 +25,7 @@ import org.apache.log4j.Logger;
 
 public abstract class StateModel {
   static final String DEFAULT_INITIAL_STATE = "OFFLINE";
+  private boolean _cancelled;
   Logger logger = Logger.getLogger(StateModel.class);
 
   // TODO Get default state from implementation or from state model annotation
@@ -80,4 +81,10 @@ public abstract class StateModel {
     logger.info("Default ERROR->DROPPED transition invoked.");
   }
 
+  /**
+   * Default implementation for cancelling state transition
+   */
+  public void cancel() {
+    _cancelled = true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/main/java/org/apache/helix/util/StatusUpdateUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/util/StatusUpdateUtil.java b/helix-core/src/main/java/org/apache/helix/util/StatusUpdateUtil.java
index 02c39d1..b508c15 100644
--- a/helix-core/src/main/java/org/apache/helix/util/StatusUpdateUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/StatusUpdateUtil.java
@@ -435,7 +435,7 @@ public class StatusUpdateUtil {
   }
 
   private String getStatusUpdateKey(Message message) {
-    if (message.getMsgType().equalsIgnoreCase(MessageType.STATE_TRANSITION.toString())) {
+    if (message.getMsgType().equalsIgnoreCase(MessageType.STATE_TRANSITION.name())) {
       return message.getPartitionName();
     }
     return message.getMsgId();
@@ -445,7 +445,7 @@ public class StatusUpdateUtil {
    * Generate the sub-path under STATUSUPDATE or ERROR path for a status update
    */
   String getStatusUpdateSubPath(Message message) {
-    if (message.getMsgType().equalsIgnoreCase(MessageType.STATE_TRANSITION.toString())) {
+    if (message.getMsgType().equalsIgnoreCase(MessageType.STATE_TRANSITION.name())) {
       return message.getResourceName();
     } else {
       return message.getMsgType();
@@ -453,7 +453,7 @@ public class StatusUpdateUtil {
   }
 
   String getStatusUpdateRecordName(Message message) {
-    if (message.getMsgType().equalsIgnoreCase(MessageType.STATE_TRANSITION.toString())) {
+    if (message.getMsgType().equalsIgnoreCase(MessageType.STATE_TRANSITION.name())) {
       return message.getTgtSessionId() + "__" + message.getResourceName();
     }
     return message.getMsgId();

http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle.java b/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle.java
index 615eab6..2058a62 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle.java
@@ -98,7 +98,7 @@ public class TestMessageThrottle extends ZkIntegrationTestBase {
             int transitionMsgCount = 0;
             for (ZNRecord record : records) {
               Message msg = new Message(record);
-              if (msg.getMsgType().equals(Message.MessageType.STATE_TRANSITION.toString())) {
+              if (msg.getMsgType().equals(Message.MessageType.STATE_TRANSITION.name())) {
                 transitionMsgCount++;
               }
             }

http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java b/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java
index f0eba23..4a986b4 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java
@@ -277,7 +277,7 @@ public class TestResourceGroupEndtoEnd extends ZkIntegrationTestBase {
       stateMach.registerStateModelFactory("OnlineOffline", stateModelFactory2);
 
       manager.connect();
-      //manager.getMessagingService().registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(), genericStateMachineHandler);
+      //manager.getMessagingService().registerMessageHandlerFactory(MessageType.STATE_TRANSITION.name(), genericStateMachineHandler);
       return manager;
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
index 236a5b2..85433cd 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
@@ -56,9 +56,17 @@ import org.apache.helix.tools.StateModelConfigGenerator;
 import org.apache.zookeeper.data.Stat;
 import org.testng.Assert;
 import org.testng.AssertJUnit;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 public class TestZkHelixAdmin extends ZkUnitTestBase {
+
+  @BeforeClass
+  public void beforeClass() {
+
+  }
+
+  @Test
   public void testZkHelixAdmin() {
     //TODO refactor this test into small test cases and use @before annotations
     System.out.println("START testZkHelixAdmin at " + new Date(System.currentTimeMillis()));

http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
index 1ff6595..2f78007 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
@@ -29,10 +29,6 @@ import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.Mocks;
 import org.apache.helix.NotificationContext;
-import org.apache.helix.messaging.handling.HelixTaskExecutor;
-import org.apache.helix.messaging.handling.HelixTaskResult;
-import org.apache.helix.messaging.handling.MessageHandler;
-import org.apache.helix.messaging.handling.MessageHandlerFactory;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageState;
 import org.testng.Assert;
@@ -179,6 +175,41 @@ public class TestHelixTaskExecutor {
     }
   }
 
+  class TestStateTransitionHandlerFactory implements MessageHandlerFactory {
+    ConcurrentHashMap<String, String> _processedMsgIds = new ConcurrentHashMap<String, String>();
+    private final String _msgType;
+    public TestStateTransitionHandlerFactory(String msgType) {
+      _msgType = msgType;
+    }
+    class TestStateTransitionMessageHandler extends MessageHandler {
+      public TestStateTransitionMessageHandler(Message message, NotificationContext context) {
+        super(message, context);
+      }
+
+      @Override public HelixTaskResult handleMessage() throws InterruptedException {
+        HelixTaskResult result = new HelixTaskResult();
+        _processedMsgIds.put(_message.getMsgId(), _message.getMsgId());
+        result.setSuccess(true);
+        return result;
+      }
+
+      @Override public void onError(Exception e, ErrorCode code, ErrorType type) {
+      }
+    }
+
+    @Override public MessageHandler createHandler(Message message, NotificationContext context) {
+      return new TestStateTransitionMessageHandler(message, context);
+    }
+
+    @Override public String getMessageType() {
+      return _msgType;
+    }
+
+    @Override public void reset() {
+
+    }
+  }
+
   @Test()
   public void testNormalMsgExecution() throws InterruptedException {
     System.out.println("START TestCMTaskExecutor.testNormalMsgExecution()");
@@ -547,4 +578,43 @@ public class TestHelixTaskExecutor {
     AssertJUnit.assertTrue(executor._taskMap.size() == 0);
 
   }
+
+  @Test
+  public void testStateTransitionCancellationMsg() throws InterruptedException {
+    HelixTaskExecutor executor = new HelixTaskExecutor();
+    HelixManager manager = new MockClusterManager();
+
+    TestStateTransitionHandlerFactory stateTransitionFactory = new TestStateTransitionHandlerFactory(Message.MessageType.STATE_TRANSITION.name());
+    TestStateTransitionHandlerFactory cancelFactory = new TestStateTransitionHandlerFactory(Message.MessageType.STATE_TRANSITION_CANCELLATION
+        .name());
+    executor.registerMessageHandlerFactory(Message.MessageType.STATE_TRANSITION.name(), stateTransitionFactory);
+    executor.registerMessageHandlerFactory(Message.MessageType.STATE_TRANSITION_CANCELLATION.name(), cancelFactory);
+
+
+    NotificationContext changeContext = new NotificationContext(manager);
+
+    List<Message> msgList = new ArrayList<Message>();
+    Message msg1 = new Message(Message.MessageType.STATE_TRANSITION, UUID.randomUUID().toString());
+    msg1.setTgtSessionId("*");
+    msg1.setPartitionName("P1");
+    msg1.setResourceName("R1");
+    msg1.setTgtName("Localhost_1123");
+    msg1.setSrcName("127.101.1.23_2234");
+    msgList.add(msg1);
+
+    Message msg2 = new Message(Message.MessageType.STATE_TRANSITION_CANCELLATION, UUID.randomUUID().toString());
+    msg2.setTgtSessionId("*");
+    msg2.setPartitionName("P1");
+    msg2.setResourceName("R1");
+    msg2.setTgtName("Localhost_1123");
+    msg2.setSrcName("127.101.1.23_2234");
+    msgList.add(msg2);
+
+    executor.onMessage("someInstance", msgList, changeContext);
+
+    Thread.sleep(3000);
+    AssertJUnit.assertEquals(cancelFactory._processedMsgIds.size(), 0);
+    AssertJUnit.assertEquals(stateTransitionFactory._processedMsgIds.size(), 0);
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/test/java/org/apache/helix/mock/participant/DummyProcess.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/DummyProcess.java b/helix-core/src/test/java/org/apache/helix/mock/participant/DummyProcess.java
index 085f822..cd30233 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/participant/DummyProcess.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/participant/DummyProcess.java
@@ -102,7 +102,7 @@ public class DummyProcess {
     stateMach.registerStateModelFactory("OnlineOffline", stateModelFactory2);
 
     manager.connect();
-    // manager.getMessagingService().registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
+    // manager.getMessagingService().registerMessageHandlerFactory(MessageType.STATE_TRANSITION.name(),
     // genericStateMachineHandler);
     return manager;
   }


Mime
View raw message