helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject [4/6] [HELIX-238] Refactor, add update to accessors, test update logic
Date Thu, 26 Sep 2013 23:05:13 GMT
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c070a765/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
new file mode 100644
index 0000000..8e07d97
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -0,0 +1,553 @@
+package org.apache.helix.api.accessor;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Controller;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.config.ClusterConfig;
+import org.apache.helix.api.config.ParticipantConfig;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.config.UserConfig;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ControllerId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.SessionId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.model.ClusterConfiguration;
+import org.apache.helix.model.ClusterConstraints;
+import org.apache.helix.model.ClusterConstraints.ConstraintType;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.PauseSignal;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.ResourceConfiguration;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+public class ClusterAccessor {
+  private static Logger LOG = Logger.getLogger(ClusterAccessor.class);
+
+  private final HelixDataAccessor _accessor;
+  private final PropertyKey.Builder _keyBuilder;
+  private final ClusterId _clusterId;
+
+  public ClusterAccessor(ClusterId clusterId, HelixDataAccessor accessor) {
+    _accessor = accessor;
+    _keyBuilder = accessor.keyBuilder();
+    _clusterId = clusterId;
+  }
+
+  /**
+   * create a new cluster, fail if it already exists
+   * @return true if created, false if creation failed
+   */
+  public boolean createCluster(ClusterConfig cluster) {
+    boolean created = _accessor.createProperty(_keyBuilder.cluster(), null);
+    if (!created) {
+      LOG.error("Cluster already created. Aborting.");
+      return false;
+    }
+    initClusterStructure();
+    Map<StateModelDefId, StateModelDefinition> stateModelDefs = cluster.getStateModelMap();
+    for (StateModelDefinition stateModelDef : stateModelDefs.values()) {
+      addStateModelDefinitionToCluster(stateModelDef);
+    }
+    Map<ResourceId, ResourceConfig> resources = cluster.getResourceMap();
+    for (ResourceConfig resource : resources.values()) {
+      addResourceToCluster(resource);
+    }
+    Map<ParticipantId, ParticipantConfig> participants = cluster.getParticipantMap();
+    for (ParticipantConfig participant : participants.values()) {
+      addParticipantToCluster(participant);
+    }
+    _accessor.createProperty(_keyBuilder.constraints(), null);
+    for (ClusterConstraints constraints : cluster.getConstraintMap().values()) {
+      _accessor.createProperty(_keyBuilder.constraint(constraints.getType().toString()),
+          constraints);
+    }
+    _accessor.createProperty(_keyBuilder.clusterConfig(),
+        ClusterConfiguration.from(cluster.getUserConfig()));
+    if (cluster.isPaused()) {
+      pauseCluster();
+    }
+
+    return true;
+  }
+
+  /**
+   * Update the cluster configuration
+   * @param clusterDelta change to the cluster configuration
+   * @return updated ClusterConfig, or null if there was an error
+   */
+  public ClusterConfig updateCluster(ClusterConfig.Delta clusterDelta) {
+    Cluster cluster = readCluster();
+    if (cluster == null) {
+      LOG.error("Cluster does not exist, cannot be updated");
+      return null;
+    }
+    ClusterConfig config = clusterDelta.mergeInto(cluster.getConfig());
+    boolean status = setBasicClusterConfig(config);
+    return status ? config : null;
+  }
+
+  /**
+   * Set a cluster config minus state model, participants, and resources
+   * @param config ClusterConfig
+   * @return true if correctly set, false otherwise
+   */
+  private boolean setBasicClusterConfig(ClusterConfig config) {
+    if (config == null) {
+      return false;
+    }
+    ClusterConfiguration configuration = ClusterConfiguration.from(config.getUserConfig());
+    _accessor.setProperty(_keyBuilder.clusterConfig(), configuration);
+    Map<ConstraintType, ClusterConstraints> constraints = config.getConstraintMap();
+    for (ConstraintType type : constraints.keySet()) {
+      ClusterConstraints constraint = constraints.get(type);
+      _accessor.setProperty(_keyBuilder.constraint(type.toString()), constraint);
+    }
+    return true;
+  }
+
+  /**
+   * drop a cluster
+   * @return true if the cluster was dropped, false if there was an error
+   */
+  public boolean dropCluster() {
+    LOG.info("Dropping cluster: " + _clusterId);
+    List<String> liveInstanceNames = _accessor.getChildNames(_keyBuilder.liveInstances());
+    if (liveInstanceNames.size() > 0) {
+      LOG.error("Can't drop cluster: " + _clusterId + " because there are running participant: "
+          + liveInstanceNames + ", shutdown participants first.");
+      return false;
+    }
+
+    LiveInstance leader = _accessor.getProperty(_keyBuilder.controllerLeader());
+    if (leader != null) {
+      LOG.error("Can't drop cluster: " + _clusterId + ", because leader: " + leader.getId()
+          + " are running, shutdown leader first.");
+      return false;
+    }
+
+    return _accessor.removeProperty(_keyBuilder.cluster());
+  }
+
+  /**
+   * read entire cluster data
+   * @return cluster snapshot
+   */
+  public Cluster readCluster() {
+    /**
+     * map of instance-id to instance-config
+     */
+    Map<String, InstanceConfig> instanceConfigMap =
+        _accessor.getChildValuesMap(_keyBuilder.instanceConfigs());
+
+    /**
+     * map of resource-id to ideal-state
+     */
+    Map<String, IdealState> idealStateMap = _accessor.getChildValuesMap(_keyBuilder.idealStates());
+
+    /**
+     * map of instance-id to live-instance
+     */
+    Map<String, LiveInstance> liveInstanceMap =
+        _accessor.getChildValuesMap(_keyBuilder.liveInstances());
+
+    /**
+     * map of participant-id to map of message-id to message
+     */
+    Map<String, Map<String, Message>> messageMap = new HashMap<String, Map<String, Message>>();
+    for (String instanceName : liveInstanceMap.keySet()) {
+      Map<String, Message> instanceMsgMap =
+          _accessor.getChildValuesMap(_keyBuilder.messages(instanceName));
+      messageMap.put(instanceName, instanceMsgMap);
+    }
+
+    /**
+     * map of participant-id to map of resource-id to current-state
+     */
+    Map<String, Map<String, CurrentState>> currentStateMap =
+        new HashMap<String, Map<String, CurrentState>>();
+    for (String participantName : liveInstanceMap.keySet()) {
+      LiveInstance liveInstance = liveInstanceMap.get(participantName);
+      SessionId sessionId = liveInstance.getSessionId();
+      Map<String, CurrentState> instanceCurStateMap =
+          _accessor.getChildValuesMap(_keyBuilder.currentStates(participantName,
+              sessionId.stringify()));
+
+      currentStateMap.put(participantName, instanceCurStateMap);
+    }
+
+    LiveInstance leader = _accessor.getProperty(_keyBuilder.controllerLeader());
+
+    /**
+     * map of constraint-type to constraints
+     */
+    Map<String, ClusterConstraints> constraintMap =
+        _accessor.getChildValuesMap(_keyBuilder.constraints());
+
+    /**
+     * Map of resource id to external view
+     */
+    Map<String, ExternalView> externalViewMap =
+        _accessor.getChildValuesMap(_keyBuilder.externalViews());
+
+    /**
+     * Map of resource id to user configuration
+     */
+    Map<String, ResourceConfiguration> resourceConfigMap =
+        _accessor.getChildValuesMap(_keyBuilder.resourceConfigs());
+
+    /**
+     * Map of resource id to resource assignment
+     */
+    Map<String, ResourceAssignment> resourceAssignmentMap =
+        _accessor.getChildValuesMap(_keyBuilder.resourceAssignments());
+
+    // read all the resources
+    Map<ResourceId, Resource> resourceMap = new HashMap<ResourceId, Resource>();
+    for (String resourceName : idealStateMap.keySet()) {
+      ResourceId resourceId = ResourceId.from(resourceName);
+      resourceMap.put(resourceId, ResourceAccessor.createResource(resourceId,
+          resourceConfigMap.get(resourceName), idealStateMap.get(resourceName),
+          externalViewMap.get(resourceName), resourceAssignmentMap.get(resourceName)));
+    }
+
+    // read all the participants
+    Map<ParticipantId, Participant> participantMap = new HashMap<ParticipantId, Participant>();
+    for (String participantName : instanceConfigMap.keySet()) {
+      InstanceConfig instanceConfig = instanceConfigMap.get(participantName);
+      UserConfig userConfig = UserConfig.from(instanceConfig);
+      LiveInstance liveInstance = liveInstanceMap.get(participantName);
+      Map<String, Message> instanceMsgMap = messageMap.get(participantName);
+
+      ParticipantId participantId = ParticipantId.from(participantName);
+
+      participantMap.put(participantId, ParticipantAccessor.createParticipant(participantId,
+          instanceConfig, userConfig, liveInstance, instanceMsgMap,
+          currentStateMap.get(participantName)));
+    }
+
+    // read the controllers
+    Map<ControllerId, Controller> controllerMap = new HashMap<ControllerId, Controller>();
+    ControllerId leaderId = null;
+    if (leader != null) {
+      leaderId = ControllerId.from(leader.getId());
+      controllerMap.put(leaderId, new Controller(leaderId, leader, true));
+    }
+
+    // read the constraints
+    Map<ConstraintType, ClusterConstraints> clusterConstraintMap =
+        new HashMap<ConstraintType, ClusterConstraints>();
+    for (String constraintType : constraintMap.keySet()) {
+      clusterConstraintMap.put(ConstraintType.valueOf(constraintType),
+          constraintMap.get(constraintType));
+    }
+
+    // read the pause status
+    PauseSignal pauseSignal = _accessor.getProperty(_keyBuilder.pause());
+    boolean isPaused = pauseSignal != null;
+
+    ClusterConfiguration clusterUserConfig = _accessor.getProperty(_keyBuilder.clusterConfig());
+    UserConfig userConfig;
+    if (clusterUserConfig != null) {
+      userConfig = UserConfig.from(clusterUserConfig);
+    } else {
+      userConfig = new UserConfig(Scope.cluster(_clusterId));
+    }
+
+    // read the state model definitions
+    StateModelDefinitionAccessor stateModelDefAccessor =
+        new StateModelDefinitionAccessor(_accessor);
+    Map<StateModelDefId, StateModelDefinition> stateModelMap =
+        stateModelDefAccessor.readStateModelDefinitions();
+
+    // create the cluster snapshot object
+    return new Cluster(_clusterId, resourceMap, participantMap, controllerMap, leaderId,
+        clusterConstraintMap, stateModelMap, userConfig, isPaused);
+  }
+
+  /**
+   * pause controller of cluster
+   */
+  public void pauseCluster() {
+    _accessor.createProperty(_keyBuilder.pause(), new PauseSignal("pause"));
+  }
+
+  /**
+   * resume controller of cluster
+   */
+  public void resumeCluster() {
+    _accessor.removeProperty(_keyBuilder.pause());
+  }
+
+  /**
+   * add a resource to cluster
+   * @param resource
+   * @return true if resource added, false if there was an error
+   */
+  public boolean addResourceToCluster(ResourceConfig resource) {
+    if (resource == null || resource.getRebalancerConfig() == null) {
+      LOG.error("Resource not fully defined with a rebalancer context");
+      return false;
+    }
+
+    if (!isClusterStructureValid()) {
+      LOG.error("Cluster: " + _clusterId + " structure is not valid");
+      return false;
+    }
+    RebalancerContext context =
+        resource.getRebalancerConfig().getRebalancerContext(RebalancerContext.class);
+    StateModelDefId stateModelDefId = context.getStateModelDefId();
+    if (_accessor.getProperty(_keyBuilder.stateModelDef(stateModelDefId.stringify())) == null) {
+      LOG.error("State model: " + stateModelDefId + " not found in cluster: " + _clusterId);
+      return false;
+    }
+
+    ResourceId resourceId = resource.getId();
+    if (_accessor.getProperty(_keyBuilder.idealState(resourceId.stringify())) != null) {
+      LOG.error("Skip adding resource: " + resourceId
+          + ", because resource ideal state already exists in cluster: " + _clusterId);
+      return false;
+    }
+    if (_accessor.getProperty(_keyBuilder.resourceConfig(resourceId.stringify())) != null) {
+      LOG.error("Skip adding resource: " + resourceId
+          + ", because resource config already exists in cluster: " + _clusterId);
+      return false;
+    }
+
+    // Add resource user config
+    if (resource.getUserConfig() != null) {
+      ResourceConfiguration configuration = new ResourceConfiguration(resourceId);
+      configuration.setType(resource.getType());
+      configuration.addNamespacedConfig(resource.getUserConfig());
+      configuration.addNamespacedConfig(resource.getRebalancerConfig().toNamespacedConfig());
+      configuration.setBucketSize(resource.getBucketSize());
+      configuration.setBatchMessageMode(resource.getBatchMessageMode());
+      _accessor.createProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
+    }
+
+    // Create an IdealState from a RebalancerConfig (if the resource is partitioned)
+    RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
+    IdealState idealState =
+        ResourceAccessor.rebalancerConfigToIdealState(rebalancerConfig, resource.getBucketSize(),
+            resource.getBatchMessageMode());
+    if (idealState != null) {
+      _accessor.createProperty(_keyBuilder.idealState(resourceId.stringify()), idealState);
+    }
+    return true;
+  }
+
+  /**
+   * drop a resource from cluster
+   * @param resourceId
+   * @return true if removal succeeded, false otherwise
+   */
+  public boolean dropResourceFromCluster(ResourceId resourceId) {
+    if (_accessor.getProperty(_keyBuilder.idealState(resourceId.stringify())) == null) {
+      LOG.error("Skip removing resource: " + resourceId
+          + ", because resource ideal state already removed from cluster: " + _clusterId);
+      return false;
+    }
+    _accessor.removeProperty(_keyBuilder.idealState(resourceId.stringify()));
+    _accessor.removeProperty(_keyBuilder.resourceConfig(resourceId.stringify()));
+    return true;
+  }
+
+  /**
+   * check if cluster structure is valid
+   * @return true if valid or false otherwise
+   */
+  public boolean isClusterStructureValid() {
+    List<String> paths = getRequiredPaths();
+    BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
+    if (baseAccessor != null) {
+      boolean[] existsResults = baseAccessor.exists(paths, 0);
+      for (boolean exists : existsResults) {
+        if (!exists) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Create empty persistent properties to ensure that there is a valid cluster structure
+   */
+  private void initClusterStructure() {
+    BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
+    List<String> paths = getRequiredPaths();
+    for (String path : paths) {
+      boolean status = baseAccessor.create(path, null, AccessOption.PERSISTENT);
+      if (!status && LOG.isDebugEnabled()) {
+        LOG.debug(path + " already exists");
+      }
+    }
+  }
+
+  /**
+   * Get all property paths that must be set for a cluster structure to be valid
+   * @return list of paths as strings
+   */
+  private List<String> getRequiredPaths() {
+    List<String> paths = new ArrayList<String>();
+    paths.add(_keyBuilder.cluster().getPath());
+    paths.add(_keyBuilder.idealStates().getPath());
+    paths.add(_keyBuilder.clusterConfigs().getPath());
+    paths.add(_keyBuilder.instanceConfigs().getPath());
+    paths.add(_keyBuilder.resourceConfigs().getPath());
+    paths.add(_keyBuilder.propertyStore().getPath());
+    paths.add(_keyBuilder.liveInstances().getPath());
+    paths.add(_keyBuilder.instances().getPath());
+    paths.add(_keyBuilder.externalViews().getPath());
+    paths.add(_keyBuilder.controller().getPath());
+    paths.add(_keyBuilder.stateModelDefs().getPath());
+    paths.add(_keyBuilder.controllerMessages().getPath());
+    paths.add(_keyBuilder.controllerTaskErrors().getPath());
+    paths.add(_keyBuilder.controllerTaskStatuses().getPath());
+    paths.add(_keyBuilder.controllerLeaderHistory().getPath());
+    return paths;
+  }
+
+  /**
+   * add a participant to cluster
+   * @param participant
+   * @return true if participant added, false otherwise
+   */
+  public boolean addParticipantToCluster(ParticipantConfig participant) {
+    if (participant == null) {
+      LOG.error("Participant not initialized");
+      return false;
+    }
+    if (!isClusterStructureValid()) {
+      LOG.error("Cluster: " + _clusterId + " structure is not valid");
+      return false;
+    }
+
+    ParticipantId participantId = participant.getId();
+    if (_accessor.getProperty(_keyBuilder.instanceConfig(participantId.stringify())) != null) {
+      LOG.error("Config for participant: " + participantId + " already exists in cluster: "
+          + _clusterId);
+      return false;
+    }
+
+    // add empty root ZNodes
+    List<PropertyKey> createKeys = new ArrayList<PropertyKey>();
+    createKeys.add(_keyBuilder.messages(participantId.stringify()));
+    createKeys.add(_keyBuilder.currentStates(participantId.stringify()));
+    createKeys.add(_keyBuilder.participantErrors(participantId.stringify()));
+    createKeys.add(_keyBuilder.statusUpdates(participantId.stringify()));
+    for (PropertyKey key : createKeys) {
+      _accessor.createProperty(key, null);
+    }
+
+    // add the config
+    InstanceConfig instanceConfig = new InstanceConfig(participant.getId());
+    instanceConfig.setHostName(participant.getHostName());
+    instanceConfig.setPort(Integer.toString(participant.getPort()));
+    instanceConfig.setInstanceEnabled(participant.isEnabled());
+    UserConfig userConfig = participant.getUserConfig();
+    instanceConfig.addNamespacedConfig(userConfig);
+    Set<String> tags = participant.getTags();
+    for (String tag : tags) {
+      instanceConfig.addTag(tag);
+    }
+    Set<PartitionId> disabledPartitions = participant.getDisabledPartitions();
+    for (PartitionId partitionId : disabledPartitions) {
+      instanceConfig.setInstanceEnabledForPartition(partitionId, false);
+    }
+    _accessor.createProperty(_keyBuilder.instanceConfig(participantId.stringify()), instanceConfig);
+    _accessor.createProperty(_keyBuilder.messages(participantId.stringify()), null);
+    return true;
+  }
+
+  /**
+   * drop a participant from cluster
+   * @param participantId
+   * @return true if participant dropped, false if there was an error
+   */
+  public boolean dropParticipantFromCluster(ParticipantId participantId) {
+    if (_accessor.getProperty(_keyBuilder.instanceConfig(participantId.stringify())) == null) {
+      LOG.error("Config for participant: " + participantId + " does NOT exist in cluster: "
+          + _clusterId);
+      return false;
+    }
+
+    if (_accessor.getProperty(_keyBuilder.instance(participantId.stringify())) == null) {
+      LOG.error("Participant: " + participantId + " structure does NOT exist in cluster: "
+          + _clusterId);
+      return false;
+    }
+
+    // delete participant config path
+    _accessor.removeProperty(_keyBuilder.instanceConfig(participantId.stringify()));
+
+    // delete participant path
+    _accessor.removeProperty(_keyBuilder.instance(participantId.stringify()));
+    return true;
+  }
+
+  /**
+   * Add a state model definition. Updates the existing state model definition if it already exists.
+   * @param stateModelDef fully initialized state model definition
+   * @return true if the model is persisted, false otherwise
+   */
+  public boolean addStateModelDefinitionToCluster(StateModelDefinition stateModelDef) {
+    if (!isClusterStructureValid()) {
+      LOG.error("Cluster: " + _clusterId + " structure is not valid");
+      return false;
+    }
+
+    StateModelDefinitionAccessor smdAccessor = new StateModelDefinitionAccessor(_accessor);
+    return smdAccessor.setStateModelDefinition(stateModelDef);
+  }
+
+  /**
+   * Remove a state model definition if it exists
+   * @param stateModelDefId state model definition id
+   * @return true if removed, false if it did not exist
+   */
+  public boolean dropStateModelDefinitionFromCluster(StateModelDefId stateModelDefId) {
+    return _accessor.removeProperty(_keyBuilder.stateModelDef(stateModelDefId.stringify()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c070a765/helix-core/src/main/java/org/apache/helix/api/accessor/ControllerAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ControllerAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ControllerAccessor.java
new file mode 100644
index 0000000..609e458
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ControllerAccessor.java
@@ -0,0 +1,49 @@
+package org.apache.helix.api.accessor;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.api.Controller;
+import org.apache.helix.api.id.ControllerId;
+import org.apache.helix.model.LiveInstance;
+
+public class ControllerAccessor {
+  private final HelixDataAccessor _accessor;
+  private final PropertyKey.Builder _keyBuilder;
+
+  public ControllerAccessor(HelixDataAccessor accessor) {
+    _accessor = accessor;
+    _keyBuilder = accessor.keyBuilder();
+  }
+
+  /**
+   * Read the leader controller if it is live
+   * @return Controller snapshot, or null
+   */
+  public Controller readLeader() {
+    LiveInstance leader = _accessor.getProperty(_keyBuilder.controllerLeader());
+    if (leader != null) {
+      ControllerId leaderId = ControllerId.from(leader.getId());
+      return new Controller(leaderId, leader, true);
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c070a765/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
new file mode 100644
index 0000000..c1a9250
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
@@ -0,0 +1,435 @@
+package org.apache.helix.api.accessor;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.RunningInstance;
+import org.apache.helix.api.config.ParticipantConfig;
+import org.apache.helix.api.config.UserConfig;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.MessageId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.SessionId;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.InstanceConfig.InstanceConfigProperty;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.log4j.Logger;
+
+public class ParticipantAccessor {
+  private static final Logger LOG = Logger.getLogger(ParticipantAccessor.class);
+
+  private final HelixDataAccessor _accessor;
+  private final PropertyKey.Builder _keyBuilder;
+  private final ClusterId _clusterId;
+
+  public ParticipantAccessor(ClusterId clusterId, HelixDataAccessor accessor) {
+    _clusterId = clusterId;
+    _accessor = accessor;
+    _keyBuilder = accessor.keyBuilder();
+  }
+
+  /**
+   * enable/disable a participant
+   * @param participantId
+   * @param isEnabled
+   */
+  void enableParticipant(ParticipantId participantId, boolean isEnabled) {
+    String participantName = participantId.stringify();
+    if (_accessor.getProperty(_keyBuilder.instanceConfig(participantName)) == null) {
+      LOG.error("Config for participant: " + participantId + " does NOT exist in cluster: "
+          + _clusterId);
+      return;
+    }
+
+    InstanceConfig config = new InstanceConfig(participantName);
+    config.setInstanceEnabled(isEnabled);
+    _accessor.updateProperty(_keyBuilder.instanceConfig(participantName), config);
+
+  }
+
+  /**
+   * disable participant
+   * @param participantId
+   */
+  public void disableParticipant(ParticipantId participantId) {
+    enableParticipant(participantId, false);
+  }
+
+  /**
+   * enable participant
+   * @param participantId
+   */
+  public void enableParticipant(ParticipantId participantId) {
+    enableParticipant(participantId, true);
+  }
+
+  /**
+   * create messages for participant
+   * @param participantId
+   * @param msgMap map of message-id to message
+   */
+  public void insertMessagesToParticipant(ParticipantId participantId,
+      Map<MessageId, Message> msgMap) {
+    List<PropertyKey> msgKeys = new ArrayList<PropertyKey>();
+    List<Message> msgs = new ArrayList<Message>();
+    for (MessageId msgId : msgMap.keySet()) {
+      msgKeys.add(_keyBuilder.message(participantId.stringify(), msgId.stringify()));
+      msgs.add(msgMap.get(msgId));
+    }
+
+    _accessor.createChildren(msgKeys, msgs);
+  }
+
+  /**
+   * set messages of participant
+   * @param participantId
+   * @param msgMap map of message-id to message
+   */
+  public void updateMessageStatus(ParticipantId participantId, Map<MessageId, Message> msgMap) {
+    String participantName = participantId.stringify();
+    List<PropertyKey> msgKeys = new ArrayList<PropertyKey>();
+    List<Message> msgs = new ArrayList<Message>();
+    for (MessageId msgId : msgMap.keySet()) {
+      msgKeys.add(_keyBuilder.message(participantName, msgId.stringify()));
+      msgs.add(msgMap.get(msgId));
+    }
+    _accessor.setChildren(msgKeys, msgs);
+  }
+
+  /**
+   * delete messages from participant
+   * @param participantId
+   * @param msgIdSet
+   */
+  public void deleteMessagesFromParticipant(ParticipantId participantId, Set<MessageId> msgIdSet) {
+    String participantName = participantId.stringify();
+    List<PropertyKey> msgKeys = new ArrayList<PropertyKey>();
+    for (MessageId msgId : msgIdSet) {
+      msgKeys.add(_keyBuilder.message(participantName, msgId.stringify()));
+    }
+
+    // TODO impl batch remove
+    for (PropertyKey msgKey : msgKeys) {
+      _accessor.removeProperty(msgKey);
+    }
+  }
+
+  /**
+   * enable/disable partitions on a participant
+   * @param enabled
+   * @param participantId
+   * @param resourceId
+   * @param partitionIdSet
+   */
+  void enablePartitionsForParticipant(final boolean enabled, final ParticipantId participantId,
+      final ResourceId resourceId, final Set<PartitionId> partitionIdSet) {
+    String participantName = participantId.stringify();
+    String resourceName = resourceId.stringify();
+
+    // check instanceConfig exists
+    PropertyKey instanceConfigKey = _keyBuilder.instanceConfig(participantName);
+    if (_accessor.getProperty(instanceConfigKey) == null) {
+      LOG.error("Config for participant: " + participantId + " does NOT exist in cluster: "
+          + _clusterId);
+      return;
+    }
+
+    // check resource exist. warn if not
+    IdealState idealState = _accessor.getProperty(_keyBuilder.idealState(resourceName));
+    if (idealState == null) {
+      LOG.warn("Disable partitions: " + partitionIdSet + " but Cluster: " + _clusterId
+          + ", resource: " + resourceId
+          + " does NOT exists. probably disable it during ERROR->DROPPED transtition");
+
+    } else {
+      // check partitions exist. warn if not
+      for (PartitionId partitionId : partitionIdSet) {
+        if ((idealState.getRebalanceMode() == RebalanceMode.SEMI_AUTO && idealState
+            .getPreferenceList(partitionId) == null)
+            || (idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED && idealState
+                .getParticipantStateMap(partitionId) == null)) {
+          LOG.warn("Cluster: " + _clusterId + ", resource: " + resourceId + ", partition: "
+              + partitionId + ", partition does NOT exist in ideal state");
+        }
+      }
+    }
+
+    // TODO merge list logic should go to znrecord updater
+    // update participantConfig
+    // could not use ZNRecordUpdater since it doesn't do listField merge/subtract
+    BaseDataAccessor<ZNRecord> baseAccessor = _accessor.getBaseDataAccessor();
+    final List<String> partitionNames = new ArrayList<String>();
+    for (PartitionId partitionId : partitionIdSet) {
+      partitionNames.add(partitionId.stringify());
+    }
+
+    baseAccessor.update(instanceConfigKey.getPath(), new DataUpdater<ZNRecord>() {
+      @Override
+      public ZNRecord update(ZNRecord currentData) {
+        if (currentData == null) {
+          throw new HelixException("Cluster: " + _clusterId + ", instance: " + participantId
+              + ", participant config is null");
+        }
+
+        // TODO: merge with InstanceConfig.setInstanceEnabledForPartition
+        List<String> list =
+            currentData.getListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString());
+        Set<String> disabledPartitions = new HashSet<String>();
+        if (list != null) {
+          disabledPartitions.addAll(list);
+        }
+
+        if (enabled) {
+          disabledPartitions.removeAll(partitionNames);
+        } else {
+          disabledPartitions.addAll(partitionNames);
+        }
+
+        list = new ArrayList<String>(disabledPartitions);
+        Collections.sort(list);
+        currentData.setListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString(), list);
+        return currentData;
+      }
+    }, AccessOption.PERSISTENT);
+  }
+
+  /**
+   * disable partitions on a participant
+   * @param participantId
+   * @param resourceId
+   * @param disablePartitionIdSet
+   */
+  public void disablePartitionsForParticipant(ParticipantId participantId, ResourceId resourceId,
+      Set<PartitionId> disablePartitionIdSet) {
+    enablePartitionsForParticipant(false, participantId, resourceId, disablePartitionIdSet);
+  }
+
+  /**
+   * enable partitions on a participant
+   * @param participantId
+   * @param resourceId
+   * @param enablePartitionIdSet
+   */
+  public void enablePartitionsForParticipant(ParticipantId participantId, ResourceId resourceId,
+      Set<PartitionId> enablePartitionIdSet) {
+    enablePartitionsForParticipant(true, participantId, resourceId, enablePartitionIdSet);
+  }
+
+  /**
+   * reset partitions on a participant
+   * @param participantId
+   * @param resourceId
+   * @param resetPartitionIdSet
+   */
+  public void resetPartitionsForParticipant(ParticipantId participantId, ResourceId resourceId,
+      Set<PartitionId> resetPartitionIdSet) {
+    // TODO impl this
+  }
+
+  /**
+   * Update a participant configuration
+   * @param participantId the participant to update
+   * @param participantDelta changes to the participant
+   * @return ParticipantConfig, or null if participant is not persisted
+   */
+  public ParticipantConfig updateParticipant(ParticipantId participantId,
+      ParticipantConfig.Delta participantDelta) {
+    Participant participant = readParticipant(participantId);
+    if (participant == null) {
+      LOG.error("Participant " + participantId + " does not exist, cannot be updated");
+      return null;
+    }
+    ParticipantConfig config = participantDelta.mergeInto(participant.getConfig());
+    setParticipant(config);
+    return config;
+  }
+
+  /**
+   * Set the configuration of an existing participant
+   * @param participantConfig participant configuration
+   * @return true if config was set, false if there was an error
+   */
+  public boolean setParticipant(ParticipantConfig participantConfig) {
+    if (participantConfig == null) {
+      LOG.error("Participant config not initialized");
+      return false;
+    }
+    InstanceConfig instanceConfig = new InstanceConfig(participantConfig.getId());
+    instanceConfig.setHostName(participantConfig.getHostName());
+    instanceConfig.setPort(Integer.toString(participantConfig.getPort()));
+    for (String tag : participantConfig.getTags()) {
+      instanceConfig.addTag(tag);
+    }
+    for (PartitionId partitionId : participantConfig.getDisabledPartitions()) {
+      instanceConfig.setInstanceEnabledForPartition(partitionId, false);
+    }
+    instanceConfig.setInstanceEnabled(participantConfig.isEnabled());
+    instanceConfig.addNamespacedConfig(participantConfig.getUserConfig());
+    _accessor.setProperty(_keyBuilder.instanceConfig(participantConfig.getId().stringify()),
+        instanceConfig);
+    return true;
+  }
+
+  /**
+   * create a participant based on physical model
+   * @param participantId
+   * @param instanceConfig
+   * @param userConfig
+   * @param liveInstance
+   * @param instanceMsgMap map of message-id to message
+   * @param instanceCurStateMap map of resource-id to current-state
+   * @return participant
+   */
+  static Participant createParticipant(ParticipantId participantId, InstanceConfig instanceConfig,
+      UserConfig userConfig, LiveInstance liveInstance, Map<String, Message> instanceMsgMap,
+      Map<String, CurrentState> instanceCurStateMap) {
+
+    String hostName = instanceConfig.getHostName();
+
+    int port = -1;
+    try {
+      port = Integer.parseInt(instanceConfig.getPort());
+    } catch (IllegalArgumentException e) {
+      // keep as -1
+    }
+    if (port < 0 || port > 65535) {
+      port = -1;
+    }
+    boolean isEnabled = instanceConfig.getInstanceEnabled();
+
+    List<String> disabledPartitions = instanceConfig.getDisabledPartitions();
+    Set<PartitionId> disabledPartitionIdSet = Collections.emptySet();
+    if (disabledPartitions != null) {
+      disabledPartitionIdSet = new HashSet<PartitionId>();
+      for (String partitionId : disabledPartitions) {
+        disabledPartitionIdSet.add(PartitionId.from(PartitionId.extractResourceId(partitionId),
+            PartitionId.stripResourceId(partitionId)));
+      }
+    }
+
+    Set<String> tags = new HashSet<String>(instanceConfig.getTags());
+
+    RunningInstance runningInstance = null;
+    if (liveInstance != null) {
+      runningInstance =
+          new RunningInstance(liveInstance.getSessionId(), liveInstance.getHelixVersion(),
+              liveInstance.getProcessId());
+    }
+
+    Map<MessageId, Message> msgMap = new HashMap<MessageId, Message>();
+    if (instanceMsgMap != null) {
+      for (String msgId : instanceMsgMap.keySet()) {
+        Message message = instanceMsgMap.get(msgId);
+        msgMap.put(MessageId.from(msgId), message);
+      }
+    }
+
+    Map<ResourceId, CurrentState> curStateMap = new HashMap<ResourceId, CurrentState>();
+    if (instanceCurStateMap != null) {
+
+      for (String resourceName : instanceCurStateMap.keySet()) {
+        curStateMap.put(ResourceId.from(resourceName), instanceCurStateMap.get(resourceName));
+      }
+    }
+
+    return new Participant(participantId, hostName, port, isEnabled, disabledPartitionIdSet, tags,
+        runningInstance, curStateMap, msgMap, userConfig);
+  }
+
+  /**
+   * read participant related data
+   * @param participantId
+   * @return participant, or null if participant not available
+   */
+  public Participant readParticipant(ParticipantId participantId) {
+    // read physical model
+    String participantName = participantId.stringify();
+    InstanceConfig instanceConfig = _accessor.getProperty(_keyBuilder.instance(participantName));
+
+    if (instanceConfig == null) {
+      LOG.error("Participant " + participantId + " is not present on the cluster");
+      return null;
+    }
+
+    UserConfig userConfig = UserConfig.from(instanceConfig);
+    LiveInstance liveInstance = _accessor.getProperty(_keyBuilder.liveInstance(participantName));
+
+    Map<String, Message> instanceMsgMap = Collections.emptyMap();
+    Map<String, CurrentState> instanceCurStateMap = Collections.emptyMap();
+    if (liveInstance != null) {
+      SessionId sessionId = liveInstance.getSessionId();
+
+      instanceMsgMap = _accessor.getChildValuesMap(_keyBuilder.messages(participantName));
+      instanceCurStateMap =
+          _accessor.getChildValuesMap(_keyBuilder.currentStates(participantName,
+              sessionId.stringify()));
+    }
+
+    return createParticipant(participantId, instanceConfig, userConfig, liveInstance,
+        instanceMsgMap, instanceCurStateMap);
+  }
+
+  /**
+   * update resource current state of a participant
+   * @param resourceId resource id
+   * @param participantId participant id
+   * @param sessionId session id
+   * @param curStateUpdate current state change delta
+   */
+  public void updateCurrentState(ResourceId resourceId, ParticipantId participantId,
+      SessionId sessionId, CurrentState curStateUpdate) {
+    _accessor.updateProperty(
+        _keyBuilder.currentState(participantId.stringify(), sessionId.stringify(),
+            resourceId.stringify()), curStateUpdate);
+  }
+
+  /**
+   * drop resource current state of a participant
+   * @param resourceId resource id
+   * @param participantId participant id
+   * @param sessionId session id
+   */
+  public void dropCurrentState(ResourceId resourceId, ParticipantId participantId,
+      SessionId sessionId) {
+    _accessor.removeProperty(_keyBuilder.currentState(participantId.stringify(),
+        sessionId.stringify(), resourceId.stringify()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c070a765/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
new file mode 100644
index 0000000..cd55684
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
@@ -0,0 +1,265 @@
+package org.apache.helix.api.accessor;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixConstants.StateModelToken;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.config.ResourceConfig.ResourceType;
+import org.apache.helix.api.config.UserConfig;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.context.CustomRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.ResourceConfiguration;
+import org.apache.log4j.Logger;
+
+public class ResourceAccessor {
+  private static final Logger LOG = Logger.getLogger(ResourceAccessor.class);
+  private final HelixDataAccessor _accessor;
+  private final PropertyKey.Builder _keyBuilder;
+
+  public ResourceAccessor(HelixDataAccessor accessor) {
+    _accessor = accessor;
+    _keyBuilder = accessor.keyBuilder();
+  }
+
+  /**
+   * Read a single snapshot of a resource
+   * @param resourceId the resource id to read
+   * @return Resource or null if not present
+   */
+  public Resource readResource(ResourceId resourceId) {
+    ResourceConfiguration config =
+        _accessor.getProperty(_keyBuilder.resourceConfig(resourceId.stringify()));
+    IdealState idealState = _accessor.getProperty(_keyBuilder.idealState(resourceId.stringify()));
+
+    if (config == null && idealState == null) {
+      LOG.error("Resource " + resourceId + " not present on the cluster");
+      return null;
+    }
+
+    ExternalView externalView =
+        _accessor.getProperty(_keyBuilder.externalView(resourceId.stringify()));
+    ResourceAssignment resourceAssignment =
+        _accessor.getProperty(_keyBuilder.resourceAssignment(resourceId.stringify()));
+    return createResource(resourceId, config, idealState, externalView, resourceAssignment);
+  }
+
+  /**
+   * Update a resource configuration
+   * @param resourceId the resource id to update
+   * @param resourceDelta changes to the resource
+   * @return ResourceConfig, or null if the resource is not persisted
+   */
+  public ResourceConfig updateResource(ResourceId resourceId, ResourceConfig.Delta resourceDelta) {
+    Resource resource = readResource(resourceId);
+    if (resource == null) {
+      LOG.error("Resource " + resourceId + " does not exist, cannot be updated");
+      return null;
+    }
+    ResourceConfig config = resourceDelta.mergeInto(resource.getConfig());
+    setResource(config);
+    return config;
+  }
+
+  /**
+   * save resource assignment
+   * @param resourceId
+   * @param resourceAssignment
+   */
+  public void setResourceAssignment(ResourceId resourceId, ResourceAssignment resourceAssignment) {
+    _accessor.setProperty(_keyBuilder.resourceAssignment(resourceId.stringify()),
+        resourceAssignment);
+  }
+
+  /**
+   * get resource assignment
+   * @param resourceId
+   * @return resource assignment or null
+   */
+  public ResourceAssignment getResourceAssignment(ResourceId resourceId) {
+    return _accessor.getProperty(_keyBuilder.resourceAssignment(resourceId.stringify()));
+  }
+
+  /**
+   * Set a physical resource configuration, which may include user-defined configuration, as well as
+   * rebalancer configuration
+   * @param resourceId
+   * @param configuration
+   */
+  void setConfiguration(ResourceId resourceId, ResourceConfiguration configuration) {
+    _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
+    // also set an ideal state if the resource supports it
+    RebalancerConfig rebalancerConfig = new RebalancerConfig(configuration);
+    IdealState idealState =
+        rebalancerConfigToIdealState(rebalancerConfig, configuration.getBucketSize(),
+            configuration.getBatchMessageMode());
+    if (idealState != null) {
+      _accessor.setProperty(_keyBuilder.idealState(resourceId.stringify()), idealState);
+    }
+  }
+
+  /**
+   * Persist an existing resource's logical configuration
+   * @param resourceConfig logical resource configuration
+   * @return true if resource is set, false otherwise
+   */
+  public boolean setResource(ResourceConfig resourceConfig) {
+    if (resourceConfig == null || resourceConfig.getRebalancerConfig() == null) {
+      LOG.error("Resource not fully defined with a rebalancer context");
+      return false;
+    }
+    ResourceId resourceId = resourceConfig.getId();
+    ResourceConfiguration config = new ResourceConfiguration(resourceId);
+    config.addNamespacedConfig(resourceConfig.getUserConfig());
+    config.addNamespacedConfig(resourceConfig.getRebalancerConfig().toNamespacedConfig());
+    config.setBucketSize(resourceConfig.getBucketSize());
+    config.setBatchMessageMode(resourceConfig.getBatchMessageMode());
+    setConfiguration(resourceId, config);
+    return true;
+  }
+
+  /**
+   * Get a resource configuration, which may include user-defined configuration, as well as
+   * rebalancer configuration
+   * @param resourceId
+   * @return configuration
+   */
+  public void getConfiguration(ResourceId resourceId) {
+    _accessor.getProperty(_keyBuilder.resourceConfig(resourceId.stringify()));
+  }
+
+  /**
+   * set external view of a resource
+   * @param resourceId
+   * @param extView
+   */
+  public void setExternalView(ResourceId resourceId, ExternalView extView) {
+    _accessor.setProperty(_keyBuilder.externalView(resourceId.stringify()), extView);
+  }
+
+  /**
+   * drop external view of a resource
+   * @param resourceId
+   */
+  public void dropExternalView(ResourceId resourceId) {
+    _accessor.removeProperty(_keyBuilder.externalView(resourceId.stringify()));
+  }
+
+  /**
+   * Get an ideal state from a rebalancer config if the resource is partitioned
+   * @param config RebalancerConfig instance
+   * @param bucketSize bucket size to use
+   * @param batchMessageMode true if batch messaging allowed, false otherwise
+   * @return IdealState, or null
+   */
+  static IdealState rebalancerConfigToIdealState(RebalancerConfig config, int bucketSize,
+      boolean batchMessageMode) {
+    PartitionedRebalancerContext partitionedContext =
+        config.getRebalancerContext(PartitionedRebalancerContext.class);
+    if (partitionedContext != null) {
+      IdealState idealState = new IdealState(partitionedContext.getResourceId());
+      idealState.setRebalanceMode(partitionedContext.getRebalanceMode());
+      idealState.setRebalancerRef(partitionedContext.getRebalancerRef());
+      String replicas = null;
+      if (partitionedContext.anyLiveParticipant()) {
+        replicas = StateModelToken.ANY_LIVEINSTANCE.toString();
+      } else {
+        replicas = Integer.toString(partitionedContext.getReplicaCount());
+      }
+      idealState.setReplicas(replicas);
+      idealState.setNumPartitions(partitionedContext.getPartitionSet().size());
+      idealState.setInstanceGroupTag(partitionedContext.getParticipantGroupTag());
+      idealState.setMaxPartitionsPerInstance(partitionedContext.getMaxPartitionsPerParticipant());
+      idealState.setStateModelDefId(partitionedContext.getStateModelDefId());
+      idealState.setStateModelFactoryId(partitionedContext.getStateModelFactoryId());
+      idealState.setBucketSize(bucketSize);
+      idealState.setBatchMessageMode(batchMessageMode);
+      if (partitionedContext.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
+        SemiAutoRebalancerContext semiAutoContext =
+            config.getRebalancerContext(SemiAutoRebalancerContext.class);
+        for (PartitionId partitionId : semiAutoContext.getPartitionSet()) {
+          idealState.setPreferenceList(partitionId, semiAutoContext.getPreferenceList(partitionId));
+        }
+      } else if (partitionedContext.getRebalanceMode() == RebalanceMode.CUSTOMIZED) {
+        CustomRebalancerContext customContext =
+            config.getRebalancerContext(CustomRebalancerContext.class);
+        for (PartitionId partitionId : customContext.getPartitionSet()) {
+          idealState.setParticipantStateMap(partitionId,
+              customContext.getPreferenceMap(partitionId));
+        }
+      }
+      return idealState;
+    }
+    return null;
+  }
+
+  /**
+   * Create a resource snapshot instance from the physical model
+   * @param resourceId the resource id
+   * @param resourceConfiguration physical resource configuration
+   * @param idealState ideal state of the resource
+   * @param externalView external view of the resource
+   * @param resourceAssignment current resource assignment
+   * @return Resource
+   */
+  static Resource createResource(ResourceId resourceId,
+      ResourceConfiguration resourceConfiguration, IdealState idealState,
+      ExternalView externalView, ResourceAssignment resourceAssignment) {
+    UserConfig userConfig;
+    ResourceType type = ResourceType.DATA;
+    if (resourceConfiguration != null) {
+      userConfig = UserConfig.from(resourceConfiguration);
+      type = resourceConfiguration.getType();
+    } else {
+      userConfig = new UserConfig(Scope.resource(resourceId));
+    }
+    int bucketSize = 0;
+    boolean batchMessageMode = false;
+    RebalancerContext rebalancerContext;
+    if (idealState != null) {
+      rebalancerContext = PartitionedRebalancerContext.from(idealState);
+      bucketSize = idealState.getBucketSize();
+      batchMessageMode = idealState.getBatchMessageMode();
+    } else {
+      if (resourceConfiguration != null) {
+        bucketSize = resourceConfiguration.getBucketSize();
+        batchMessageMode = resourceConfiguration.getBatchMessageMode();
+        RebalancerConfig rebalancerConfig = new RebalancerConfig(resourceConfiguration);
+        rebalancerContext = rebalancerConfig.getRebalancerContext(RebalancerContext.class);
+      } else {
+        rebalancerContext = new PartitionedRebalancerContext(RebalanceMode.NONE);
+      }
+    }
+    return new Resource(resourceId, type, idealState, resourceAssignment, externalView,
+        rebalancerContext, userConfig, bucketSize, batchMessageMode);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c070a765/helix-core/src/main/java/org/apache/helix/api/accessor/StateModelDefinitionAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/StateModelDefinitionAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/StateModelDefinitionAccessor.java
new file mode 100644
index 0000000..3816507
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/StateModelDefinitionAccessor.java
@@ -0,0 +1,70 @@
+package org.apache.helix.api.accessor;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.model.StateModelDefinition;
+
+import com.google.common.collect.ImmutableMap;
+
+public class StateModelDefinitionAccessor {
+  private final HelixDataAccessor _accessor;
+  private final PropertyKey.Builder _keyBuilder;
+
+  /**
+   * @param accessor
+   */
+  public StateModelDefinitionAccessor(HelixDataAccessor accessor) {
+    _accessor = accessor;
+    _keyBuilder = accessor.keyBuilder();
+  }
+
+  /**
+   * Get all of the state model definitions available to the cluster
+   * @return map of state model ids to state model definition objects
+   */
+  public Map<StateModelDefId, StateModelDefinition> readStateModelDefinitions() {
+    Map<String, StateModelDefinition> stateModelDefs =
+        _accessor.getChildValuesMap(_keyBuilder.stateModelDefs());
+    Map<StateModelDefId, StateModelDefinition> stateModelDefMap =
+        new HashMap<StateModelDefId, StateModelDefinition>();
+
+    for (String stateModelDefName : stateModelDefs.keySet()) {
+      stateModelDefMap.put(StateModelDefId.from(stateModelDefName),
+          stateModelDefs.get(stateModelDefName));
+    }
+
+    return ImmutableMap.copyOf(stateModelDefMap);
+  }
+
+  /**
+   * Set a state model definition. Adds the state model definition if it does not exist
+   * @param stateModelDef fully initialized state model definition
+   * @return true if the model is persisted, false otherwise
+   */
+  public boolean setStateModelDefinition(StateModelDefinition stateModelDef) {
+    return _accessor.setProperty(_keyBuilder.stateModelDef(stateModelDef.getId()), stateModelDef);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c070a765/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
new file mode 100644
index 0000000..79b4f61
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
@@ -0,0 +1,696 @@
+package org.apache.helix.api.config;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ConstraintId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.model.ClusterConstraints;
+import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
+import org.apache.helix.model.ClusterConstraints.ConstraintType;
+import org.apache.helix.model.ClusterConstraints.ConstraintValue;
+import org.apache.helix.model.ConstraintItem;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.Transition;
+import org.apache.helix.model.builder.ConstraintItemBuilder;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Configuration properties of a cluster
+ */
+public class ClusterConfig {
+  private static final Logger LOG = Logger.getLogger(ClusterConfig.class);
+
+  private final ClusterId _id;
+  private final Map<ResourceId, ResourceConfig> _resourceMap;
+  private final Map<ParticipantId, ParticipantConfig> _participantMap;
+  private final Map<ConstraintType, ClusterConstraints> _constraintMap;
+  private final Map<StateModelDefId, StateModelDefinition> _stateModelMap;
+  private final UserConfig _userConfig;
+  private final boolean _isPaused;
+
+  /**
+   * Initialize a cluster configuration. Also see ClusterConfig.Builder
+   * @param id cluster id
+   * @param resourceMap map of resource id to resource config
+   * @param participantMap map of participant id to participant config
+   * @param constraintMap map of constraint type to all constraints of that type
+   * @param stateModelMap map of state model id to state model definition
+   * @param userConfig user-defined cluster properties
+   * @param isPaused true if paused, false if active
+   */
+  private ClusterConfig(ClusterId id, Map<ResourceId, ResourceConfig> resourceMap,
+      Map<ParticipantId, ParticipantConfig> participantMap,
+      Map<ConstraintType, ClusterConstraints> constraintMap,
+      Map<StateModelDefId, StateModelDefinition> stateModelMap, UserConfig userConfig,
+      boolean isPaused) {
+    _id = id;
+    _resourceMap = ImmutableMap.copyOf(resourceMap);
+    _participantMap = ImmutableMap.copyOf(participantMap);
+    _constraintMap = ImmutableMap.copyOf(constraintMap);
+    _stateModelMap = ImmutableMap.copyOf(stateModelMap);
+    _userConfig = userConfig;
+    _isPaused = isPaused;
+  }
+
+  /**
+   * Get cluster id
+   * @return cluster id
+   */
+  public ClusterId getId() {
+    return _id;
+  }
+
+  /**
+   * Get resources in the cluster
+   * @return a map of resource id to resource, or empty map if none
+   */
+  public Map<ResourceId, ResourceConfig> getResourceMap() {
+    return _resourceMap;
+  }
+
+  /**
+   * Get all the constraints on the cluster
+   * @return map of constraint type to constraints
+   */
+  public Map<ConstraintType, ClusterConstraints> getConstraintMap() {
+    return _constraintMap;
+  }
+
+  /**
+   * Get the maximum number of participants that can be in a state
+   * @param scope the scope for the bound
+   * @param stateModelDefId the state model of the state
+   * @param state the constrained state
+   * @return The upper bound, which can be "-1" if unspecified, a numerical upper bound, "R" for
+   *         number of replicas, or "N" for number of participants
+   */
+  public String getStateUpperBoundConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+      State state) {
+    // set up attributes to match based on the scope
+    ClusterConstraints stateConstraints = getConstraintMap().get(ConstraintType.STATE_CONSTRAINT);
+    Map<ConstraintAttribute, String> matchAttributes = Maps.newHashMap();
+    matchAttributes.put(ConstraintAttribute.STATE, state.toString());
+    matchAttributes.put(ConstraintAttribute.STATE_MODEL, stateModelDefId.toString());
+    switch (scope.getType()) {
+    case CLUSTER:
+      // cluster is implicit
+      break;
+    case RESOURCE:
+      matchAttributes.put(ConstraintAttribute.RESOURCE, scope.getScopedId().stringify());
+      break;
+    default:
+      LOG.error("Unsupported scope for state constraint: " + scope);
+      return "-1";
+    }
+    Set<ConstraintItem> matches = stateConstraints.match(matchAttributes);
+    int value = -1;
+    for (ConstraintItem item : matches) {
+      // match: if an R or N is found, always choose that one
+      // otherwise, take the minimum of the counts specified in the constraints
+      String constraintValue = item.getConstraintValue();
+      if (constraintValue != null) {
+        if (constraintValue.equals(ConstraintValue.N.toString())
+            || constraintValue.equals(ConstraintValue.R.toString())) {
+          return constraintValue;
+        } else {
+          try {
+            int current = Integer.parseInt(constraintValue);
+            if (value == -1 || current < value) {
+              value = current;
+            }
+          } catch (NumberFormatException e) {
+            LOG.error("Invalid state upper bound: " + constraintValue);
+          }
+        }
+      }
+    }
+    return Integer.toString(value);
+  }
+
+  /**
+   * Get the limit of simultaneous execution of a transition
+   * @param scope the scope under which the transition is constrained
+   * @param stateModelDefId the state model of which the transition is a part
+   * @param transition the constrained transition
+   * @return the limit, or Integer.MAX_VALUE if there is no limit
+   */
+  public int getTransitionConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+      Transition transition) {
+    // set up attributes to match based on the scope
+    ClusterConstraints transitionConstraints =
+        getConstraintMap().get(ConstraintType.MESSAGE_CONSTRAINT);
+    Map<ConstraintAttribute, String> matchAttributes = Maps.newHashMap();
+    matchAttributes.put(ConstraintAttribute.STATE_MODEL, stateModelDefId.toString());
+    matchAttributes.put(ConstraintAttribute.MESSAGE_TYPE, MessageType.STATE_TRANSITION.toString());
+    matchAttributes.put(ConstraintAttribute.TRANSITION, transition.toString());
+    switch (scope.getType()) {
+    case CLUSTER:
+      // cluster is implicit
+      break;
+    case RESOURCE:
+      matchAttributes.put(ConstraintAttribute.RESOURCE, scope.getScopedId().stringify());
+      break;
+    case PARTICIPANT:
+      matchAttributes.put(ConstraintAttribute.INSTANCE, scope.getScopedId().stringify());
+      break;
+    default:
+      LOG.error("Unsupported scope for transition constraints: " + scope);
+      return Integer.MAX_VALUE;
+    }
+    Set<ConstraintItem> matches = transitionConstraints.match(matchAttributes);
+    int value = Integer.MAX_VALUE;
+    for (ConstraintItem item : matches) {
+      String constraintValue = item.getConstraintValue();
+      if (constraintValue != null) {
+        try {
+          int current = Integer.parseInt(constraintValue);
+          if (current < value) {
+            value = current;
+          }
+        } catch (NumberFormatException e) {
+          LOG.error("Invalid in-flight transition cap: " + constraintValue);
+        }
+      }
+    }
+    return value;
+  }
+
+  /**
+   * Get participants of the cluster
+   * @return a map of participant id to participant, or empty map if none
+   */
+  public Map<ParticipantId, ParticipantConfig> getParticipantMap() {
+    return _participantMap;
+  }
+
+  /**
+   * Get all the state model definitions on the cluster
+   * @return map of state model definition id to state model definition
+   */
+  public Map<StateModelDefId, StateModelDefinition> getStateModelMap() {
+    return _stateModelMap;
+  }
+
+  /**
+   * Get user-specified configuration properties of this cluster
+   * @return UserConfig properties
+   */
+  public UserConfig getUserConfig() {
+    return _userConfig;
+  }
+
+  /**
+   * Check the paused status of the cluster
+   * @return true if paused, false otherwise
+   */
+  public boolean isPaused() {
+    return _isPaused;
+  }
+
+  /**
+   * Update context for a ClusterConfig
+   */
+  public static class Delta {
+    private enum Fields {
+      USER_CONFIG
+    }
+
+    private Set<Fields> _updateFields;
+    private Map<ConstraintType, Set<ConstraintId>> _removedConstraints;
+    private Builder _builder;
+
+    /**
+     * Instantiate the delta for a cluster config
+     * @param clusterId the cluster to update
+     */
+    public Delta(ClusterId clusterId) {
+      _updateFields = Sets.newHashSet();
+      _removedConstraints = Maps.newHashMap();
+      for (ConstraintType type : ConstraintType.values()) {
+        Set<ConstraintId> constraints = Sets.newHashSet();
+        _removedConstraints.put(type, constraints);
+      }
+      _builder = new Builder(clusterId);
+    }
+
+    /**
+     * Add a state upper bound constraint
+     * @param scope scope under which the constraint is valid
+     * @param stateModelDefId identifier of the state model that owns the state
+     * @param state the state to constrain
+     * @param upperBound maximum number of replicas per partition in the state
+     * @return Delta
+     */
+    public Delta addStateUpperBoundConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+        State state, int upperBound) {
+      return addStateUpperBoundConstraint(scope, stateModelDefId, state,
+          Integer.toString(upperBound));
+    }
+
+    /**
+     * Add a state upper bound constraint
+     * @param scope scope under which the constraint is valid
+     * @param stateModelDefId identifier of the state model that owns the state
+     * @param state the state to constrain
+     * @param dynamicUpperBound the upper bound of replicas per partition in the state, can be a
+     *          number, or the currently supported special bound values:<br />
+     *          "R" - Refers to the number of replicas specified during resource
+     *          creation. This allows having different replication factor for each
+     *          resource without having to create a different state machine. <br />
+     *          "N" - Refers to all nodes in the cluster. Useful for resources that need
+     *          to exist on all nodes. This way one can add/remove nodes without having
+     *          the change the bounds.
+     * @return Delta
+     */
+    public Delta addStateUpperBoundConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+        State state, String dynamicUpperBound) {
+      _builder.addStateUpperBoundConstraint(scope, stateModelDefId, state, dynamicUpperBound);
+      return this;
+    }
+
+    /**
+     * Remove state upper bound constraint
+     * @param scope scope under which the constraint is valid
+     * @param stateModelDefId identifier of the state model that owns the state
+     * @param state the state to constrain
+     * @return Delta
+     */
+    public Delta removeStateUpperBoundConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+        State state) {
+      _removedConstraints.get(ConstraintType.STATE_CONSTRAINT).add(
+          ConstraintId.from(scope, stateModelDefId, state));
+      return this;
+    }
+
+    /**
+     * Add a constraint on the maximum number of in-flight transitions of a certain type
+     * @param scope scope of the constraint
+     * @param stateModelDefId identifies the state model containing the transition
+     * @param transition the transition to constrain
+     * @param maxInFlightTransitions number of allowed in-flight transitions in the scope
+     * @return Delta
+     */
+    public Delta addTransitionConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+        Transition transition, int maxInFlightTransitions) {
+      _builder.addTransitionConstraint(scope, stateModelDefId, transition, maxInFlightTransitions);
+      return this;
+    }
+
+    /**
+     * Remove a constraint on the maximum number of in-flight transitions of a certain type
+     * @param scope scope of the constraint
+     * @param stateModelDefId identifies the state model containing the transition
+     * @param transition the transition to constrain
+     * @return Delta
+     */
+    public Delta removeTransitionConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+        Transition transition) {
+      _removedConstraints.get(ConstraintType.MESSAGE_CONSTRAINT).add(
+          ConstraintId.from(scope, stateModelDefId, transition));
+      return this;
+    }
+
+    /**
+     * Add a single constraint item
+     * @param type type of the constraint item
+     * @param constraintId unique constraint id
+     * @param item instantiated ConstraintItem
+     * @return Delta
+     */
+    public Delta addConstraintItem(ConstraintType type, ConstraintId constraintId,
+        ConstraintItem item) {
+      _builder.addConstraint(type, constraintId, item);
+      return this;
+    }
+
+    /**
+     * Remove a single constraint item
+     * @param type type of the constraint item
+     * @param constraintId unique constraint id
+     * @return Delta
+     */
+    public Delta removeConstraintItem(ConstraintType type, ConstraintId constraintId) {
+      _removedConstraints.get(type).add(constraintId);
+      return this;
+    }
+
+    /*
+     * Set the user configuration
+     * @param userConfig user-specified properties
+     * @return Builder
+     */
+    public Delta setUserConfig(UserConfig userConfig) {
+      _builder.userConfig(userConfig);
+      _updateFields.add(Fields.USER_CONFIG);
+      return this;
+    }
+
+    /**
+     * Create a ClusterConfig that is the combination of an existing ClusterConfig and this delta
+     * @param orig the original ClusterConfig
+     * @return updated ClusterConfig
+     */
+    public ClusterConfig mergeInto(ClusterConfig orig) {
+      // copy in original and updated fields
+      ClusterConfig deltaConfig = _builder.build();
+      Builder builder =
+          new Builder(orig.getId()).addResources(orig.getResourceMap().values())
+              .addParticipants(orig.getParticipantMap().values())
+              .addStateModelDefinitions(orig.getStateModelMap().values())
+              .userConfig(orig.getUserConfig()).pausedStatus(orig.isPaused());
+      for (Fields field : _updateFields) {
+        switch (field) {
+        case USER_CONFIG:
+          builder.userConfig(deltaConfig.getUserConfig());
+          break;
+        }
+      }
+      // add constraint deltas
+      for (ConstraintType type : ConstraintType.values()) {
+        ClusterConstraints constraints;
+        if (orig.getConstraintMap().containsKey(type)) {
+          constraints = orig.getConstraintMap().get(type);
+        } else {
+          constraints = new ClusterConstraints(type);
+        }
+        // add new constraints
+        if (deltaConfig.getConstraintMap().containsKey(type)) {
+          ClusterConstraints deltaConstraints = deltaConfig.getConstraintMap().get(type);
+          for (ConstraintId constraintId : deltaConstraints.getConstraintItems().keySet()) {
+            ConstraintItem constraintItem = deltaConstraints.getConstraintItem(constraintId);
+            constraints.addConstraintItem(constraintId, constraintItem);
+          }
+        }
+        // remove constraints
+        for (ConstraintId constraintId : _removedConstraints.get(type)) {
+          constraints.removeConstraintItem(constraintId);
+        }
+        builder.addConstraint(constraints);
+      }
+      return builder.build();
+    }
+  }
+
+  /**
+   * Assembles a cluster configuration
+   */
+  public static class Builder {
+    private final ClusterId _id;
+    private final Map<ResourceId, ResourceConfig> _resourceMap;
+    private final Map<ParticipantId, ParticipantConfig> _participantMap;
+    private final Map<ConstraintType, ClusterConstraints> _constraintMap;
+    private final Map<StateModelDefId, StateModelDefinition> _stateModelMap;
+    private UserConfig _userConfig;
+    private boolean _isPaused;
+
+    /**
+     * Initialize builder for a cluster
+     * @param id cluster id
+     */
+    public Builder(ClusterId id) {
+      _id = id;
+      _resourceMap = new HashMap<ResourceId, ResourceConfig>();
+      _participantMap = new HashMap<ParticipantId, ParticipantConfig>();
+      _constraintMap = new HashMap<ConstraintType, ClusterConstraints>();
+      _stateModelMap = new HashMap<StateModelDefId, StateModelDefinition>();
+      _isPaused = false;
+      _userConfig = new UserConfig(Scope.cluster(id));
+    }
+
+    /**
+     * Add a resource to the cluster
+     * @param resource resource configuration
+     * @return Builder
+     */
+    public Builder addResource(ResourceConfig resource) {
+      _resourceMap.put(resource.getId(), resource);
+      return this;
+    }
+
+    /**
+     * Add multiple resources to the cluster
+     * @param resources resource configurations
+     * @return Builder
+     */
+    public Builder addResources(Collection<ResourceConfig> resources) {
+      for (ResourceConfig resource : resources) {
+        addResource(resource);
+      }
+      return this;
+    }
+
+    /**
+     * Add a participant to the cluster
+     * @param participant participant configuration
+     * @return Builder
+     */
+    public Builder addParticipant(ParticipantConfig participant) {
+      _participantMap.put(participant.getId(), participant);
+      return this;
+    }
+
+    /**
+     * Add multiple participants to the cluster
+     * @param participants participant configurations
+     * @return Builder
+     */
+    public Builder addParticipants(Collection<ParticipantConfig> participants) {
+      for (ParticipantConfig participant : participants) {
+        addParticipant(participant);
+      }
+      return this;
+    }
+
+    /**
+     * Add a constraint to the cluster
+     * @param constraint cluster constraint of a specific type
+     * @return Builder
+     */
+    public Builder addConstraint(ClusterConstraints constraint) {
+      ClusterConstraints existConstraints = getConstraintsInstance(constraint.getType());
+      for (ConstraintId constraintId : constraint.getConstraintItems().keySet()) {
+        existConstraints
+            .addConstraintItem(constraintId, constraint.getConstraintItem(constraintId));
+      }
+      return this;
+    }
+
+    /**
+     * Add a single constraint item
+     * @param type type of the constraint
+     * @param constraintId unique constraint identifier
+     * @param item instantiated ConstraintItem
+     * @return Builder
+     */
+    public Builder addConstraint(ConstraintType type, ConstraintId constraintId, ConstraintItem item) {
+      ClusterConstraints existConstraints = getConstraintsInstance(type);
+      existConstraints.addConstraintItem(constraintId, item);
+      return this;
+    }
+
+    /**
+     * Add multiple constraints to the cluster
+     * @param constraints cluster constraints of multiple distinct types
+     * @return Builder
+     */
+    public Builder addConstraints(Collection<ClusterConstraints> constraints) {
+      for (ClusterConstraints constraint : constraints) {
+        addConstraint(constraint);
+      }
+      return this;
+    }
+
+    /**
+     * Add a constraint on the maximum number of in-flight transitions of a certain type
+     * @param scope scope of the constraint
+     * @param stateModelDefId identifies the state model containing the transition
+     * @param transition the transition to constrain
+     * @param maxInFlightTransitions number of allowed in-flight transitions in the scope
+     * @return Builder
+     */
+    public Builder addTransitionConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+        Transition transition, int maxInFlightTransitions) {
+      Map<String, String> attributes = Maps.newHashMap();
+      attributes.put(ConstraintAttribute.MESSAGE_TYPE.toString(),
+          MessageType.STATE_TRANSITION.toString());
+      attributes.put(ConstraintAttribute.CONSTRAINT_VALUE.toString(),
+          Integer.toString(maxInFlightTransitions));
+      attributes.put(ConstraintAttribute.TRANSITION.toString(), transition.toString());
+      attributes.put(ConstraintAttribute.STATE_MODEL.toString(), stateModelDefId.stringify());
+      switch (scope.getType()) {
+      case CLUSTER:
+        // cluster is implicit
+        break;
+      case RESOURCE:
+        attributes.put(ConstraintAttribute.RESOURCE.toString(), scope.getScopedId().stringify());
+        break;
+      case PARTICIPANT:
+        attributes.put(ConstraintAttribute.INSTANCE.toString(), scope.getScopedId().stringify());
+        break;
+      default:
+        LOG.error("Unsupported scope for adding a transition constraint: " + scope);
+        return this;
+      }
+      ConstraintItem item = new ConstraintItemBuilder().addConstraintAttributes(attributes).build();
+      ClusterConstraints constraints = getConstraintsInstance(ConstraintType.MESSAGE_CONSTRAINT);
+      constraints.addConstraintItem(ConstraintId.from(scope, stateModelDefId, transition), item);
+      return this;
+    }
+
+    /**
+     * Add a state upper bound constraint
+     * @param scope scope under which the constraint is valid
+     * @param stateModelDefId identifier of the state model that owns the state
+     * @param state the state to constrain
+     * @param upperBound maximum number of replicas per partition in the state
+     * @return Builder
+     */
+    public Builder addStateUpperBoundConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+        State state, int upperBound) {
+      return addStateUpperBoundConstraint(scope, stateModelDefId, state,
+          Integer.toString(upperBound));
+    }
+
+    /**
+     * Add a state upper bound constraint
+     * @param scope scope under which the constraint is valid
+     * @param stateModelDefId identifier of the state model that owns the state
+     * @param state the state to constrain
+     * @param dynamicUpperBound the upper bound of replicas per partition in the state, can be a
+     *          number, or the currently supported special bound values:<br />
+     *          "R" - Refers to the number of replicas specified during resource
+     *          creation. This allows having different replication factor for each
+     *          resource without having to create a different state machine. <br />
+     *          "N" - Refers to all nodes in the cluster. Useful for resources that need
+     *          to exist on all nodes. This way one can add/remove nodes without having
+     *          the change the bounds.
+     * @return Builder
+     */
+    public Builder addStateUpperBoundConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+        State state, String dynamicUpperBound) {
+      Map<String, String> attributes = Maps.newHashMap();
+      attributes.put(ConstraintAttribute.STATE.toString(), state.toString());
+      attributes.put(ConstraintAttribute.STATE_MODEL.toString(), stateModelDefId.stringify());
+      attributes.put(ConstraintAttribute.CONSTRAINT_VALUE.toString(), dynamicUpperBound);
+      switch (scope.getType()) {
+      case CLUSTER:
+        // cluster is implicit
+        break;
+      case RESOURCE:
+        attributes.put(ConstraintAttribute.RESOURCE.toString(), scope.getScopedId().stringify());
+        break;
+      default:
+        LOG.error("Unsupported scope for adding a state constraint: " + scope);
+        return this;
+      }
+      ConstraintItem item = new ConstraintItemBuilder().addConstraintAttributes(attributes).build();
+      ClusterConstraints constraints = getConstraintsInstance(ConstraintType.STATE_CONSTRAINT);
+      constraints.addConstraintItem(ConstraintId.from(scope, stateModelDefId, state), item);
+      return this;
+    }
+
+    /**
+     * Add a state model definition to the cluster
+     * @param stateModelDef state model definition of the cluster
+     * @return Builder
+     */
+    public Builder addStateModelDefinition(StateModelDefinition stateModelDef) {
+      _stateModelMap.put(stateModelDef.getStateModelDefId(), stateModelDef);
+      // add state constraints from the state model definition
+      for (State state : stateModelDef.getStatesPriorityList()) {
+        if (!stateModelDef.getNumParticipantsPerState(state).equals("-1")) {
+          addStateUpperBoundConstraint(Scope.cluster(_id), stateModelDef.getStateModelDefId(),
+              state, stateModelDef.getNumParticipantsPerState(state));
+        }
+      }
+      return this;
+    }
+
+    /**
+     * Add multiple state model definitions
+     * @param stateModelDefs collection of state model definitions for the cluster
+     * @return Builder
+     */
+    public Builder addStateModelDefinitions(Collection<StateModelDefinition> stateModelDefs) {
+      for (StateModelDefinition stateModelDef : stateModelDefs) {
+        addStateModelDefinition(stateModelDef);
+      }
+      return this;
+    }
+
+    /**
+     * Set the paused status of the cluster
+     * @param isPaused true if paused, false otherwise
+     * @return Builder
+     */
+    public Builder pausedStatus(boolean isPaused) {
+      _isPaused = isPaused;
+      return this;
+    }
+
+    /**
+     * Set the user configuration
+     * @param userConfig user-specified properties
+     * @return Builder
+     */
+    public Builder userConfig(UserConfig userConfig) {
+      _userConfig = userConfig;
+      return this;
+    }
+
+    /**
+     * Create the cluster configuration
+     * @return ClusterConfig
+     */
+    public ClusterConfig build() {
+      return new ClusterConfig(_id, _resourceMap, _participantMap, _constraintMap, _stateModelMap,
+          _userConfig, _isPaused);
+    }
+
+    /**
+     * Get a valid instance of ClusterConstraints for a type
+     * @param type the type
+     * @return ClusterConstraints
+     */
+    private ClusterConstraints getConstraintsInstance(ConstraintType type) {
+      ClusterConstraints constraints = _constraintMap.get(type);
+      if (constraints == null) {
+        constraints = new ClusterConstraints(type);
+        _constraintMap.put(type, constraints);
+      }
+      return constraints;
+    }
+  }
+}


Mime
View raw message