helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject [3/4] [HELIX-389] Unify accessor classes into a single class
Date Wed, 23 Jul 2014 21:07:45 GMT
http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
deleted file mode 100644
index cb52e91..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
+++ /dev/null
@@ -1,809 +0,0 @@
-package org.apache.helix.api.accessor;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
-import org.I0Itec.zkclient.DataUpdater;
-import org.apache.helix.AccessOption;
-import org.apache.helix.BaseDataAccessor;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixDefinedState;
-import org.apache.helix.HelixException;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.api.Participant;
-import org.apache.helix.api.Resource;
-import org.apache.helix.api.RunningInstance;
-import org.apache.helix.api.Scope;
-import org.apache.helix.api.State;
-import org.apache.helix.api.config.ContainerConfig;
-import org.apache.helix.api.config.ParticipantConfig;
-import org.apache.helix.api.config.UserConfig;
-import org.apache.helix.api.id.ClusterId;
-import org.apache.helix.api.id.MessageId;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.id.SessionId;
-import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.provisioner.ContainerId;
-import org.apache.helix.controller.provisioner.ContainerSpec;
-import org.apache.helix.controller.provisioner.ContainerState;
-import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
-import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
-import org.apache.helix.model.CurrentState;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.InstanceConfig.InstanceConfigProperty;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Message.MessageState;
-import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.util.HelixUtil;
-import org.apache.log4j.Logger;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-public class ParticipantAccessor {
-  private static final Logger LOG = Logger.getLogger(ParticipantAccessor.class);
-
-  private final HelixDataAccessor _accessor;
-  private final ClusterId _clusterId;
-  private final PropertyKey.Builder _keyBuilder;
-
-  public ParticipantAccessor(ClusterId clusterId, HelixDataAccessor accessor) {
-    _clusterId = clusterId;
-    _accessor = accessor;
-    _keyBuilder = accessor.keyBuilder();
-  }
-
-  /**
-   * enable/disable a participant
-   * @param participantId
-   * @param isEnabled
-   * @return true if enable state succeeded, false otherwise
-   */
-  boolean enableParticipant(ParticipantId participantId, boolean isEnabled) {
-    String participantName = participantId.stringify();
-    if (_accessor.getProperty(_keyBuilder.instanceConfig(participantName)) == null) {
-      LOG.error("Config for participant: " + participantId + " does NOT exist in cluster");
-      return false;
-    }
-
-    InstanceConfig config = new InstanceConfig(participantName);
-    config.setInstanceEnabled(isEnabled);
-    return _accessor.updateProperty(_keyBuilder.instanceConfig(participantName), config);
-  }
-
-  /**
-   * disable participant
-   * @param participantId
-   * @return true if disabled successfully, false otherwise
-   */
-  public boolean disableParticipant(ParticipantId participantId) {
-    return enableParticipant(participantId, false);
-  }
-
-  /**
-   * enable participant
-   * @param participantId
-   * @return true if enabled successfully, false otherwise
-   */
-  public boolean enableParticipant(ParticipantId participantId) {
-    return enableParticipant(participantId, true);
-  }
-
-  /**
-   * create messages for participant
-   * @param participantId
-   * @param msgMap map of message-id to message
-   */
-  public void insertMessagesToParticipant(ParticipantId participantId,
-      Map<MessageId, Message> msgMap) {
-    List<PropertyKey> msgKeys = new ArrayList<PropertyKey>();
-    List<Message> msgs = new ArrayList<Message>();
-    for (MessageId msgId : msgMap.keySet()) {
-      msgKeys.add(_keyBuilder.message(participantId.stringify(), msgId.stringify()));
-      msgs.add(msgMap.get(msgId));
-    }
-
-    _accessor.createChildren(msgKeys, msgs);
-  }
-
-  /**
-   * set messages of participant
-   * @param participantId
-   * @param msgMap map of message-id to message
-   */
-  public void updateMessageStatus(ParticipantId participantId, Map<MessageId, Message> msgMap) {
-    String participantName = participantId.stringify();
-    List<PropertyKey> msgKeys = new ArrayList<PropertyKey>();
-    List<Message> msgs = new ArrayList<Message>();
-    for (MessageId msgId : msgMap.keySet()) {
-      msgKeys.add(_keyBuilder.message(participantName, msgId.stringify()));
-      msgs.add(msgMap.get(msgId));
-    }
-    _accessor.setChildren(msgKeys, msgs);
-  }
-
-  /**
-   * delete messages from participant
-   * @param participantId
-   * @param msgIdSet
-   */
-  public void deleteMessagesFromParticipant(ParticipantId participantId, Set<MessageId> msgIdSet) {
-    String participantName = participantId.stringify();
-    List<PropertyKey> msgKeys = new ArrayList<PropertyKey>();
-    for (MessageId msgId : msgIdSet) {
-      msgKeys.add(_keyBuilder.message(participantName, msgId.stringify()));
-    }
-
-    // TODO impl batch remove
-    for (PropertyKey msgKey : msgKeys) {
-      _accessor.removeProperty(msgKey);
-    }
-  }
-
-  /**
-   * enable/disable partitions on a participant
-   * @param enabled
-   * @param participantId
-   * @param resourceId
-   * @param partitionIdSet
-   * @return true if enable state changed successfully, false otherwise
-   */
-  boolean enablePartitionsForParticipant(final boolean enabled, final ParticipantId participantId,
-      final ResourceId resourceId, final Set<PartitionId> partitionIdSet) {
-    String participantName = participantId.stringify();
-    String resourceName = resourceId.stringify();
-
-    // check instanceConfig exists
-    PropertyKey instanceConfigKey = _keyBuilder.instanceConfig(participantName);
-    if (_accessor.getProperty(instanceConfigKey) == null) {
-      LOG.error("Config for participant: " + participantId + " does NOT exist in cluster");
-      return false;
-    }
-
-    // check resource exist. warn if not
-    IdealState idealState = _accessor.getProperty(_keyBuilder.idealStates(resourceName));
-    if (idealState == null) {
-      LOG.warn("Disable partitions: " + partitionIdSet + ", resource: " + resourceId
-          + " does NOT exist. probably disable it during ERROR->DROPPED transtition");
-
-    } else {
-      // check partitions exist. warn if not
-      for (PartitionId partitionId : partitionIdSet) {
-        if ((idealState.getRebalanceMode() == RebalanceMode.SEMI_AUTO && idealState
-            .getPreferenceList(partitionId) == null)
-            || (idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED && idealState
-                .getParticipantStateMap(partitionId) == null)) {
-          LOG.warn("Resource: " + resourceId + ", partition: " + partitionId
-              + ", partition does NOT exist in ideal state");
-        }
-      }
-    }
-
-    BaseDataAccessor<ZNRecord> baseAccessor = _accessor.getBaseDataAccessor();
-    final List<String> partitionNames = new ArrayList<String>();
-    for (PartitionId partitionId : partitionIdSet) {
-      partitionNames.add(partitionId.stringify());
-    }
-
-    return baseAccessor.update(instanceConfigKey.getPath(), new DataUpdater<ZNRecord>() {
-      @Override
-      public ZNRecord update(ZNRecord currentData) {
-        if (currentData == null) {
-          throw new HelixException("Instance: " + participantId + ", participant config is null");
-        }
-
-        // TODO: merge with InstanceConfig.setInstanceEnabledForPartition
-        List<String> list =
-            currentData.getListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString());
-        Set<String> disabledPartitions = new HashSet<String>();
-        if (list != null) {
-          disabledPartitions.addAll(list);
-        }
-
-        if (enabled) {
-          disabledPartitions.removeAll(partitionNames);
-        } else {
-          disabledPartitions.addAll(partitionNames);
-        }
-
-        list = new ArrayList<String>(disabledPartitions);
-        Collections.sort(list);
-        currentData.setListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString(), list);
-        return currentData;
-      }
-    }, AccessOption.PERSISTENT);
-  }
-
-  /**
-   * disable partitions on a participant
-   * @param participantId
-   * @param resourceId
-   * @param disablePartitionIdSet
-   * @return true if disabled successfully, false otherwise
-   */
-  public boolean disablePartitionsForParticipant(ParticipantId participantId,
-      ResourceId resourceId, Set<PartitionId> disablePartitionIdSet) {
-    return enablePartitionsForParticipant(false, participantId, resourceId, disablePartitionIdSet);
-  }
-
-  /**
-   * enable partitions on a participant
-   * @param participantId
-   * @param resourceId
-   * @param enablePartitionIdSet
-   * @return true if enabled successfully, false otherwise
-   */
-  public boolean enablePartitionsForParticipant(ParticipantId participantId, ResourceId resourceId,
-      Set<PartitionId> enablePartitionIdSet) {
-    return enablePartitionsForParticipant(true, participantId, resourceId, enablePartitionIdSet);
-  }
-
-  /**
-   * Reset partitions assigned to a set of participants
-   * @param resetParticipantIdSet the participants to reset
-   * @return true if reset, false otherwise
-   */
-  public boolean resetParticipants(Set<ParticipantId> resetParticipantIdSet) {
-    List<ExternalView> extViews = _accessor.getChildValues(_keyBuilder.externalViews());
-    for (ParticipantId participantId : resetParticipantIdSet) {
-      for (ExternalView extView : extViews) {
-        Set<PartitionId> resetPartitionIdSet = Sets.newHashSet();
-        for (PartitionId partitionId : extView.getPartitionIdSet()) {
-          Map<ParticipantId, State> stateMap = extView.getStateMap(partitionId);
-          if (stateMap.containsKey(participantId)
-              && stateMap.get(participantId).equals(State.from(HelixDefinedState.ERROR))) {
-            resetPartitionIdSet.add(partitionId);
-          }
-        }
-        resetPartitionsForParticipant(participantId, extView.getResourceId(), resetPartitionIdSet);
-      }
-    }
-    return true;
-  }
-
-  /**
-   * reset partitions on a participant
-   * @param participantId
-   * @param resourceId
-   * @param resetPartitionIdSet
-   * @return true if partitions reset, false otherwise
-   */
-  public boolean resetPartitionsForParticipant(ParticipantId participantId, ResourceId resourceId,
-      Set<PartitionId> resetPartitionIdSet) {
-    // make sure the participant is running
-    Participant participant = readParticipant(participantId);
-    if (!participant.isAlive()) {
-      LOG.error("Cannot reset partitions because the participant is not running");
-      return false;
-    }
-    RunningInstance runningInstance = participant.getRunningInstance();
-
-    // check that the resource exists
-    ResourceAccessor resourceAccessor = resourceAccessor();
-    Resource resource = resourceAccessor.readResource(resourceId);
-    if (resource == null || resource.getRebalancerConfig() == null) {
-      LOG.error("Cannot reset partitions because the resource is not present");
-      return false;
-    }
-
-    // need the rebalancer config for the resource
-    RebalancerConfig config = resource.getRebalancerConfig();
-    if (config == null) {
-      LOG.error("Rebalancer config for resource does not exist");
-      return false;
-    }
-
-    // ensure that all partitions to reset exist
-    Set<PartitionId> partitionSet = ImmutableSet.copyOf(config.getSubUnitIdSet());
-    if (!partitionSet.containsAll(resetPartitionIdSet)) {
-      LOG.error("Not all of the specified partitions to reset exist for the resource");
-      return false;
-    }
-
-    // check for a valid current state that has all specified partitions in ERROR state
-    CurrentState currentState = participant.getCurrentStateMap().get(resourceId);
-    if (currentState == null) {
-      LOG.error("The participant does not have a current state for the resource");
-      return false;
-    }
-    for (PartitionId partitionId : resetPartitionIdSet) {
-      if (!currentState.getState(partitionId).equals(State.from(HelixDefinedState.ERROR))) {
-        LOG.error("Partition " + partitionId + " is not in error state, aborting reset");
-        return false;
-      }
-    }
-
-    // make sure that there are no pending transition messages
-    for (Message message : participant.getMessageMap().values()) {
-      if (!MessageType.STATE_TRANSITION.toString().equalsIgnoreCase(message.getMsgType())
-          || !runningInstance.getSessionId().equals(message.getTypedTgtSessionId())
-          || !resourceId.equals(message.getResourceId())
-          || !resetPartitionIdSet.contains(message.getPartitionId())) {
-        continue;
-      }
-      LOG.error("Cannot reset partitions because of the following pending message: " + message);
-      return false;
-    }
-
-    // set up the source id
-    String adminName = null;
-    try {
-      adminName = InetAddress.getLocalHost().getCanonicalHostName() + "-ADMIN";
-    } catch (UnknownHostException e) {
-      // can ignore it
-      if (LOG.isInfoEnabled()) {
-        LOG.info("Unable to get host name. Will set it to UNKNOWN, mostly ignorable", e);
-      }
-      adminName = "UNKNOWN";
-    }
-
-    // build messages to signal the transition
-    StateModelDefId stateModelDefId = config.getStateModelDefId();
-    StateModelDefinition stateModelDef =
-        _accessor.getProperty(_keyBuilder.stateModelDef(stateModelDefId.stringify()));
-    Map<MessageId, Message> messageMap = Maps.newHashMap();
-    for (PartitionId partitionId : resetPartitionIdSet) {
-      // send ERROR to initialState message
-      MessageId msgId = MessageId.from(UUID.randomUUID().toString());
-      Message message = new Message(MessageType.STATE_TRANSITION, msgId);
-      message.setSrcName(adminName);
-      message.setTgtName(participantId.stringify());
-      message.setMsgState(MessageState.NEW);
-      message.setPartitionId(partitionId);
-      message.setResourceId(resourceId);
-      message.setTgtSessionId(runningInstance.getSessionId());
-      message.setStateModelDef(stateModelDefId);
-      message.setFromState(State.from(HelixDefinedState.ERROR.toString()));
-      message.setToState(stateModelDef.getTypedInitialState());
-      message.setStateModelFactoryId(config.getStateModelFactoryId());
-
-      messageMap.put(message.getMessageId(), message);
-    }
-
-    // send the messages
-    insertMessagesToParticipant(participantId, messageMap);
-    return true;
-  }
-
-  /**
-   * Read the user config of the participant
-   * @param participantId the participant to to look up
-   * @return UserConfig, or null
-   */
-  public UserConfig readUserConfig(ParticipantId participantId) {
-    InstanceConfig instanceConfig =
-        _accessor.getProperty(_keyBuilder.instanceConfig(participantId.stringify()));
-    return instanceConfig != null ? instanceConfig.getUserConfig() : null;
-  }
-
-  /**
-   * Set the user config of the participant, overwriting existing user configs
-   * @param participantId the participant to update
-   * @param userConfig the new user config
-   * @return true if the user config was set, false otherwise
-   */
-  public boolean setUserConfig(ParticipantId participantId, UserConfig userConfig) {
-    ParticipantConfig.Delta delta =
-        new ParticipantConfig.Delta(participantId).setUserConfig(userConfig);
-    return updateParticipant(participantId, delta) != null;
-  }
-
-  /**
-   * Add user configuration to the existing participant user configuration. Overwrites properties
-   * with
-   * the same key
-   * @param participant the participant to update
-   * @param userConfig the user config key-value pairs to add
-   * @return true if the user config was updated, false otherwise
-   */
-  public boolean updateUserConfig(ParticipantId participantId, UserConfig userConfig) {
-    InstanceConfig instanceConfig = new InstanceConfig(participantId);
-    instanceConfig.addNamespacedConfig(userConfig);
-    return _accessor.updateProperty(_keyBuilder.instanceConfig(participantId.stringify()),
-        instanceConfig);
-  }
-
-  /**
-   * Clear any user-specified configuration from the participant
-   * @param participantId the participant to update
-   * @return true if the config was cleared, false otherwise
-   */
-  public boolean dropUserConfig(ParticipantId participantId) {
-    return setUserConfig(participantId, new UserConfig(Scope.participant(participantId)));
-  }
-
-  /**
-   * Update a participant configuration
-   * @param participantId the participant to update
-   * @param participantDelta changes to the participant
-   * @return ParticipantConfig, or null if participant is not persisted
-   */
-  public ParticipantConfig updateParticipant(ParticipantId participantId,
-      ParticipantConfig.Delta participantDelta) {
-    Participant participant = readParticipant(participantId);
-    if (participant == null) {
-      LOG.error("Participant " + participantId + " does not exist, cannot be updated");
-      return null;
-    }
-    ParticipantConfig config = participantDelta.mergeInto(participant.getConfig());
-    setParticipant(config);
-    return config;
-  }
-
-  /**
-   * Set the configuration of an existing participant
-   * @param participantConfig participant configuration
-   * @return true if config was set, false if there was an error
-   */
-  public boolean setParticipant(ParticipantConfig participantConfig) {
-    if (participantConfig == null) {
-      LOG.error("Participant config not initialized");
-      return false;
-    }
-    InstanceConfig instanceConfig = new InstanceConfig(participantConfig.getId());
-    instanceConfig.setHostName(participantConfig.getHostName());
-    instanceConfig.setPort(Integer.toString(participantConfig.getPort()));
-    for (String tag : participantConfig.getTags()) {
-      instanceConfig.addTag(tag);
-    }
-    for (PartitionId partitionId : participantConfig.getDisabledPartitions()) {
-      instanceConfig.setParticipantEnabledForPartition(partitionId, false);
-    }
-    instanceConfig.setInstanceEnabled(participantConfig.isEnabled());
-    instanceConfig.addNamespacedConfig(participantConfig.getUserConfig());
-    _accessor.setProperty(_keyBuilder.instanceConfig(participantConfig.getId().stringify()),
-        instanceConfig);
-    return true;
-  }
-
-  /**
-   * create a participant based on physical model
-   * @param participantId
-   * @param instanceConfig
-   * @param userConfig
-   * @param liveInstance
-   * @param instanceMsgMap map of message-id to message
-   * @param instanceCurStateMap map of resource-id to current-state
-   * @return participant
-   */
-  static Participant createParticipant(ParticipantId participantId, InstanceConfig instanceConfig,
-      UserConfig userConfig, LiveInstance liveInstance, Map<String, Message> instanceMsgMap,
-      Map<String, CurrentState> instanceCurStateMap) {
-
-    String hostName = instanceConfig.getHostName();
-
-    int port = -1;
-    try {
-      port = Integer.parseInt(instanceConfig.getPort());
-    } catch (IllegalArgumentException e) {
-      // keep as -1
-    }
-    if (port < 0 || port > 65535) {
-      port = -1;
-    }
-    boolean isEnabled = instanceConfig.getInstanceEnabled();
-
-    List<String> disabledPartitions = instanceConfig.getDisabledPartitions();
-    Set<PartitionId> disabledPartitionIdSet = Collections.emptySet();
-    if (disabledPartitions != null) {
-      disabledPartitionIdSet = new HashSet<PartitionId>();
-      for (String partitionId : disabledPartitions) {
-        disabledPartitionIdSet.add(PartitionId.from(PartitionId.extractResourceId(partitionId),
-            PartitionId.stripResourceId(partitionId)));
-      }
-    }
-
-    Set<String> tags = new HashSet<String>(instanceConfig.getTags());
-
-    RunningInstance runningInstance = null;
-    if (liveInstance != null) {
-      runningInstance =
-          new RunningInstance(liveInstance.getTypedSessionId(),
-              liveInstance.getTypedHelixVersion(), liveInstance.getProcessId());
-    }
-
-    Map<MessageId, Message> msgMap = new HashMap<MessageId, Message>();
-    if (instanceMsgMap != null) {
-      for (String msgId : instanceMsgMap.keySet()) {
-        Message message = instanceMsgMap.get(msgId);
-        msgMap.put(MessageId.from(msgId), message);
-      }
-    }
-
-    Map<ResourceId, CurrentState> curStateMap = new HashMap<ResourceId, CurrentState>();
-    if (instanceCurStateMap != null) {
-
-      for (String resourceName : instanceCurStateMap.keySet()) {
-        curStateMap.put(ResourceId.from(resourceName), instanceCurStateMap.get(resourceName));
-      }
-    }
-
-    // set up the container config if it exists
-    ContainerConfig containerConfig = null;
-    ContainerSpec containerSpec = instanceConfig.getContainerSpec();
-    ContainerState containerState = instanceConfig.getContainerState();
-    ContainerId containerId = instanceConfig.getContainerId();
-    if (containerSpec != null || containerState != null || containerId != null) {
-      containerConfig = new ContainerConfig(containerId, containerSpec, containerState);
-    }
-
-    return new Participant(participantId, hostName, port, isEnabled, disabledPartitionIdSet, tags,
-        runningInstance, curStateMap, msgMap, userConfig, containerConfig);
-  }
-
-  /**
-   * read participant related data
-   * @param participantId
-   * @return participant, or null if participant not available
-   */
-  public Participant readParticipant(ParticipantId participantId) {
-    // read physical model
-    String participantName = participantId.stringify();
-    InstanceConfig instanceConfig =
-        _accessor.getProperty(_keyBuilder.instanceConfig(participantName));
-
-    if (instanceConfig == null) {
-      LOG.error("Participant " + participantId + " is not present on the cluster");
-      return null;
-    }
-
-    UserConfig userConfig = instanceConfig.getUserConfig();
-    LiveInstance liveInstance = _accessor.getProperty(_keyBuilder.liveInstance(participantName));
-
-    Map<String, Message> instanceMsgMap = Collections.emptyMap();
-    Map<String, CurrentState> instanceCurStateMap = Collections.emptyMap();
-    if (liveInstance != null) {
-      SessionId sessionId = liveInstance.getTypedSessionId();
-
-      instanceMsgMap = _accessor.getChildValuesMap(_keyBuilder.messages(participantName));
-      instanceCurStateMap =
-          _accessor.getChildValuesMap(_keyBuilder.currentStates(participantName,
-              sessionId.stringify()));
-    }
-
-    return createParticipant(participantId, instanceConfig, userConfig, liveInstance,
-        instanceMsgMap, instanceCurStateMap);
-  }
-
-  /**
-   * update resource current state of a participant
-   * @param resourceId resource id
-   * @param participantId participant id
-   * @param sessionId session id
-   * @param curStateUpdate current state change delta
-   */
-  public void updateCurrentState(ResourceId resourceId, ParticipantId participantId,
-      SessionId sessionId, CurrentState curStateUpdate) {
-    _accessor.updateProperty(
-        _keyBuilder.currentState(participantId.stringify(), sessionId.stringify(),
-            resourceId.stringify()), curStateUpdate);
-  }
-
-  /**
-   * drop resource current state of a participant
-   * @param resourceId resource id
-   * @param participantId participant id
-   * @param sessionId session id
-   * @return true if dropped, false otherwise
-   */
-  public boolean dropCurrentState(ResourceId resourceId, ParticipantId participantId,
-      SessionId sessionId) {
-    return _accessor.removeProperty(_keyBuilder.currentState(participantId.stringify(),
-        sessionId.stringify(), resourceId.stringify()));
-  }
-
-  /**
-   * drop a participant from cluster
-   * @param participantId
-   * @return true if participant dropped, false if there was an error
-   */
-  boolean dropParticipant(ParticipantId participantId) {
-    if (_accessor.getProperty(_keyBuilder.instanceConfig(participantId.stringify())) == null) {
-      LOG.error("Config for participant: " + participantId + " does NOT exist in cluster");
-    }
-
-    if (_accessor.getProperty(_keyBuilder.instance(participantId.stringify())) == null) {
-      LOG.error("Participant: " + participantId + " structure does NOT exist in cluster");
-    }
-
-    // delete participant config path
-    _accessor.removeProperty(_keyBuilder.instanceConfig(participantId.stringify()));
-
-    // delete participant path
-    _accessor.removeProperty(_keyBuilder.instance(participantId.stringify()));
-    return true;
-  }
-
-  /**
-   * Let a new participant take the place of an existing participant
-   * @param oldParticipantId the participant to drop
-   * @param newParticipantId the participant that takes its place
-   * @return true if swap successful, false otherwise
-   */
-  public boolean swapParticipants(ParticipantId oldParticipantId, ParticipantId newParticipantId) {
-    Participant oldParticipant = readParticipant(oldParticipantId);
-    if (oldParticipant == null) {
-      LOG.error("Could not swap participants because the old participant does not exist");
-      return false;
-    }
-    if (oldParticipant.isEnabled()) {
-      LOG.error("Could not swap participants because the old participant is still enabled");
-      return false;
-    }
-    if (oldParticipant.isAlive()) {
-      LOG.error("Could not swap participants because the old participant is still live");
-      return false;
-    }
-    Participant newParticipant = readParticipant(newParticipantId);
-    if (newParticipant == null) {
-      LOG.error("Could not swap participants because the new participant does not exist");
-      return false;
-    }
-    dropParticipant(oldParticipantId);
-    ResourceAccessor resourceAccessor = resourceAccessor();
-    Map<String, IdealState> idealStateMap = _accessor.getChildValuesMap(_keyBuilder.idealStates());
-    for (String resourceName : idealStateMap.keySet()) {
-      IdealState idealState = idealStateMap.get(resourceName);
-      swapParticipantsInIdealState(idealState, oldParticipantId, newParticipantId);
-      PartitionedRebalancerConfig config = PartitionedRebalancerConfig.from(idealState);
-      resourceAccessor.setRebalancerConfig(ResourceId.from(resourceName), config);
-      _accessor.setProperty(_keyBuilder.idealStates(resourceName), idealState);
-    }
-    return true;
-  }
-
-  /**
-   * Replace occurrences of participants in preference lists and maps
-   * @param idealState the current ideal state
-   * @param oldParticipantId the participant to drop
-   * @param newParticipantId the participant that replaces it
-   */
-  protected void swapParticipantsInIdealState(IdealState idealState,
-      ParticipantId oldParticipantId, ParticipantId newParticipantId) {
-    for (PartitionId partitionId : idealState.getPartitionIdSet()) {
-      List<ParticipantId> oldPreferenceList = idealState.getPreferenceList(partitionId);
-      if (oldPreferenceList != null) {
-        List<ParticipantId> newPreferenceList = Lists.newArrayList();
-        for (ParticipantId participantId : oldPreferenceList) {
-          if (participantId.equals(oldParticipantId)) {
-            newPreferenceList.add(newParticipantId);
-          } else if (!participantId.equals(newParticipantId)) {
-            newPreferenceList.add(participantId);
-          }
-        }
-        idealState.setPreferenceList(partitionId, newPreferenceList);
-      }
-      Map<ParticipantId, State> preferenceMap = idealState.getParticipantStateMap(partitionId);
-      if (preferenceMap != null) {
-        if (preferenceMap.containsKey(oldParticipantId)) {
-          State state = preferenceMap.get(oldParticipantId);
-          preferenceMap.remove(oldParticipantId);
-          preferenceMap.put(newParticipantId, state);
-        }
-        idealState.setParticipantStateMap(partitionId, preferenceMap);
-      }
-    }
-  }
-
-  /**
-   * Create empty persistent properties to ensure that there is a valid participant structure
-   * @param participantId the identifier under which to initialize the structure
-   * @return true if the participant structure exists at the end of this call, false otherwise
-   */
-  public boolean initParticipantStructure(ParticipantId participantId) {
-    if (participantId == null) {
-      LOG.error("Participant ID cannot be null when clearing the participant in cluster "
-          + _clusterId + "!");
-      return false;
-    }
-    List<String> paths =
-        HelixUtil.getRequiredPathsForInstance(_clusterId.toString(), participantId.toString());
-    BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
-    for (String path : paths) {
-      boolean status = baseAccessor.create(path, null, AccessOption.PERSISTENT);
-      if (!status) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(path + " already exists");
-        }
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Clear properties for the participant
-   * @param participantId the participant for which to clear
-   * @return true if all paths removed, false otherwise
-   */
-  protected boolean clearParticipantStructure(ParticipantId participantId) {
-    List<String> paths =
-        HelixUtil.getRequiredPathsForInstance(_clusterId.toString(), participantId.toString());
-    BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
-    boolean[] removeResults = baseAccessor.remove(paths, 0);
-    boolean result = true;
-    for (boolean removeResult : removeResults) {
-      result = result && removeResult;
-    }
-    return result;
-  }
-
-  /**
-   * check if participant structure is valid
-   * @return true if valid or false otherwise
-   */
-  public boolean isParticipantStructureValid(ParticipantId participantId) {
-    List<String> paths =
-        HelixUtil.getRequiredPathsForInstance(_clusterId.toString(), participantId.toString());
-    BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
-    if (baseAccessor != null) {
-      boolean[] existsResults = baseAccessor.exists(paths, 0);
-      for (boolean exists : existsResults) {
-        if (!exists) {
-          return false;
-        }
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Get a ResourceAccessor instance
-   * @return ResourceAccessor
-   */
-  protected ResourceAccessor resourceAccessor() {
-    return new ResourceAccessor(_clusterId, _accessor);
-  }
-
-  /**
-   * Get the cluster ID this accessor is connected to
-   * @return ClusterId
-   */
-  protected ClusterId clusterId() {
-    return _clusterId;
-  }
-
-  /**
-   * Get the accessor for the properties stored for this cluster
-   * @return HelixDataAccessor
-   */
-  protected HelixDataAccessor dataAccessor() {
-    return _accessor;
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
deleted file mode 100644
index 7dde6ee..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
+++ /dev/null
@@ -1,541 +0,0 @@
-package org.apache.helix.api.accessor;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.HelixConstants.StateModelToken;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixDefinedState;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.api.Resource;
-import org.apache.helix.api.Scope;
-import org.apache.helix.api.State;
-import org.apache.helix.api.config.ResourceConfig;
-import org.apache.helix.api.config.ResourceConfig.ResourceType;
-import org.apache.helix.api.config.UserConfig;
-import org.apache.helix.api.id.ClusterId;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.provisioner.ProvisionerConfig;
-import org.apache.helix.controller.rebalancer.RebalancerRef;
-import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
-import org.apache.helix.controller.rebalancer.config.CustomRebalancerConfig;
-import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
-import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.config.RebalancerConfigHolder;
-import org.apache.helix.controller.rebalancer.config.SemiAutoRebalancerConfig;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.ProvisionerConfigHolder;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.ResourceConfiguration;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-public class ResourceAccessor {
-  private static final Logger LOG = Logger.getLogger(ResourceAccessor.class);
-  private final ClusterId _clusterId;
-  private final HelixDataAccessor _accessor;
-  private final PropertyKey.Builder _keyBuilder;
-
-  public ResourceAccessor(ClusterId clusterId, HelixDataAccessor accessor) {
-    _clusterId = clusterId;
-    _accessor = accessor;
-    _keyBuilder = accessor.keyBuilder();
-  }
-
-  /**
-   * Read a single snapshot of a resource
-   * @param resourceId the resource id to read
-   * @return Resource or null if not present
-   */
-  public Resource readResource(ResourceId resourceId) {
-    ResourceConfiguration config =
-        _accessor.getProperty(_keyBuilder.resourceConfig(resourceId.stringify()));
-    IdealState idealState = _accessor.getProperty(_keyBuilder.idealStates(resourceId.stringify()));
-
-    if (config == null && idealState == null) {
-      LOG.error("Resource " + resourceId + " not present on the cluster");
-      return null;
-    }
-
-    ExternalView externalView =
-        _accessor.getProperty(_keyBuilder.externalView(resourceId.stringify()));
-    ResourceAssignment resourceAssignment =
-        _accessor.getProperty(_keyBuilder.resourceAssignment(resourceId.stringify()));
-    return createResource(resourceId, config, idealState, externalView, resourceAssignment);
-  }
-
-  /**
-   * Update a resource configuration
-   * @param resourceId the resource id to update
-   * @param resourceDelta changes to the resource
-   * @return ResourceConfig, or null if the resource is not persisted
-   */
-  public ResourceConfig updateResource(ResourceId resourceId, ResourceConfig.Delta resourceDelta) {
-    Resource resource = readResource(resourceId);
-    if (resource == null) {
-      LOG.error("Resource " + resourceId + " does not exist, cannot be updated");
-      return null;
-    }
-    ResourceConfig config = resourceDelta.mergeInto(resource.getConfig());
-    setResource(config);
-    return config;
-  }
-
-  /**
-   * save resource assignment
-   * @param resourceId
-   * @param resourceAssignment
-   * @return true if set, false otherwise
-   */
-  public boolean setResourceAssignment(ResourceId resourceId, ResourceAssignment resourceAssignment) {
-    return _accessor.setProperty(_keyBuilder.resourceAssignment(resourceId.stringify()),
-        resourceAssignment);
-  }
-
-  /**
-   * get resource assignment
-   * @param resourceId
-   * @return resource assignment or null
-   */
-  public ResourceAssignment getResourceAssignment(ResourceId resourceId) {
-    return _accessor.getProperty(_keyBuilder.resourceAssignment(resourceId.stringify()));
-  }
-
-  /**
-   * Set a physical resource configuration, which may include user-defined configuration, as well as
-   * rebalancer configuration
-   * @param resourceId
-   * @param configuration
-   * @return true if set, false otherwise
-   */
-  private boolean setConfiguration(ResourceId resourceId, ResourceConfiguration configuration,
-      RebalancerConfig rebalancerConfig) {
-    boolean status = true;
-    if (configuration != null) {
-      status =
-          _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
-    }
-    // set an ideal state if the resource supports it
-    IdealState idealState =
-        rebalancerConfigToIdealState(rebalancerConfig, configuration.getBucketSize(),
-            configuration.getBatchMessageMode());
-    if (idealState != null) {
-      status =
-          status
-              && _accessor.setProperty(_keyBuilder.idealStates(resourceId.stringify()), idealState);
-    }
-    return status;
-  }
-
-  /**
-   * Set the config of the rebalancer. This includes all properties required for rebalancing this
-   * resource
-   * @param resourceId the resource to update
-   * @param config the new rebalancer config
-   * @return true if the config was set, false otherwise
-   */
-  public boolean setRebalancerConfig(ResourceId resourceId, RebalancerConfig config) {
-    ResourceConfiguration resourceConfig = new ResourceConfiguration(resourceId);
-    PartitionedRebalancerConfig partitionedConfig = PartitionedRebalancerConfig.from(config);
-    if (partitionedConfig == null
-        || partitionedConfig.getRebalanceMode() == RebalanceMode.USER_DEFINED) {
-      // only persist if this is not easily convertible to an ideal state
-      resourceConfig.addNamespacedConfig(new RebalancerConfigHolder(config).toNamespacedConfig());
-    }
-
-    // update the ideal state if applicable
-    IdealState oldIdealState =
-        _accessor.getProperty(_keyBuilder.idealStates(resourceId.stringify()));
-    if (oldIdealState != null) {
-      IdealState idealState =
-          rebalancerConfigToIdealState(config, oldIdealState.getBucketSize(),
-              oldIdealState.getBatchMessageMode());
-      if (idealState != null) {
-        _accessor.setProperty(_keyBuilder.idealStates(resourceId.stringify()), idealState);
-      }
-    }
-
-    return _accessor.updateProperty(_keyBuilder.resourceConfig(resourceId.stringify()),
-        resourceConfig);
-  }
-
-  /**
-   * Read the user config of the resource
-   * @param resourceId the resource to to look up
-   * @return UserConfig, or null
-   */
-  public UserConfig readUserConfig(ResourceId resourceId) {
-    ResourceConfiguration resourceConfig =
-        _accessor.getProperty(_keyBuilder.resourceConfig(resourceId.stringify()));
-    return resourceConfig != null ? UserConfig.from(resourceConfig) : null;
-  }
-
-  /**
-   * Read the rebalancer config of the resource
-   * @param resourceId the resource to to look up
-   * @return RebalancerConfig, or null
-   */
-  public RebalancerConfig readRebalancerConfig(ResourceId resourceId) {
-    IdealState idealState = _accessor.getProperty(_keyBuilder.idealStates(resourceId.stringify()));
-    if (idealState != null && idealState.getRebalanceMode() != RebalanceMode.USER_DEFINED) {
-      return PartitionedRebalancerConfig.from(idealState);
-    }
-    ResourceConfiguration resourceConfig =
-        _accessor.getProperty(_keyBuilder.resourceConfig(resourceId.stringify()));
-    return resourceConfig.getRebalancerConfig(RebalancerConfig.class);
-  }
-
-  /**
-   * Set the user config of the resource, overwriting existing user configs
-   * @param resourceId the resource to update
-   * @param userConfig the new user config
-   * @return true if the user config was set, false otherwise
-   */
-  public boolean setUserConfig(ResourceId resourceId, UserConfig userConfig) {
-    ResourceConfig.Delta delta = new ResourceConfig.Delta(resourceId).setUserConfig(userConfig);
-    return updateResource(resourceId, delta) != null;
-  }
-
-  /**
-   * Add user configuration to the existing resource user configuration. Overwrites properties with
-   * the same key
-   * @param resourceId the resource to update
-   * @param userConfig the user config key-value pairs to add
-   * @return true if the user config was updated, false otherwise
-   */
-  public boolean updateUserConfig(ResourceId resourceId, UserConfig userConfig) {
-    ResourceConfiguration resourceConfig = new ResourceConfiguration(resourceId);
-    resourceConfig.addNamespacedConfig(userConfig);
-    return _accessor.updateProperty(_keyBuilder.resourceConfig(resourceId.stringify()),
-        resourceConfig);
-  }
-
-  /**
-   * Clear any user-specified configuration from the resource
-   * @param resourceId the resource to update
-   * @return true if the config was cleared, false otherwise
-   */
-  public boolean dropUserConfig(ResourceId resourceId) {
-    return setUserConfig(resourceId, new UserConfig(Scope.resource(resourceId)));
-  }
-
-  /**
-   * Persist an existing resource's logical configuration
-   * @param resourceConfig logical resource configuration
-   * @return true if resource is set, false otherwise
-   */
-  public boolean setResource(ResourceConfig resourceConfig) {
-    if (resourceConfig == null || resourceConfig.getRebalancerConfig() == null) {
-      LOG.error("Resource not fully defined with a rebalancer context");
-      return false;
-    }
-    ResourceId resourceId = resourceConfig.getId();
-    ResourceConfiguration config = new ResourceConfiguration(resourceId);
-    UserConfig userConfig = resourceConfig.getUserConfig();
-    if (userConfig != null
-        && (!userConfig.getSimpleFields().isEmpty() || !userConfig.getListFields().isEmpty() || !userConfig
-            .getMapFields().isEmpty())) {
-      config.addNamespacedConfig(userConfig);
-    } else {
-      userConfig = null;
-    }
-    PartitionedRebalancerConfig partitionedConfig =
-        PartitionedRebalancerConfig.from(resourceConfig.getRebalancerConfig());
-    if (partitionedConfig == null
-        || partitionedConfig.getRebalanceMode() == RebalanceMode.USER_DEFINED) {
-      // only persist if this is not easily convertible to an ideal state
-      config.addNamespacedConfig(new RebalancerConfigHolder(resourceConfig.getRebalancerConfig())
-          .toNamespacedConfig());
-      config.setBucketSize(resourceConfig.getBucketSize());
-      config.setBatchMessageMode(resourceConfig.getBatchMessageMode());
-    } else if (userConfig == null) {
-      config = null;
-    }
-    if (resourceConfig.getProvisionerConfig() != null) {
-      config.addNamespacedConfig(new ProvisionerConfigHolder(resourceConfig.getProvisionerConfig())
-          .toNamespacedConfig());
-    }
-    config.setBucketSize(resourceConfig.getBucketSize());
-    config.setBatchMessageMode(resourceConfig.getBatchMessageMode());
-    setConfiguration(resourceId, config, resourceConfig.getRebalancerConfig());
-    return true;
-  }
-
-  /**
-   * Get a resource configuration, which may include user-defined configuration, as well as
-   * rebalancer configuration
-   * @param resourceId
-   * @return configuration or null
-   */
-  public ResourceConfiguration getConfiguration(ResourceId resourceId) {
-    return _accessor.getProperty(_keyBuilder.resourceConfig(resourceId.stringify()));
-  }
-
-  /**
-   * set external view of a resource
-   * @param resourceId
-   * @param extView
-   * @return true if set, false otherwise
-   */
-  public boolean setExternalView(ResourceId resourceId, ExternalView extView) {
-    return _accessor.setProperty(_keyBuilder.externalView(resourceId.stringify()), extView);
-  }
-
-  /**
-   * get the external view of a resource
-   * @param resourceId the resource to look up
-   * @return external view or null
-   */
-  public ExternalView readExternalView(ResourceId resourceId) {
-    return _accessor.getProperty(_keyBuilder.externalView(resourceId.stringify()));
-  }
-
-  /**
-   * drop external view of a resource
-   * @param resourceId
-   * @return true if dropped, false otherwise
-   */
-  public boolean dropExternalView(ResourceId resourceId) {
-    return _accessor.removeProperty(_keyBuilder.externalView(resourceId.stringify()));
-  }
-
-  /**
-   * reset resources for all participants
-   * @param resetResourceIdSet the resources to reset
-   * @return true if they were reset, false otherwise
-   */
-  public boolean resetResources(Set<ResourceId> resetResourceIdSet) {
-    ParticipantAccessor accessor = participantAccessor();
-    List<ExternalView> extViews = _accessor.getChildValues(_keyBuilder.externalViews());
-    for (ExternalView extView : extViews) {
-      if (!resetResourceIdSet.contains(extView.getResourceId())) {
-        continue;
-      }
-
-      Map<ParticipantId, Set<PartitionId>> resetPartitionIds = Maps.newHashMap();
-      for (PartitionId partitionId : extView.getPartitionIdSet()) {
-        Map<ParticipantId, State> stateMap = extView.getStateMap(partitionId);
-        for (ParticipantId participantId : stateMap.keySet()) {
-          State state = stateMap.get(participantId);
-          if (state.equals(State.from(HelixDefinedState.ERROR))) {
-            if (!resetPartitionIds.containsKey(participantId)) {
-              resetPartitionIds.put(participantId, new HashSet<PartitionId>());
-            }
-            resetPartitionIds.get(participantId).add(partitionId);
-          }
-        }
-      }
-      for (ParticipantId participantId : resetPartitionIds.keySet()) {
-        accessor.resetPartitionsForParticipant(participantId, extView.getResourceId(),
-            resetPartitionIds.get(participantId));
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Generate a default assignment for partitioned resources
-   * @param resourceId the resource to update
-   * @param replicaCount the new replica count (or -1 to use the existing one)
-   * @param participantGroupTag the new participant group tag (or null to use the existing one)
-   * @return true if assignment successful, false otherwise
-   */
-  public boolean generateDefaultAssignment(ResourceId resourceId, int replicaCount,
-      String participantGroupTag) {
-    Resource resource = readResource(resourceId);
-    RebalancerConfig config = resource.getRebalancerConfig();
-    PartitionedRebalancerConfig partitionedConfig = PartitionedRebalancerConfig.from(config);
-    if (partitionedConfig == null) {
-      LOG.error("Only partitioned resource types are supported");
-      return false;
-    }
-    if (replicaCount != -1) {
-      partitionedConfig.setReplicaCount(replicaCount);
-    }
-    if (participantGroupTag != null) {
-      partitionedConfig.setParticipantGroupTag(participantGroupTag);
-    }
-    StateModelDefinition stateModelDef =
-        _accessor.getProperty(_keyBuilder.stateModelDef(partitionedConfig.getStateModelDefId()
-            .stringify()));
-    List<InstanceConfig> participantConfigs =
-        _accessor.getChildValues(_keyBuilder.instanceConfigs());
-    Set<ParticipantId> participantSet = Sets.newHashSet();
-    for (InstanceConfig participantConfig : participantConfigs) {
-      participantSet.add(participantConfig.getParticipantId());
-    }
-    partitionedConfig.generateDefaultConfiguration(stateModelDef, participantSet);
-    setRebalancerConfig(resourceId, partitionedConfig);
-    return true;
-  }
-
-  /**
-   * Get an ideal state from a rebalancer config if the resource is partitioned
-   * @param config RebalancerConfig instance
-   * @param bucketSize bucket size to use
-   * @param batchMessageMode true if batch messaging allowed, false otherwise
-   * @return IdealState, or null
-   */
-  public static IdealState rebalancerConfigToIdealState(RebalancerConfig config, int bucketSize,
-      boolean batchMessageMode) {
-    PartitionedRebalancerConfig partitionedConfig = PartitionedRebalancerConfig.from(config);
-    if (partitionedConfig != null) {
-      if (!PartitionedRebalancerConfig.isBuiltinConfig(partitionedConfig.getClass())) {
-        // don't proceed if this resource cannot be described by an ideal state
-        return null;
-      }
-      IdealState idealState = new IdealState(partitionedConfig.getResourceId());
-      idealState.setRebalanceMode(partitionedConfig.getRebalanceMode());
-
-      RebalancerRef ref = partitionedConfig.getRebalancerRef();
-      if (ref != null) {
-        idealState.setRebalancerRef(partitionedConfig.getRebalancerRef());
-      }
-      String replicas = null;
-      if (partitionedConfig.anyLiveParticipant()) {
-        replicas = StateModelToken.ANY_LIVEINSTANCE.toString();
-      } else {
-        replicas = Integer.toString(partitionedConfig.getReplicaCount());
-      }
-      idealState.setReplicas(replicas);
-      idealState.setNumPartitions(partitionedConfig.getPartitionSet().size());
-      idealState.setInstanceGroupTag(partitionedConfig.getParticipantGroupTag());
-      idealState.setMaxPartitionsPerInstance(partitionedConfig.getMaxPartitionsPerParticipant());
-      idealState.setStateModelDefId(partitionedConfig.getStateModelDefId());
-      idealState.setStateModelFactoryId(partitionedConfig.getStateModelFactoryId());
-      idealState.setBucketSize(bucketSize);
-      idealState.setBatchMessageMode(batchMessageMode);
-      idealState.setRebalancerConfigClass(config.getClass());
-      if (SemiAutoRebalancerConfig.class.equals(config.getClass())) {
-        SemiAutoRebalancerConfig semiAutoConfig =
-            BasicRebalancerConfig.convert(config, SemiAutoRebalancerConfig.class);
-        for (PartitionId partitionId : semiAutoConfig.getPartitionSet()) {
-          idealState.setPreferenceList(partitionId, semiAutoConfig.getPreferenceList(partitionId));
-        }
-      } else if (CustomRebalancerConfig.class.equals(config.getClass())) {
-        CustomRebalancerConfig customConfig =
-            BasicRebalancerConfig.convert(config, CustomRebalancerConfig.class);
-        for (PartitionId partitionId : customConfig.getPartitionSet()) {
-          idealState
-              .setParticipantStateMap(partitionId, customConfig.getPreferenceMap(partitionId));
-        }
-      } else {
-        for (PartitionId partitionId : partitionedConfig.getPartitionSet()) {
-          List<ParticipantId> preferenceList = Collections.emptyList();
-          idealState.setPreferenceList(partitionId, preferenceList);
-          Map<ParticipantId, State> participantStateMap = Collections.emptyMap();
-          idealState.setParticipantStateMap(partitionId, participantStateMap);
-        }
-      }
-      return idealState;
-    }
-    return null;
-  }
-
-  /**
-   * Create a resource snapshot instance from the physical model
-   * @param resourceId the resource id
-   * @param resourceConfiguration physical resource configuration
-   * @param idealState ideal state of the resource
-   * @param externalView external view of the resource
-   * @param resourceAssignment current resource assignment
-   * @return Resource
-   */
-  static Resource createResource(ResourceId resourceId,
-      ResourceConfiguration resourceConfiguration, IdealState idealState,
-      ExternalView externalView, ResourceAssignment resourceAssignment) {
-    UserConfig userConfig;
-    ProvisionerConfig provisionerConfig = null;
-    RebalancerConfig rebalancerConfig = null;
-    ResourceType type = ResourceType.DATA;
-    if (resourceConfiguration != null) {
-      userConfig = resourceConfiguration.getUserConfig();
-      type = resourceConfiguration.getType();
-    } else {
-      userConfig = new UserConfig(Scope.resource(resourceId));
-    }
-    int bucketSize = 0;
-    boolean batchMessageMode = false;
-    if (idealState != null) {
-      if (resourceConfiguration != null
-          && idealState.getRebalanceMode() == RebalanceMode.USER_DEFINED) {
-        // prefer rebalancer config for user_defined data rebalancing
-        rebalancerConfig =
-            resourceConfiguration.getRebalancerConfig(PartitionedRebalancerConfig.class);
-      }
-      if (rebalancerConfig == null) {
-        // prefer ideal state for non-user_defined data rebalancing
-        rebalancerConfig = PartitionedRebalancerConfig.from(idealState);
-      }
-      bucketSize = idealState.getBucketSize();
-      batchMessageMode = idealState.getBatchMessageMode();
-      idealState.updateUserConfig(userConfig);
-    } else if (resourceConfiguration != null) {
-      bucketSize = resourceConfiguration.getBucketSize();
-      batchMessageMode = resourceConfiguration.getBatchMessageMode();
-      rebalancerConfig = resourceConfiguration.getRebalancerConfig(RebalancerConfig.class);
-    }
-    if (rebalancerConfig == null) {
-      rebalancerConfig = new PartitionedRebalancerConfig();
-    }
-    if (resourceConfiguration != null) {
-      provisionerConfig = resourceConfiguration.getProvisionerConfig(ProvisionerConfig.class);
-    }
-    return new Resource(resourceId, type, idealState, resourceAssignment, externalView,
-        rebalancerConfig, provisionerConfig, userConfig, bucketSize, batchMessageMode);
-  }
-
-  /**
-   * Get a ParticipantAccessor instance
-   * @return ParticipantAccessor
-   */
-  protected ParticipantAccessor participantAccessor() {
-    return new ParticipantAccessor(_clusterId, _accessor);
-  }
-
-  /**
-   * Get the cluster ID this accessor is connected to
-   * @return ClusterId
-   */
-  protected ClusterId clusterId() {
-    return _clusterId;
-  }
-
-  /**
-   * Get the accessor for the properties stored for this cluster
-   * @return HelixDataAccessor
-   */
-  protected HelixDataAccessor dataAccessor() {
-    return _accessor;
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
index 6185383..26df5d7 100644
--- a/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
@@ -1,17 +1,5 @@
 package org.apache.helix.api.config;
 
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.api.Partition;
-import org.apache.helix.api.Scope;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.provisioner.ProvisionerConfig;
-import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
-
-import com.google.common.collect.Sets;
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -31,6 +19,17 @@ import com.google.common.collect.Sets;
  * under the License.
  */
 
+import java.util.Set;
+
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.provisioner.ProvisionerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.model.IdealState;
+
+import com.google.common.collect.Sets;
+
 /**
  * Full configuration of a Helix resource. Typically used to add or modify resources on a cluster
  */
@@ -47,6 +46,7 @@ public class ResourceConfig {
 
   private final ResourceId _id;
   private final RebalancerConfig _rebalancerConfig;
+  private final IdealState _idealState;
   private final SchedulerTaskConfig _schedulerTaskConfig;
   private final ProvisionerConfig _provisionerConfig;
   private final UserConfig _userConfig;
@@ -65,13 +65,14 @@ public class ResourceConfig {
    * @param bucketSize bucket size for this resource
    * @param batchMessageMode whether or not batch messaging is allowed
    */
-  public ResourceConfig(ResourceId id, ResourceType resourceType,
+  public ResourceConfig(ResourceId id, ResourceType resourceType, IdealState idealState,
       SchedulerTaskConfig schedulerTaskConfig, RebalancerConfig rebalancerConfig,
       ProvisionerConfig provisionerConfig, UserConfig userConfig, int bucketSize,
       boolean batchMessageMode) {
     _id = id;
     _resourceType = resourceType;
     _schedulerTaskConfig = schedulerTaskConfig;
+    _idealState = idealState;
     _rebalancerConfig = rebalancerConfig;
     _provisionerConfig = provisionerConfig;
     _userConfig = userConfig;
@@ -80,28 +81,11 @@ public class ResourceConfig {
   }
 
   /**
-   * Get the subunits of the resource
-   * @return map of subunit id to subunit or empty map if none
-   */
-  public Map<? extends PartitionId, ? extends Partition> getSubUnitMap() {
-    return _rebalancerConfig.getSubUnitMap();
-  }
-
-  /**
-   * Get a subunit that the resource contains
-   * @param subUnitId the subunit id to look up
-   * @return Partition or null if none is present with the given id
-   */
-  public Partition getSubUnit(PartitionId subUnitId) {
-    return getSubUnitMap().get(subUnitId);
-  }
-
-  /**
    * Get the set of subunit ids that the resource contains
    * @return subunit id set, or empty if none
    */
   public Set<? extends PartitionId> getSubUnitSet() {
-    return getSubUnitMap().keySet();
+    return _idealState.getPartitionIdSet();
   }
 
   /**
@@ -113,6 +97,14 @@ public class ResourceConfig {
   }
 
   /**
+   * Get the ideal state for this resource
+   * @return IdealState instance
+   */
+  public IdealState getIdealState() {
+    return _idealState;
+  }
+
+  /**
    * Get the resource id
    * @return ResourceId
    */
@@ -170,7 +162,7 @@ public class ResourceConfig {
 
   @Override
   public String toString() {
-    return getSubUnitMap().toString();
+    return _idealState.toString();
   }
 
   /**
@@ -309,6 +301,7 @@ public class ResourceConfig {
   public static class Builder {
     private final ResourceId _id;
     private ResourceType _type;
+    private IdealState _idealState;
     private RebalancerConfig _rebalancerConfig;
     private SchedulerTaskConfig _schedulerTaskConfig;
     private ProvisionerConfig _provisionerConfig;
@@ -349,6 +342,16 @@ public class ResourceConfig {
     }
 
     /**
+     * Set the ideal state
+     * @param idealState a description of a resource
+     * @return Builder
+     */
+    public Builder idealState(IdealState idealState) {
+      _idealState = idealState;
+      return this;
+    }
+
+    /**
      * Set the user configuration
      * @param userConfig user-specified properties
      * @return Builder
@@ -401,7 +404,7 @@ public class ResourceConfig {
      * @return instantiated Resource
      */
     public ResourceConfig build() {
-      return new ResourceConfig(_id, _type, _schedulerTaskConfig, _rebalancerConfig,
+      return new ResourceConfig(_id, _type, _idealState, _schedulerTaskConfig, _rebalancerConfig,
           _provisionerConfig, _userConfig, _bucketSize, _batchMessageMode);
     }
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
index 4d5c373..e884ff8 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
@@ -9,8 +9,6 @@ import org.apache.helix.api.State;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.controller.context.ControllerContextProvider;
-import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
-import org.apache.helix.controller.rebalancer.config.CustomRebalancerConfig;
 import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
 import org.apache.helix.controller.stages.ResourceCurrentState;
@@ -48,27 +46,26 @@ public class CustomRebalancer implements HelixRebalancer {
   }
 
   @Override
-  public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
-      ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState) {
-    CustomRebalancerConfig config =
-        BasicRebalancerConfig.convert(rebalancerConfig, CustomRebalancerConfig.class);
-    IdealState idealState = cluster.getResource(rebalancerConfig.getResourceId()).getIdealState();
+  public ResourceAssignment computeResourceMapping(IdealState idealState,
+      RebalancerConfig rebalancerConfig, ResourceAssignment prevAssignment, Cluster cluster,
+      ResourceCurrentState currentState) {
     boolean isEnabled = (idealState != null) ? idealState.isEnabled() : true;
     StateModelDefinition stateModelDef =
-        cluster.getStateModelMap().get(config.getStateModelDefId());
+        cluster.getStateModelMap().get(idealState.getStateModelDefId());
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing resource:" + config.getResourceId());
+      LOG.debug("Processing resource:" + idealState.getResourceId());
     }
-    ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId());
-    for (PartitionId partition : config.getPartitionSet()) {
+    ResourceAssignment partitionMapping = new ResourceAssignment(idealState.getResourceId());
+    for (PartitionId partition : idealState.getPartitionIdSet()) {
       Map<ParticipantId, State> currentStateMap =
-          currentState.getCurrentStateMap(config.getResourceId(), partition);
+          currentState.getCurrentStateMap(idealState.getResourceId(), partition);
       Set<ParticipantId> disabledInstancesForPartition =
           ConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(), partition);
       Map<ParticipantId, State> bestStateForPartition =
           ConstraintBasedAssignment.computeCustomizedBestStateForPartition(cluster
-              .getLiveParticipantMap().keySet(), stateModelDef, config.getPreferenceMap(partition),
-              currentStateMap, disabledInstancesForPartition, isEnabled);
+              .getLiveParticipantMap().keySet(), stateModelDef, idealState
+              .getParticipantStateMap(partition), currentStateMap, disabledInstancesForPartition,
+              isEnabled);
       partitionMapping.addReplicaMap(partition, bestStateForPartition);
     }
     return partitionMapping;

http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
index e00e57c..6f5575a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
@@ -24,14 +24,12 @@ import java.util.Set;
 
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
-import org.apache.helix.PropertyKey;
 import org.apache.helix.api.Cluster;
 import org.apache.helix.api.State;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.controller.context.ControllerContextProvider;
-import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
 import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
 import org.apache.helix.controller.stages.ClusterDataCache;
@@ -60,36 +58,29 @@ public class FallbackRebalancer implements HelixRebalancer {
   }
 
   @Override
-  public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
-      ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState) {
+  public ResourceAssignment computeResourceMapping(IdealState idealState,
+      RebalancerConfig rebalancerConfig, ResourceAssignment prevAssignment, Cluster cluster,
+      ResourceCurrentState currentState) {
     // make sure the manager is not null
     if (_helixManager == null) {
       LOG.info("HelixManager is null!");
       return null;
     }
 
-    // get the config
-    PartitionedRebalancerConfig config = PartitionedRebalancerConfig.from(rebalancerConfig);
-    if (config == null) {
-      LOG.info("Resource is not partitioned");
+    // Make sure we have an ideal state
+    if (idealState == null) {
+      LOG.info("No IdealState available");
       return null;
     }
 
-    // get the ideal state and rebalancer class
-    ResourceId resourceId = config.getResourceId();
+    // get the rebalancer class
+    ResourceId resourceId = idealState.getResourceId();
     StateModelDefinition stateModelDef =
-        cluster.getStateModelMap().get(config.getStateModelDefId());
+        cluster.getStateModelMap().get(idealState.getStateModelDefId());
     if (stateModelDef == null) {
       LOG.info("StateModelDefinition unavailable for " + resourceId);
       return null;
     }
-    HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
-    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-    IdealState idealState = accessor.getProperty(keyBuilder.idealStates(resourceId.stringify()));
-    if (idealState == null) {
-      LOG.info("No IdealState available for " + resourceId);
-      return null;
-    }
     String rebalancerClassName = idealState.getRebalancerClassName();
     if (rebalancerClassName == null) {
       LOG.info("No Rebalancer class available for " + resourceId);
@@ -111,6 +102,7 @@ public class FallbackRebalancer implements HelixRebalancer {
     }
 
     // get the cluster data cache (unfortunately involves a second read of the cluster)
+    HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
     ClusterDataCache cache = new ClusterDataCache();
     cache.refresh(accessor);
 

http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
index 4bf030b..c1d32da 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
@@ -9,6 +9,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.helix.HelixConstants.StateModelToken;
 import org.apache.helix.HelixManager;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.api.Cluster;
@@ -17,8 +18,6 @@ import org.apache.helix.api.State;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.controller.context.ControllerContextProvider;
-import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
-import org.apache.helix.controller.rebalancer.config.FullAutoRebalancerConfig;
 import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
 import org.apache.helix.controller.stages.ResourceCurrentState;
@@ -64,28 +63,27 @@ public class FullAutoRebalancer implements HelixRebalancer {
   }
 
   @Override
-  public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
-      ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState) {
-    FullAutoRebalancerConfig config =
-        BasicRebalancerConfig.convert(rebalancerConfig, FullAutoRebalancerConfig.class);
-    IdealState idealState = cluster.getResource(rebalancerConfig.getResourceId()).getIdealState();
+  public ResourceAssignment computeResourceMapping(IdealState idealState,
+      RebalancerConfig rebalancerConfig, ResourceAssignment prevAssignment, Cluster cluster,
+      ResourceCurrentState currentState) {
     boolean isEnabled = (idealState != null) ? idealState.isEnabled() : true;
     StateModelDefinition stateModelDef =
-        cluster.getStateModelMap().get(config.getStateModelDefId());
+        cluster.getStateModelMap().get(idealState.getStateModelDefId());
     // Compute a preference list based on the current ideal state
-    List<PartitionId> partitions = new ArrayList<PartitionId>(config.getPartitionSet());
+    List<PartitionId> partitions = new ArrayList<PartitionId>(idealState.getPartitionIdSet());
     Map<ParticipantId, Participant> liveParticipants = cluster.getLiveParticipantMap();
     Map<ParticipantId, Participant> allParticipants = cluster.getParticipantMap();
     int replicas = -1;
-    if (config.anyLiveParticipant()) {
+    String replicaStr = idealState.getReplicas();
+    if (replicaStr.equals(StateModelToken.ANY_LIVEINSTANCE.toString())) {
       replicas = liveParticipants.size();
     } else {
-      replicas = config.getReplicaCount();
+      replicas = Integer.valueOf(replicaStr);
     }
 
     // count how many replicas should be in each state
     Map<State, String> upperBounds =
-        ConstraintBasedAssignment.stateConstraints(stateModelDef, config.getResourceId(),
+        ConstraintBasedAssignment.stateConstraints(stateModelDef, idealState.getResourceId(),
             cluster.getConfig());
     LinkedHashMap<State, Integer> stateCountMap =
         ConstraintBasedAssignment.stateCount(upperBounds, stateModelDef, liveParticipants.size(),
@@ -99,15 +97,15 @@ public class FullAutoRebalancer implements HelixRebalancer {
 
     // compute the current mapping from the current state
     Map<PartitionId, Map<ParticipantId, State>> currentMapping =
-        currentMapping(config, currentState, stateCountMap);
+        currentMapping(idealState, currentState, stateCountMap);
 
     // If there are nodes tagged with resource, use only those nodes
     // If there are nodes tagged with resource name, use only those nodes
     Set<ParticipantId> taggedNodes = new HashSet<ParticipantId>();
     Set<ParticipantId> taggedLiveNodes = new HashSet<ParticipantId>();
-    if (config.getParticipantGroupTag() != null) {
+    if (idealState.getInstanceGroupTag() != null) {
       for (ParticipantId participantId : allParticipantList) {
-        if (cluster.getParticipantMap().get(participantId).hasTag(config.getParticipantGroupTag())) {
+        if (cluster.getParticipantMap().get(participantId).hasTag(idealState.getInstanceGroupTag())) {
           taggedNodes.add(participantId);
           if (liveParticipants.containsKey(participantId)) {
             taggedLiveNodes.add(participantId);
@@ -117,24 +115,24 @@ public class FullAutoRebalancer implements HelixRebalancer {
       if (!taggedLiveNodes.isEmpty()) {
         // live nodes exist that have this tag
         if (LOG.isDebugEnabled()) {
-          LOG.debug("found the following participants with tag " + config.getParticipantGroupTag()
-              + " for " + config.getResourceId() + ": " + taggedLiveNodes);
+          LOG.debug("found the following participants with tag " + idealState.getInstanceGroupTag()
+              + " for " + idealState.getResourceId() + ": " + taggedLiveNodes);
         }
       } else if (taggedNodes.isEmpty()) {
         // no live nodes and no configured nodes have this tag
-        LOG.warn("Resource " + config.getResourceId() + " has tag "
-            + config.getParticipantGroupTag() + " but no configured participants have this tag");
+        LOG.warn("Resource " + idealState.getResourceId() + " has tag "
+            + idealState.getInstanceGroupTag() + " but no configured participants have this tag");
       } else {
         // configured nodes have this tag, but no live nodes have this tag
-        LOG.warn("Resource " + config.getResourceId() + " has tag "
-            + config.getParticipantGroupTag() + " but no live participants have this tag");
+        LOG.warn("Resource " + idealState.getResourceId() + " has tag "
+            + idealState.getInstanceGroupTag() + " but no live participants have this tag");
       }
       allParticipantList = new ArrayList<ParticipantId>(taggedNodes);
       liveParticipantList = new ArrayList<ParticipantId>(taggedLiveNodes);
     }
 
     // determine which nodes the replicas should live on
-    int maxPartition = config.getMaxPartitionsPerParticipant();
+    int maxPartition = idealState.getMaxPartitionsPerInstance();
     if (LOG.isDebugEnabled()) {
       LOG.debug("currentMapping: " + currentMapping);
       LOG.debug("stateCountMap: " + stateCountMap);
@@ -144,8 +142,8 @@ public class FullAutoRebalancer implements HelixRebalancer {
     }
     ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
     _algorithm =
-        new AutoRebalanceStrategy(config.getResourceId(), partitions, stateCountMap, maxPartition,
-            placementScheme);
+        new AutoRebalanceStrategy(idealState.getResourceId(), partitions, stateCountMap,
+            maxPartition, placementScheme);
     ZNRecord newMapping =
         _algorithm.typedComputePartitionAssignment(liveParticipantList, currentMapping,
             allParticipantList);
@@ -156,9 +154,9 @@ public class FullAutoRebalancer implements HelixRebalancer {
 
     // compute a full partition mapping for the resource
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing resource:" + config.getResourceId());
+      LOG.debug("Processing resource:" + idealState.getResourceId());
     }
-    ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId());
+    ResourceAssignment partitionMapping = new ResourceAssignment(idealState.getResourceId());
     for (PartitionId partition : partitions) {
       Set<ParticipantId> disabledParticipantsForPartition =
           ConstraintBasedAssignment.getDisabledParticipants(allParticipants, partition);
@@ -178,22 +176,21 @@ public class FullAutoRebalancer implements HelixRebalancer {
       Map<ParticipantId, State> bestStateForPartition =
           ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds,
               liveParticipants.keySet(), stateModelDef, preferenceList,
-              currentState.getCurrentStateMap(config.getResourceId(), partition),
+              currentState.getCurrentStateMap(idealState.getResourceId(), partition),
               disabledParticipantsForPartition, isEnabled);
       partitionMapping.addReplicaMap(partition, bestStateForPartition);
     }
     return partitionMapping;
   }
 
-  private Map<PartitionId, Map<ParticipantId, State>> currentMapping(
-      FullAutoRebalancerConfig config, ResourceCurrentState currentStateOutput,
-      Map<State, Integer> stateCountMap) {
+  private Map<PartitionId, Map<ParticipantId, State>> currentMapping(IdealState idealState,
+      ResourceCurrentState currentStateOutput, Map<State, Integer> stateCountMap) {
     Map<PartitionId, Map<ParticipantId, State>> map =
         new HashMap<PartitionId, Map<ParticipantId, State>>();
 
-    for (PartitionId partition : config.getPartitionSet()) {
+    for (PartitionId partition : idealState.getPartitionIdSet()) {
       Map<ParticipantId, State> curStateMap =
-          currentStateOutput.getCurrentStateMap(config.getResourceId(), partition);
+          currentStateOutput.getCurrentStateMap(idealState.getResourceId(), partition);
       map.put(partition, new HashMap<ParticipantId, State>());
       for (ParticipantId node : curStateMap.keySet()) {
         State state = curStateMap.get(node);
@@ -203,7 +200,7 @@ public class FullAutoRebalancer implements HelixRebalancer {
       }
 
       Map<ParticipantId, State> pendingStateMap =
-          currentStateOutput.getPendingStateMap(config.getResourceId(), partition);
+          currentStateOutput.getPendingStateMap(idealState.getResourceId(), partition);
       for (ParticipantId node : pendingStateMap.keySet()) {
         State state = pendingStateMap.get(node);
         if (stateCountMap.containsKey(state)) {

http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/main/java/org/apache/helix/controller/rebalancer/HelixRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/HelixRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/HelixRebalancer.java
index 1fbb02f..9de571d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/HelixRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/HelixRebalancer.java
@@ -5,6 +5,7 @@ import org.apache.helix.api.Cluster;
 import org.apache.helix.controller.context.ControllerContextProvider;
 import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.controller.stages.ResourceCurrentState;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.ResourceAssignment;
 
 /*
@@ -58,11 +59,13 @@ public interface HelixRebalancer {
    * MyRebalancerConfig config = BasicRebalancerConfig.convert(rebalancerConfig,
    *     MyRebalancerConfig.class);
    * </pre>
+   * @param idealState the ideal state that defines how a resource should be rebalanced
    * @param rebalancerConfig the properties of the resource for which a mapping will be computed
    * @param prevAssignment the previous ResourceAssignment of this cluster, or null if none
    * @param cluster complete snapshot of the cluster
    * @param currentState the current states of all partitions
    */
-  public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
-      ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState);
+  public ResourceAssignment computeResourceMapping(IdealState idealState,
+      RebalancerConfig rebalancerConfig, ResourceAssignment prevAssignment, Cluster cluster,
+      ResourceCurrentState currentState);
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
index 07f6337..fbf06d2 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
@@ -10,9 +10,7 @@ import org.apache.helix.api.State;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.controller.context.ControllerContextProvider;
-import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
 import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.config.SemiAutoRebalancerConfig;
 import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
 import org.apache.helix.controller.stages.ResourceCurrentState;
 import org.apache.helix.model.IdealState;
@@ -52,29 +50,27 @@ public class SemiAutoRebalancer implements HelixRebalancer {
   }
 
   @Override
-  public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
-      ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState) {
-    SemiAutoRebalancerConfig config =
-        BasicRebalancerConfig.convert(rebalancerConfig, SemiAutoRebalancerConfig.class);
-    IdealState idealState = cluster.getResource(rebalancerConfig.getResourceId()).getIdealState();
+  public ResourceAssignment computeResourceMapping(IdealState idealState,
+      RebalancerConfig rebalancerConfig, ResourceAssignment prevAssignment, Cluster cluster,
+      ResourceCurrentState currentState) {
     boolean isEnabled = (idealState != null) ? idealState.isEnabled() : true;
     StateModelDefinition stateModelDef =
-        cluster.getStateModelMap().get(config.getStateModelDefId());
+        cluster.getStateModelMap().get(idealState.getStateModelDefId());
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing resource:" + config.getResourceId());
+      LOG.debug("Processing resource:" + idealState.getResourceId());
     }
-    ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId());
+    ResourceAssignment partitionMapping = new ResourceAssignment(idealState.getResourceId());
 
-    for (PartitionId partition : config.getPartitionSet()) {
+    for (PartitionId partition : idealState.getPartitionIdSet()) {
       Map<ParticipantId, State> currentStateMap =
-          currentState.getCurrentStateMap(config.getResourceId(), partition);
+          currentState.getCurrentStateMap(idealState.getResourceId(), partition);
       Set<ParticipantId> disabledInstancesForPartition =
           ConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(), partition);
       List<ParticipantId> preferenceList =
           ConstraintBasedAssignment.getPreferenceList(cluster, partition,
-              config.getPreferenceList(partition));
+              idealState.getPreferenceList(partition));
       Map<State, String> upperBounds =
-          ConstraintBasedAssignment.stateConstraints(stateModelDef, config.getResourceId(),
+          ConstraintBasedAssignment.stateConstraints(stateModelDef, idealState.getResourceId(),
               cluster.getConfig());
 
       Map<ParticipantId, State> bestStateForPartition =


Mime
View raw message