Return-Path: X-Original-To: apmail-helix-commits-archive@minotaur.apache.org Delivered-To: apmail-helix-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C3B58D95D for ; Wed, 24 Oct 2012 23:17:35 +0000 (UTC) Received: (qmail 78631 invoked by uid 500); 24 Oct 2012 23:17:35 -0000 Delivered-To: apmail-helix-commits-archive@helix.apache.org Received: (qmail 78605 invoked by uid 500); 24 Oct 2012 23:17:35 -0000 Mailing-List: contact commits-help@helix.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@helix.incubator.apache.org Delivered-To: mailing list commits@helix.incubator.apache.org Received: (qmail 78596 invoked by uid 99); 24 Oct 2012 23:17:35 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Oct 2012 23:17:35 +0000 X-ASF-Spam-Status: No, hits=-2000.7 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 24 Oct 2012 23:15:21 +0000 Received: (qmail 72872 invoked by uid 99); 24 Oct 2012 23:14:58 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Oct 2012 23:14:58 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 0410A4F24E; Wed, 24 Oct 2012 23:14:58 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kishoreg@apache.org To: commits@helix.incubator.apache.org X-Mailer: ASF-Git Admin Mailer Subject: [16/42] Refactoring the package names and removing jsql parser Message-Id: <20121024231458.0410A4F24E@tyr.zones.apache.org> Date: Wed, 24 Oct 2012 23:14:58 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/package-info.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/package-info.java b/helix-core/src/main/java/org/apache/helix/manager/zk/package-info.java new file mode 100644 index 0000000..08516e8 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/package-info.java @@ -0,0 +1,5 @@ +/** + * zookeeper-based implementation of Helix cluster manager + * + */ +package org.apache.helix.manager.zk; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/messaging/AsyncCallback.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/AsyncCallback.java b/helix-core/src/main/java/org/apache/helix/messaging/AsyncCallback.java new file mode 100644 index 0000000..5a520a9 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/messaging/AsyncCallback.java @@ -0,0 +1,168 @@ +/** + * Copyright (C) 2012 LinkedIn Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.helix.messaging; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Timer; +import java.util.TimerTask; + +import org.apache.helix.model.Message; +import org.apache.log4j.Logger; + + +public abstract class AsyncCallback +{ + + private static Logger _logger = Logger.getLogger(AsyncCallback.class); + long _startTimeStamp = 0; + protected long _timeout = -1; + Timer _timer = null; + List _messagesSent; + protected final List _messageReplied = new ArrayList(); + boolean _timedOut = false; + boolean _isInterrupted = false; + + /** + * Enforcing timeout to be set + * + * @param timeout + */ + public AsyncCallback(long timeout) + { + _logger.info("Setting time out to " + timeout + " ms"); + _timeout = timeout; + } + + public AsyncCallback() + { + this(-1); + } + + public final void setTimeout(long timeout) + { + _logger.info("Setting time out to " + timeout + " ms"); + _timeout = timeout; + + } + + public List getMessageReplied() + { + return _messageReplied; + } + + public boolean isInterrupted() + { + return _isInterrupted; + } + + public void setInterrupted(boolean b) + { + _isInterrupted = true; + } + + public synchronized final void onReply(Message message) + { + _logger.info("OnReply msg " + message.getMsgId()); + if (!isDone()) + { + _messageReplied.add(message); + try + { + onReplyMessage(message); + } + catch(Exception e) + { + _logger.error(e); + } + } + if (isDone()) + { + if(_timer != null) + { + _timer.cancel(); + } + notifyAll(); + } + } + + /** + * Default implementation will wait until every message sent gets a response + * + * @return + */ + public boolean isDone() + { + return _messageReplied.size() == _messagesSent.size(); + } + + public boolean isTimedOut() + { + return _timedOut; + } + + final void setMessagesSent(List generatedMessage) + { + _messagesSent = generatedMessage; + } + + final void startTimer() + { + if (_timer == null && _timeout > 0) + { + if (_startTimeStamp == 0) + { + _startTimeStamp = new Date().getTime(); + } + _timer = new Timer(true); + _timer.schedule(new TimeoutTask(this), _timeout); + } + } + + public abstract void onTimeOut(); + + public abstract void onReplyMessage(Message message); + + class TimeoutTask extends TimerTask + { + AsyncCallback _callback; + + public TimeoutTask(AsyncCallback asyncCallback) + { + _callback = asyncCallback; + } + + @Override + public void run() + { + try + { + synchronized (_callback) + { + _callback._timedOut = true; + _callback.notifyAll(); + _callback.onTimeOut(); + } + } + catch (Exception e) + { + _logger.error(e); + } + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java b/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java new file mode 100644 index 0000000..d4cc9b0 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java @@ -0,0 +1,94 @@ +/** + * Copyright (C) 2012 LinkedIn Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.helix.messaging; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.helix.Criteria; +import org.apache.helix.DataAccessor; +import org.apache.helix.HelixManager; +import org.apache.helix.PropertyType; +import org.apache.helix.ZNRecord; +import org.apache.helix.Criteria.DataSource; +import org.apache.helix.josql.ClusterJosqlQueryProcessor; +import org.apache.helix.josql.ZNRecordRow; +import org.apache.log4j.Logger; +import org.josql.Query; +import org.josql.QueryExecutionException; +import org.josql.QueryParseException; +import org.josql.QueryResults; + + +public class CriteriaEvaluator +{ + private static Logger logger = Logger.getLogger(CriteriaEvaluator.class); + + public List> evaluateCriteria(Criteria recipientCriteria, HelixManager manager) + { + List> selected = new ArrayList>(); + + String queryFields = + (!recipientCriteria.getInstanceName().equals("") ? " " + ZNRecordRow.MAP_SUBKEY : " ''") +","+ + (!recipientCriteria.getResource().equals("") ? " " + ZNRecordRow.ZNRECORD_ID : " ''") +","+ + (!recipientCriteria.getPartition().equals("") ? " " + ZNRecordRow.MAP_KEY : " ''") +","+ + (!recipientCriteria.getPartitionState().equals("") ? " " + ZNRecordRow.MAP_VALUE : " '' "); + + String matchCondition = + ZNRecordRow.MAP_SUBKEY + " LIKE '" + (!recipientCriteria.getInstanceName().equals("") ? (recipientCriteria.getInstanceName() +"'") : "%' ") + " AND "+ + ZNRecordRow.ZNRECORD_ID+ " LIKE '" + (!recipientCriteria.getResource().equals("") ? (recipientCriteria.getResource() +"'") : "%' ") + " AND "+ + ZNRecordRow.MAP_KEY + " LIKE '" + (!recipientCriteria.getPartition().equals("") ? (recipientCriteria.getPartition() +"'") : "%' ") + " AND "+ + ZNRecordRow.MAP_VALUE + " LIKE '" + (!recipientCriteria.getPartitionState().equals("") ? (recipientCriteria.getPartitionState()+"'") : "%' ") + " AND "+ + ZNRecordRow.MAP_SUBKEY + " IN ((SELECT [*]id FROM :LIVEINSTANCES))"; + + + String queryTarget = recipientCriteria.getDataSource().toString() + ClusterJosqlQueryProcessor.FLATTABLE; + + String josql = "SELECT DISTINCT " + queryFields + + " FROM " + queryTarget + " WHERE " + + matchCondition; + ClusterJosqlQueryProcessor p = new ClusterJosqlQueryProcessor(manager); + List result = new ArrayList(); + try + { + logger.info("JOSQL query: " + josql); + result = p.runJoSqlQuery(josql, null, null); + } + catch (Exception e) + { + logger.error("", e); + return selected; + } + + for(Object o : result) + { + Map resultRow = new HashMap(); + List row = (List)o; + resultRow.put("instanceName", (String)(row.get(0))); + resultRow.put("resourceName", (String)(row.get(1))); + resultRow.put("partitionName", (String)(row.get(2))); + resultRow.put("partitionState", (String)(row.get(3))); + selected.add(resultRow); + } + logger.info("JOSQL query return " + selected.size() + " rows"); + return selected; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java new file mode 100644 index 0000000..b589874 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java @@ -0,0 +1,392 @@ +/** + * Copyright (C) 2012 LinkedIn Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.helix.messaging; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.helix.ClusterMessagingService; +import org.apache.helix.ConfigAccessor; +import org.apache.helix.ConfigScope; +import org.apache.helix.ConfigScopeBuilder; +import org.apache.helix.Criteria; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixManager; +import org.apache.helix.InstanceType; +import org.apache.helix.PropertyKey.Builder; +import org.apache.helix.messaging.handling.AsyncCallbackService; +import org.apache.helix.messaging.handling.HelixTaskExecutor; +import org.apache.helix.messaging.handling.MessageHandlerFactory; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.Message; +import org.apache.helix.model.Message.MessageType; +import org.apache.log4j.Logger; + + +public class DefaultMessagingService implements ClusterMessagingService +{ + private final HelixManager _manager; + private final CriteriaEvaluator _evaluator; + private final HelixTaskExecutor _taskExecutor; + // TODO:rename to factory, this is not a service + private final AsyncCallbackService _asyncCallbackService; + private static Logger _logger = + Logger.getLogger(DefaultMessagingService.class); + ConcurrentHashMap _messageHandlerFactoriestobeAdded + = new ConcurrentHashMap(); + + public DefaultMessagingService(HelixManager manager) + { + _manager = manager; + _evaluator = new CriteriaEvaluator(); + _taskExecutor = new HelixTaskExecutor(); + _asyncCallbackService = new AsyncCallbackService(); + _taskExecutor.registerMessageHandlerFactory(MessageType.TASK_REPLY.toString(), + _asyncCallbackService); + } + + @Override + public int send(Criteria recipientCriteria, final Message messageTemplate) + { + return send(recipientCriteria, messageTemplate, null, -1); + } + + @Override + public int send(final Criteria recipientCriteria, + final Message message, + AsyncCallback callbackOnReply, + int timeOut) + { + return send(recipientCriteria, message, callbackOnReply, timeOut, 0); + } + + @Override + public int send(final Criteria recipientCriteria, + final Message message, + AsyncCallback callbackOnReply, + int timeOut, + int retryCount) + { + Map> generateMessage = + generateMessage(recipientCriteria, message); + int totalMessageCount = 0; + for (List messages : generateMessage.values()) + { + totalMessageCount += messages.size(); + } + _logger.info("Send " + totalMessageCount + " messages with criteria " + + recipientCriteria); + if (totalMessageCount == 0) + { + return 0; + } + String correlationId = null; + if (callbackOnReply != null) + { + int totalTimeout = timeOut * (retryCount + 1); + if (totalTimeout < 0) + { + totalTimeout = -1; + } + callbackOnReply.setTimeout(totalTimeout); + correlationId = UUID.randomUUID().toString(); + for (List messages : generateMessage.values()) + { + callbackOnReply.setMessagesSent(messages); + } + _asyncCallbackService.registerAsyncCallback(correlationId, callbackOnReply); + } + + for (InstanceType receiverType : generateMessage.keySet()) + { + List list = generateMessage.get(receiverType); + for (Message tempMessage : list) + { + tempMessage.setRetryCount(retryCount); + tempMessage.setExecutionTimeout(timeOut); + tempMessage.setSrcInstanceType(_manager.getInstanceType()); + if (correlationId != null) + { + tempMessage.setCorrelationId(correlationId); + } + + HelixDataAccessor accessor = _manager.getHelixDataAccessor(); + Builder keyBuilder = accessor.keyBuilder(); + + if (receiverType == InstanceType.CONTROLLER) + { + // _manager.getDataAccessor().setProperty(PropertyType.MESSAGES_CONTROLLER, + // tempMessage, + // tempMessage.getId()); + accessor.setProperty(keyBuilder.controllerMessage(tempMessage.getId()), + tempMessage); + } + + if (receiverType == InstanceType.PARTICIPANT) + { + accessor.setProperty(keyBuilder.message(tempMessage.getTgtName(), + tempMessage.getId()), + tempMessage); + } + } + } + + if (callbackOnReply != null) + { + // start timer if timeout is set + callbackOnReply.startTimer(); + } + return totalMessageCount; + } + + private Map> generateMessage(final Criteria recipientCriteria, + final Message message) + { + Map> messagesToSendMap = + new HashMap>(); + InstanceType instanceType = recipientCriteria.getRecipientInstanceType(); + + if (instanceType == InstanceType.CONTROLLER) + { + List messages = generateMessagesForController(message); + messagesToSendMap.put(InstanceType.CONTROLLER, messages); + // _dataAccessor.setControllerProperty(PropertyType.MESSAGES, + // newMessage.getRecord(), CreateMode.PERSISTENT); + } + else if (instanceType == InstanceType.PARTICIPANT) + { + List messages = new ArrayList(); + List> matchedList = + _evaluator.evaluateCriteria(recipientCriteria, _manager); + + if (!matchedList.isEmpty()) + { + Map sessionIdMap = new HashMap(); + if (recipientCriteria.isSessionSpecific()) + { + HelixDataAccessor accessor = _manager.getHelixDataAccessor(); + Builder keyBuilder = accessor.keyBuilder(); + + List liveInstances = + accessor.getChildValues(keyBuilder.liveInstances()); + + for (LiveInstance liveInstance : liveInstances) + { + sessionIdMap.put(liveInstance.getInstanceName(), liveInstance.getSessionId()); + } + } + for (Map map : matchedList) + { + String id = UUID.randomUUID().toString(); + Message newMessage = new Message(message.getRecord(), id); + String srcInstanceName = _manager.getInstanceName(); + String tgtInstanceName = map.get("instanceName"); + // Don't send message to self + if (recipientCriteria.isSelfExcluded() + && srcInstanceName.equalsIgnoreCase(tgtInstanceName)) + { + continue; + } + newMessage.setSrcName(srcInstanceName); + newMessage.setTgtName(tgtInstanceName); + newMessage.setResourceName(map.get("resourceName")); + newMessage.setPartitionName(map.get("partitionName")); + if (recipientCriteria.isSessionSpecific()) + { + newMessage.setTgtSessionId(sessionIdMap.get(tgtInstanceName)); + } + messages.add(newMessage); + } + messagesToSendMap.put(InstanceType.PARTICIPANT, messages); + } + } + return messagesToSendMap; + } + + private List generateMessagesForController(Message message) + { + List messages = new ArrayList(); + String id = UUID.randomUUID().toString(); + Message newMessage = new Message(message.getRecord(), id); + newMessage.setMsgId(id); + newMessage.setSrcName(_manager.getInstanceName()); + newMessage.setTgtName("Controller"); + messages.add(newMessage); + return messages; + } + + @Override + public synchronized void registerMessageHandlerFactory(String type, MessageHandlerFactory factory) + { + if (_manager.isConnected()) + { + registerMessageHandlerFactoryInternal(type, factory); + } + else + { + _messageHandlerFactoriestobeAdded.put(type, factory); + } + } + + public synchronized void onConnected() + { + for(String type : _messageHandlerFactoriestobeAdded.keySet()) + { + registerMessageHandlerFactoryInternal(type, _messageHandlerFactoriestobeAdded.get(type)); + } + _messageHandlerFactoriestobeAdded.clear(); + } + + void registerMessageHandlerFactoryInternal(String type, MessageHandlerFactory factory) + { + _logger.info("registering msg factory for type " + type); + int threadpoolSize = HelixTaskExecutor.DEFAULT_PARALLEL_TASKS; + String threadpoolSizeStr = null; + String key = type + "." + HelixTaskExecutor.MAX_THREADS; + + ConfigAccessor configAccessor = _manager.getConfigAccessor(); + if(configAccessor != null) + { + ConfigScope scope = null; + + // Read the participant config and cluster config for the per-message type thread pool size. + // participant config will override the cluster config. + + if(_manager.getInstanceType() == InstanceType.PARTICIPANT || _manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT) + { + scope = new ConfigScopeBuilder().forCluster(_manager.getClusterName()).forParticipant(_manager.getInstanceName()).build(); + threadpoolSizeStr = configAccessor.get(scope, key); + } + + if(threadpoolSizeStr == null) + { + scope = new ConfigScopeBuilder().forCluster(_manager.getClusterName()).build(); + threadpoolSizeStr = configAccessor.get(scope, key); + } + } + + if(threadpoolSizeStr != null) + { + try + { + threadpoolSize = Integer.parseInt(threadpoolSizeStr); + if(threadpoolSize <= 0) + { + threadpoolSize = 1; + } + } + catch(Exception e) + { + _logger.error("", e); + } + } + + _taskExecutor.registerMessageHandlerFactory(type, factory, threadpoolSize); + // Self-send a no-op message, so that the onMessage() call will be invoked + // again, and + // we have a chance to process the message that we received with the new + // added MessageHandlerFactory + // before the factory is added. + sendNopMessage(); + } + + public void sendNopMessage() + { + if (_manager.isConnected()) + { + try + { + Message nopMsg = new Message(MessageType.NO_OP, UUID.randomUUID().toString()); + nopMsg.setSrcName(_manager.getInstanceName()); + + HelixDataAccessor accessor = _manager.getHelixDataAccessor(); + Builder keyBuilder = accessor.keyBuilder(); + + if (_manager.getInstanceType() == InstanceType.CONTROLLER + || _manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT) + { + nopMsg.setTgtName("Controller"); + accessor.setProperty(keyBuilder.controllerMessage(nopMsg.getId()), nopMsg); + } + + if (_manager.getInstanceType() == InstanceType.PARTICIPANT + || _manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT) + { + nopMsg.setTgtName(_manager.getInstanceName()); + accessor.setProperty(keyBuilder.message(nopMsg.getTgtName(), nopMsg.getId()), + nopMsg); + } + } + catch (Exception e) + { + _logger.error(e); + } + } + } + + public HelixTaskExecutor getExecutor() + { + return _taskExecutor; + } + + @Override + public int sendAndWait(Criteria receipientCriteria, + Message message, + AsyncCallback asyncCallback, + int timeOut, + int retryCount) + { + int messagesSent = + send(receipientCriteria, message, asyncCallback, timeOut, retryCount); + if (messagesSent > 0) + { + while (!asyncCallback.isDone() && !asyncCallback.isTimedOut()) + { + synchronized (asyncCallback) + { + try + { + asyncCallback.wait(); + } + catch (InterruptedException e) + { + _logger.error(e); + asyncCallback.setInterrupted(true); + break; + } + } + } + } + else + { + _logger.warn("No messages sent. For Criteria:" + receipientCriteria); + } + return messagesSent; + } + + @Override + public int sendAndWait(Criteria recipientCriteria, + Message message, + AsyncCallback asyncCallback, + int timeOut) + { + return sendAndWait(recipientCriteria, message, asyncCallback, timeOut, 0); + } +} http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/messaging/handling/AsyncCallbackService.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/AsyncCallbackService.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/AsyncCallbackService.java new file mode 100644 index 0000000..6ad31d9 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/AsyncCallbackService.java @@ -0,0 +1,149 @@ +/** + * Copyright (C) 2012 LinkedIn Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.helix.messaging.handling; + +import java.util.Collection; +import java.util.Date; +import java.util.Map; +import java.util.Random; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.helix.HelixException; +import org.apache.helix.NotificationContext; +import org.apache.helix.messaging.AsyncCallback; +import org.apache.helix.messaging.handling.MessageHandler.ErrorCode; +import org.apache.helix.messaging.handling.MessageHandler.ErrorType; +import org.apache.helix.model.Message; +import org.apache.helix.model.Message.MessageType; +import org.apache.helix.participant.HelixStateMachineEngine; +import org.apache.log4j.Logger; + + +public class AsyncCallbackService implements MessageHandlerFactory +{ + private final ConcurrentHashMap _callbackMap = new ConcurrentHashMap(); + private static Logger _logger = Logger.getLogger(AsyncCallbackService.class); + + public AsyncCallbackService() + { + } + + public void registerAsyncCallback(String correlationId, AsyncCallback callback) + { + if (_callbackMap.containsKey(correlationId)) + { + _logger.warn("correlation id " + correlationId + " already registered"); + } + _logger.info("registering correlation id " + correlationId); + _callbackMap.put(correlationId, callback); + } + + void verifyMessage(Message message) + { + if (!message.getMsgType().toString() + .equalsIgnoreCase(MessageType.TASK_REPLY.toString())) + { + String errorMsg = "Unexpected msg type for message " + message.getMsgId() + + " type:" + message.getMsgType() + " Expected : " + + MessageType.TASK_REPLY; + _logger.error(errorMsg); + throw new HelixException(errorMsg); + } + String correlationId = message.getCorrelationId(); + if (correlationId == null) + { + String errorMsg = "Message " + message.getMsgId() + + " does not have correlation id"; + _logger.error(errorMsg); + throw new HelixException(errorMsg); + } + + if (!_callbackMap.containsKey(correlationId)) + { + String errorMsg = "Message " + + message.getMsgId() + + " does not have correponding callback. Probably timed out already. Correlation id: " + + correlationId; + _logger.error(errorMsg); + throw new HelixException(errorMsg); + } + _logger.info("Verified reply message " + message.getMsgId() + + " correlation:" + correlationId); + } + + @Override + public MessageHandler createHandler(Message message, + NotificationContext context) + { + verifyMessage(message); + return new AsyncCallbackMessageHandler(message.getCorrelationId(),message, context); + } + + @Override + public String getMessageType() + { + return MessageType.TASK_REPLY.toString(); + } + + @Override + public void reset() + { + + } + + public class AsyncCallbackMessageHandler extends MessageHandler + { + private final String _correlationId; + + public AsyncCallbackMessageHandler(String correlationId, Message message, NotificationContext context) + { + super(message, context); + _correlationId = correlationId; + } + + @Override + public HelixTaskResult handleMessage() throws InterruptedException + { + verifyMessage(_message); + HelixTaskResult result = new HelixTaskResult(); + assert (_correlationId.equalsIgnoreCase(_message.getCorrelationId())); + _logger.info("invoking reply message " + _message.getMsgId() + + ", correlationid:" + _correlationId); + + AsyncCallback callback = _callbackMap.get(_correlationId); + synchronized (callback) + { + callback.onReply(_message); + if (callback.isDone()) + { + _logger.info("Removing finished callback, correlationid:" + + _correlationId); + _callbackMap.remove(_correlationId); + } + } + result.setSuccess(true); + return result; + } + + @Override + public void onError(Exception e, ErrorCode code, ErrorType type) + { + _logger.error("Message handling pipeline get an exception. MsgId:" + _message.getMsgId(), e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/messaging/handling/GroupMessageHandler.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/GroupMessageHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/GroupMessageHandler.java new file mode 100644 index 0000000..a6ee6f0 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/GroupMessageHandler.java @@ -0,0 +1,116 @@ +package org.apache.helix.messaging.handling; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.helix.PropertyKey; +import org.apache.helix.model.CurrentState; +import org.apache.helix.model.Message; +import org.apache.helix.model.Message.Attributes; + + +public class GroupMessageHandler +{ + class CurrentStateUpdate + { + final PropertyKey _key; + final CurrentState _curStateDelta; + + public CurrentStateUpdate(PropertyKey key, CurrentState curStateDelta) + { + _key = key; + _curStateDelta = curStateDelta; + } + + public void merge(CurrentState curState) + { + _curStateDelta.getRecord().merge(curState.getRecord()); + } + } + + static class GroupMessageInfo + { + final Message _message; + final AtomicInteger _countDown; + final ConcurrentLinkedQueue _curStateUpdateList; + + public GroupMessageInfo(Message message) + { + _message = message; + List partitionNames = message.getPartitionNames(); + _countDown = new AtomicInteger(partitionNames.size()); + _curStateUpdateList = new ConcurrentLinkedQueue(); + } + + public Map merge() + { + Map curStateUpdateMap = + new HashMap(); + for (CurrentStateUpdate update : _curStateUpdateList) + { + String path = update._key.getPath(); + if (!curStateUpdateMap.containsKey(path)) + { + curStateUpdateMap.put(path, update); + } + else + { + curStateUpdateMap.get(path).merge(update._curStateDelta); + } + } + + Map ret = new HashMap(); + for (CurrentStateUpdate update : curStateUpdateMap.values()) + { + ret.put(update._key, update._curStateDelta); + } + + return ret; + } + + } + + final ConcurrentHashMap _groupMsgMap; + + public GroupMessageHandler() + { + _groupMsgMap = new ConcurrentHashMap(); + } + + public void put(Message message) + { + _groupMsgMap.putIfAbsent(message.getId(), new GroupMessageInfo(message)); + } + + // return non-null if all sub-messages are completed + public GroupMessageInfo onCompleteSubMessage(Message subMessage) + { + String parentMid = subMessage.getAttribute(Attributes.PARENT_MSG_ID); + GroupMessageInfo info = _groupMsgMap.get(parentMid); + if (info != null) + { + int val = info._countDown.decrementAndGet(); + if (val <= 0) + { + return _groupMsgMap.remove(parentMid); + } + } + + return null; + } + + void addCurStateUpdate(Message subMessage, PropertyKey key, CurrentState delta) + { + String parentMid = subMessage.getAttribute(Attributes.PARENT_MSG_ID); + GroupMessageInfo info = _groupMsgMap.get(parentMid); + if (info != null) + { + info._curStateUpdateList.add(new CurrentStateUpdate(key, delta)); + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java new file mode 100644 index 0000000..102a984 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java @@ -0,0 +1,388 @@ +/** + * Copyright (C) 2012 LinkedIn Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.helix.messaging.handling; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.List; + +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixException; +import org.apache.helix.HelixManager; +import org.apache.helix.NotificationContext; +import org.apache.helix.PropertyKey; +import org.apache.helix.ZNRecordBucketizer; +import org.apache.helix.ZNRecordDelta; +import org.apache.helix.PropertyKey.Builder; +import org.apache.helix.ZNRecordDelta.MergeOperation; +import org.apache.helix.model.CurrentState; +import org.apache.helix.model.Message; +import org.apache.helix.participant.statemachine.StateModel; +import org.apache.helix.participant.statemachine.StateModelParser; +import org.apache.helix.participant.statemachine.StateTransitionError; +import org.apache.helix.util.StatusUpdateUtil; +import org.apache.log4j.Logger; + + +public class HelixStateTransitionHandler extends MessageHandler +{ + public static class HelixStateMismatchException extends Exception + { + public HelixStateMismatchException(String info) + { + super(info); + } + } + private static Logger logger = + Logger.getLogger(HelixStateTransitionHandler.class); + private final StateModel _stateModel; + StatusUpdateUtil _statusUpdateUtil; + private final StateModelParser _transitionMethodFinder; + private final CurrentState _currentStateDelta; + volatile boolean _isTimeout = false; + private final HelixTaskExecutor _executor; + + public HelixStateTransitionHandler(StateModel stateModel, + Message message, + NotificationContext context, + CurrentState currentStateDelta, + HelixTaskExecutor executor) + { + super(message, context); + _stateModel = stateModel; + _statusUpdateUtil = new StatusUpdateUtil(); + _transitionMethodFinder = new StateModelParser(); + _currentStateDelta = currentStateDelta; + _executor = executor; + } + + private void prepareMessageExecution(HelixManager manager, Message message) throws HelixException, + HelixStateMismatchException + { + if (!message.isValid()) + { + String errorMessage = + "Invalid Message, ensure that message: " + message + + " has all the required fields: " + + Arrays.toString(Message.Attributes.values()); + + _statusUpdateUtil.logError(message, + HelixStateTransitionHandler.class, + errorMessage, + manager.getHelixDataAccessor()); + logger.error(errorMessage); + throw new HelixException(errorMessage); + } + // DataAccessor accessor = manager.getDataAccessor(); + HelixDataAccessor accessor = manager.getHelixDataAccessor(); + + String partitionName = message.getPartitionName(); + String fromState = message.getFromState(); + + // Verify the fromState and current state of the stateModel + String state = _currentStateDelta.getState(partitionName); + + if (fromState != null && !fromState.equals("*") && !fromState.equalsIgnoreCase(state)) + { + String errorMessage = + "Current state of stateModel does not match the fromState in Message" + + ", Current State:" + state + ", message expected:" + fromState + + ", partition: " + partitionName + ", from: " + message.getMsgSrc() + + ", to: " + message.getTgtName(); + + _statusUpdateUtil.logError(message, + HelixStateTransitionHandler.class, + errorMessage, + accessor); + logger.error(errorMessage); + throw new HelixStateMismatchException(errorMessage); + } + } + + void postExecutionMessage(HelixManager manager, + Message message, + NotificationContext context, + HelixTaskResult taskResult, + Exception exception) + { + String partitionKey = message.getPartitionName(); + String resource = message.getResourceName(); + String sessionId = message.getTgtSessionId(); + String instanceName = manager.getInstanceName(); + + HelixDataAccessor accessor = manager.getHelixDataAccessor(); + Builder keyBuilder = accessor.keyBuilder(); + + int bucketSize = message.getBucketSize(); + ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(bucketSize); + + // Lock the helix manager so that the session id will not change when we update + // the state model state. for zk current state it is OK as we have the per-session + // current state node + synchronized (manager) + { + if (!message.getTgtSessionId().equals(manager.getSessionId())) + { + logger.warn("Session id has changed. Skip postExecutionMessage. Old session " + + message.getExecutionSessionId() + " , new session : " + + manager.getSessionId()); + return; + } + + if (taskResult.isSucess()) + { + // String fromState = message.getFromState(); + String toState = message.getToState(); + _currentStateDelta.setState(partitionKey, toState); + + if (toState.equalsIgnoreCase("DROPPED")) + { + // for "OnOfflineToDROPPED" message, we need to remove the resource key record + // from the current state of the instance because the resource key is dropped. + // In the state model it will be stayed as "OFFLINE", which is OK. + ZNRecordDelta delta = + new ZNRecordDelta(_currentStateDelta.getRecord(), MergeOperation.SUBTRACT); + // Don't subtract simple fields since they contain stateModelDefRef + delta._record.getSimpleFields().clear(); + + List deltaList = new ArrayList(); + deltaList.add(delta); + _currentStateDelta.setDeltaList(deltaList); + } + else + { + // if the partition is not to be dropped, update _stateModel to the TO_STATE + _stateModel.updateState(toState); + } + } + else + { + if (exception instanceof HelixStateMismatchException) + { + // if fromState mismatch, set current state on zk to stateModel's current state + logger.warn("Force CurrentState on Zk to be stateModel's CurrentState. partitionKey: " + + partitionKey + + ", currentState: " + + _stateModel.getCurrentState() + + ", message: " + message); + _currentStateDelta.setState(partitionKey, _stateModel.getCurrentState()); + } + else + { + StateTransitionError error = + new StateTransitionError(ErrorType.INTERNAL, ErrorCode.ERROR, exception); + if (exception instanceof InterruptedException) + { + if (_isTimeout) + { + error = + new StateTransitionError(ErrorType.INTERNAL, + ErrorCode.TIMEOUT, + exception); + } + else + { + // State transition interrupted but not caused by timeout. Keep the current + // state in this case + logger.error("State transition interrupted but not timeout. Not updating state. Partition : " + + message.getPartitionName() + " MsgId : " + message.getMsgId()); + return; + } + } + _stateModel.rollbackOnError(message, context, error); + _currentStateDelta.setState(partitionKey, "ERROR"); + _stateModel.updateState("ERROR"); + } + } + } + try + { + // Update the ZK current state of the node + PropertyKey key = keyBuilder.currentState(instanceName, + sessionId, + resource, + bucketizer.getBucketName(partitionKey)); + if (!_message.getGroupMessageMode()) + { + accessor.updateProperty(key, _currentStateDelta); + } + else + { + _executor._groupMsgHandler.addCurStateUpdate(_message, key, _currentStateDelta); + } + } + catch (Exception e) + { + logger.error("Error when updating the state ", e); + StateTransitionError error = + new StateTransitionError(ErrorType.FRAMEWORK, ErrorCode.ERROR, e); + _stateModel.rollbackOnError(message, context, error); + _statusUpdateUtil.logError(message, + HelixStateTransitionHandler.class, + e, + "Error when update the state ", + accessor); + } + } + + public HelixTaskResult handleMessageInternal(Message message, + NotificationContext context) + { + synchronized (_stateModel) + { + HelixTaskResult taskResult = new HelixTaskResult(); + HelixManager manager = context.getManager(); + HelixDataAccessor accessor = manager.getHelixDataAccessor(); + + _statusUpdateUtil.logInfo(message, + HelixStateTransitionHandler.class, + "Message handling task begin execute", + accessor); + message.setExecuteStartTimeStamp(new Date().getTime()); + + Exception exception = null; + try + { + prepareMessageExecution(manager, message); + invoke(accessor, context, taskResult, message); + } + catch (HelixStateMismatchException e) + { + // Simply log error and return from here if State mismatch. + // The current state of the state model is intact. + taskResult.setSuccess(false); + taskResult.setMessage(e.toString()); + taskResult.setException(e); + exception = e; + // return taskResult; + } + catch (Exception e) + { + String errorMessage = + "Exception while executing a state transition task " + + message.getPartitionName(); + logger.error(errorMessage, e); + if (e.getCause() != null && e.getCause() instanceof InterruptedException) + { + e = (InterruptedException) e.getCause(); + } + _statusUpdateUtil.logError(message, + HelixStateTransitionHandler.class, + e, + errorMessage, + accessor); + taskResult.setSuccess(false); + taskResult.setMessage(e.toString()); + taskResult.setException(e); + taskResult.setInterrupted(e instanceof InterruptedException); + exception = e; + } + postExecutionMessage(manager, message, context, taskResult, exception); + + return taskResult; + } + } + + private void invoke(HelixDataAccessor accessor, + NotificationContext context, + HelixTaskResult taskResult, + Message message) throws IllegalAccessException, + InvocationTargetException, + InterruptedException + { + _statusUpdateUtil.logInfo(message, + HelixStateTransitionHandler.class, + "Message handling invoking", + accessor); + + // by default, we invoke state transition function in state model + Method methodToInvoke = null; + String fromState = message.getFromState(); + String toState = message.getToState(); + methodToInvoke = + _transitionMethodFinder.getMethodForTransition(_stateModel.getClass(), + fromState, + toState, + new Class[] { Message.class, + NotificationContext.class }); + if (methodToInvoke != null) + { + methodToInvoke.invoke(_stateModel, new Object[] { message, context }); + taskResult.setSuccess(true); + } + else + { + String errorMessage = + "Unable to find method for transition from " + fromState + " to " + toState + + "in " + _stateModel.getClass(); + logger.error(errorMessage); + taskResult.setSuccess(false); + + _statusUpdateUtil.logError(message, + HelixStateTransitionHandler.class, + errorMessage, + accessor); + } + } + + @Override + public HelixTaskResult handleMessage() + { + return handleMessageInternal(_message, _notificationContext); + } + + @Override + public void onError(Exception e, ErrorCode code, ErrorType type) + { + // All internal error has been processed already, so we can skip them + if (type == ErrorType.INTERNAL) + { + logger.error("Skip internal error " + e.getMessage() + " " + code); + return; + } + HelixManager manager = _notificationContext.getManager(); + HelixDataAccessor accessor = manager.getHelixDataAccessor(); + Builder keyBuilder = accessor.keyBuilder(); + + String instanceName = manager.getInstanceName(); + String partition = _message.getPartitionName(); + String resourceName = _message.getResourceName(); + CurrentState currentStateDelta = new CurrentState(resourceName); + + StateTransitionError error = new StateTransitionError(type, code, e); + _stateModel.rollbackOnError(_message, _notificationContext, error); + // if the transition is not canceled, it should go into error state + if (code == ErrorCode.ERROR) + { + currentStateDelta.setState(partition, "ERROR"); + _stateModel.updateState("ERROR"); + + accessor.updateProperty(keyBuilder.currentState(instanceName, + _message.getTgtSessionId(), + resourceName), + currentStateDelta); + } + } + + @Override + public void onTimeout() + { + _isTimeout = true; + } +}; http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java new file mode 100644 index 0000000..d14cfaa --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java @@ -0,0 +1,369 @@ +/** + * Copyright (C) 2012 LinkedIn Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.helix.messaging.handling; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.Callable; + +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixManager; +import org.apache.helix.InstanceType; +import org.apache.helix.NotificationContext; +import org.apache.helix.PropertyKey; +import org.apache.helix.PropertyKey.Builder; +import org.apache.helix.messaging.handling.GroupMessageHandler.GroupMessageInfo; +import org.apache.helix.messaging.handling.MessageHandler.ErrorCode; +import org.apache.helix.messaging.handling.MessageHandler.ErrorType; +import org.apache.helix.model.CurrentState; +import org.apache.helix.model.Message; +import org.apache.helix.model.Message.MessageType; +import org.apache.helix.monitoring.StateTransitionContext; +import org.apache.helix.monitoring.StateTransitionDataPoint; +import org.apache.helix.util.StatusUpdateUtil; +import org.apache.log4j.Logger; + + +public class HelixTask implements Callable +{ + private static Logger logger = Logger.getLogger(HelixTask.class); + private final Message _message; + private final MessageHandler _handler; + private final NotificationContext _notificationContext; + private final HelixManager _manager; + StatusUpdateUtil _statusUpdateUtil; + HelixTaskExecutor _executor; + volatile boolean _isTimeout = false; + + public class TimeoutCancelTask extends TimerTask + { + HelixTaskExecutor _executor; + Message _message; + NotificationContext _context; + + public TimeoutCancelTask(HelixTaskExecutor executor, + Message message, + NotificationContext context) + { + _executor = executor; + _message = message; + _context = context; + } + + @Override + public void run() + { + _isTimeout = true; + logger.warn("Message time out, canceling. id:" + _message.getMsgId() + + " timeout : " + _message.getExecutionTimeout()); + _handler.onTimeout(); + _executor.cancelTask(_message, _context); + } + + } + + public HelixTask(Message message, + NotificationContext notificationContext, + MessageHandler handler, + HelixTaskExecutor executor) throws Exception + { + this._notificationContext = notificationContext; + this._message = message; + this._handler = handler; + this._manager = notificationContext.getManager(); + _statusUpdateUtil = new StatusUpdateUtil(); + _executor = executor; + } + + @Override + public HelixTaskResult call() + { + // Start the timeout TimerTask, if necessary + Timer timer = null; + if (_message.getExecutionTimeout() > 0) + { + timer = new Timer(true); + timer.schedule(new TimeoutCancelTask(_executor, _message, _notificationContext), + _message.getExecutionTimeout()); + logger.info("Message starts with timeout " + _message.getExecutionTimeout() + + " MsgId:" + _message.getMsgId()); + } + else + { + logger.info("Message does not have timeout. MsgId:" + _message.getMsgId() + "/" + + _message.getPartitionName()); + } + + HelixTaskResult taskResult = new HelixTaskResult(); + + Exception exception = null; + ErrorType type = ErrorType.INTERNAL; + ErrorCode code = ErrorCode.ERROR; + + long start = System.currentTimeMillis(); + logger.info("msg:" + _message.getMsgId() + " handling task begin, at: " + start); + HelixDataAccessor accessor = _manager.getHelixDataAccessor(); + _statusUpdateUtil.logInfo(_message, + HelixTask.class, + "Message handling task begin execute", + accessor); + _message.setExecuteStartTimeStamp(new Date().getTime()); + + // Handle the message + try + { + taskResult = _handler.handleMessage(); + exception = taskResult.getException(); + } + catch (InterruptedException e) + { + _statusUpdateUtil.logError(_message, + HelixTask.class, + e, + "State transition interrupted, timeout:" + _isTimeout, + accessor); + logger.info("Message " + _message.getMsgId() + " is interrupted"); + taskResult.setInterrupted(true); + taskResult.setException(e); + exception = e; + } + catch (Exception e) + { + String errorMessage = + "Exception while executing a message. " + e + " msgId: " + _message.getMsgId() + + " type: " + _message.getMsgType(); + logger.error(errorMessage, e); + _statusUpdateUtil.logError(_message, HelixTask.class, e, errorMessage, accessor); + taskResult.setSuccess(false); + taskResult.setException(e); + taskResult.setMessage(e.getMessage()); + exception = e; + } + + // Cancel the timer since the handling is done + // it is fine if the TimerTask for canceling is called already + if (timer != null) + { + timer.cancel(); + } + + if (taskResult.isSucess()) + { + _statusUpdateUtil.logInfo(_message, + _handler.getClass(), + "Message handling task completed successfully", + accessor); + logger.info("Message " + _message.getMsgId() + " completed."); + } + else if (taskResult.isInterrupted()) + { + logger.info("Message " + _message.getMsgId() + " is interrupted"); + code = _isTimeout ? ErrorCode.TIMEOUT : ErrorCode.CANCEL; + if (_isTimeout) + { + int retryCount = _message.getRetryCount(); + logger.info("Message timeout, retry count: " + retryCount + " MSGID:" + + _message.getMsgId()); + _statusUpdateUtil.logInfo(_message, + _handler.getClass(), + "Message handling task timeout, retryCount:" + + retryCount, + accessor); + // Notify the handler that timeout happens, and the number of retries left + // In case timeout happens (time out and also interrupted) + // we should retry the execution of the message by re-schedule it in + if (retryCount > 0) + { + _message.setRetryCount(retryCount - 1); + _executor.scheduleTask(_message, _handler, _notificationContext); + return taskResult; + } + } + } + else + // logging for errors + { + String errorMsg = + "Message execution failed. msgId: " + _message.getMsgId() + + taskResult.getMessage(); + if (exception != null) + { + errorMsg += exception; + } + logger.error(errorMsg, exception); + _statusUpdateUtil.logError(_message, _handler.getClass(), errorMsg, accessor); + } + + // Post-processing for the finished task + try + { + if (!_message.getGroupMessageMode()) + { + removeMessageFromZk(accessor, _message); + reportMessageStat(_manager, _message, taskResult); + sendReply(accessor, _message, taskResult); + } + else + { + GroupMessageInfo info = _executor._groupMsgHandler.onCompleteSubMessage(_message); + if (info != null) + { + // TODO: changed to async update + // group update current state + Map curStateMap = info.merge(); + for (PropertyKey key : curStateMap.keySet()) + { + accessor.updateProperty(key, curStateMap.get(key)); + } + + // remove group message + removeMessageFromZk(accessor, _message); + reportMessageStat(_manager, _message, taskResult); + sendReply(accessor, _message, taskResult); + } + } + _executor.reportCompletion(_message); + } + + // TODO: capture errors and log here + catch (Exception e) + { + String errorMessage = + "Exception after executing a message, msgId: " + _message.getMsgId() + e; + logger.error(errorMessage, e); + _statusUpdateUtil.logError(_message, HelixTask.class, errorMessage, accessor); + exception = e; + type = ErrorType.FRAMEWORK; + code = ErrorCode.ERROR; + } + // + finally + { + long end = System.currentTimeMillis(); + logger.info("msg:" + _message.getMsgId() + " handling task completed, results:" + + taskResult.isSucess() + ", at: " + end + ", took:" + (end - start)); + + // Notify the handler about any error happened in the handling procedure, so that + // the handler have chance to finally cleanup + if (exception != null) + { + _handler.onError(exception, code, type); + } + } + return taskResult; + } + + private void removeMessageFromZk(HelixDataAccessor accessor, Message message) + { + Builder keyBuilder = accessor.keyBuilder(); + if (message.getTgtName().equalsIgnoreCase("controller")) + { + // TODO: removeProperty returns boolean + accessor.removeProperty(keyBuilder.controllerMessage(message.getMsgId())); + } + else + { + accessor.removeProperty(keyBuilder.message(_manager.getInstanceName(), + message.getMsgId())); + } + } + + private void sendReply(HelixDataAccessor accessor, + Message message, + HelixTaskResult taskResult) + { + if (_message.getCorrelationId() != null + && !message.getMsgType().equals(MessageType.TASK_REPLY.toString())) + { + logger.info("Sending reply for message " + message.getCorrelationId()); + _statusUpdateUtil.logInfo(message, HelixTask.class, "Sending reply", accessor); + + taskResult.getTaskResultMap().put("SUCCESS", "" + taskResult.isSucess()); + taskResult.getTaskResultMap().put("INTERRUPTED", "" + taskResult.isInterrupted()); + if (!taskResult.isSucess()) + { + taskResult.getTaskResultMap().put("ERRORINFO", taskResult.getMessage()); + } + Message replyMessage = + Message.createReplyMessage(_message, + _manager.getInstanceName(), + taskResult.getTaskResultMap()); + replyMessage.setSrcInstanceType(_manager.getInstanceType()); + + if (message.getSrcInstanceType() == InstanceType.PARTICIPANT) + { + Builder keyBuilder = accessor.keyBuilder(); + accessor.setProperty(keyBuilder.message(message.getMsgSrc(), + replyMessage.getMsgId()), + replyMessage); + } + else if (message.getSrcInstanceType() == InstanceType.CONTROLLER) + { + Builder keyBuilder = accessor.keyBuilder(); + accessor.setProperty(keyBuilder.controllerMessage(replyMessage.getMsgId()), + replyMessage); + } + _statusUpdateUtil.logInfo(message, HelixTask.class, "1 msg replied to " + + replyMessage.getTgtName(), accessor); + } + } + + private void reportMessageStat(HelixManager manager, + Message message, + HelixTaskResult taskResult) + { + // report stat + if (!message.getMsgType().equals(MessageType.STATE_TRANSITION.toString())) + { + return; + } + long now = new Date().getTime(); + long msgReadTime = message.getReadTimeStamp(); + long msgExecutionStartTime = message.getExecuteStartTimeStamp(); + if (msgReadTime != 0 && msgExecutionStartTime != 0) + { + long totalDelay = now - msgReadTime; + long executionDelay = now - msgExecutionStartTime; + if (totalDelay > 0 && executionDelay > 0) + { + String fromState = message.getFromState(); + String toState = message.getToState(); + String transition = fromState + "--" + toState; + + StateTransitionContext cxt = + new StateTransitionContext(manager.getClusterName(), + manager.getInstanceName(), + message.getResourceName(), + transition); + + StateTransitionDataPoint data = + new StateTransitionDataPoint(totalDelay, + executionDelay, + taskResult.isSucess()); + _executor.getParticipantMonitor().reportTransitionStat(cxt, data); + } + } + else + { + logger.warn("message read time and start execution time not recorded."); + } + } + +}; http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java new file mode 100644 index 0000000..daff92b --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java @@ -0,0 +1,638 @@ +/** + * Copyright (C) 2012 LinkedIn Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.helix.messaging.handling; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.helix.ConfigAccessor; +import org.apache.helix.ConfigScope; +import org.apache.helix.ConfigScopeBuilder; +import org.apache.helix.HelixConstants; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixException; +import org.apache.helix.HelixManager; +import org.apache.helix.MessageListener; +import org.apache.helix.NotificationContext; +import org.apache.helix.PropertyKey; +import org.apache.helix.ZNRecord; +import org.apache.helix.NotificationContext.Type; +import org.apache.helix.PropertyKey.Builder; +import org.apache.helix.model.CurrentState; +import org.apache.helix.model.Message; +import org.apache.helix.model.Message.Attributes; +import org.apache.helix.model.Message.MessageState; +import org.apache.helix.model.Message.MessageType; +import org.apache.helix.monitoring.ParticipantMonitor; +import org.apache.helix.participant.HelixStateMachineEngine; +import org.apache.helix.util.StatusUpdateUtil; +import org.apache.log4j.Logger; + + +public class HelixTaskExecutor implements MessageListener +{ + // TODO: we need to further design how to throttle this. + // From storage point of view, only bootstrap case is expensive + // and we need to throttle, which is mostly IO / network bounded. + public static final int DEFAULT_PARALLEL_TASKS = 40; + // TODO: create per-task type threadpool with customizable pool size + protected final Map> _taskMap; + private final Object _lock; + private final StatusUpdateUtil _statusUpdateUtil; + private final ParticipantMonitor _monitor; + public static final String MAX_THREADS = + "maxThreads"; + + final ConcurrentHashMap _handlerFactoryMap = + new ConcurrentHashMap(); + + final ConcurrentHashMap _threadpoolMap = + new ConcurrentHashMap(); + + private static Logger LOG = + Logger.getLogger(HelixTaskExecutor.class); + + Map _resourceThreadpoolSizeMap = + new ConcurrentHashMap(); + + final GroupMessageHandler _groupMsgHandler; + + public HelixTaskExecutor() + { + _taskMap = new ConcurrentHashMap>(); + _groupMsgHandler = new GroupMessageHandler(); + + _lock = new Object(); + _statusUpdateUtil = new StatusUpdateUtil(); + _monitor = new ParticipantMonitor(); + startMonitorThread(); + } + + public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory) + { + registerMessageHandlerFactory(type, factory, DEFAULT_PARALLEL_TASKS); + } + + public void registerMessageHandlerFactory(String type, + MessageHandlerFactory factory, + int threadpoolSize) + { + if (!_handlerFactoryMap.containsKey(type)) + { + if (!type.equalsIgnoreCase(factory.getMessageType())) + { + throw new HelixException("Message factory type mismatch. Type: " + type + + " factory : " + factory.getMessageType()); + + } + _handlerFactoryMap.put(type, factory); + _threadpoolMap.put(type, Executors.newFixedThreadPool(threadpoolSize)); + LOG.info("Adding msg factory for type " + type + " threadpool size " + + threadpoolSize); + } + else + { + LOG.error("Ignoring duplicate msg handler factory for type " + type); + } + } + + public ParticipantMonitor getParticipantMonitor() + { + return _monitor; + } + + private void startMonitorThread() + { + // start a thread which monitors the completions of task + } + + void checkResourceConfig(String resourceName, HelixManager manager) + { + if (!_resourceThreadpoolSizeMap.containsKey(resourceName)) + { + int threadpoolSize = -1; + ConfigAccessor configAccessor = manager.getConfigAccessor(); + if (configAccessor != null) + { + ConfigScope scope = + new ConfigScopeBuilder().forCluster(manager.getClusterName()) + .forResource(resourceName) + .build(); + + String threadpoolSizeStr = configAccessor.get(scope, MAX_THREADS); + try + { + if (threadpoolSizeStr != null) + { + threadpoolSize = Integer.parseInt(threadpoolSizeStr); + } + } + catch (Exception e) + { + LOG.error("", e); + } + } + if (threadpoolSize > 0) + { + String key = MessageType.STATE_TRANSITION.toString() + "." + resourceName; + _threadpoolMap.put(key, Executors.newFixedThreadPool(threadpoolSize)); + LOG.info("Adding per resource threadpool for resource " + resourceName + + " with size " + threadpoolSize); + } + _resourceThreadpoolSizeMap.put(resourceName, threadpoolSize); + } + } + + /** + * Find the executor service for the message. A message can have a per-statemodelfactory + * executor service, or per-message type executor service. + * + **/ + ExecutorService findExecutorServiceForMsg(Message message) + { + ExecutorService executorService = _threadpoolMap.get(message.getMsgType()); + if (message.getMsgType().equals(MessageType.STATE_TRANSITION.toString())) + { + String resourceName = message.getResourceName(); + if (resourceName != null) + { + String key = message.getMsgType() + "." + resourceName; + if (_threadpoolMap.containsKey(key)) + { + LOG.info("Find per-resource thread pool with key " + key); + executorService = _threadpoolMap.get(key); + } + } + } + return executorService; + } + + public void scheduleTask(Message message, + MessageHandler handler, + NotificationContext notificationContext) + { + assert (handler != null); + synchronized (_lock) + { + try + { + String taskId = message.getMsgId() + "/" + message.getPartitionName(); + + if (message.getMsgType().equals(MessageType.STATE_TRANSITION.toString())) + { + checkResourceConfig(message.getResourceName(), notificationContext.getManager()); + } + LOG.info("Scheduling message: " + taskId); + // System.out.println("sched msg: " + message.getPartitionName() + "-" + // + message.getTgtName() + "-" + message.getFromState() + "-" + // + message.getToState()); + + _statusUpdateUtil.logInfo(message, + HelixTaskExecutor.class, + "Message handling task scheduled", + notificationContext.getManager().getHelixDataAccessor()); + + HelixTask task = new HelixTask(message, notificationContext, handler, this); + if (!_taskMap.containsKey(taskId)) + { + LOG.info("Message:" + taskId + " handling task scheduled"); + Future future = + findExecutorServiceForMsg(message).submit(task); + _taskMap.put(taskId, future); + } + else + { + _statusUpdateUtil.logWarning(message, + HelixTaskExecutor.class, + "Message handling task already sheduled for " + + taskId, + notificationContext.getManager() + .getHelixDataAccessor()); + } + } + catch (Exception e) + { + LOG.error("Error while executing task." + message, e); + + _statusUpdateUtil.logError(message, + HelixTaskExecutor.class, + e, + "Error while executing task " + e, + notificationContext.getManager() + .getHelixDataAccessor()); + } + } + } + + public void cancelTask(Message message, NotificationContext notificationContext) + { + synchronized (_lock) + { + String taskId = message.getMsgId() + "/" + message.getPartitionName(); + + if (_taskMap.containsKey(taskId)) + { + _statusUpdateUtil.logInfo(message, + HelixTaskExecutor.class, + "Trying to cancel the future for " + taskId, + notificationContext.getManager().getHelixDataAccessor()); + Future future = _taskMap.get(taskId); + + // If the thread is still running it will be interrupted if cancel(true) + // is called. So state transition callbacks should implement logic to + // return + // if it is interrupted. + if (future.cancel(true)) + { + _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "Canceled " + + taskId, notificationContext.getManager().getHelixDataAccessor()); + _taskMap.remove(taskId); + } + else + { + _statusUpdateUtil.logInfo(message, + HelixTaskExecutor.class, + "false when trying to cancel the message " + taskId, + notificationContext.getManager() + .getHelixDataAccessor()); + } + } + else + { + _statusUpdateUtil.logWarning(message, + HelixTaskExecutor.class, + "Future not found when trying to cancel " + taskId, + notificationContext.getManager() + .getHelixDataAccessor()); + } + } + } + + protected void reportCompletion(Message message) // String msgId) + { + synchronized (_lock) + { + String taskId = message.getMsgId() + "/" + message.getPartitionName(); + LOG.info("message finished: " + taskId + ", took " + + (new Date().getTime() - message.getExecuteStartTimeStamp())); + if (_taskMap.containsKey(taskId)) + { + _taskMap.remove(taskId); + } + else + { + LOG.warn("message " + taskId + "not found in task map"); + } + } + } + + private void updateMessageState(List readMsgs, + HelixDataAccessor accessor, + String instanceName) + { + Builder keyBuilder = accessor.keyBuilder(); + List readMsgKeys = new ArrayList(); + for (Message msg : readMsgs) + { + readMsgKeys.add(msg.getKey(keyBuilder, instanceName)); + } + accessor.setChildren(readMsgKeys, readMsgs); + } + + @Override + public void onMessage(String instanceName, + List messages, + NotificationContext changeContext) + { + // If FINALIZE notification comes, reset all handler factories + // and terminate all the thread pools + // TODO: see if we should have a separate notification call for resetting + if (changeContext.getType() == Type.FINALIZE) + { + LOG.info("Get FINALIZE notification"); + for (MessageHandlerFactory factory : _handlerFactoryMap.values()) + { + factory.reset(); + } + // Cancel all scheduled future + // synchronized (_lock) + { + for (Future f : _taskMap.values()) + { + f.cancel(true); + } + _taskMap.clear(); + } + return; + } + + HelixManager manager = changeContext.getManager(); + HelixDataAccessor accessor = manager.getHelixDataAccessor(); + Builder keyBuilder = accessor.keyBuilder(); + + if (messages == null || messages.size() == 0) + { + LOG.info("No Messages to process"); + return; + } + + // sort message by creation timestamp, so message created earlier is processed first + Collections.sort(messages, Message.CREATE_TIME_COMPARATOR); + + // message handlers created + List handlers = new ArrayList(); + + // message read + List readMsgs = new ArrayList(); + + String sessionId = manager.getSessionId(); + List curResourceNames = + accessor.getChildNames(keyBuilder.currentStates(instanceName, sessionId)); + List createCurStateKeys = new ArrayList(); + List metaCurStates = new ArrayList(); + Set createCurStateNames = new HashSet(); + + changeContext.add(NotificationContext.TASK_EXECUTOR_KEY, this); + for (Message message : messages) + { + // nop messages are simply removed. It is used to trigger onMessage() in + // situations such as register a new message handler factory + if (message.getMsgType().equalsIgnoreCase(MessageType.NO_OP.toString())) + { + LOG.info("Dropping NO-OP message. mid: " + message.getId() + ", from: " + + message.getMsgSrc()); + accessor.removeProperty(message.getKey(keyBuilder, instanceName)); + continue; + } + + String tgtSessionId = message.getTgtSessionId(); + + // if sessionId not match, remove it + if (!sessionId.equals(tgtSessionId) && !tgtSessionId.equals("*")) + { + String warningMessage = + "SessionId does NOT match. expected sessionId: " + sessionId + + ", tgtSessionId in message: " + tgtSessionId + ", messageId: " + + message.getMsgId(); + LOG.warn(warningMessage); + accessor.removeProperty(message.getKey(keyBuilder, instanceName)); + _statusUpdateUtil.logWarning(message, + HelixStateMachineEngine.class, + warningMessage, + accessor); + continue; + } + + // don't process message that is of READ or UNPROCESSABLE state + if (MessageState.NEW != message.getMsgState()) + { + // It happens because we don't delete message right after + // read. Instead we keep it until the current state is updated. + // We will read the message again if there is a new message but we + // check for the status and ignore if its already read + LOG.trace("Message already read. mid: " + message.getMsgId()); + continue; + } + + // create message handlers, if handlers not found, leave its state as NEW + try + { + List createHandlers = + createMessageHandlers(message, changeContext); + if (createHandlers.isEmpty()) + { + continue; + } + handlers.addAll(createHandlers); + } + catch (Exception e) + { + LOG.error("Failed to create message handler for " + message.getMsgId(), e); + String error = + "Failed to create message handler for " + message.getMsgId() + + ", exception: " + e; + + _statusUpdateUtil.logError(message, + HelixStateMachineEngine.class, + e, + error, + accessor); + + // Mark message state UNPROCESSABLE if we hit an exception in creating + // message handler. The message will stay on zookeeper but will not be processed + message.setMsgState(MessageState.UNPROCESSABLE); + accessor.updateProperty(message.getKey(keyBuilder, instanceName), message); + continue; + } + + // update msgState to read + message.setMsgState(MessageState.READ); + message.setReadTimeStamp(new Date().getTime()); + message.setExecuteSessionId(changeContext.getManager().getSessionId()); + + _statusUpdateUtil.logInfo(message, + HelixStateMachineEngine.class, + "New Message", + accessor); + + readMsgs.add(message); + + // batch creation of all current state meta data + // do it for non-controller and state transition messages only + if (!message.isControlerMsg() + && message.getMsgType().equals(Message.MessageType.STATE_TRANSITION.toString())) + { + String resourceName = message.getResourceName(); + if (!curResourceNames.contains(resourceName) + && !createCurStateNames.contains(resourceName)) + { + createCurStateNames.add(resourceName); + createCurStateKeys.add(keyBuilder.currentState(instanceName, + sessionId, + resourceName)); + + CurrentState metaCurState = new CurrentState(resourceName); + metaCurState.setBucketSize(message.getBucketSize()); + metaCurState.setStateModelDefRef(message.getStateModelDef()); + metaCurState.setSessionId(sessionId); + metaCurState.setGroupMessageMode(message.getGroupMessageMode()); + String ftyName = message.getStateModelFactoryName(); + if (ftyName != null) + { + metaCurState.setStateModelFactoryName(ftyName); + } + else + { + metaCurState.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY); + } + + metaCurStates.add(metaCurState); + } + } + } + + // batch create curState meta + if (createCurStateKeys.size() > 0) + { + try + { + accessor.createChildren(createCurStateKeys, metaCurStates); + } + catch (Exception e) + { + LOG.error(e); + } + } + + // update message state to READ in batch and schedule all read messages + if (readMsgs.size() > 0) + { + updateMessageState(readMsgs, accessor, instanceName); + + for (MessageHandler handler : handlers) + { + scheduleTask(handler._message, handler, changeContext); + } + } + } + + private MessageHandler createMessageHandler(Message message, + NotificationContext changeContext) + { + String msgType = message.getMsgType().toString(); + + MessageHandlerFactory handlerFactory = _handlerFactoryMap.get(msgType); + + // Fail to find a MessageHandlerFactory for the message + // we will keep the message and the message will be handled when + // the corresponding MessageHandlerFactory is registered + if (handlerFactory == null) + { + LOG.warn("Fail to find message handler factory for type: " + msgType + " mid:" + + message.getMsgId()); + return null; + } + + return handlerFactory.createHandler(message, changeContext); + } + + private List createMessageHandlers(Message message, + NotificationContext changeContext) + { + List handlers = new ArrayList(); + if (!message.getGroupMessageMode()) + { + LOG.info("Creating handler for message " + message.getMsgId() + "/" + + message.getPartitionName()); + + MessageHandler handler = createMessageHandler(message, changeContext); + + if (handler != null) + { + handlers.add(handler); + } + } + else + { + _groupMsgHandler.put(message); + + List partitionNames = message.getPartitionNames(); + for (String partitionName : partitionNames) + { + Message subMsg = new Message(message.getRecord()); + subMsg.setPartitionName(partitionName); + subMsg.setAttribute(Attributes.PARENT_MSG_ID, message.getId()); + + LOG.info("Creating handler for group message " + subMsg.getMsgId() + "/" + + partitionName); + MessageHandler handler = createMessageHandler(subMsg, changeContext); + if (handler != null) + { + handlers.add(handler); + } + } + } + + return handlers; + } + + public void shutDown() + { + LOG.info("shutting down TaskExecutor"); + synchronized (_lock) + { + for (String msgType : _threadpoolMap.keySet()) + { + List tasksLeft = _threadpoolMap.get(msgType).shutdownNow(); + LOG.info(tasksLeft.size() + " tasks are still in the threadpool for msgType " + + msgType); + } + for (String msgType : _threadpoolMap.keySet()) + { + try + { + if (!_threadpoolMap.get(msgType).awaitTermination(200, TimeUnit.MILLISECONDS)) + { + LOG.warn(msgType + " is not fully termimated in 200 MS"); + System.out.println(msgType + " is not fully termimated in 200 MS"); + } + } + catch (InterruptedException e) + { + LOG.error("Interrupted", e); + } + } + } + _monitor.shutDown(); + LOG.info("shutdown finished"); + } + + // TODO: remove this + public static void main(String[] args) throws Exception + { + ExecutorService pool = Executors.newFixedThreadPool(DEFAULT_PARALLEL_TASKS); + Future future; + future = pool.submit(new Callable() + { + + @Override + public HelixTaskResult call() throws Exception + { + System.out.println("CMTaskExecutor.main(...).new Callable() {...}.call()"); + return null; + } + + }); + future = pool.submit(new HelixTask(null, null, null, null)); + Thread.currentThread().join(); + System.out.println(future.isDone()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java new file mode 100644 index 0000000..ee6919e --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java @@ -0,0 +1,74 @@ +/** + * Copyright (C) 2012 LinkedIn Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.helix.messaging.handling; + +import java.util.HashMap; +import java.util.Map; + +public class HelixTaskResult +{ + + private boolean _success; + private String _message = ""; + private Map _taskResultMap = new HashMap(); + private boolean _interrupted = false; + Exception _exception = null; + + public boolean isSucess() + { + return _success; + } + + public boolean isInterrupted() + { + return _interrupted; + } + + public void setInterrupted(boolean interrupted) + { + _interrupted = interrupted; + } + + public void setSuccess(boolean success) + { + this._success = success; + } + + public String getMessage() + { + return _message; + } + + public void setMessage(String message) + { + this._message = message; + } + + public Map getTaskResultMap() + { + return _taskResultMap; + } + + public void setException(Exception e) + { + _exception = e; + } + + public Exception getException() + { + return _exception; + } +}