Return-Path: X-Original-To: apmail-helix-commits-archive@minotaur.apache.org Delivered-To: apmail-helix-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DA34F1091F for ; Tue, 15 Oct 2013 23:52:03 +0000 (UTC) Received: (qmail 79288 invoked by uid 500); 15 Oct 2013 23:52:03 -0000 Delivered-To: apmail-helix-commits-archive@helix.apache.org Received: (qmail 79261 invoked by uid 500); 15 Oct 2013 23:52:03 -0000 Mailing-List: contact commits-help@helix.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@helix.incubator.apache.org Delivered-To: mailing list commits@helix.incubator.apache.org Received: (qmail 79254 invoked by uid 99); 15 Oct 2013 23:52:03 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Oct 2013 23:52:03 +0000 X-ASF-Spam-Status: No, hits=-2000.5 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD,T_FILL_THIS_FORM_SHORT X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Tue, 15 Oct 2013 23:51:58 +0000 Received: (qmail 77320 invoked by uid 99); 15 Oct 2013 23:51:38 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Oct 2013 23:51:38 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 801668B56D3; Tue, 15 Oct 2013 23:51:38 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kanak@apache.org To: commits@helix.incubator.apache.org Date: Tue, 15 Oct 2013 23:51:39 -0000 Message-Id: <4894af99631248b0888042ef41038e49@git.apache.org> In-Reply-To: <1725185e2566462a8ee001269bc28633@git.apache.org> References: <1725185e2566462a8ee001269bc28633@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/9] [HELIX-209] Backward compatible function naming in the model package X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6b57486b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java index 75d564f..59bae9f 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java @@ -334,7 +334,7 @@ public class ZKHelixAdmin implements HelixAdmin { } // check partition is in ERROR state - SessionId sessionId = liveInstance.getSessionId(); + SessionId sessionId = liveInstance.getTypedSessionId(); CurrentState curState = accessor.getProperty(keyBuilder.currentState(instanceName, sessionId.stringify(), resourceName)); @@ -358,7 +358,7 @@ public class ZKHelixAdmin implements HelixAdmin { List messages = accessor.getChildValues(keyBuilder.messages(instanceName)); for (Message message : messages) { if (!MessageType.STATE_TRANSITION.toString().equalsIgnoreCase(message.getMsgType()) - || !sessionId.equals(message.getTgtSessionId()) + || !sessionId.equals(message.getTypedTgtSessionId()) || !resourceName.equals(message.getResourceId().stringify()) || !resetPartitionNames.contains(message.getPartitionId().stringify())) { continue; @@ -391,7 +391,7 @@ public class ZKHelixAdmin implements HelixAdmin { message.setTgtSessionId(sessionId); message.setStateModelDef(stateModelDef); message.setFromState(State.from(HelixDefinedState.ERROR.toString())); - message.setToState(stateModel.getInitialState()); + message.setToState(stateModel.getTypedInitialState()); message.setStateModelFactoryId(idealState.getStateModelFactoryId()); resetMessages.add(message); @@ -993,7 +993,7 @@ public class ZKHelixAdmin implements HelixAdmin { } // StateModelDefinition def = new StateModelDefinition(stateModDef); - List statePriorityList = stateModDef.getStatesPriorityStringList(); + List statePriorityList = stateModDef.getStatesPriorityList(); String masterStateValue = null; String slaveStateValue = null; @@ -1151,7 +1151,7 @@ public class ZKHelixAdmin implements HelixAdmin { @Override public void rebalance(String clusterName, IdealState currentIdealState, List instanceNames) { Set activeInstances = new HashSet(); - for (PartitionId partition : currentIdealState.getPartitionSet()) { + for (PartitionId partition : currentIdealState.getPartitionIdSet()) { activeInstances.addAll(IdealState.stringListFromPreferenceList(currentIdealState .getPreferenceList(partition))); } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6b57486b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java index 7d37b68..afd35e6 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java @@ -837,7 +837,7 @@ public class ZKHelixManager implements HelixManager { keyBuilder.currentState(_instanceName, _sessionId, lastCurState.getResourceName()) .getPath(); _helixAccessor.getBaseDataAccessor().update(curStatePath, - new CurStateCarryOverUpdater(_sessionId, stateModel.getInitialStateString(), lastCurState), + new CurStateCarryOverUpdater(_sessionId, stateModel.getInitialState(), lastCurState), AccessOption.PERSISTENT); } } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6b57486b/helix-core/src/main/java/org/apache/helix/messaging/AsyncCallback.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/AsyncCallback.java b/helix-core/src/main/java/org/apache/helix/messaging/AsyncCallback.java index f9743a4..9fea0c8 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/AsyncCallback.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/AsyncCallback.java @@ -71,7 +71,7 @@ public abstract class AsyncCallback { } public synchronized final void onReply(Message message) { - _logger.info("OnReply msg " + message.getMsgId()); + _logger.info("OnReply msg " + message.getMessageId()); if (!isDone()) { _messageReplied.add(message); try { http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6b57486b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java index a207b0c..73b69a8 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java @@ -161,7 +161,7 @@ public class DefaultMessagingService implements ClusterMessagingService { List liveInstances = accessor.getChildValues(keyBuilder.liveInstances()); for (LiveInstance liveInstance : liveInstances) { - sessionIdMap.put(liveInstance.getInstanceName(), liveInstance.getSessionId() + sessionIdMap.put(liveInstance.getInstanceName(), liveInstance.getTypedSessionId() .stringify()); } } @@ -194,7 +194,7 @@ public class DefaultMessagingService implements ClusterMessagingService { List messages = new ArrayList(); MessageId id = MessageId.from(UUID.randomUUID().toString()); Message newMessage = new Message(message.getRecord(), id); - newMessage.setMsgId(id); + newMessage.setMessageId(id); newMessage.setSrcName(_manager.getInstanceName()); newMessage.setTgtName("Controller"); messages.add(newMessage); http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6b57486b/helix-core/src/main/java/org/apache/helix/messaging/handling/AsyncCallbackService.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/AsyncCallbackService.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/AsyncCallbackService.java index 46c595d..17fc67d 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/AsyncCallbackService.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/AsyncCallbackService.java @@ -47,14 +47,14 @@ public class AsyncCallbackService implements MessageHandlerFactory { void verifyMessage(Message message) { if (!message.getMsgType().toString().equalsIgnoreCase(MessageType.TASK_REPLY.toString())) { String errorMsg = - "Unexpected msg type for message " + message.getMsgId() + " type:" + message.getMsgType() + "Unexpected msg type for message " + message.getMessageId() + " type:" + message.getMsgType() + " Expected : " + MessageType.TASK_REPLY; _logger.error(errorMsg); throw new HelixException(errorMsg); } String correlationId = message.getCorrelationId(); if (correlationId == null) { - String errorMsg = "Message " + message.getMsgId() + " does not have correlation id"; + String errorMsg = "Message " + message.getMessageId() + " does not have correlation id"; _logger.error(errorMsg); throw new HelixException(errorMsg); } @@ -62,13 +62,13 @@ public class AsyncCallbackService implements MessageHandlerFactory { if (!_callbackMap.containsKey(correlationId)) { String errorMsg = "Message " - + message.getMsgId() + + message.getMessageId() + " does not have correponding callback. Probably timed out already. Correlation id: " + correlationId; _logger.error(errorMsg); throw new HelixException(errorMsg); } - _logger.info("Verified reply message " + message.getMsgId() + " correlation:" + correlationId); + _logger.info("Verified reply message " + message.getMessageId() + " correlation:" + correlationId); } @Override @@ -101,7 +101,7 @@ public class AsyncCallbackService implements MessageHandlerFactory { verifyMessage(_message); HelixTaskResult result = new HelixTaskResult(); assert (_correlationId.equalsIgnoreCase(_message.getCorrelationId())); - _logger.info("invoking reply message " + _message.getMsgId() + ", correlationid:" + _logger.info("invoking reply message " + _message.getMessageId() + ", correlationid:" + _correlationId); AsyncCallback callback = _callbackMap.get(_correlationId); @@ -118,7 +118,7 @@ public class AsyncCallbackService implements MessageHandlerFactory { @Override public void onError(Exception e, ErrorCode code, ErrorType type) { - _logger.error("Message handling pipeline get an exception. MsgId:" + _message.getMsgId(), e); + _logger.error("Message handling pipeline get an exception. MsgId:" + _message.getMessageId(), e); } } } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6b57486b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java index c6eaa65..8381f4a 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java @@ -96,7 +96,7 @@ public class HelixStateTransitionHandler extends MessageHandler { HelixDataAccessor accessor = _manager.getHelixDataAccessor(); PartitionId partitionId = _message.getPartitionId(); - State fromState = _message.getFromState(); + State fromState = _message.getTypedFromState(); // Verify the fromState and current state of the stateModel String state = _currentStateDelta.getState(partitionId.stringify()); @@ -122,7 +122,7 @@ public class HelixStateTransitionHandler extends MessageHandler { PartitionId partitionId = _message.getPartitionId(); ResourceId resource = _message.getResourceId(); - SessionId sessionId = _message.getTgtSessionId(); + SessionId sessionId = _message.getTypedTgtSessionId(); String instanceName = _manager.getInstanceName(); HelixDataAccessor accessor = _manager.getHelixDataAccessor(); @@ -135,15 +135,15 @@ public class HelixStateTransitionHandler extends MessageHandler { // new session // sessionId might change when we update the state model state. // for zk current state it is OK as we have the per-session current state node - if (!_message.getTgtSessionId().stringify().equals(_manager.getSessionId())) { + if (!_message.getTypedTgtSessionId().stringify().equals(_manager.getSessionId())) { logger.warn("Session id has changed. Skip postExecutionMessage. Old session " - + _message.getExecutionSessionId() + " , new session : " + _manager.getSessionId()); + + _message.getTypedExecutionSessionId() + " , new session : " + _manager.getSessionId()); return; } if (taskResult.isSuccess()) { // String fromState = message.getFromState(); - State toState = _message.getToState(); + State toState = _message.getTypedToState(); _currentStateDelta.setState(partitionId, toState); if (toState.toString().equalsIgnoreCase(HelixDefinedState.DROPPED.toString())) { @@ -181,7 +181,7 @@ public class HelixStateTransitionHandler extends MessageHandler { // state in this case logger .error("State transition interrupted but not timeout. Not updating state. Partition : " - + _message.getPartitionId() + " MsgId : " + _message.getMsgId()); + + _message.getPartitionId() + " MsgId : " + _message.getMessageId()); return; } } @@ -190,7 +190,7 @@ public class HelixStateTransitionHandler extends MessageHandler { _stateModel.updateState(HelixDefinedState.ERROR.toString()); // if we have errors transit from ERROR state, disable the partition - if (_message.getFromState().toString().equalsIgnoreCase(HelixDefinedState.ERROR.toString())) { + if (_message.getTypedFromState().toString().equalsIgnoreCase(HelixDefinedState.ERROR.toString())) { disablePartition(); } } @@ -229,7 +229,7 @@ public class HelixStateTransitionHandler extends MessageHandler { HelixAdmin admin = _manager.getClusterManagmentTool(); admin.enablePartition(false, clusterName, instanceName, resourceId.stringify(), Arrays.asList(partitionId.stringify())); - logger.info("error in transit from ERROR to " + _message.getToState() + " for partition: " + logger.info("error in transit from ERROR to " + _message.getTypedToState() + " for partition: " + partitionId + ". disable it on " + instanceName); } @@ -288,8 +288,8 @@ public class HelixStateTransitionHandler extends MessageHandler { // by default, we invoke state transition function in state model Method methodToInvoke = null; - State fromState = message.getFromState(); - State toState = message.getToState(); + State fromState = message.getTypedFromState(); + State toState = message.getTypedToState(); methodToInvoke = _transitionMethodFinder.getMethodForTransition(_stateModel.getClass(), fromState.toString(), toState.toString(), new Class[] { @@ -335,10 +335,10 @@ public class HelixStateTransitionHandler extends MessageHandler { _stateModel.updateState(HelixDefinedState.ERROR.toString()); // if transit from ERROR state, disable the partition - if (_message.getFromState().toString().equalsIgnoreCase(HelixDefinedState.ERROR.toString())) { + if (_message.getTypedFromState().toString().equalsIgnoreCase(HelixDefinedState.ERROR.toString())) { disablePartition(); } - accessor.updateProperty(keyBuilder.currentState(instanceName, _message.getTgtSessionId() + accessor.updateProperty(keyBuilder.currentState(instanceName, _message.getTypedTgtSessionId() .stringify(), resourceId.stringify()), currentStateDelta); } } finally { http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6b57486b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java index 39da1aa..3bcc260 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java @@ -90,14 +90,14 @@ public class HelixTask implements MessageTask { _statusUpdateUtil.logError(_message, HelixTask.class, e, "State transition interrupted, timeout:" + _isTimeout, accessor); - logger.info("Message " + _message.getMsgId() + " is interrupted"); + logger.info("Message " + _message.getMessageId() + " is interrupted"); } catch (Exception e) { taskResult = new HelixTaskResult(); taskResult.setException(e); taskResult.setMessage(e.getMessage()); String errorMessage = - "Exception while executing a message. " + e + " msgId: " + _message.getMsgId() + "Exception while executing a message. " + e + " msgId: " + _message.getMessageId() + " type: " + _message.getMsgType(); logger.error(errorMessage, e); _statusUpdateUtil.logError(_message, HelixTask.class, e, errorMessage, accessor); @@ -111,17 +111,17 @@ public class HelixTask implements MessageTask { if (taskResult.isSuccess()) { _statusUpdateUtil.logInfo(_message, _handler.getClass(), "Message handling task completed successfully", accessor); - logger.info("Message " + _message.getMsgId() + " completed."); + logger.info("Message " + _message.getMessageId() + " completed."); } else { type = ErrorType.INTERNAL; if (taskResult.isInterrupted()) { - logger.info("Message " + _message.getMsgId() + " is interrupted"); + logger.info("Message " + _message.getMessageId() + " is interrupted"); code = _isTimeout ? ErrorCode.TIMEOUT : ErrorCode.CANCEL; if (_isTimeout) { int retryCount = _message.getRetryCount(); logger.info("Message timeout, retry count: " + retryCount + " msgId:" - + _message.getMsgId()); + + _message.getMessageId()); _statusUpdateUtil.logInfo(_message, _handler.getClass(), "Message handling task timeout, retryCount:" + retryCount, accessor); // Notify the handler that timeout happens, and the number of retries left @@ -158,12 +158,12 @@ public class HelixTask implements MessageTask { code = ErrorCode.ERROR; String errorMessage = - "Exception after executing a message, msgId: " + _message.getMsgId() + e; + "Exception after executing a message, msgId: " + _message.getMessageId() + e; logger.error(errorMessage, e); _statusUpdateUtil.logError(_message, HelixTask.class, errorMessage, accessor); } finally { long end = System.currentTimeMillis(); - logger.info("msg: " + _message.getMsgId() + " handling task completed, results:" + logger.info("msg: " + _message.getMessageId() + " handling task completed, results:" + taskResult.isSuccess() + ", at: " + end + ", took:" + (end - start)); // Notify the handler about any error happened in the handling procedure, so that @@ -182,9 +182,9 @@ public class HelixTask implements MessageTask { Builder keyBuilder = accessor.keyBuilder(); if (message.getTgtName().equalsIgnoreCase("controller")) { // TODO: removeProperty returns boolean - accessor.removeProperty(keyBuilder.controllerMessage(message.getMsgId().stringify())); + accessor.removeProperty(keyBuilder.controllerMessage(message.getMessageId().stringify())); } else { - accessor.removeProperty(keyBuilder.message(_manager.getInstanceName(), message.getMsgId() + accessor.removeProperty(keyBuilder.message(_manager.getInstanceName(), message.getMessageId() .stringify())); } } @@ -208,11 +208,11 @@ public class HelixTask implements MessageTask { if (message.getSrcInstanceType() == InstanceType.PARTICIPANT) { Builder keyBuilder = accessor.keyBuilder(); accessor.setProperty( - keyBuilder.message(message.getMsgSrc(), replyMessage.getMsgId().stringify()), + keyBuilder.message(message.getMsgSrc(), replyMessage.getMessageId().stringify()), replyMessage); } else if (message.getSrcInstanceType() == InstanceType.CONTROLLER) { Builder keyBuilder = accessor.keyBuilder(); - accessor.setProperty(keyBuilder.controllerMessage(replyMessage.getMsgId().stringify()), + accessor.setProperty(keyBuilder.controllerMessage(replyMessage.getMessageId().stringify()), replyMessage); } _statusUpdateUtil.logInfo(message, HelixTask.class, @@ -232,8 +232,8 @@ public class HelixTask implements MessageTask { long totalDelay = now - msgReadTime; long executionDelay = now - msgExecutionStartTime; if (totalDelay > 0 && executionDelay > 0) { - State fromState = message.getFromState(); - State toState = message.getToState(); + State fromState = message.getTypedFromState(); + State toState = message.getTypedToState(); String transition = fromState + "--" + toState; StateTransitionContext cxt = http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6b57486b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java index 8cf1aa7..8da53ea 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java @@ -432,14 +432,14 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { continue; } - SessionId tgtSessionId = message.getTgtSessionId(); + SessionId tgtSessionId = message.getTypedTgtSessionId(); // sessionId mismatch normally means message comes from expired session, just remove it if (!sessionId.equals(tgtSessionId.toString()) && !tgtSessionId.toString().equals("*")) { String warningMessage = "SessionId does NOT match. expected sessionId: " + sessionId + ", tgtSessionId in message: " + tgtSessionId + ", messageId: " - + message.getMsgId(); + + message.getMessageId(); LOG.warn(warningMessage); accessor.removeProperty(message.getKey(keyBuilder, instanceName)); _statusUpdateUtil.logWarning(message, HelixStateMachineEngine.class, warningMessage, @@ -454,7 +454,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { // We will read the message again if there is a new message but we // check for the status and ignore if its already read if (LOG.isTraceEnabled()) { - LOG.trace("Message already read. msgId: " + message.getMsgId()); + LOG.trace("Message already read. msgId: " + message.getMessageId()); } continue; } @@ -467,9 +467,9 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { } handlers.add(createHandler); } catch (Exception e) { - LOG.error("Failed to create message handler for " + message.getMsgId(), e); + LOG.error("Failed to create message handler for " + message.getMessageId(), e); String error = - "Failed to create message handler for " + message.getMsgId() + ", exception: " + e; + "Failed to create message handler for " + message.getMessageId() + ", exception: " + e; _statusUpdateUtil.logError(message, HelixStateMachineEngine.class, e, error, accessor); @@ -547,7 +547,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { // the corresponding MessageHandlerFactory is registered if (handlerFactory == null) { LOG.warn("Fail to find message handler factory for type: " + msgType + " msgId: " - + message.getMsgId()); + + message.getMessageId()); return null; } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6b57486b/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTimeoutTask.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTimeoutTask.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTimeoutTask.java index e1b4f0f..17fc041 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTimeoutTask.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTimeoutTask.java @@ -40,7 +40,7 @@ public class MessageTimeoutTask extends TimerTask { Message message = _task.getMessage(); // NotificationContext context = _task.getNotificationContext(); // System.out.println("msg: " + message.getMsgId() + " timeouot."); - LOG.warn("Message time out, canceling. id:" + message.getMsgId() + " timeout : " + LOG.warn("Message time out, canceling. id:" + message.getMessageId() + " timeout : " + message.getExecutionTimeout()); _task.onTimeout(); _executor.cancelTask(_task); http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6b57486b/helix-core/src/main/java/org/apache/helix/model/AlertStatus.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/AlertStatus.java b/helix-core/src/main/java/org/apache/helix/model/AlertStatus.java index d90ec1a..d5ee44c 100644 --- a/helix-core/src/main/java/org/apache/helix/model/AlertStatus.java +++ b/helix-core/src/main/java/org/apache/helix/model/AlertStatus.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.helix.HelixProperty; import org.apache.helix.ZNRecord; +import org.apache.helix.api.id.SessionId; import org.apache.helix.model.Alerts.AlertsProperty; /** @@ -67,6 +68,16 @@ public class AlertStatus extends HelixProperty { } /** + * Set the session that the alerts correspond to + * @param sessionId the session for which alerts occurred + */ + public void setSessionId(SessionId sessionId) { + if (sessionId != null) { + setSessionId(sessionId.stringify()); + } + } + + /** * Get the session that these alerts correspond to * @return session identifier */ @@ -75,6 +86,14 @@ public class AlertStatus extends HelixProperty { } /** + * Get the session that the alerts correspond to + * @return session identifier + */ + public SessionId getTypedSessionId() { + return SessionId.from(getSessionId()); + } + + /** * Get the instance that these alerts correspond to * @return name of the instance */ http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6b57486b/helix-core/src/main/java/org/apache/helix/model/Alerts.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/Alerts.java b/helix-core/src/main/java/org/apache/helix/model/Alerts.java index 506e3d5..f30f39c 100644 --- a/helix-core/src/main/java/org/apache/helix/model/Alerts.java +++ b/helix-core/src/main/java/org/apache/helix/model/Alerts.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.helix.HelixProperty; import org.apache.helix.ZNRecord; +import org.apache.helix.api.id.SessionId; /** * Describe alerts and corresponding metrics. An alert is triggered when cluster health @@ -78,6 +79,16 @@ public class Alerts extends HelixProperty { } /** + * Set the session that the alerts correspond to + * @param sessionId the session for which alerts occurred + */ + public void setSessionId(SessionId sessionId) { + if (sessionId != null) { + setSessionId(sessionId.stringify()); + } + } + + /** * Get the session that the alerts correspond to * @return session identifier */ @@ -86,6 +97,14 @@ public class Alerts extends HelixProperty { } /** + * Get the session that the alerts correspond to + * @return session identifier + */ + public SessionId getTypedSessionId() { + return SessionId.from(getSessionId()); + } + + /** * Get the instance that the alerts correspond to * @return the name of the instance */ http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6b57486b/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java index 8e37b18..daefe6e 100644 --- a/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java +++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java @@ -126,11 +126,20 @@ public class ClusterConstraints extends HelixProperty { } /** + * add the constraint, overwrite existing one if constraint with same constraint-id already exists + * @param constraintId unique constraint identifier + * @param item the constraint as a {@link ConstraintItem} + */ + public void addConstraintItem(String constraintId, ConstraintItem item) { + addConstraintItem(ConstraintId.from(constraintId), item); + } + + /** * Add multiple constraint items. * @param items (constraint identifier, {@link ConstrantItem}) pairs */ - public void addConstraintItems(Map items) { - for (ConstraintId constraintId : items.keySet()) { + public void addConstraintItems(Map items) { + for (String constraintId : items.keySet()) { addConstraintItem(constraintId, items.get(constraintId)); } } @@ -145,6 +154,14 @@ public class ClusterConstraints extends HelixProperty { } /** + * remove a constraint-item + * @param constraintId unique constraint identifier + */ + public void removeConstraintItem(String constraintId) { + removeConstraintItem(ConstraintId.from(constraintId)); + } + + /** * get a constraint-item * @param constraintId unique constraint identifier * @return {@link ConstraintItem} or null if not present @@ -154,6 +171,15 @@ public class ClusterConstraints extends HelixProperty { } /** + * get a constraint-item + * @param constraintId unique constraint identifier + * @return {@link ConstraintItem} or null if not present + */ + public ConstraintItem getConstraintItem(String constraintId) { + return getConstraintItem(ConstraintId.from(constraintId)); + } + + /** * return a set of constraints that match the attribute pairs * @param attributes (constraint scope, constraint string) pairs * @return a set of {@link ConstraintItem}s with matching attributes @@ -186,9 +212,9 @@ public class ClusterConstraints extends HelixProperty { String msgType = msg.getMsgType(); attributes.put(ConstraintAttribute.MESSAGE_TYPE, msgType); if (MessageType.STATE_TRANSITION.toString().equals(msgType)) { - if (msg.getFromState() != null && msg.getToState() != null) { + if (msg.getTypedFromState() != null && msg.getTypedToState() != null) { attributes.put(ConstraintAttribute.TRANSITION, - Transition.from(msg.getFromState(), msg.getToState()).toString()); + Transition.from(msg.getTypedFromState(), msg.getTypedToState()).toString()); } if (msg.getResourceId() != null) { attributes.put(ConstraintAttribute.RESOURCE, msg.getResourceId().stringify()); http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6b57486b/helix-core/src/main/java/org/apache/helix/model/CurrentState.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/CurrentState.java b/helix-core/src/main/java/org/apache/helix/model/CurrentState.java index 2fe37ce..5c9bcbc 100644 --- a/helix-core/src/main/java/org/apache/helix/model/CurrentState.java +++ b/helix-core/src/main/java/org/apache/helix/model/CurrentState.java @@ -95,7 +95,7 @@ public class CurrentState extends HelixProperty { * Get the partitions on this instance and the state that each partition is currently in. * @return (partition, state) pairs */ - public Map getPartitionStateStringMap() { + public Map getPartitionStateMap() { Map map = new HashMap(); Map> mapFields = _record.getMapFields(); for (String partitionName : mapFields.keySet()) { @@ -111,7 +111,7 @@ public class CurrentState extends HelixProperty { * Get the partitions on this instance and the state that each partition is currently in * @return (partition id, state) pairs */ - public Map getPartitionStateMap() { + public Map getTypedPartitionStateMap() { Map map = new HashMap(); for (String partitionName : _record.getMapFields().keySet()) { Map stateMap = _record.getMapField(partitionName); @@ -127,8 +127,16 @@ public class CurrentState extends HelixProperty { * Get the session that this current state corresponds to * @return session identifier */ - public SessionId getSessionId() { - return SessionId.from(_record.getSimpleField(CurrentStateProperty.SESSION_ID.toString())); + public SessionId getTypedSessionId() { + return SessionId.from(getSessionId()); + } + + /** + * Get the session that this current state corresponds to + * @return session identifier + */ + public String getSessionId() { + return _record.getSimpleField(CurrentStateProperty.SESSION_ID.toString()); } /** @@ -136,7 +144,15 @@ public class CurrentState extends HelixProperty { * @param sessionId session identifier */ public void setSessionId(SessionId sessionId) { - _record.setSimpleField(CurrentStateProperty.SESSION_ID.toString(), sessionId.stringify()); + setSessionId(sessionId.stringify()); + } + + /** + * Set the session that this current state corresponds to + * @param sessionId session identifier + */ + public void setSessionId(String sessionId) { + _record.setSimpleField(CurrentStateProperty.SESSION_ID.toString(), sessionId); } /** @@ -197,7 +213,7 @@ public class CurrentState extends HelixProperty { /** * Set the state that a partition is currently in on this instance - * @param partitionName the name of the partition + * @param partitionId the id of the partition * @param state the state of the partition */ public void setState(PartitionId partitionId, State state) { @@ -210,6 +226,19 @@ public class CurrentState extends HelixProperty { } /** + * Set the state that a partition is currently in on this instance + * @param partitionName the name of the partition + * @param state the state of the partition + */ + public void setState(String partitionName, String state) { + Map> mapFields = _record.getMapFields(); + if (mapFields.get(partitionName) == null) { + mapFields.put(partitionName, new TreeMap()); + } + mapFields.get(partitionName).put(CurrentStateProperty.CURRENT_STATE.toString(), state); + } + + /** * Set the state model factory * @param factoryName the name of the factory */ http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6b57486b/helix-core/src/main/java/org/apache/helix/model/ExternalView.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/ExternalView.java b/helix-core/src/main/java/org/apache/helix/model/ExternalView.java index 15a22ca..0f7b6db 100644 --- a/helix-core/src/main/java/org/apache/helix/model/ExternalView.java +++ b/helix-core/src/main/java/org/apache/helix/model/ExternalView.java @@ -114,7 +114,7 @@ public class ExternalView extends HelixProperty { * Get all the partitions of the resource * @return a set of partition names */ - public Set getPartitionStringSet() { + public Set getPartitionSet() { return _record.getMapFields().keySet(); } @@ -122,9 +122,9 @@ public class ExternalView extends HelixProperty { * Get all the partitions of the resource * @return a set of partition ids */ - public Set getPartitionSet() { + public Set getPartitionIdSet() { Set partitionSet = Sets.newHashSet(); - for (String partitionName : getPartitionStringSet()) { + for (String partitionName : getPartitionSet()) { partitionSet.add(PartitionId.from(partitionName)); } return partitionSet; http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6b57486b/helix-core/src/main/java/org/apache/helix/model/IdealState.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java index 8f579ec..7d84258 100644 --- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java +++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java @@ -126,7 +126,6 @@ public class IdealState extends HelixProperty { * Get the associated resource * @return the name of the resource */ - @Deprecated public String getResourceName() { return _record.getId(); } @@ -173,7 +172,6 @@ public class IdealState extends HelixProperty { * Define a custom rebalancer that implements {@link Rebalancer} * @param rebalancerClassName the name of the custom rebalancing class */ - @Deprecated public void setRebalancerClassName(String rebalancerClassName) { _record .setSimpleField(IdealStateProperty.REBALANCER_CLASS_NAME.toString(), rebalancerClassName); @@ -183,7 +181,6 @@ public class IdealState extends HelixProperty { * Get the name of the user-defined rebalancer associated with this resource * @return the rebalancer class name, or null if none is being used */ - @Deprecated public String getRebalancerClassName() { return _record.getSimpleField(IdealStateProperty.REBALANCER_CLASS_NAME.toString()); } @@ -277,8 +274,7 @@ public class IdealState extends HelixProperty { * Get all of the partitions * @return a set of partition names */ - @Deprecated - public Set getPartitionStringSet() { + public Set getPartitionSet() { if (getRebalanceMode() == RebalanceMode.SEMI_AUTO || getRebalanceMode() == RebalanceMode.FULL_AUTO) { return _record.getListFields().keySet(); @@ -295,9 +291,9 @@ public class IdealState extends HelixProperty { * Get all of the partitions * @return a set of partitions */ - public Set getPartitionSet() { + public Set getPartitionIdSet() { Set partitionSet = Sets.newHashSet(); - for (String partitionName : getPartitionStringSet()) { + for (String partitionName : getPartitionSet()) { partitionSet.add(PartitionId.from(partitionName)); } return partitionSet; @@ -308,7 +304,6 @@ public class IdealState extends HelixProperty { * @param partitionName the name of the partition * @return the instances where the replicas live and the state of each */ - @Deprecated public Map getInstanceStateMap(String partitionName) { return _record.getMapField(partitionName); } @@ -350,7 +345,6 @@ public class IdealState extends HelixProperty { * @param partitionName the partition to look up * @return set of instance names */ - @Deprecated public Set getInstanceSet(String partitionName) { if (getRebalanceMode() == RebalanceMode.SEMI_AUTO || getRebalanceMode() == RebalanceMode.FULL_AUTO) { @@ -407,7 +401,6 @@ public class IdealState extends HelixProperty { * @param partitionName the name of the partition * @return a list of instances that can serve replicas of the partition */ - @Deprecated public List getPreferenceList(String partitionName) { List instanceStateList = _record.getListField(partitionName); @@ -439,7 +432,6 @@ public class IdealState extends HelixProperty { * Get the state model associated with this resource * @return an identifier of the state model */ - @Deprecated public String getStateModelDefRef() { return _record.getSimpleField(IdealStateProperty.STATE_MODEL_DEF_REF.toString()); } @@ -456,7 +448,6 @@ public class IdealState extends HelixProperty { * Set the state model associated with this resource * @param stateModel state model identifier */ - @Deprecated public void setStateModelDefRef(String stateModel) { _record.setSimpleField(IdealStateProperty.STATE_MODEL_DEF_REF.toString(), stateModel); } @@ -546,7 +537,6 @@ public class IdealState extends HelixProperty { * Set the state model factory associated with this resource * @param name state model factory name */ - @Deprecated public void setStateModelFactoryName(String name) { _record.setSimpleField(IdealStateProperty.STATE_MODEL_FACTORY_NAME.toString(), name); } @@ -565,7 +555,6 @@ public class IdealState extends HelixProperty { * Get the state model factory associated with this resource * @return state model factory name */ - @Deprecated public String getStateModelFactoryName() { return _record.getStringField(IdealStateProperty.STATE_MODEL_FACTORY_NAME.toString(), HelixConstants.DEFAULT_STATE_MODEL_FACTORY); @@ -609,7 +598,7 @@ public class IdealState extends HelixProperty { if (!replicaStr.equals(HelixConstants.StateModelToken.ANY_LIVEINSTANCE.toString())) { int replica = Integer.parseInt(replicaStr); - Set partitionSet = getPartitionStringSet(); + Set partitionSet = getPartitionSet(); for (String partition : partitionSet) { List preferenceList = getPreferenceList(partition); if (preferenceList == null || preferenceList.size() != replica) { @@ -655,7 +644,7 @@ public class IdealState extends HelixProperty { _record.getListFields().clear(); // assign a partition at a time - for (PartitionId partition : assignment.getMappedPartitions()) { + for (PartitionId partition : assignment.getMappedPartitionIds()) { List preferenceList = new ArrayList(); Map participantStateMap = new HashMap(); @@ -665,7 +654,7 @@ public class IdealState extends HelixProperty { Multimaps.invertFrom(Multimaps.forMap(replicaMap), inverseMap); // update the ideal state in order of state priorities - for (State state : stateModelDef.getStatesPriorityList()) { + for (State state : stateModelDef.getTypedStatesPriorityList()) { if (!state.equals(State.from(HelixDefinedState.DROPPED)) && !state.equals(State.from(HelixDefinedState.ERROR))) { List stateParticipants = inverseMap.get(state); http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6b57486b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java index d2e1187..8f776a0 100644 --- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java @@ -228,7 +228,7 @@ public class InstanceConfig extends HelixProperty { * @param partitionId the partition to set * @param enabled true to enable, false to disable */ - public void setInstanceEnabledForPartition(PartitionId partitionId, boolean enabled) { + public void setParticipantEnabledForPartition(PartitionId partitionId, boolean enabled) { setInstanceEnabledForPartition(partitionId.stringify(), enabled); } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6b57486b/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java b/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java index fda144a..e9348ec 100644 --- a/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java +++ b/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java @@ -79,8 +79,16 @@ public class LiveInstance extends HelixProperty { * Get the session that this participant corresponds to * @return session identifier */ - public SessionId getSessionId() { - return SessionId.from(_record.getSimpleField(LiveInstanceProperty.SESSION_ID.toString())); + public SessionId getTypedSessionId() { + return SessionId.from(getSessionId()); + } + + /** + * Get the session that this participant corresponds to + * @return session identifier + */ + public String getSessionId() { + return _record.getSimpleField(LiveInstanceProperty.SESSION_ID.toString()); } /** @@ -103,8 +111,16 @@ public class LiveInstance extends HelixProperty { * Get the version of Helix that this participant is running * @return the version */ - public HelixVersion getHelixVersion() { - return HelixVersion.from(_record.getSimpleField(LiveInstanceProperty.HELIX_VERSION.toString())); + public HelixVersion getTypedHelixVersion() { + return HelixVersion.from(getHelixVersion()); + } + + /** + * Get the version of Helix that this participant is running + * @return the version + */ + public String getHelixVersion() { + return _record.getSimpleField(LiveInstanceProperty.HELIX_VERSION.toString()); } /** @@ -165,11 +181,11 @@ public class LiveInstance extends HelixProperty { @Override public boolean isValid() { - if (getSessionId() == null) { + if (getTypedSessionId() == null) { _logger.error("liveInstance does not have session id. id:" + _record.getId()); return false; } - if (getHelixVersion() == null) { + if (getTypedHelixVersion() == null) { _logger.error("liveInstance does not have CLM verion. id:" + _record.getId()); return false; } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6b57486b/helix-core/src/main/java/org/apache/helix/model/Message.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java index 86319e3..2bd313a 100644 --- a/helix-core/src/main/java/org/apache/helix/model/Message.java +++ b/helix-core/src/main/java/org/apache/helix/model/Message.java @@ -129,25 +129,52 @@ public class Message extends HelixProperty { /** * Instantiate a message + * @param type the message category + * @param msgId unique message identifier + */ + public Message(MessageType type, String msgId) { + this(type, MessageId.from(msgId)); + } + + /** + * Instantiate a message * @param type {@link MessageType} as a string or a custom message type * @param msgId unique message identifier */ public Message(String type, MessageId msgId) { super(new ZNRecord(msgId.stringify())); _record.setSimpleField(Attributes.MSG_TYPE.toString(), type); - setMsgId(msgId); + setMessageId(msgId); setMsgState(MessageState.NEW); _record.setLongField(Attributes.CREATE_TIMESTAMP.toString(), new Date().getTime()); } /** + * Instantiate a message + * @param type {@link MessageType} as a string or a custom message type + * @param msgId unique message identifier + */ + public Message(String type, String msgId) { + this(type, MessageId.from(msgId)); + } + + /** * Instantiate a message with a new id * @param record a ZNRecord corresponding to a message * @param id unique message identifier */ public Message(ZNRecord record, MessageId id) { super(new ZNRecord(record, id.stringify())); - setMsgId(id); + setMessageId(id); + } + + /** + * Instantiate a message with a new id + * @param record a ZNRecord corresponding to a message + * @param id unique message identifier + */ + public Message(ZNRecord record, String id) { + this(record, MessageId.from(id)); } /** @@ -224,8 +251,16 @@ public class Message extends HelixProperty { * Get the session identifier of the destination node * @return session identifier */ - public SessionId getTgtSessionId() { - return SessionId.from(_record.getSimpleField(Attributes.TGT_SESSION_ID.toString())); + public SessionId getTypedTgtSessionId() { + return SessionId.from(getTgtSessionId()); + } + + /** + * Get the session identifier of the destination node + * @return session identifier + */ + public String getTgtSessionId() { + return _record.getSimpleField(Attributes.TGT_SESSION_ID.toString()); } /** @@ -233,15 +268,33 @@ public class Message extends HelixProperty { * @param tgtSessionId session identifier */ public void setTgtSessionId(SessionId tgtSessionId) { - _record.setSimpleField(Attributes.TGT_SESSION_ID.toString(), tgtSessionId.stringify()); + if (tgtSessionId != null) { + setTgtSessionId(tgtSessionId.stringify()); + } + } + + /** + * Set the session identifier of the destination node + * @param tgtSessionId session identifier + */ + public void setTgtSessionId(String tgtSessionId) { + _record.setSimpleField(Attributes.TGT_SESSION_ID.toString(), tgtSessionId); } /** * Get the session identifier of the source node * @return session identifier */ - public SessionId getSrcSessionId() { - return SessionId.from(_record.getSimpleField(Attributes.SRC_SESSION_ID.toString())); + public SessionId getTypedSrcSessionId() { + return SessionId.from(getSrcSessionId()); + } + + /** + * Get the session identifier of the source node + * @return session identifier + */ + public String getSrcSessionId() { + return _record.getSimpleField(Attributes.SRC_SESSION_ID.toString()); } /** @@ -249,15 +302,33 @@ public class Message extends HelixProperty { * @param srcSessionId session identifier */ public void setSrcSessionId(SessionId srcSessionId) { - _record.setSimpleField(Attributes.SRC_SESSION_ID.toString(), srcSessionId.stringify()); + if (srcSessionId != null) { + setSrcSessionId(srcSessionId.stringify()); + } + } + + /** + * Set the session identifier of the source node + * @param srcSessionId session identifier + */ + public void setSrcSessionId(String srcSessionId) { + _record.setSimpleField(Attributes.SRC_SESSION_ID.toString(), srcSessionId); + } + + /** + * Get the session identifier of the node that executes the message + * @return session identifier + */ + public SessionId getTypedExecutionSessionId() { + return SessionId.from(getExecutionSessionId()); } /** * Get the session identifier of the node that executes the message * @return session identifier */ - public SessionId getExecutionSessionId() { - return SessionId.from(_record.getSimpleField(Attributes.EXE_SESSION_ID.toString())); + public String getExecutionSessionId() { + return _record.getSimpleField(Attributes.EXE_SESSION_ID.toString()); } /** @@ -265,7 +336,17 @@ public class Message extends HelixProperty { * @param exeSessionId session identifier */ public void setExecuteSessionId(SessionId exeSessionId) { - _record.setSimpleField(Attributes.EXE_SESSION_ID.toString(), exeSessionId.stringify()); + if (exeSessionId != null) { + setExecuteSessionId(exeSessionId.stringify()); + } + } + + /** + * Set the session identifier of the node that executes the message + * @param exeSessionId session identifier + */ + public void setExecuteSessionId(String exeSessionId) { + _record.setSimpleField(Attributes.EXE_SESSION_ID.toString(), exeSessionId); } /** @@ -333,23 +414,51 @@ public class Message extends HelixProperty { * @param partitionId */ public void setPartitionId(PartitionId partitionId) { - _record.setSimpleField(Attributes.PARTITION_NAME.toString(), partitionId.stringify()); + if (partitionId != null) { + setPartitionName(partitionId.stringify()); + } + } + + /** + * Set the id of the partition this message concerns + * @param partitionId + */ + public void setPartitionName(String partitionName) { + _record.setSimpleField(Attributes.PARTITION_NAME.toString(), partitionName); } /** * Get the unique identifier of this message * @return message identifier */ - public MessageId getMsgId() { - return MessageId.from(_record.getSimpleField(Attributes.MSG_ID.toString())); + public MessageId getMessageId() { + return MessageId.from(getMsgId()); + } + + /** + * Get the unique identifier of this message + * @return message identifier + */ + public String getMsgId() { + return _record.getSimpleField(Attributes.MSG_ID.toString()); } /** * Set the unique identifier of this message * @param msgId message identifier */ - public void setMsgId(MessageId msgId) { - _record.setSimpleField(Attributes.MSG_ID.toString(), msgId.toString()); + public void setMessageId(MessageId msgId) { + if (msgId != null) { + setMsgId(msgId.stringify()); + } + } + + /** + * Set the unique identifier of this message + * @param msgId message identifier + */ + public void setMsgId(String msgId) { + _record.setSimpleField(Attributes.MSG_ID.toString(), msgId); } /** @@ -357,6 +466,16 @@ public class Message extends HelixProperty { * @param state the state */ public void setFromState(State state) { + if (state != null) { + setFromState(state.toString()); + } + } + + /** + * Set the "from state" for transition-related messages + * @param state the state + */ + public void setFromState(String state) { _record.setSimpleField(Attributes.FROM_STATE.toString(), state.toString()); } @@ -364,8 +483,16 @@ public class Message extends HelixProperty { * Get the "from-state" for transition-related messages * @return state, or null for other message types */ - public State getFromState() { - return State.from(_record.getSimpleField(Attributes.FROM_STATE.toString())); + public State getTypedFromState() { + return State.from(getFromState()); + } + + /** + * Get the "from-state" for transition-related messages + * @return state, or null for other message types + */ + public String getFromState() { + return _record.getSimpleField(Attributes.FROM_STATE.toString()); } /** @@ -373,6 +500,16 @@ public class Message extends HelixProperty { * @param state the state */ public void setToState(State state) { + if (state != null) { + setToState(state.toString()); + } + } + + /** + * Set the "to state" for transition-related messages + * @param state the state + */ + public void setToState(String state) { _record.setSimpleField(Attributes.TO_STATE.toString(), state.toString()); } @@ -380,8 +517,16 @@ public class Message extends HelixProperty { * Get the "to state" for transition-related messages * @return state, or null for other message types */ - public State getToState() { - return State.from(_record.getSimpleField(Attributes.TO_STATE.toString())); + public State getTypedToState() { + return State.from(getToState()); + } + + /** + * Get the "to state" for transition-related messages + * @return state, or null for other message types + */ + public String getToState() { + return _record.getSimpleField(Attributes.TO_STATE.toString()); } /** @@ -413,7 +558,17 @@ public class Message extends HelixProperty { * @param resourceId resource name to set */ public void setResourceId(ResourceId resourceId) { - _record.setSimpleField(Attributes.RESOURCE_NAME.toString(), resourceId.stringify()); + if (resourceId != null) { + setResourceName(resourceId.stringify()); + } + } + + /** + * Set the resource associated with this message + * @param resourceName resource name to set + */ + public void setResourceName(String resourceName) { + _record.setSimpleField(Attributes.RESOURCE_NAME.toString(), resourceName); } /** @@ -421,7 +576,15 @@ public class Message extends HelixProperty { * @return resource name */ public ResourceId getResourceId() { - return ResourceId.from(_record.getSimpleField(Attributes.RESOURCE_NAME.toString())); + return ResourceId.from(getResourceName()); + } + + /** + * Get the resource associated with this message + * @return resource name + */ + public String getResourceName() { + return _record.getSimpleField(Attributes.RESOURCE_NAME.toString()); } /** @@ -429,7 +592,15 @@ public class Message extends HelixProperty { * @return partition id */ public PartitionId getPartitionId() { - return PartitionId.from(_record.getSimpleField(Attributes.PARTITION_NAME.toString())); + return PartitionId.from(getPartitionName()); + } + + /** + * Get the resource partition associated with this message + * @return partition id + */ + public String getPartitionName() { + return _record.getSimpleField(Attributes.PARTITION_NAME.toString()); } /** @@ -453,7 +624,17 @@ public class Message extends HelixProperty { * @param stateModelDefName a reference to the state model definition, e.g. "MasterSlave" */ public void setStateModelDef(StateModelDefId stateModelDefId) { - _record.setSimpleField(Attributes.STATE_MODEL_DEF.toString(), stateModelDefId.stringify()); + if (stateModelDefId != null) { + setStateModelDef(stateModelDefId.stringify()); + } + } + + /** + * Set the state model definition + * @param stateModelDefName a reference to the state model definition, e.g. "MasterSlave" + */ + public void setStateModelDef(String stateModelDefId) { + _record.setSimpleField(Attributes.STATE_MODEL_DEF.toString(), stateModelDefId); } /** @@ -629,7 +810,7 @@ public class Message extends HelixProperty { public static Message createReplyMessage(Message srcMessage, String instanceName, Map taskResultMap) { if (srcMessage.getCorrelationId() == null) { - throw new HelixException("Message " + srcMessage.getMsgId() + throw new HelixException("Message " + srcMessage.getMessageId() + " does not contain correlation id"); } Message replyMessage = @@ -679,6 +860,14 @@ public class Message extends HelixProperty { } /** + * Get a list of partitions associated with this message + * @return list of partition ids + */ + public List getPartitionNames() { + return _record.getListField(Attributes.PARTITION_NAME.toString()); + } + + /** * Check if this message is targetted for a controller * @return true if this is a controller message, false otherwise */ @@ -762,9 +951,9 @@ public class Message extends HelixProperty { boolean isNotValid = isNullOrEmpty(getTgtName()) || isNullOrEmpty(getPartitionId().stringify()) || isNullOrEmpty(getResourceId().stringify()) || isNullOrEmpty(getStateModelDef()) - || isNullOrEmpty(getToState().toString()) + || isNullOrEmpty(getTypedToState().toString()) || isNullOrEmpty(getStateModelFactoryName()) - || isNullOrEmpty(getFromState().toString()); + || isNullOrEmpty(getTypedFromState().toString()); return !isNotValid; } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6b57486b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java index c91a655..96d0ca7 100644 --- a/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java +++ b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java @@ -33,6 +33,8 @@ import org.apache.helix.api.id.ResourceId; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; /** * Represents the assignments of replicas for an entire resource, keyed on partitions of the @@ -75,7 +77,7 @@ public class ResourceAssignment extends HelixProperty { * Get the currently mapped partitions * @return list of Partition objects (immutable) */ - public List getMappedPartitions() { + public List getMappedPartitionIds() { ImmutableList.Builder builder = new ImmutableList.Builder(); for (String partitionName : _record.getMapFields().keySet()) { builder.add(PartitionId.from(partitionName)); @@ -84,6 +86,14 @@ public class ResourceAssignment extends HelixProperty { } /** + * Get the currently mapped partitions + * @return list of Partition objects (immutable) + */ + public List getMappedPartitions() { + return Lists.newArrayList(_record.getMapFields().keySet()); + } + + /** * Get the entire map of a resource * @return map of partition to participant to state */ @@ -111,13 +121,25 @@ public class ResourceAssignment extends HelixProperty { } /** + * Get the participant, state pairs for a partition + * @param partition the Partition to look up + * @return map of (participant id, state) + */ + public Map getReplicaMap(String partitionId) { + Map rawReplicaMap = _record.getMapField(partitionId); + if (rawReplicaMap == null) { + return Collections.emptyMap(); + } + return rawReplicaMap; + } + + /** * Add participant, state pairs for a partition - * TODO: should be package-private, but builder can't see it * @param partitionId the partition to set * @param replicaMap map of (participant name, state) */ public void addReplicaMap(PartitionId partitionId, Map replicaMap) { - Map convertedMap = new HashMap(); + Map convertedMap = Maps.newHashMap(); for (ParticipantId participantId : replicaMap.keySet()) { convertedMap.put(participantId.stringify(), replicaMap.get(participantId).toString()); } @@ -125,6 +147,15 @@ public class ResourceAssignment extends HelixProperty { } /** + * Add participant, state pairs for a partition + * @param partitionId the partition to set + * @param replicaMap map of (participant name, state) + */ + public void addReplicaMap(String partitionId, Map replicaMap) { + _record.setMapField(partitionId, replicaMap); + } + + /** * Helper for converting a map of strings to a concrete replica map * @param rawMap map of participant name to state name * @return map of participant id to state @@ -133,7 +164,7 @@ public class ResourceAssignment extends HelixProperty { if (rawMap == null) { return Collections.emptyMap(); } - Map replicaMap = new HashMap(); + Map replicaMap = Maps.newHashMap(); for (String participantName : rawMap.keySet()) { replicaMap.put(ParticipantId.from(participantName), State.from(rawMap.get(participantName))); } @@ -150,8 +181,7 @@ public class ResourceAssignment extends HelixProperty { if (rawMaps == null) { return Collections.emptyMap(); } - Map> participantStateMaps = - new HashMap>(); + Map> participantStateMaps = Maps.newHashMap(); for (String partitionId : rawMaps.keySet()) { participantStateMaps.put(PartitionId.from(partitionId), replicaMapFromStringMap(rawMaps.get(partitionId))); @@ -185,7 +215,7 @@ public class ResourceAssignment extends HelixProperty { if (replicaMaps == null) { return Collections.emptyMap(); } - Map> rawMaps = new HashMap>(); + Map> rawMaps = Maps.newHashMap(); for (PartitionId partitionId : replicaMaps.keySet()) { rawMaps.put(partitionId.stringify(), stringMapFromReplicaMap(replicaMaps.get(partitionId))); } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6b57486b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java index 3a8542b..a9a6e49 100644 --- a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java +++ b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java @@ -175,7 +175,7 @@ public class StateModelDefinition extends HelixProperty { * Get an ordered priority list of states * @return state names, the first of which is highest priority */ - public List getStatesPriorityStringList() { + public List getStatesPriorityList() { return _statesPriorityList; } @@ -183,9 +183,9 @@ public class StateModelDefinition extends HelixProperty { * Get an ordered priority list of states * @return immutable list of states, the first of which is highest priority (immutable) */ - public List getStatesPriorityList() { + public List getTypedStatesPriorityList() { ImmutableList.Builder builder = new ImmutableList.Builder(); - for (String state : getStatesPriorityStringList()) { + for (String state : getStatesPriorityList()) { builder.add(State.from(state)); } return builder.build(); @@ -223,7 +223,7 @@ public class StateModelDefinition extends HelixProperty { * Get the starting state in the model * @return name of the initial state */ - public String getInitialStateString() { + public String getInitialState() { // return _record.getSimpleField(StateModelDefinitionProperty.INITIAL_STATE // .toString()); return _initialState; @@ -233,7 +233,7 @@ public class StateModelDefinition extends HelixProperty { * Get the starting state in the model * @return name of the initial state */ - public State getInitialState() { + public State getTypedInitialState() { // return _record.getSimpleField(StateModelDefinitionProperty.INITIAL_STATE // .toString()); return State.from(_initialState); @@ -259,7 +259,7 @@ public class StateModelDefinition extends HelixProperty { @Override public boolean isValid() { - if (getInitialStateString() == null) { + if (getInitialState() == null) { _logger.error("State model does not contain init state, statemodel:" + _record.getId()); return false; } @@ -299,7 +299,16 @@ public class StateModelDefinition extends HelixProperty { * @param state */ public Builder initialState(State initialState) { - this.initialState = initialState.toString(); + return initialState(initialState.toString()); + } + + /** + * initial state of a replica when it starts, most commonly used initial + * state is OFFLINE + * @param state + */ + public Builder initialState(String initialState) { + this.initialState = initialState; return this; } @@ -312,7 +321,19 @@ public class StateModelDefinition extends HelixProperty { * @param states */ public Builder addState(State state, int priority) { - statesMap.put(state.toString(), priority); + return addState(state.toString(), priority); + } + + /** + * Define all valid states using this method. Set the priority in which the + * constraints must be satisfied. Lets say STATE1 has a constraint of 1 and + * STATE2 has a constraint of 3 but only one node is up then Helix will uses + * the priority to see STATE constraint has to be given higher preference
+ * Use -1 to indicates states with no constraints, like OFFLINE + * @param states + */ + public Builder addState(String state, int priority) { + statesMap.put(state, priority); return this; } @@ -326,6 +347,15 @@ public class StateModelDefinition extends HelixProperty { } /** + * Sets the priority to Integer.MAX_VALUE + * @param state + */ + public Builder addState(String state) { + addState(state, Integer.MAX_VALUE); + return this; + } + + /** * Define all legal transitions between states using this method. Priority * is used to order the transitions. Helix tries to maximize the number of * transitions that can be fired in parallel without violating the @@ -343,6 +373,23 @@ public class StateModelDefinition extends HelixProperty { } /** + * Define all legal transitions between states using this method. Priority + * is used to order the transitions. Helix tries to maximize the number of + * transitions that can be fired in parallel without violating the + * constraint. The transitions are first sorted based on priority and + * transitions are selected in a greedy way until the constriants are not + * violated. + * @param fromState source + * @param toState destination + * @param priority priority, higher value is higher priority + * @return Builder + */ + public Builder addTransition(String fromState, String toState, int priority) { + transitionMap.put(new Transition(fromState, toState), priority); + return this; + } + + /** * Add a state transition with maximal priority value * @see #addTransition(String, String, int) * @param fromState @@ -355,13 +402,35 @@ public class StateModelDefinition extends HelixProperty { } /** + * Add a state transition with maximal priority value + * @see #addTransition(String, String, int) + * @param fromState + * @param toState + * @return Builder + */ + public Builder addTransition(String fromState, String toState) { + addTransition(fromState, toState, Integer.MAX_VALUE); + return this; + } + + /** * Set a maximum for replicas in this state * @param state state name * @param upperBound maximum * @return Builder */ public Builder upperBound(State state, int upperBound) { - stateConstraintMap.put(state.toString(), String.valueOf(upperBound)); + return upperBound(state.toString(), upperBound); + } + + /** + * Set a maximum for replicas in this state + * @param state state name + * @param upperBound maximum + * @return Builder + */ + public Builder upperBound(String state, int upperBound) { + stateConstraintMap.put(state, String.valueOf(upperBound)); return this; } @@ -380,7 +449,25 @@ public class StateModelDefinition extends HelixProperty { * @return Builder */ public Builder dynamicUpperBound(State state, String bound) { - stateConstraintMap.put(state.toString(), bound); + return dynamicUpperBound(state.toString(), bound); + } + + /** + * You can use this to have the bounds dynamically change based on other + * parameters.
+ * Currently support 2 values
+ * R --> Refers to the number of replicas specified during resource + * creation. This allows having different replication factor for each + * resource without having to create a different state machine.
+ * N --> Refers to all nodes in the cluster. Useful for resources that need + * to exist on all nodes. This way one can add/remove nodes without having + * the change the bounds. + * @param state + * @param bound + * @return Builder + */ + public Builder dynamicUpperBound(String state, String bound) { + stateConstraintMap.put(state, bound); return this; } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6b57486b/helix-core/src/main/java/org/apache/helix/model/Transition.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/Transition.java b/helix-core/src/main/java/org/apache/helix/model/Transition.java index 16fc937..70f8635 100644 --- a/helix-core/src/main/java/org/apache/helix/model/Transition.java +++ b/helix-core/src/main/java/org/apache/helix/model/Transition.java @@ -38,6 +38,15 @@ public class Transition { _toState = toState; } + /** + * Instantiate with a source and destination state + * @param fromState source name + * @param toState destination name + */ + public Transition(String fromState, String toState) { + this(State.from(fromState), State.from(toState)); + } + @Override public String toString() { return _fromState + "-" + _toState; @@ -60,7 +69,7 @@ public class Transition { * Get the source state * @return source state name */ - public State getFromState() { + public State getTypedFromState() { return _fromState; } @@ -68,11 +77,27 @@ public class Transition { * Get the destination state * @return destination state name */ - public State getToState() { + public State getTypedToState() { return _toState; } /** + * Get the source state + * @return source state name + */ + public String getFromState() { + return _fromState.toString(); + } + + /** + * Get the destination state + * @return destination state name + */ + public String getToState() { + return _toState.toString(); + } + + /** * Create a new transition * @param fromState string source state * @param toState string destination state http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6b57486b/helix-core/src/main/java/org/apache/helix/model/builder/StateTransitionTableBuilder.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/builder/StateTransitionTableBuilder.java b/helix-core/src/main/java/org/apache/helix/model/builder/StateTransitionTableBuilder.java index 4c6edf7..779f220 100644 --- a/helix-core/src/main/java/org/apache/helix/model/builder/StateTransitionTableBuilder.java +++ b/helix-core/src/main/java/org/apache/helix/model/builder/StateTransitionTableBuilder.java @@ -126,8 +126,8 @@ public class StateTransitionTableBuilder { } for (Transition transition : transitions) { - State fromState = transition.getFromState(); - State toState = transition.getToState(); + State fromState = transition.getTypedFromState(); + State toState = transition.getTypedToState(); setPathVal(path, fromState.toString(), toState.toString(), 1); setNext(next, fromState.toString(), toState.toString(), toState.toString()); } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6b57486b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java index 83b93d8..afd2886 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java @@ -89,7 +89,7 @@ public class ResourceMonitor implements ResourceMonitorMBean { // TODO fix this; IdealState shall have either map fields (CUSTOM mode) // or list fields (AUDO mode) - for (PartitionId partitionId : idealState.getPartitionSet()) { + for (PartitionId partitionId : idealState.getPartitionIdSet()) { Map idealRecord = idealState.getParticipantStateMap(partitionId); Map externalViewRecord = externalView.getStateMap(partitionId); @@ -113,7 +113,7 @@ public class ResourceMonitor implements ResourceMonitorMBean { } _numOfErrorPartitions = numOfErrorPartitions; _externalViewIdealStateDiff = numOfDiff; - _numOfPartitionsInExternalView = externalView.getPartitionSet().size(); + _numOfPartitionsInExternalView = externalView.getPartitionIdSet().size(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6b57486b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java index 6ae8a1b..0e8c6fd 100644 --- a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java +++ b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java @@ -142,7 +142,7 @@ public class DistClusterControllerElection implements ControllerChangeListener { leader = accessor.getProperty(keyBuilder.controllerLeader()); if (leader != null) { - String leaderSessionId = leader.getSessionId().stringify(); + String leaderSessionId = leader.getTypedSessionId().stringify(); LOG.info("Leader exists for cluster: " + manager.getClusterName() + ", currentLeader: " + leader.getInstanceName() + ", leaderSessionId: " + leaderSessionId); http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6b57486b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java index 2258b95..4e4fdf6 100644 --- a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java +++ b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java @@ -166,13 +166,13 @@ public class HelixStateMachineEngine implements StateMachineEngine { if (!type.equals(MessageType.STATE_TRANSITION.toString())) { throw new HelixException("Expect state-transition message type, but was " - + message.getMsgType() + ", msgId: " + message.getMsgId()); + + message.getMsgType() + ", msgId: " + message.getMessageId()); } PartitionId partitionKey = message.getPartitionId(); StateModelDefId stateModelId = message.getStateModelDefId(); ResourceId resourceId = message.getResourceId(); - SessionId sessionId = message.getTgtSessionId(); + SessionId sessionId = message.getTypedTgtSessionId(); int bucketSize = message.getBucketSize(); if (stateModelId == null) { @@ -210,7 +210,7 @@ public class HelixStateMachineEngine implements StateMachineEngine { if (message.getBatchMessageMode() == false) { // create currentStateDelta for this partition - String initState = _stateModelDefs.get(message.getStateModelDef()).getInitialStateString(); + String initState = _stateModelDefs.get(message.getStateModelDef()).getInitialState(); StateModel stateModel = stateModelFactory.getStateModel(partitionKey.stringify()); if (stateModel == null) { stateModel = stateModelFactory.createAndAddStateModel(partitionKey.stringify()); http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6b57486b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java index ed411d1..9bba660 100644 --- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java +++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java @@ -133,7 +133,7 @@ public class RoutingTableProvider implements ExternalViewChangeListener, ConfigC if (externalViewList != null) { for (ExternalView extView : externalViewList) { String resourceName = extView.getId(); - for (String partitionName : extView.getPartitionStringSet()) { + for (String partitionName : extView.getPartitionSet()) { Map stateMap = extView.getStateMap(partitionName); for (String instanceName : stateMap.keySet()) { String currentState = stateMap.get(instanceName); http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6b57486b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java index ea03d58..f591a24 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java +++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java @@ -260,7 +260,7 @@ public class ClusterStateVerifier { ResourceAssignment resourceAssignment = bestPossOutput.getResourceAssignment(resourceId); ResourceAssignmentBuilder raBuilder = new ResourceAssignmentBuilder(resourceId); - List mappedPartitions = resourceAssignment.getMappedPartitions(); + List mappedPartitions = resourceAssignment.getMappedPartitionIds(); for (PartitionId partitionId : mappedPartitions) { raBuilder.addAssignments(partitionId, resourceAssignment.getReplicaMap(partitionId)); } @@ -315,7 +315,7 @@ public class ClusterStateVerifier { int extViewSize = extView.getRecord().getMapFields().size(); int bestPossStateSize = bestPossOutput.getResourceAssignment(ResourceId.from(resourceName)) - .getMappedPartitions().size(); + .getMappedPartitionIds().size(); if (extViewSize != bestPossStateSize) { LOG.info("exterView size (" + extViewSize + ") is different from bestPossState size (" + bestPossStateSize + ") for resource: " + resourceName); http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6b57486b/helix-core/src/main/java/org/apache/helix/tools/MessagePoster.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/tools/MessagePoster.java b/helix-core/src/main/java/org/apache/helix/tools/MessagePoster.java index f3ed88e..8120981 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/MessagePoster.java +++ b/helix-core/src/main/java/org/apache/helix/tools/MessagePoster.java @@ -66,7 +66,7 @@ public class MessagePoster { MessageId msgId = MessageId.from("TestMessageId-2"); Message message = new Message(MessageType.STATE_TRANSITION, msgId); - message.setMsgId(msgId); + message.setMessageId(msgId); message.setSrcName(msgSrc); message.setTgtName(instanceName); message.setMsgState(MessageState.NEW); http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6b57486b/helix-core/src/main/java/org/apache/helix/tools/ZkLogAnalyzer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/tools/ZkLogAnalyzer.java b/helix-core/src/main/java/org/apache/helix/tools/ZkLogAnalyzer.java index 58186d5..8b32ddc 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/ZkLogAnalyzer.java +++ b/helix-core/src/main/java/org/apache/helix/tools/ZkLogAnalyzer.java @@ -342,14 +342,14 @@ public class ZkLogAnalyzer { // sendMessageLines.add(inputLine); stats.msgSentCount++; - if (msg.getFromState().toString().equals("OFFLINE") - && msg.getToState().toString().equals("SLAVE")) { + if (msg.getTypedFromState().toString().equals("OFFLINE") + && msg.getTypedToState().toString().equals("SLAVE")) { stats.msgSentCount_O2S++; - } else if (msg.getFromState().toString().equals("SLAVE") - && msg.getToState().toString().equals("MASTER")) { + } else if (msg.getTypedFromState().toString().equals("SLAVE") + && msg.getTypedToState().toString().equals("MASTER")) { stats.msgSentCount_S2M++; - } else if (msg.getFromState().toString().equals("MASTER") - && msg.getToState().toString().equals("SLAVE")) { + } else if (msg.getTypedFromState().toString().equals("MASTER") + && msg.getTypedToState().toString().equals("SLAVE")) { stats.msgSentCount_M2S++; } // System.out.println("Message create:"+new http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6b57486b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java index e46ad13..d304a87 100644 --- a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java +++ b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java @@ -41,7 +41,7 @@ public class RebalanceUtil { Map partitionIndex = new HashMap(); Map reversePartitionIndex = new HashMap(); boolean indexInPartitionName = true; - for (PartitionId partitionId : state.getPartitionSet()) { + for (PartitionId partitionId : state.getPartitionIdSet()) { String partitionName = partitionId.stringify(); int lastPos = partitionName.lastIndexOf("_"); if (lastPos < 0) { @@ -64,7 +64,7 @@ public class RebalanceUtil { if (indexInPartitionName == false) { List partitions = new ArrayList(); - partitions.addAll(Lists.transform(Lists.newArrayList(state.getPartitionSet()), + partitions.addAll(Lists.transform(Lists.newArrayList(state.getPartitionIdSet()), Functions.toStringFunction())); Collections.sort(partitions); for (int i = 0; i < partitions.size(); i++) { @@ -76,7 +76,7 @@ public class RebalanceUtil { Map> nodeMasterAssignmentMap = new TreeMap>(); Map>> combinedNodeSlaveAssignmentMap = new TreeMap>>(); - for (PartitionId partition : state.getPartitionSet()) { + for (PartitionId partition : state.getPartitionIdSet()) { List instances = state.getRecord().getListField(partition.stringify()); String master = instances.get(0); if (!nodeMasterAssignmentMap.containsKey(master)) { @@ -111,7 +111,7 @@ public class RebalanceUtil { // StateModelDefinition def = new StateModelDefinition(stateModDef); - List statePriorityList = stateModDef.getStatesPriorityStringList(); + List statePriorityList = stateModDef.getStatesPriorityList(); for (String state : statePriorityList) { String count = stateModDef.getNumInstancesPerState(state);