Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7D085200D33 for ; Tue, 3 Oct 2017 23:44:04 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 7B83F1609DE; Tue, 3 Oct 2017 21:44:04 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A81EE1609BD for ; Tue, 3 Oct 2017 23:44:02 +0200 (CEST) Received: (qmail 64416 invoked by uid 500); 3 Oct 2017 21:44:01 -0000 Mailing-List: contact commits-help@helix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@helix.apache.org Delivered-To: mailing list commits@helix.apache.org Received: (qmail 64407 invoked by uid 99); 3 Oct 2017 21:44:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Oct 2017 21:44:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B53BAF56FF; Tue, 3 Oct 2017 21:44:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jxue@apache.org To: commits@helix.apache.org Date: Tue, 03 Oct 2017 21:44:01 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/5] helix git commit: [HELIX-669] State Transition Cancellation Client Implementation archived-at: Tue, 03 Oct 2017 21:44:04 -0000 Repository: helix Updated Branches: refs/heads/master 55b844657 -> 6775cd3ff [HELIX-669] State Transition Cancellation Client Implementation State transition takes a vital part of Helix managing clusters. There are different reasons can cause state transition is not necessary, for example, the node of partition running state transition is down. Thus state transition cancellation would be a useful feature to have in Helix. It not only helps cancel the state transition to avoid invalid state but also benefits for reducing redundant state transitions. In this implementation following are included: 1. Add new StateTransitionCancellationHandler 2. Implement cancel logic of message received not handled, message handled task not started and message handled task started. 3. Add default implementation of cancel method in StateModel 4. Add new STATE_TRANSITION_CANCELLATION message type 5. Unit test for cancel logic Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/aa2f6411 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/aa2f6411 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/aa2f6411 Branch: refs/heads/master Commit: aa2f64111696764824b18b60ca6ce6a140b026fc Parents: 55b8446 Author: Junkai Xue Authored: Tue Oct 3 11:57:21 2017 -0700 Committer: Junkai Xue Committed: Tue Oct 3 12:24:43 2017 -0700 ---------------------------------------------------------------------- .../apache/helix/HelixRollbackException.java | 35 ++++++ .../stages/CurrentStateComputationStage.java | 2 +- .../apache/helix/examples/BootstrapProcess.java | 2 +- .../apache/helix/examples/ExampleProcess.java | 2 +- .../helix/manager/zk/ParticipantManager.java | 2 +- .../apache/helix/manager/zk/ZKHelixAdmin.java | 2 +- ...HelixStateTransitionCancellationHandler.java | 65 ++++++++++ .../helix/messaging/handling/HelixTask.java | 18 ++- .../messaging/handling/HelixTaskExecutor.java | 120 +++++++++++++++++-- .../messaging/handling/HelixTaskResult.java | 9 ++ .../messaging/handling/MessageHandler.java | 8 ++ .../apache/helix/model/ClusterConstraints.java | 2 +- .../java/org/apache/helix/model/Message.java | 5 +- .../participant/HelixStateMachineEngine.java | 42 ++++--- .../participant/statemachine/StateModel.java | 7 ++ .../org/apache/helix/util/StatusUpdateUtil.java | 6 +- .../helix/integration/TestMessageThrottle.java | 2 +- .../integration/TestResourceGroupEndtoEnd.java | 2 +- .../helix/manager/zk/TestZkHelixAdmin.java | 8 ++ .../handling/TestHelixTaskExecutor.java | 78 +++++++++++- .../helix/mock/participant/DummyProcess.java | 2 +- 21 files changed, 370 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/main/java/org/apache/helix/HelixRollbackException.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/HelixRollbackException.java b/helix-core/src/main/java/org/apache/helix/HelixRollbackException.java new file mode 100644 index 0000000..e59e494 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/HelixRollbackException.java @@ -0,0 +1,35 @@ +package org.apache.helix; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +public class HelixRollbackException extends HelixException { + + public HelixRollbackException(String message) { + super(message); + } + + public HelixRollbackException(Throwable cause) { + super(cause); + } + + public HelixRollbackException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java index 0dd4165..d548d3c 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java @@ -54,7 +54,7 @@ public class CurrentStateComputationStage extends AbstractBaseStage { String instanceName = instance.getInstanceName(); Map instanceMessages = cache.getMessages(instanceName); for (Message message : instanceMessages.values()) { - if (!MessageType.STATE_TRANSITION.toString().equalsIgnoreCase(message.getMsgType())) { + if (!MessageType.STATE_TRANSITION.name().equalsIgnoreCase(message.getMsgType())) { continue; } if (!instance.getSessionId().equals(message.getTgtSessionId())) { http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/main/java/org/apache/helix/examples/BootstrapProcess.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/examples/BootstrapProcess.java b/helix-core/src/main/java/org/apache/helix/examples/BootstrapProcess.java index d924fe5..ce4cf10 100644 --- a/helix-core/src/main/java/org/apache/helix/examples/BootstrapProcess.java +++ b/helix-core/src/main/java/org/apache/helix/examples/BootstrapProcess.java @@ -107,7 +107,7 @@ public class BootstrapProcess { stateMach.registerStateModelFactory("MasterSlave", stateModelFactory); manager.getMessagingService().registerMessageHandlerFactory( - MessageType.STATE_TRANSITION.toString(), stateMach); + MessageType.STATE_TRANSITION.name(), stateMach); manager.getMessagingService().registerMessageHandlerFactory( MessageType.USER_DEFINE_MSG.toString(), new CustomMessageHandlerFactory()); manager.connect(); http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/main/java/org/apache/helix/examples/ExampleProcess.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/examples/ExampleProcess.java b/helix-core/src/main/java/org/apache/helix/examples/ExampleProcess.java index 9328cf4..ef2d84f 100644 --- a/helix-core/src/main/java/org/apache/helix/examples/ExampleProcess.java +++ b/helix-core/src/main/java/org/apache/helix/examples/ExampleProcess.java @@ -91,7 +91,7 @@ public class ExampleProcess { stateMach.registerStateModelFactory(stateModelType, stateModelFactory); manager.connect(); manager.getMessagingService().registerMessageHandlerFactory( - MessageType.STATE_TRANSITION.toString(), stateMach); + MessageType.STATE_TRANSITION.name(), stateMach); } public void stop() { http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java index bf7302b..209dbca 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java @@ -331,7 +331,7 @@ public class ParticipantManager { } private void setupMsgHandler() throws Exception { - _messagingService.registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(), + _messagingService.registerMessageHandlerFactory(MessageType.STATE_TRANSITION.name(), _stateMachineEngine); _manager.addMessageListener(_messagingService.getExecutor(), _instanceName); http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java index 19a4193..fb46379 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java @@ -386,7 +386,7 @@ public class ZKHelixAdmin implements HelixAdmin { // check there is no pending messages for the partitions exist List messages = accessor.getChildValues(keyBuilder.messages(instanceName)); for (Message message : messages) { - if (!MessageType.STATE_TRANSITION.toString().equalsIgnoreCase(message.getMsgType()) + if (!MessageType.STATE_TRANSITION.name().equalsIgnoreCase(message.getMsgType()) || !sessionId.equals(message.getTgtSessionId()) || !resourceName.equals(message.getResourceName()) || !resetPartitionNames.contains(message.getPartitionName())) { http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionCancellationHandler.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionCancellationHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionCancellationHandler.java new file mode 100644 index 0000000..3256d8d --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionCancellationHandler.java @@ -0,0 +1,65 @@ +package org.apache.helix.messaging.handling; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.HelixRollbackException; +import org.apache.helix.NotificationContext; +import org.apache.helix.model.Message; +import org.apache.helix.participant.statemachine.StateModel; +import org.apache.log4j.Logger; + +public class HelixStateTransitionCancellationHandler extends MessageHandler { + private final StateModel _stateModel; + private static final Logger logger = + Logger.getLogger(HelixStateTransitionCancellationHandler.class); + + + public HelixStateTransitionCancellationHandler(StateModel stateModel, Message message, + NotificationContext context) { + super(message, context); + _stateModel = stateModel; + } + + @Override + public HelixTaskResult handleMessage() throws InterruptedException { + HelixTaskResult taskResult = new HelixTaskResult(); + synchronized (_stateModel) { + try { + _stateModel.cancel(); + taskResult.setSuccess(true); + } catch (Exception e) { + taskResult.setSuccess(false); + taskResult.setMessage(e.toString()); + taskResult.setException(e); + } + + if (taskResult.isSuccess()) { + throw new HelixRollbackException("Cancellation success for " + _message.getMsgId()); + } + } + return taskResult; + } + + @Override + public void onError(Exception e, ErrorCode code, ErrorType type) { + // Nothing need to do when it is error + logger.warn("No extra action needed when cancel failed."); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java index 1aa932e..12193f0 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java @@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; +import org.apache.helix.HelixRollbackException; import org.apache.helix.InstanceType; import org.apache.helix.NotificationContext; import org.apache.helix.NotificationContext.MapKey; @@ -91,6 +92,14 @@ public class HelixTask implements MessageTask { _statusUpdateUtil.logError(_message, HelixTask.class, e, "State transition interrupted, timeout:" + _isTimeout, accessor); logger.info("Message " + _message.getMsgId() + " is interrupted"); + } catch (HelixRollbackException e) { + // TODO : Support cancel to any state + logger.info( + "Rollback happened of state transition on resource \"" + _message.getResourceName() + + "\" partition \"" + _message.getPartitionName() + "\" from \"" + _message + .getFromState() + "\" to \"" + _message.getToState() + "\""); + taskResult = new HelixTaskResult(); + taskResult.setCancelled(true); } catch (Exception e) { taskResult = new HelixTaskResult(); taskResult.setException(e); @@ -136,6 +145,13 @@ public class HelixTask implements MessageTask { return taskResult; } } + } else if (taskResult.isCancelled()) { + // Cancellation success, report message complete + _statusUpdateUtil + .logInfo(_message, _handler.getClass(), "Cancellation completed successfully", + accessor); + _executor.getParticipantMonitor().reportProcessedMessage(_message, + ParticipantMessageMonitor.ProcessedMessageState.COMPLETED); } else // logging for errors { code = ErrorCode.ERROR; @@ -223,7 +239,7 @@ public class HelixTask implements MessageTask { private void reportMessageStat(HelixManager manager, Message message, HelixTaskResult taskResult) { // report stat - if (!message.getMsgType().equals(MessageType.STATE_TRANSITION.toString())) { + if (!message.getMsgType().equals(MessageType.STATE_TRANSITION.name())) { return; } long now = new Date().getTime(); http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java index 3f6a43d..41b5d0d 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java @@ -22,6 +22,7 @@ package org.apache.helix.messaging.handling; import java.util.ArrayList; import java.util.Collections; import java.util.Date; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -102,6 +103,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { // From storage point of view, only bootstrap case is expensive // and we need to throttle, which is mostly IO / network bounded. public static final int DEFAULT_PARALLEL_TASKS = 40; + public static final int DEFAULT_CANCELLATION_THREADPOOL_SIZE = 40; // TODO: create per-task type threadpool with customizable pool size protected final Map _taskMap; private final Object _lock; @@ -121,6 +123,9 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { final ConcurrentHashMap _executorMap; + final ConcurrentHashMap> _messageFutureMap; + private ExecutorService _cancellationExcutorService; + /** * separate executor for executing batch messages */ @@ -144,6 +149,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { _hdlrFtyRegistry = new ConcurrentHashMap(); _executorMap = new ConcurrentHashMap(); + _messageFutureMap = new ConcurrentHashMap>(); + _cancellationExcutorService = Executors.newFixedThreadPool(DEFAULT_CANCELLATION_THREADPOOL_SIZE); _batchMessageExecutorService = Executors.newFixedThreadPool(DEFAULT_PARALLEL_TASKS); _batchMessageThreadpoolChecked = false; _resourcesThreadpoolChecked = @@ -209,7 +216,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { * This method is to check it and update the thread pool if necessary. */ private void updateStateTransitionMessageThreadPool(Message message, HelixManager manager) { - if (!message.getMsgType().equals(MessageType.STATE_TRANSITION.toString())) { + if (!message.getMsgType().equals(MessageType.STATE_TRANSITION.name())) { return; } @@ -245,7 +252,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { "Failed to parse ThreadPoolSize from resourceConfig for resource" + resourceName, e); } } - String key = MessageType.STATE_TRANSITION.toString() + "." + resourceName; + String key = MessageType.STATE_TRANSITION.name() + "." + resourceName; if (threadpoolSize > 0) { _executorMap.put(key, Executors.newFixedThreadPool(threadpoolSize)); LOG.info("Added dedicate threadpool for resource: " + resourceName + " with size: " @@ -283,9 +290,11 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { */ ExecutorService findExecutorServiceForMsg(Message message) { ExecutorService executorService = _executorMap.get(message.getMsgType()); - if (message.getMsgType().equals(MessageType.STATE_TRANSITION.toString())) { + if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name())) { if(message.getBatchMessageMode() == true) { executorService = _batchMessageExecutorService; + } else if (message.getMsgType().equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) { + executorService = _cancellationExcutorService; } else { String resourceName = message.getResourceName(); if (resourceName != null) { @@ -333,6 +342,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { String taskId = task.getTaskId(); if (_taskMap.containsKey(taskId)) { MessageTaskInfo info = _taskMap.get(taskId); + removeMessageFromFutureMap(task.getMessage()); if (info._timerTask != null) { info._timerTask.cancel(); } @@ -369,6 +379,9 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { LOG.info("Submit task: " + taskId + " to pool: " + exeSvc); Future future = exeSvc.submit(task); + _messageFutureMap + .putIfAbsent(getMessageTarget(message.getResourceName(), message.getPartitionName()), + future); TimerTask timerTask = null; if (message.getExecutionTimeout() > 0) { @@ -416,7 +429,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { // cancel task Future future = taskInfo.getFuture(); - + removeMessageFromFutureMap(message); _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "Canceling task: " + taskId, notificationContext.getManager().getHelixDataAccessor()); @@ -450,6 +463,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { synchronized (_lock) { if (_taskMap.containsKey(taskId)) { MessageTaskInfo info = _taskMap.remove(taskId); + removeMessageFromFutureMap(message); if (info._timerTask != null) { // ok to cancel multiple times info._timerTask.cancel(); @@ -544,6 +558,9 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { } _taskMap.clear(); + shutdownAndAwaitTermination(_cancellationExcutorService); + _messageFutureMap.clear(); + _lastSessionSyncTime = null; } @@ -631,7 +648,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { Builder keyBuilder = accessor.keyBuilder(); // message handlers created - List handlers = new ArrayList(); + Map stateTransitionHandlers = new HashMap(); + List nonStateTransitionHandlers = new ArrayList(); // message read List readMsgs = new ArrayList(); @@ -702,6 +720,44 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { continue; } + // State Transition Cancellation + // Three Types of Cancellation: 1. Message arrived with previous state transition + // 2. Message handled but task not started + // 3. Message handled and task already started + if (message.getMsgType().equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) { + String messageTarget = + getMessageTarget(message.getResourceName(), message.getPartitionName()); + // State transition message and cancel message are in same batch + if (stateTransitionHandlers.containsKey(messageTarget)) { + markReadMessage(message, changeContext, accessor); + readMsgs.add(message); + _monitor.reportProcessedMessage(message, + ParticipantMessageMonitor.ProcessedMessageState.COMPLETED); + removeMessageFromZk(accessor, message, instanceName); + removeMessageFromZk(accessor, stateTransitionHandlers.get(messageTarget).getMessage(), + instanceName); + stateTransitionHandlers.remove(messageTarget); + continue; + } else { + if (_messageFutureMap.containsKey(messageTarget)) { + Future future = _messageFutureMap.get(messageTarget); + // Cancel the from future without interrupt -> Cancel the task future without + // interruptting the state transition that is already started. If the state transition + // is already started, we should call cancel in the state model. + if (future.cancel(false) || future.isDone()) { + markReadMessage(message, changeContext, accessor); + readMsgs.add(message); + _monitor.reportProcessedMessage(message, + ParticipantMessageMonitor.ProcessedMessageState.COMPLETED); + removeMessageFromZk(accessor, message, instanceName); + removeMessageFromZk(accessor, stateTransitionHandlers.get(messageTarget).getMessage(), + instanceName); + continue; + } + } + } + } + // create message handlers, if handlers not found, leave its state as NEW try { MessageHandler createHandler = createMessageHandler(message, changeContext); @@ -709,7 +765,14 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { _monitor.reportProcessedMessage(message, ParticipantMessageMonitor.ProcessedMessageState.DISCARDED); continue; } - handlers.add(createHandler); + if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name()) || message.getMsgType() + .equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) { + stateTransitionHandlers + .put(getMessageTarget(message.getResourceName(), message.getPartitionName()), + createHandler); + } else { + nonStateTransitionHandlers.add(createHandler); + } } catch (Exception e) { LOG.error("Failed to create message handler for " + message.getMsgId(), e); String error = @@ -724,10 +787,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { continue; } - // update msgState to read - message.setMsgState(MessageState.READ); - message.setReadTimeStamp(new Date().getTime()); - message.setExecuteSessionId(changeContext.getManager().getSessionId()); + markReadMessage(message, changeContext, accessor); _statusUpdateUtil.logInfo(message, HelixStateMachineEngine.class, "New Message", accessor); @@ -736,7 +796,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { // batch creation of all current state meta data // do it for non-controller and state transition messages only if (!message.isControlerMsg() - && message.getMsgType().equals(Message.MessageType.STATE_TRANSITION.toString())) { + && message.getMsgType().equals(Message.MessageType.STATE_TRANSITION.name())) { String resourceName = message.getResourceName(); if (!curResourceNames.contains(resourceName) && !createCurStateNames.contains(resourceName)) { createCurStateNames.add(resourceName); @@ -772,13 +832,27 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { if (readMsgs.size() > 0) { updateMessageState(readMsgs, accessor, instanceName); - for (MessageHandler handler : handlers) { + for (MessageHandler handler : stateTransitionHandlers.values()) { + HelixTask task = new HelixTask(handler._message, changeContext, handler, this); + scheduleTask(task); + } + + for (MessageHandler handler : nonStateTransitionHandlers) { HelixTask task = new HelixTask(handler._message, changeContext, handler, this); scheduleTask(task); } } } + private void markReadMessage(Message message, NotificationContext context, + HelixDataAccessor accessor) { + message.setMsgState(MessageState.READ); + message.setReadTimeStamp(new Date().getTime()); + message.setExecuteSessionId(context.getManager().getSessionId()); + + _statusUpdateUtil.logInfo(message, HelixStateMachineEngine.class, "New Message", accessor); + } + public MessageHandler createMessageHandler(Message message, NotificationContext changeContext) { String msgType = message.getMsgType().toString(); @@ -800,6 +874,28 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { return handlerFactory.createHandler(message, changeContext); } + private void removeMessageFromFutureMap(Message message) { + String messageTarget = getMessageTarget(message.getResourceName(), message.getPartitionName()); + if (_messageFutureMap.containsKey(messageTarget)) { + _messageFutureMap.remove(messageTarget); + } + } + + private String getMessageTarget(String resourceName, String partitionName) { + return String.format("%s_%s", resourceName, partitionName); + } + + private void removeMessageFromZk(HelixDataAccessor accessor, Message message, + String instanceName) { + Builder keyBuilder = accessor.keyBuilder(); + if (message.getTgtName().equalsIgnoreCase("controller")) { + // TODO: removeProperty returns boolean + accessor.removeProperty(keyBuilder.controllerMessage(message.getMsgId())); + } else { + accessor.removeProperty(keyBuilder.message(instanceName, message.getMsgId())); + } + } + @Override public void shutdown() { LOG.info("Shutting down HelixTaskExecutor"); http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java index ced9c65..5ed6140 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java @@ -25,6 +25,7 @@ import java.util.Map; public class HelixTaskResult { private boolean _success; + private boolean _cancelled; private String _message = ""; private String _info = ""; private Map _taskResultMap = new HashMap(); @@ -43,6 +44,14 @@ public class HelixTaskResult { _interrupted = interrupted; } + public boolean isCancelled() { + return _cancelled; + } + + public void setCancelled(boolean cancelled) { + _cancelled = cancelled; + } + public void setSuccess(boolean success) { this._success = success; } http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandler.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandler.java index 5715571..506886d 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandler.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandler.java @@ -80,4 +80,12 @@ public abstract class MessageHandler { public void onTimeout() { } + + /** + * Get message for this message handler + * @return + */ + public Message getMessage() { + return _message; + } } http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java index eff68f7..8da5bc3 100644 --- a/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java +++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java @@ -164,7 +164,7 @@ public class ClusterConstraints extends HelixProperty { Map attributes = new TreeMap(); String msgType = msg.getMsgType(); attributes.put(ConstraintAttribute.MESSAGE_TYPE, msgType); - if (MessageType.STATE_TRANSITION.toString().equals(msgType)) { + if (MessageType.STATE_TRANSITION.name().equals(msgType)) { if (msg.getFromState() != null && msg.getToState() != null) { attributes.put(ConstraintAttribute.TRANSITION, msg.getFromState() + "-" + msg.getToState()); } http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/main/java/org/apache/helix/model/Message.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java index 2901b1c..ae90829 100644 --- a/helix-core/src/main/java/org/apache/helix/model/Message.java +++ b/helix-core/src/main/java/org/apache/helix/model/Message.java @@ -43,6 +43,7 @@ public class Message extends HelixProperty { */ public enum MessageType { STATE_TRANSITION, + STATE_TRANSITION_CANCELLATION, SCHEDULER_MSG, USER_DEFINE_MSG, CONTROLLER_MSG, @@ -50,7 +51,7 @@ public class Message extends HelixProperty { NO_OP, PARTICIPANT_ERROR_REPORT, PARTICIPANT_SESSION_CHANGE - }; + } /** * Properties attached to Messages @@ -696,7 +697,7 @@ public class Message extends HelixProperty { // TODO: refactor message to state transition message and task-message and // implement this function separately - if (getMsgType().equals(MessageType.STATE_TRANSITION.toString())) { + if (getMsgType().equals(MessageType.STATE_TRANSITION.name())) { boolean isNotValid = isNullOrEmpty(getTgtName()) || isNullOrEmpty(getPartitionName()) || isNullOrEmpty(getResourceName()) || isNullOrEmpty(getStateModelDef()) http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java index 8143b2f..4c38fa4 100644 --- a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java +++ b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java @@ -33,13 +33,14 @@ import org.apache.helix.NotificationContext.MapKey; import org.apache.helix.PropertyKey.Builder; import org.apache.helix.messaging.handling.BatchMessageHandler; import org.apache.helix.messaging.handling.BatchMessageWrapper; +import org.apache.helix.messaging.handling.HelixStateTransitionCancellationHandler; import org.apache.helix.messaging.handling.HelixStateTransitionHandler; import org.apache.helix.messaging.handling.MessageHandler; import org.apache.helix.messaging.handling.TaskExecutor; import org.apache.helix.model.CurrentState; import org.apache.helix.model.Message; -import org.apache.helix.model.StateModelDefinition; import org.apache.helix.model.Message.MessageType; +import org.apache.helix.model.StateModelDefinition; import org.apache.helix.participant.statemachine.StateModel; import org.apache.helix.participant.statemachine.StateModelFactory; import org.apache.helix.participant.statemachine.StateModelParser; @@ -161,7 +162,7 @@ public class HelixStateMachineEngine implements StateMachineEngine { public MessageHandler createHandler(Message message, NotificationContext context) { String type = message.getMsgType(); - if (!type.equals(MessageType.STATE_TRANSITION.toString())) { + if (!type.equals(MessageType.STATE_TRANSITION.name())) { throw new HelixException("Expect state-transition message type, but was " + message.getMsgType() + ", msgId: " + message.getMsgId()); } @@ -207,7 +208,6 @@ public class HelixStateMachineEngine implements StateMachineEngine { } if (message.getBatchMessageMode() == false) { - // create currentStateDelta for this partition String initState = _stateModelDefs.get(message.getStateModelDef()).getInitialState(); StateModel stateModel = stateModelFactory.getStateModel(resourceName, partitionKey); if (stateModel == null) { @@ -215,18 +215,23 @@ public class HelixStateMachineEngine implements StateMachineEngine { stateModel.updateState(initState); } - // TODO: move currentStateDelta to StateTransitionMsgHandler - CurrentState currentStateDelta = new CurrentState(resourceName); - currentStateDelta.setSessionId(sessionId); - currentStateDelta.setStateModelDefRef(stateModelName); - currentStateDelta.setStateModelFactoryName(factoryName); - currentStateDelta.setBucketSize(bucketSize); - - currentStateDelta.setState(partitionKey, (stateModel.getCurrentState() == null) ? initState - : stateModel.getCurrentState()); - - return new HelixStateTransitionHandler(stateModelFactory, stateModel, message, context, - currentStateDelta); + if (message.getMsgType().equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) { + return new HelixStateTransitionCancellationHandler(stateModel, message, context); + } else { + // create currentStateDelta for this partition + // TODO: move currentStateDelta to StateTransitionMsgHandler + CurrentState currentStateDelta = new CurrentState(resourceName); + currentStateDelta.setSessionId(sessionId); + currentStateDelta.setStateModelDefRef(stateModelName); + currentStateDelta.setStateModelFactoryName(factoryName); + currentStateDelta.setBucketSize(bucketSize); + + currentStateDelta.setState(partitionKey, + (stateModel.getCurrentState() == null) ? initState : stateModel.getCurrentState()); + + return new HelixStateTransitionHandler(stateModelFactory, stateModel, message, context, + currentStateDelta); + } } else { BatchMessageWrapper wrapper = stateModelFactory.getBatchMessageWrapper(resourceName); if (wrapper == null) { @@ -236,8 +241,9 @@ public class HelixStateMachineEngine implements StateMachineEngine { // get executor-service for the message TaskExecutor executor = (TaskExecutor) context.get(MapKey.TASK_EXECUTOR.toString()); if (executor == null) { - logger.error("fail to get executor-service for batch message: " + message.getId() - + ". msgType: " + message.getMsgType() + ", resource: " + message.getResourceName()); + logger.error( + "fail to get executor-service for batch message: " + message.getId() + ". msgType: " + + message.getMsgType() + ", resource: " + message.getResourceName()); return null; } return new BatchMessageHandler(message, context, this, wrapper, executor); @@ -246,7 +252,7 @@ public class HelixStateMachineEngine implements StateMachineEngine { @Override public String getMessageType() { - return MessageType.STATE_TRANSITION.toString(); + return MessageType.STATE_TRANSITION.name(); } @Override http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java index b88262b..56ea430 100644 --- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java +++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java @@ -25,6 +25,7 @@ import org.apache.log4j.Logger; public abstract class StateModel { static final String DEFAULT_INITIAL_STATE = "OFFLINE"; + private boolean _cancelled; Logger logger = Logger.getLogger(StateModel.class); // TODO Get default state from implementation or from state model annotation @@ -80,4 +81,10 @@ public abstract class StateModel { logger.info("Default ERROR->DROPPED transition invoked."); } + /** + * Default implementation for cancelling state transition + */ + public void cancel() { + _cancelled = true; + } } http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/main/java/org/apache/helix/util/StatusUpdateUtil.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/util/StatusUpdateUtil.java b/helix-core/src/main/java/org/apache/helix/util/StatusUpdateUtil.java index 02c39d1..b508c15 100644 --- a/helix-core/src/main/java/org/apache/helix/util/StatusUpdateUtil.java +++ b/helix-core/src/main/java/org/apache/helix/util/StatusUpdateUtil.java @@ -435,7 +435,7 @@ public class StatusUpdateUtil { } private String getStatusUpdateKey(Message message) { - if (message.getMsgType().equalsIgnoreCase(MessageType.STATE_TRANSITION.toString())) { + if (message.getMsgType().equalsIgnoreCase(MessageType.STATE_TRANSITION.name())) { return message.getPartitionName(); } return message.getMsgId(); @@ -445,7 +445,7 @@ public class StatusUpdateUtil { * Generate the sub-path under STATUSUPDATE or ERROR path for a status update */ String getStatusUpdateSubPath(Message message) { - if (message.getMsgType().equalsIgnoreCase(MessageType.STATE_TRANSITION.toString())) { + if (message.getMsgType().equalsIgnoreCase(MessageType.STATE_TRANSITION.name())) { return message.getResourceName(); } else { return message.getMsgType(); @@ -453,7 +453,7 @@ public class StatusUpdateUtil { } String getStatusUpdateRecordName(Message message) { - if (message.getMsgType().equalsIgnoreCase(MessageType.STATE_TRANSITION.toString())) { + if (message.getMsgType().equalsIgnoreCase(MessageType.STATE_TRANSITION.name())) { return message.getTgtSessionId() + "__" + message.getResourceName(); } return message.getMsgId(); http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle.java b/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle.java index 615eab6..2058a62 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle.java @@ -98,7 +98,7 @@ public class TestMessageThrottle extends ZkIntegrationTestBase { int transitionMsgCount = 0; for (ZNRecord record : records) { Message msg = new Message(record); - if (msg.getMsgType().equals(Message.MessageType.STATE_TRANSITION.toString())) { + if (msg.getMsgType().equals(Message.MessageType.STATE_TRANSITION.name())) { transitionMsgCount++; } } http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java b/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java index f0eba23..4a986b4 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java @@ -277,7 +277,7 @@ public class TestResourceGroupEndtoEnd extends ZkIntegrationTestBase { stateMach.registerStateModelFactory("OnlineOffline", stateModelFactory2); manager.connect(); - //manager.getMessagingService().registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(), genericStateMachineHandler); + //manager.getMessagingService().registerMessageHandlerFactory(MessageType.STATE_TRANSITION.name(), genericStateMachineHandler); return manager; } http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java index 236a5b2..85433cd 100644 --- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java +++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java @@ -56,9 +56,17 @@ import org.apache.helix.tools.StateModelConfigGenerator; import org.apache.zookeeper.data.Stat; import org.testng.Assert; import org.testng.AssertJUnit; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; public class TestZkHelixAdmin extends ZkUnitTestBase { + + @BeforeClass + public void beforeClass() { + + } + + @Test public void testZkHelixAdmin() { //TODO refactor this test into small test cases and use @before annotations System.out.println("START testZkHelixAdmin at " + new Date(System.currentTimeMillis())); http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java index 1ff6595..2f78007 100644 --- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java +++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java @@ -29,10 +29,6 @@ import org.apache.helix.HelixException; import org.apache.helix.HelixManager; import org.apache.helix.Mocks; import org.apache.helix.NotificationContext; -import org.apache.helix.messaging.handling.HelixTaskExecutor; -import org.apache.helix.messaging.handling.HelixTaskResult; -import org.apache.helix.messaging.handling.MessageHandler; -import org.apache.helix.messaging.handling.MessageHandlerFactory; import org.apache.helix.model.Message; import org.apache.helix.model.Message.MessageState; import org.testng.Assert; @@ -179,6 +175,41 @@ public class TestHelixTaskExecutor { } } + class TestStateTransitionHandlerFactory implements MessageHandlerFactory { + ConcurrentHashMap _processedMsgIds = new ConcurrentHashMap(); + private final String _msgType; + public TestStateTransitionHandlerFactory(String msgType) { + _msgType = msgType; + } + class TestStateTransitionMessageHandler extends MessageHandler { + public TestStateTransitionMessageHandler(Message message, NotificationContext context) { + super(message, context); + } + + @Override public HelixTaskResult handleMessage() throws InterruptedException { + HelixTaskResult result = new HelixTaskResult(); + _processedMsgIds.put(_message.getMsgId(), _message.getMsgId()); + result.setSuccess(true); + return result; + } + + @Override public void onError(Exception e, ErrorCode code, ErrorType type) { + } + } + + @Override public MessageHandler createHandler(Message message, NotificationContext context) { + return new TestStateTransitionMessageHandler(message, context); + } + + @Override public String getMessageType() { + return _msgType; + } + + @Override public void reset() { + + } + } + @Test() public void testNormalMsgExecution() throws InterruptedException { System.out.println("START TestCMTaskExecutor.testNormalMsgExecution()"); @@ -547,4 +578,43 @@ public class TestHelixTaskExecutor { AssertJUnit.assertTrue(executor._taskMap.size() == 0); } + + @Test + public void testStateTransitionCancellationMsg() throws InterruptedException { + HelixTaskExecutor executor = new HelixTaskExecutor(); + HelixManager manager = new MockClusterManager(); + + TestStateTransitionHandlerFactory stateTransitionFactory = new TestStateTransitionHandlerFactory(Message.MessageType.STATE_TRANSITION.name()); + TestStateTransitionHandlerFactory cancelFactory = new TestStateTransitionHandlerFactory(Message.MessageType.STATE_TRANSITION_CANCELLATION + .name()); + executor.registerMessageHandlerFactory(Message.MessageType.STATE_TRANSITION.name(), stateTransitionFactory); + executor.registerMessageHandlerFactory(Message.MessageType.STATE_TRANSITION_CANCELLATION.name(), cancelFactory); + + + NotificationContext changeContext = new NotificationContext(manager); + + List msgList = new ArrayList(); + Message msg1 = new Message(Message.MessageType.STATE_TRANSITION, UUID.randomUUID().toString()); + msg1.setTgtSessionId("*"); + msg1.setPartitionName("P1"); + msg1.setResourceName("R1"); + msg1.setTgtName("Localhost_1123"); + msg1.setSrcName("127.101.1.23_2234"); + msgList.add(msg1); + + Message msg2 = new Message(Message.MessageType.STATE_TRANSITION_CANCELLATION, UUID.randomUUID().toString()); + msg2.setTgtSessionId("*"); + msg2.setPartitionName("P1"); + msg2.setResourceName("R1"); + msg2.setTgtName("Localhost_1123"); + msg2.setSrcName("127.101.1.23_2234"); + msgList.add(msg2); + + executor.onMessage("someInstance", msgList, changeContext); + + Thread.sleep(3000); + AssertJUnit.assertEquals(cancelFactory._processedMsgIds.size(), 0); + AssertJUnit.assertEquals(stateTransitionFactory._processedMsgIds.size(), 0); + + } } http://git-wip-us.apache.org/repos/asf/helix/blob/aa2f6411/helix-core/src/test/java/org/apache/helix/mock/participant/DummyProcess.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/DummyProcess.java b/helix-core/src/test/java/org/apache/helix/mock/participant/DummyProcess.java index 085f822..cd30233 100644 --- a/helix-core/src/test/java/org/apache/helix/mock/participant/DummyProcess.java +++ b/helix-core/src/test/java/org/apache/helix/mock/participant/DummyProcess.java @@ -102,7 +102,7 @@ public class DummyProcess { stateMach.registerStateModelFactory("OnlineOffline", stateModelFactory2); manager.connect(); - // manager.getMessagingService().registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(), + // manager.getMessagingService().registerMessageHandlerFactory(MessageType.STATE_TRANSITION.name(), // genericStateMachineHandler); return manager; }