helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject [4/4] git commit: [HELIX-109] Review Helix model package, merge model changes with accessor changes, incomplete, rb=13878
Date Thu, 29 Aug 2013 06:17:48 GMT
[HELIX-109] Review Helix model package, merge model changes with accessor changes, incomplete, rb=13878


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/433b0011
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/433b0011
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/433b0011

Branch: refs/heads/helix-logical-model
Commit: 433b0011655c0c42228416488bb6b16f4b2f2700
Parents: 7dbb58c
Author: zzhang <zzhang5@uci.edu>
Authored: Wed Aug 28 23:17:26 2013 -0700
Committer: zzhang <zzhang5@uci.edu>
Committed: Wed Aug 28 23:17:26 2013 -0700

----------------------------------------------------------------------
 .../resources/SchedulerTasksResource.java       |  12 +-
 .../helix/tools/TestResetPartitionState.java    |   6 +-
 .../org/apache/helix/agent/AgentStateModel.java |   8 +-
 .../main/java/org/apache/helix/PropertyKey.java |  18 ++
 .../main/java/org/apache/helix/api/Cluster.java |  50 +---
 .../org/apache/helix/api/ClusterAccessor.java   | 202 ++++++++++++--
 .../org/apache/helix/api/ClusterReader.java     | 154 -----------
 .../java/org/apache/helix/api/Controller.java   |   4 +-
 .../apache/helix/api/ControllerAccessor.java    |   3 +-
 .../java/org/apache/helix/api/CurState.java     |   2 +-
 .../org/apache/helix/api/CurStateAccessor.java  |  45 ----
 .../org/apache/helix/api/ExtViewAccessor.java   |  45 ----
 .../java/org/apache/helix/api/HelixVersion.java |  14 +
 .../src/main/java/org/apache/helix/api/Id.java  |  84 ++++++
 .../java/org/apache/helix/api/MessageId.java    |  34 +++
 .../src/main/java/org/apache/helix/api/Msg.java |  10 +-
 .../main/java/org/apache/helix/api/MsgId.java   |  34 ---
 .../java/org/apache/helix/api/Participant.java  | 206 ++++++++++-----
 .../apache/helix/api/ParticipantAccessor.java   | 263 +++++++++++++++++--
 .../java/org/apache/helix/api/PartitionId.java  |   2 +-
 .../org/apache/helix/api/RebalancerConfig.java  |  77 +++++-
 .../org/apache/helix/api/RebalancerRef.java     |  21 ++
 .../java/org/apache/helix/api/Resource.java     |  87 +++++-
 .../org/apache/helix/api/ResourceAccessor.java  |  31 ++-
 .../org/apache/helix/api/RscAssignment.java     |  98 ++++++-
 .../org/apache/helix/api/RunningInstance.java   |   1 -
 .../main/java/org/apache/helix/api/State.java   |  20 ++
 .../controller/GenericHelixController.java      |   2 +-
 .../controller/rebalancer/AutoRebalancer.java   |   2 +-
 .../controller/rebalancer/CustomRebalancer.java |   2 +-
 .../util/ConstraintBasedAssignment.java         |   8 +-
 .../controller/stages/ClusterDataCache.java     |   4 +-
 .../stages/CompatibilityCheckStage.java         |   2 +-
 .../stages/CurrentStateComputationStage.java    |  12 +-
 .../stages/ExternalViewComputeStage.java        |   4 +-
 .../stages/MessageGenerationPhase.java          |   6 +-
 .../stages/MessageSelectionStage.java           |  14 +-
 .../stages/RebalanceIdealStateStage.java        |   2 +-
 .../stages/ResourceComputationStage.java        |  10 +-
 .../controller/stages/TaskAssignmentStage.java  |  10 +-
 .../helix/manager/zk/ControllerManager.java     |   2 +-
 .../manager/zk/CurStateCarryOverUpdater.java    |   2 +-
 .../DefaultControllerMessageHandlerFactory.java |   8 +-
 ...ltParticipantErrorMessageHandlerFactory.java |   4 +-
 .../DefaultSchedulerMessageHandlerFactory.java  |  38 +--
 .../zk/DistributedControllerManager.java        |   2 +-
 .../manager/zk/DistributedLeaderElection.java   |   2 +-
 .../manager/zk/ParticipantManagerHelper.java    |   6 +-
 .../apache/helix/manager/zk/ZKHelixAdmin.java   |  10 +-
 .../helix/manager/zk/ZKHelixDataAccessor.java   |   2 +-
 .../apache/helix/manager/zk/ZKHelixManager.java |   2 +-
 .../apache/helix/messaging/AsyncCallback.java   |   2 +-
 .../messaging/DefaultMessagingService.java      |   2 +-
 .../handling/AsyncCallbackService.java          |  12 +-
 .../handling/HelixStateTransitionHandler.java   |  24 +-
 .../helix/messaging/handling/HelixTask.java     |  26 +-
 .../messaging/handling/HelixTaskExecutor.java   |  12 +-
 .../messaging/handling/MessageTimeoutTask.java  |   2 +-
 .../apache/helix/model/ClusterConstraints.java  |   4 +-
 .../org/apache/helix/model/CurrentState.java    |  61 ++++-
 .../org/apache/helix/model/ExternalView.java    |  45 +++-
 .../java/org/apache/helix/model/IdealState.java |  96 ++++++-
 .../org/apache/helix/model/InstanceConfig.java  |  10 +
 .../org/apache/helix/model/LiveInstance.java    |  45 +++-
 .../java/org/apache/helix/model/Message.java    | 117 ++++++++-
 .../apache/helix/model/ResourceAssignment.java  |   8 +
 .../helix/model/StateModelDefinition.java       |  59 ++++-
 .../java/org/apache/helix/model/Transition.java |  10 +
 .../model/builder/CurrentStateBuilder.java      | 123 +++++++++
 .../helix/model/builder/IdealStateBuilder.java  |  19 ++
 .../monitoring/mbeans/ResourceMonitor.java      |   2 +-
 .../DistClusterControllerElection.java          |   2 +-
 .../participant/HelixStateMachineEngine.java    |   6 +-
 .../helix/spectator/RoutingTableProvider.java   |   2 +-
 .../org/apache/helix/tools/ZkLogAnalyzer.java   |   6 +-
 .../org/apache/helix/util/RebalanceUtil.java    |   8 +-
 .../org/apache/helix/util/StatusUpdateUtil.java |  26 +-
 .../java/org/apache/helix/ZkUnitTestBase.java   |   4 +-
 .../stages/TestMsgSelectionStage.java           |   2 +-
 .../stages/TestRebalancePipeline.java           |  20 +-
 .../stages/TestResourceComputationStage.java    |   2 +-
 .../helix/healthcheck/TestAddDropAlert.java     |   4 +-
 .../helix/healthcheck/TestExpandAlert.java      |   4 +-
 .../helix/healthcheck/TestSimpleAlert.java      |   4 +-
 .../healthcheck/TestSimpleWildcardAlert.java    |   4 +-
 .../helix/healthcheck/TestStalenessAlert.java   |   4 +-
 .../helix/healthcheck/TestWildcardAlert.java    |   4 +-
 .../helix/integration/TestAutoRebalance.java    |   2 +-
 .../TestAutoRebalancePartitionLimit.java        |  12 +-
 .../integration/TestCleanupExternalView.java    |   4 +-
 .../TestCustomizedIdealStateRebalancer.java     |  10 +-
 .../org/apache/helix/integration/TestDrop.java  |   2 +-
 .../TestEnablePartitionDuringDisable.java       |   4 +-
 .../helix/integration/TestHelixInstanceTag.java |   4 +-
 .../TestMessagePartitionStateMismatch.java      |   6 +-
 .../helix/integration/TestMessagingService.java |   2 +-
 .../integration/TestResetPartitionState.java    |   6 +-
 .../helix/integration/TestSchedulerMessage.java |  18 +-
 .../integration/TestStateTransitionTimeout.java |   8 +-
 .../helix/integration/TestStatusUpdate.java     |   2 +-
 .../helix/manager/zk/TestZkClusterManager.java  |  10 +-
 .../helix/messaging/TestAsyncCallbackSvc.java   |  10 +-
 .../handling/TestHelixTaskExecutor.java         |   8 +-
 .../helix/mock/participant/ErrTransition.java   |   4 +-
 .../apache/helix/tools/TestHelixAdminCli.java   |   6 +-
 helix-examples/src/main/config/log4j.properties |  31 +++
 .../examples/MasterSlaveStateModelFactory.java  |  16 +-
 .../org/apache/helix/examples/Quickstart.java   |   2 +-
 .../helix/lockmanager/LockManagerDemo.java      |   2 +-
 .../helix/filestore/FileStoreStateModel.java    |  24 +-
 .../org/apache/helix/taskexecution/Task.java    |   2 +-
 111 files changed, 1921 insertions(+), 742 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/433b0011/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java
index 59d9174..2e48de4 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java
@@ -117,7 +117,7 @@ public class SchedulerTasksResource extends Resource {
         ClusterRepresentationUtil.getClusterDataAccessor(zkClient, clusterName);
     LiveInstance liveInstance =
         accessor.getProperty(accessor.keyBuilder().liveInstance(instanceName));
-    String sessionId = liveInstance.getSessionId();
+    String sessionId = liveInstance.getSessionIdString();
 
     StringRepresentation representation = new StringRepresentation("");// (ClusterRepresentationUtil.ObjectToJson(instanceConfigs),
                                                                        // MediaType.APPLICATION_JSON);
@@ -158,7 +158,7 @@ public class SchedulerTasksResource extends Resource {
 
       schedulerMessage.getRecord().getMapFields().put(MESSAGETEMPLATE, messageTemplate);
 
-      schedulerMessage.setTgtSessionId(leader.getSessionId());
+      schedulerMessage.setTgtSessionId(leader.getSessionIdString());
       schedulerMessage.setTgtName("CONTROLLER");
       schedulerMessage.setSrcInstanceType(InstanceType.CONTROLLER);
       String taskQueueName =
@@ -167,22 +167,22 @@ public class SchedulerTasksResource extends Resource {
         schedulerMessage.getRecord().setSimpleField(
             DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, taskQueueName);
       }
-      accessor.setProperty(accessor.keyBuilder().controllerMessage(schedulerMessage.getMsgId()),
+      accessor.setProperty(accessor.keyBuilder().controllerMessage(schedulerMessage.getMsgIdString()),
           schedulerMessage);
 
       Map<String, String> resultMap = new HashMap<String, String>();
       resultMap.put("StatusUpdatePath", PropertyPathConfig.getPath(
           PropertyType.STATUSUPDATES_CONTROLLER, clusterName, MessageType.SCHEDULER_MSG.toString(),
-          schedulerMessage.getMsgId()));
+          schedulerMessage.getMsgIdString()));
       resultMap.put("MessageType", Message.MessageType.SCHEDULER_MSG.toString());
-      resultMap.put("MsgId", schedulerMessage.getMsgId());
+      resultMap.put("MsgId", schedulerMessage.getMsgIdString());
 
       // Assemble the rest URL for task status update
       String ipAddress = InetAddress.getLocalHost().getCanonicalHostName();
       String url =
           "http://" + ipAddress + ":" + getContext().getAttributes().get(RestAdminApplication.PORT)
               + "/clusters/" + clusterName + "/Controller/statusUpdates/SCHEDULER_MSG/"
-              + schedulerMessage.getMsgId();
+              + schedulerMessage.getMsgIdString();
       resultMap.put("statusUpdateUrl", url);
 
       getResponse().setEntity(ClusterRepresentationUtil.ObjectToJson(resultMap),

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/433b0011/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestResetPartitionState.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestResetPartitionState.java b/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestResetPartitionState.java
index c8099a4..c3980bf 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestResetPartitionState.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestResetPartitionState.java
@@ -67,8 +67,8 @@ public class TestResetPartitionState extends AdminTestBase {
     @Override
     public void doTransition(Message message, NotificationContext context) {
       super.doTransition(message, context);
-      String fromState = message.getFromState();
-      String toState = message.getToState();
+      String fromState = message.getFromStateString();
+      String toState = message.getToStateString();
       if (fromState.equals("ERROR") && toState.equals("OFFLINE")) {
         // System.err.println("doReset() invoked");
         _errToOfflineInvoked.incrementAndGet();
@@ -191,7 +191,7 @@ public class TestResetPartitionState extends AdminTestBase {
     Builder keyBuilder = accessor.keyBuilder();
 
     LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instance));
-    accessor.removeProperty(keyBuilder.stateTransitionStatus(instance, liveInstance.getSessionId(),
+    accessor.removeProperty(keyBuilder.stateTransitionStatus(instance, liveInstance.getSessionIdString(),
         resource, partition));
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/433b0011/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModel.java
----------------------------------------------------------------------
diff --git a/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModel.java b/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModel.java
index 313f430..393da23 100644
--- a/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModel.java
+++ b/helix-agent/src/main/java/org/apache/helix/agent/AgentStateModel.java
@@ -71,8 +71,8 @@ public class AgentStateModel extends StateModel {
 
     HelixManager manager = context.getManager();
     String clusterName = manager.getClusterName();
-    String fromState = message.getFromState();
-    String toState = message.getToState();
+    String fromState = message.getFromStateString();
+    String toState = message.getToStateString();
 
     // construct keys for command-config
     String cmdKey = buildKey(fromState, toState, CommandAttribute.COMMAND);
@@ -112,8 +112,8 @@ public class AgentStateModel extends StateModel {
     }
 
     if (cmd == null) {
-      throw new Exception("Unable to find command for transition from:" + message.getFromState()
-          + " to:" + message.getToState());
+      throw new Exception("Unable to find command for transition from:" + message.getFromStateString()
+          + " to:" + message.getToStateString());
     }
     _logger.info("Executing command: " + cmd + ", using workingDir: " + workingDir + ", timeout: "
         + timeout + ", on " + manager.getInstanceName());

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/433b0011/helix-core/src/main/java/org/apache/helix/PropertyKey.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
index 0874958..2f20319 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
@@ -305,6 +305,14 @@ public class PropertyKey {
     }
 
     /**
+     * Get a property key associated with all instances
+     * @return {@link PropertyKey}
+     */
+    public PropertyKey instance(String instanceName) {
+      return new PropertyKey(PropertyType.INSTANCES, null, _clusterName, instanceName);
+    }
+
+    /**
      * Get a property key associated with {@link Message} for an instance
      * @param instanceName
      * @return {@link PropertyKey}
@@ -338,6 +346,16 @@ public class PropertyKey {
      * @param sessionId
      * @return {@link PropertyKey}
      */
+    public PropertyKey currentStates(String instanceName) {
+      return new PropertyKey(CURRENTSTATES, CurrentState.class, _clusterName, instanceName);
+    }
+
+    /**
+     * Get a property key associated with {@link CurrentState} of an instance and session
+     * @param instanceName
+     * @param sessionId
+     * @return {@link PropertyKey}
+     */
     public PropertyKey currentStates(String instanceName, String sessionId) {
       return new PropertyKey(CURRENTSTATES, CurrentState.class, _clusterName, instanceName,
           sessionId);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/433b0011/helix-core/src/main/java/org/apache/helix/api/Cluster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Cluster.java b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
index 5b149d9..07deca6 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Cluster.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
@@ -61,53 +61,25 @@ public class Cluster {
 
   private final ClusterConfig _config = null;
 
-  // TODO move construct logic to ClusterAccessor
   /**
-   * Construct a cluster
-   * @param id a unique id for the cluster
-   * @param idealStateMap map of resource-id to ideal-state
-   * @param currentStateMap map of resource-id to map of participant-id to current-state
-   * @param instanceConfigMap map of participant-id to instance-config
-   * @param liveInstanceMap map of participant-id to live-instance
-   * @param msgMap map of participant-id to map of message-id to message
-   * @param leader
+   * construct a cluster
+   * @param id
+   * @param resourceMap
+   * @param participantMap
+   * @param controllerMap
+   * @param leaderId
    */
-  public Cluster(ClusterId id, Map<String, IdealState> idealStateMap,
-      Map<String, Map<String, CurrentState>> currentStateMap,
-      Map<String, InstanceConfig> instanceConfigMap, Map<String, LiveInstance> liveInstanceMap,
-      Map<String, Map<String, Message>> msgMap, LiveInstance leader) {
-    _id = id;
+  public Cluster(ClusterId id, Map<ResourceId, Resource> resourceMap,
+      Map<ParticipantId, Participant> participantMap, Map<ControllerId, Controller> controllerMap,
+      ControllerId leaderId) {
 
-    Map<ResourceId, Resource> resourceMap = new HashMap<ResourceId, Resource>();
-    for (String resourceId : idealStateMap.keySet()) {
-      IdealState idealState = idealStateMap.get(resourceId);
-      Map<String, CurrentState> curStateMap = currentStateMap.get(resourceId);
+    _id = id;
 
-      // TODO pass resource assignment
-      resourceMap.put(new ResourceId(resourceId), new Resource(new ResourceId(resourceId),
-          idealState, null));
-    }
     _resourceMap = ImmutableMap.copyOf(resourceMap);
 
-    Map<ParticipantId, Participant> participantMap = new HashMap<ParticipantId, Participant>();
-    for (String participantId : instanceConfigMap.keySet()) {
-      InstanceConfig instanceConfig = instanceConfigMap.get(participantId);
-      LiveInstance liveInstance = liveInstanceMap.get(participantId);
-      Map<String, Message> instanceMsgMap = msgMap.get(participantId);
-
-      // TODO pass current-state map
-      participantMap.put(new ParticipantId(participantId), new Participant(new ParticipantId(
-          participantId), instanceConfig, liveInstance, null, instanceMsgMap));
-    }
     _participantMap = ImmutableMap.copyOf(participantMap);
 
-    Map<ControllerId, Controller> controllerMap = new HashMap<ControllerId, Controller>();
-    if (leader != null) {
-      _leaderId = new ControllerId(leader.getId());
-      controllerMap.put(_leaderId, new Controller(_leaderId, leader, true));
-    } else {
-      _leaderId = null;
-    }
+    _leaderId = leaderId;
 
     // TODO impl this when we persist controllers and spectators on zookeeper
     _controllerMap = ImmutableMap.copyOf(controllerMap);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/433b0011/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
index 1b43e6b..ac3b715 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
@@ -19,68 +19,234 @@ package org.apache.helix.api;
  * under the License.
  */
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.PauseSignal;
 
 public class ClusterAccessor {
   private final HelixDataAccessor _accessor;
+  private final PropertyKey.Builder _keyBuilder;
   private final ClusterId _clusterId;
 
-  public ClusterAccessor(HelixDataAccessor accessor) {
+  public ClusterAccessor(ClusterId clusterId, HelixDataAccessor accessor) {
     _accessor = accessor;
-    _clusterId = null;
+    _keyBuilder = accessor.keyBuilder();
+    _clusterId = clusterId;
   }
 
   /**
-   * 
+   * create a new cluster
    */
-  public void create() {
-
+  public void createCluster() {
+    List<PropertyKey> createKeys = new ArrayList<PropertyKey>();
+
+    createKeys.add(_keyBuilder.idealStates());
+    createKeys.add(_keyBuilder.clusterConfigs());
+    createKeys.add(_keyBuilder.instanceConfigs());
+    createKeys.add(_keyBuilder.resourceConfigs());
+    createKeys.add(_keyBuilder.instances());
+    createKeys.add(_keyBuilder.liveInstances());
+    createKeys.add(_keyBuilder.externalViews());
+    createKeys.add(_keyBuilder.controller());
+
+    // TODO add controller sub-dir's and state model definitions
+    for (PropertyKey key : createKeys) {
+      _accessor.createProperty(key, null);
+    }
   }
 
   /**
-   * @return
+   * read entire cluster data
+   * @return cluster
    */
-  public Cluster read() {
-    return null;
+  public Cluster readCluster() {
+    /**
+     * map of instance-id to instance-config
+     */
+    Map<String, InstanceConfig> instanceConfigMap =
+        _accessor.getChildValuesMap(_keyBuilder.instanceConfigs());
+
+    /**
+     * map of resource-id to ideal-state
+     */
+    Map<String, IdealState> idealStateMap = _accessor.getChildValuesMap(_keyBuilder.idealStates());
+
+    /**
+     * map of instance-id to live-instance
+     */
+    Map<String, LiveInstance> liveInstanceMap =
+        _accessor.getChildValuesMap(_keyBuilder.liveInstances());
+
+    /**
+     * map of participant-id to map of message-id to message
+     */
+    Map<String, Map<String, Message>> messageMap = new HashMap<String, Map<String, Message>>();
+    for (String instanceName : liveInstanceMap.keySet()) {
+      Map<String, Message> instanceMsgMap =
+          _accessor.getChildValuesMap(_keyBuilder.messages(instanceName));
+      messageMap.put(instanceName, instanceMsgMap);
+    }
+
+    /**
+     * map of participant-id to map of resource-id to current-state
+     */
+    Map<String, Map<String, CurrentState>> currentStateMap =
+        new HashMap<String, Map<String, CurrentState>>();
+    for (String participantId : liveInstanceMap.keySet()) {
+      LiveInstance liveInstance = liveInstanceMap.get(participantId);
+      SessionId sessionId = liveInstance.getSessionId();
+      Map<String, CurrentState> instanceCurStateMap =
+          _accessor.getChildValuesMap(_keyBuilder.currentStates(participantId,
+              sessionId.stringify()));
+
+      currentStateMap.put(participantId, instanceCurStateMap);
+    }
+
+    LiveInstance leader = _accessor.getProperty(_keyBuilder.controllerLeader());
+
+    Map<ResourceId, Resource> resourceMap = new HashMap<ResourceId, Resource>();
+    for (String resourceName : idealStateMap.keySet()) {
+      IdealState idealState = idealStateMap.get(resourceName);
+
+      // TODO pass resource assignment
+      ResourceId resourceId = new ResourceId(resourceName);
+      resourceMap.put(resourceId, new Resource(resourceId, idealState, null));
+    }
+
+    Map<ParticipantId, Participant> participantMap = new HashMap<ParticipantId, Participant>();
+    for (String participantName : instanceConfigMap.keySet()) {
+      InstanceConfig instanceConfig = instanceConfigMap.get(participantName);
+      LiveInstance liveInstance = liveInstanceMap.get(participantName);
+      Map<String, Message> instanceMsgMap = messageMap.get(participantName);
+
+      // TODO pass current-state map
+      ParticipantId participantId = new ParticipantId(participantName);
+
+      // TODO construct participant
+      participantMap.put(participantId, new Participant(participantId, null, -1, false, null, null,
+          null, null, null));
+    }
+
+    Map<ControllerId, Controller> controllerMap = new HashMap<ControllerId, Controller>();
+    ControllerId leaderId = null;
+    if (leader != null) {
+      leaderId = new ControllerId(leader.getId());
+      controllerMap.put(leaderId, new Controller(leaderId, leader, true));
+    }
+
+    return new Cluster(_clusterId, resourceMap, participantMap, controllerMap, leaderId);
   }
 
   /**
+   * pause controller of cluster
    */
-  public void pause() {
-
+  public void pauseCluster() {
+    _accessor.createProperty(_keyBuilder.pause(), new PauseSignal("pause"));
   }
 
   /**
+   * resume controller of cluster
    */
   public void resume() {
-
+    _accessor.removeProperty(_keyBuilder.pause());
   }
 
   /**
+   * add a resource to cluster
    * @param resource
    */
-  public void addResource(Resource resource) {
-
+  public void addResourceToCluster(Resource resource) {
+    StateModelDefId stateModelDefId = resource.getRebalancerConfig().getStateModelDefId();
+    if (_accessor.getProperty(_keyBuilder.stateModelDef(stateModelDefId.stringify())) == null) {
+      throw new HelixException("State model: " + stateModelDefId + " not found in cluster: "
+          + _clusterId);
+    }
+
+    ResourceId resourceId = resource.getId();
+    if (_accessor.getProperty(_keyBuilder.idealStates(resourceId.stringify())) != null) {
+      throw new HelixException("Skip adding resource: " + resourceId
+          + " . Resource ideal state already exists in cluster: " + _clusterId);
+    }
+
+    // TODO convert rebalancerConfig to idealState
+    _accessor.createProperty(_keyBuilder.idealStates(resourceId.stringify()), null);
   }
 
   /**
+   * drop a resource from cluster
    * @param resourceId
    */
-  public void dropResource(ResourceId resourceId) {
+  public void dropResourceFromCluster(ResourceId resourceId) {
+    // TODO check existence
+    _accessor.removeProperty(_keyBuilder.idealStates(resourceId.stringify()));
+    _accessor.removeProperty(_keyBuilder.resourceConfig(resourceId.stringify()));
+  }
 
+  /**
+   * check if cluster structure is valid
+   * @return true if valid or false otherwise
+   */
+  public boolean isClusterStructureValid() {
+    // TODO impl this
+    return false;
   }
 
   /**
+   * add a participant to cluster
    * @param participant
    */
-  public void addParticipant(Participant participant) {
-
+  public void addParticipantToCluster(Participant participant) {
+    if (!isClusterStructureValid()) {
+      throw new HelixException("Cluster: " + _clusterId + " structure is not valid");
+    }
+
+    ParticipantId participantId = participant.getId();
+    if (_accessor.getProperty(_keyBuilder.instanceConfig(participantId.stringify())) != null) {
+      throw new HelixException("Config for participant: " + participantId
+          + " already exists in cluster: " + _clusterId);
+    }
+
+    List<PropertyKey> createKeys = new ArrayList<PropertyKey>();
+    createKeys.add(_keyBuilder.instanceConfig(participantId.stringify()));
+    createKeys.add(_keyBuilder.messages(participantId.stringify()));
+    createKeys.add(_keyBuilder.currentStates(participantId.stringify()));
+    // TODO add participant error and status-update paths
+
+    for (PropertyKey key : createKeys) {
+      _accessor.createProperty(key, null);
+    }
   }
 
   /**
+   * drop a participant from cluster
    * @param participantId
    */
-  public void dropParticipant(ParticipantId participantId) {
-
+  public void dropParticipantFromCluster(ParticipantId participantId) {
+    if (_accessor.getProperty(_keyBuilder.instanceConfig(participantId.stringify())) == null) {
+      throw new HelixException("Config for participant: " + participantId
+          + " does NOT exist in cluster: " + _clusterId);
+    }
+
+    if (_accessor.getProperty(_keyBuilder.instance(participantId.stringify())) == null) {
+      throw new HelixException("Participant: " + participantId
+          + " structure does NOT exist in cluster: " + _clusterId);
+    }
+
+    // delete participant config path
+    _accessor.removeProperty(_keyBuilder.instanceConfig(participantId.stringify()));
+
+    // delete participant path
+    _accessor.removeProperty(_keyBuilder.instance(participantId.stringify()));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/433b0011/helix-core/src/main/java/org/apache/helix/api/ClusterReader.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ClusterReader.java b/helix-core/src/main/java/org/apache/helix/api/ClusterReader.java
deleted file mode 100644
index 12a41ac..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/ClusterReader.java
+++ /dev/null
@@ -1,154 +0,0 @@
-package org.apache.helix.api;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.model.CurrentState;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Message;
-
-/**
- * Read a cluster from zookeeper
- */
-public class ClusterReader {
-  final ZkClient _client;
-
-  public ClusterReader(ZkClient client) {
-    _client = client;
-  }
-
-  // TODO move to ClusterAccessor
-  /**
-   * Read the following znodes from zookeeper and construct a cluster instance
-   * - all instance-configs
-   * - all ideal-states
-   * - all live-instances
-   * - all messages
-   * - all current-states
-   * @param clusterId
-   * @return cluster or null if not exist
-   */
-  public Cluster readCluster(String clusterId) {
-    HelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterId, new ZkBaseDataAccessor<ZNRecord>(_client));
-    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-
-    /**
-     * map of instance-id to instance-config
-     */
-    Map<String, InstanceConfig> instanceConfigMap =
-        accessor.getChildValuesMap(keyBuilder.instanceConfigs());
-
-    /**
-     * map of resource-id to ideal-state
-     */
-    Map<String, IdealState> idealStateMap = accessor.getChildValuesMap(keyBuilder.idealStates());
-
-    /**
-     * map of instance-id to live-instance
-     */
-    Map<String, LiveInstance> liveInstanceMap =
-        accessor.getChildValuesMap(keyBuilder.liveInstances());
-
-    /**
-     * map of participant-id to map of message-id to message
-     */
-    Map<String, Map<String, Message>> messageMap = new HashMap<String, Map<String, Message>>();
-    for (String instanceName : liveInstanceMap.keySet()) {
-      Map<String, Message> instanceMsgMap =
-          accessor.getChildValuesMap(keyBuilder.messages(instanceName));
-      messageMap.put(instanceName, instanceMsgMap);
-    }
-
-    /**
-     * map of resource-id to map of participant-id to current-state
-     */
-    Map<String, Map<String, CurrentState>> currentStateMap =
-        new HashMap<String, Map<String, CurrentState>>();
-    for (String participantId : liveInstanceMap.keySet()) {
-      LiveInstance liveInstance = liveInstanceMap.get(participantId);
-      String sessionId = liveInstance.getSessionId();
-      Map<String, CurrentState> instanceCurStateMap =
-          accessor.getChildValuesMap(keyBuilder.currentStates(participantId, sessionId));
-
-      for (String resourceId : instanceCurStateMap.keySet()) {
-        if (!currentStateMap.containsKey(resourceId)) {
-          currentStateMap.put(resourceId, new HashMap<String, CurrentState>());
-        }
-
-        currentStateMap.get(resourceId).put(participantId, instanceCurStateMap.get(resourceId));
-      }
-    }
-
-    return new Cluster(new ClusterId(clusterId), idealStateMap, currentStateMap, instanceConfigMap,
-        liveInstanceMap, messageMap, null);
-  }
-
-  /**
-   * simple test
-   * @param args
-   */
-  public static void main(String[] args) {
-    ZkClient client =
-        new ZkClient("zzhang-ld", ZkClient.DEFAULT_SESSION_TIMEOUT,
-            ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
-
-    ClusterReader reader = new ClusterReader(client);
-    Cluster cluster = reader.readCluster("ESPRESSO_STORAGE");
-
-    Map<ParticipantId, Participant> participantMap = cluster.getParticipantMap();
-    for (ParticipantId participantId : participantMap.keySet()) {
-      Participant participant = participantMap.get(participantId);
-      System.out.println(participantId + " - " + participant.isEnabled());
-      if (participant.isAlive()) {
-        System.out.println("\t" + participant.getRunningInstance().getSessionId());
-      }
-    }
-
-    Map<ResourceId, Resource> resourceMap = cluster.getResourceMap();
-    for (ResourceId resourceId : resourceMap.keySet()) {
-      Resource resource = resourceMap.get(resourceId);
-      // System.out.println(resourceId + " - " + resource.getStateModelDefId());
-
-      // TODO fix it
-      //
-      // Map<ParticipantId, CurState> curStateMap = resource.getCurrentStateMap();
-      // for (ParticipantId participantId : curStateMap.keySet()) {
-      // System.out.println("\t" + participantId);
-      // CurState curState = curStateMap.get(participantId);
-      // for (PartitionId partitionId : curState.getPartitionIdSet()) {
-      // State state = curState.getState(partitionId);
-      // System.out.println("\t\t" + partitionId + " - " + state);
-      // }
-      // }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/433b0011/helix-core/src/main/java/org/apache/helix/api/Controller.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Controller.java b/helix-core/src/main/java/org/apache/helix/api/Controller.java
index df28571..d056e13 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Controller.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Controller.java
@@ -38,8 +38,8 @@ public class Controller {
 
     if (liveInstance != null) {
       _runningInstance =
-          new RunningInstance(new SessionId(liveInstance.getSessionId()), new HelixVersion(
-              liveInstance.getHelixVersion()), new ProcId(liveInstance.getLiveInstance()));
+          new RunningInstance(new SessionId(liveInstance.getSessionIdString()), new HelixVersion(
+              liveInstance.getHelixVersionString()), new ProcId(liveInstance.getLiveInstance()));
     } else {
       _runningInstance = null;
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/433b0011/helix-core/src/main/java/org/apache/helix/api/ControllerAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ControllerAccessor.java b/helix-core/src/main/java/org/apache/helix/api/ControllerAccessor.java
index fb3f844..3a7b3b6 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ControllerAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ControllerAccessor.java
@@ -29,9 +29,10 @@ public class ControllerAccessor {
   }
 
   /**
+   * create leader
    * @param controllerId
    */
   public void start(ControllerId controllerId) {
-
+    // TODO impl this
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/433b0011/helix-core/src/main/java/org/apache/helix/api/CurState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/CurState.java b/helix-core/src/main/java/org/apache/helix/api/CurState.java
index e66fb7a..b6f64a9 100644
--- a/helix-core/src/main/java/org/apache/helix/api/CurState.java
+++ b/helix-core/src/main/java/org/apache/helix/api/CurState.java
@@ -50,7 +50,7 @@ public class CurState {
     _participantId = participantId;
 
     Map<PartitionId, State> stateMap = new HashMap<PartitionId, State>();
-    Map<String, String> currentStateMap = currentState.getPartitionStateMap();
+    Map<String, String> currentStateMap = currentState.getPartitionStateStringMap();
     for (String partitionId : currentStateMap.keySet()) {
       String state = currentStateMap.get(partitionId);
       stateMap.put(new PartitionId(resourceId, PartitionId.stripResourceId(partitionId)),

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/433b0011/helix-core/src/main/java/org/apache/helix/api/CurStateAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/CurStateAccessor.java b/helix-core/src/main/java/org/apache/helix/api/CurStateAccessor.java
deleted file mode 100644
index 1b8e2ce..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/CurStateAccessor.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package org.apache.helix.api;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.HelixDataAccessor;
-
-public class CurStateAccessor {
-  private final HelixDataAccessor _accessor;
-
-  public CurStateAccessor(HelixDataAccessor accessor) {
-    _accessor = accessor;
-  }
-
-  /**
-   * @param curStateUpdate current state change delta
-   */
-  public void updateCurState(ParticipantId participantId, ResourceId resourceId,
-      CurState curStateUpdate) {
-    // accessor.updateProperty()
-  }
-
-  /**
-   * 
-   */
-  public void drop(ParticipantId participantId, ResourceId resourceId) {
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/433b0011/helix-core/src/main/java/org/apache/helix/api/ExtViewAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ExtViewAccessor.java b/helix-core/src/main/java/org/apache/helix/api/ExtViewAccessor.java
deleted file mode 100644
index 41406ff..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/ExtViewAccessor.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package org.apache.helix.api;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.HelixDataAccessor;
-
-public class ExtViewAccessor {
-
-  private final HelixDataAccessor _accessor;
-
-  public ExtViewAccessor(HelixDataAccessor accessor) {
-    _accessor = accessor;
-  }
-
-  /**
-   * @param extView
-   */
-  public void setExternalView(ExtView extView) {
-
-  }
-
-  /**
-   * 
-   */
-  public void drop(ResourceId resourceId) {
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/433b0011/helix-core/src/main/java/org/apache/helix/api/HelixVersion.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/HelixVersion.java b/helix-core/src/main/java/org/apache/helix/api/HelixVersion.java
index 84d0a8f..a32c9c7 100644
--- a/helix-core/src/main/java/org/apache/helix/api/HelixVersion.java
+++ b/helix-core/src/main/java/org/apache/helix/api/HelixVersion.java
@@ -48,4 +48,18 @@ public class HelixVersion {
   public String getMinor() {
     return null;
   }
+
+  @Override
+  public String toString() {
+    return _version;
+  }
+
+  /**
+   * Create a version from a version string
+   * @param version string in the form of a.b.c.d
+   * @return HelixVersion
+   */
+  public static HelixVersion from(String version) {
+    return new HelixVersion(version);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/433b0011/helix-core/src/main/java/org/apache/helix/api/Id.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Id.java b/helix-core/src/main/java/org/apache/helix/api/Id.java
index 96ce15d..d31084a 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Id.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Id.java
@@ -51,4 +51,88 @@ public abstract class Id implements Comparable<Id> {
     return -1;
   }
 
+  /**
+   * Get a concrete resource id for a string name
+   * @param resourceId string resource identifier
+   * @return ResourceId
+   */
+  public static ResourceId resource(String resourceId) {
+    if (resourceId == null) {
+      return null;
+    }
+    return new ResourceId(resourceId);
+  }
+
+  /**
+   * Get a concrete partition id
+   * @param partitionId string partition identifier
+   * @return PartitionId
+   */
+  public static PartitionId partition(String partitionId) {
+    if (partitionId == null) {
+      return null;
+    }
+    return new PartitionId(PartitionId.extractResourceId(partitionId),
+        PartitionId.stripResourceId(partitionId));
+  }
+
+  /**
+   * Get a concrete participant id
+   * @param participantId string participant identifier
+   * @return ParticipantId
+   */
+  public static ParticipantId participant(String participantId) {
+    if (participantId == null) {
+      return null;
+    }
+    return new ParticipantId(participantId);
+  }
+
+  /**
+   * Get a concrete session id
+   * @param sessionId string session identifier
+   * @return SessionId
+   */
+  public static SessionId session(String sessionId) {
+    if (sessionId == null) {
+      return null;
+    }
+    return new SessionId(sessionId);
+  }
+
+  /**
+   * Get a concrete process id
+   * @param procId string process identifier (e.g. pid@host)
+   * @return ProcId
+   */
+  public static ProcId process(String processId) {
+    if (processId == null) {
+      return null;
+    }
+    return new ProcId(processId);
+  }
+
+  /**
+   * Get a concrete state model definition id
+   * @param stateModelDefId string state model identifier
+   * @return StateModelDefId
+   */
+  public static StateModelDefId stateModelDef(String stateModelDefId) {
+    if (stateModelDefId == null) {
+      return null;
+    }
+    return new StateModelDefId(stateModelDefId);
+  }
+
+  /**
+   * Get a concrete message id
+   * @param messageId string message identifier
+   * @return MsgId
+   */
+  public static MessageId message(String messageId) {
+    if (messageId == null) {
+      return null;
+    }
+    return new MessageId(messageId);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/433b0011/helix-core/src/main/java/org/apache/helix/api/MessageId.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/MessageId.java b/helix-core/src/main/java/org/apache/helix/api/MessageId.java
new file mode 100644
index 0000000..5d271c8
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/MessageId.java
@@ -0,0 +1,34 @@
+package org.apache.helix.api;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public class MessageId extends Id {
+
+  private final String _id;
+
+  public MessageId(String id) {
+    _id = id;
+  }
+
+  @Override
+  public String stringify() {
+    return _id;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/433b0011/helix-core/src/main/java/org/apache/helix/api/Msg.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Msg.java b/helix-core/src/main/java/org/apache/helix/api/Msg.java
index ac7638d..229d742 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Msg.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Msg.java
@@ -25,7 +25,7 @@ import org.apache.helix.model.Message;
  * Helix message
  */
 public class Msg {
-  private final MsgId _id;
+  private final MessageId _id;
   private final SessionId _srcSessionId;
   private final SessionId _tgtSessionid;
 
@@ -36,16 +36,16 @@ public class Msg {
    * @param message
    */
   public Msg(Message message) {
-    _id = new MsgId(message.getId());
-    _srcSessionId = new SessionId(message.getSrcSessionId());
-    _tgtSessionid = new SessionId(message.getTgtSessionId());
+    _id = new MessageId(message.getId());
+    _srcSessionId = new SessionId(message.getSrcSessionIdString());
+    _tgtSessionid = new SessionId(message.getTgtSessionIdString());
   }
 
   /**
    * Get message id
    * @return message id
    */
-  public MsgId getId() {
+  public MessageId getId() {
     return _id;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/433b0011/helix-core/src/main/java/org/apache/helix/api/MsgId.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/MsgId.java b/helix-core/src/main/java/org/apache/helix/api/MsgId.java
deleted file mode 100644
index 88eb448..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/MsgId.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package org.apache.helix.api;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-public class MsgId extends Id {
-
-  private final String _id;
-
-  public MsgId(String id) {
-    _id = id;
-  }
-
-  @Override
-  public String stringify() {
-    return _id;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/433b0011/helix-core/src/main/java/org/apache/helix/api/Participant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Participant.java b/helix-core/src/main/java/org/apache/helix/api/Participant.java
index e3eb68e..f7a9ed0 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Participant.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Participant.java
@@ -46,7 +46,7 @@ public class Participant {
   /**
    * set of disabled partition id's
    */
-  private final Set<PartitionId> _disabledPartitionIds;
+  private final Set<PartitionId> _disabledPartitionIdSet;
   private final Set<String> _tags;
 
   private final RunningInstance _runningInstance;
@@ -54,81 +54,29 @@ public class Participant {
   /**
    * map of resource-id to current-state
    */
-  private final Map<ResourceId, CurState> _currentStateMap;
+  private final Map<ResourceId, CurrentState> _currentStateMap;
 
   /**
    * map of message-id to message
    */
-  private final Map<MsgId, Msg> _messageMap;
+  private final Map<MessageId, Message> _messageMap;
 
-  // TODO move this to ParticipantAccessor
   /**
    * Construct a participant
    * @param config
    */
-  public Participant(ParticipantId id, InstanceConfig config, LiveInstance liveInstance,
-      Map<String, CurrentState> currentStateMap, Map<String, Message> instanceMsgMap) {
+  public Participant(ParticipantId id, String hostName, int port, boolean isEnabled,
+      Set<PartitionId> disabledPartitionIdSet, Set<String> tags, RunningInstance runningInstance,
+      Map<ResourceId, CurrentState> currentStateMap, Map<MessageId, Message> messageMap) {
     _id = id;
-    _hostName = config.getHostName();
-
-    int port = -1;
-    try {
-      port = Integer.parseInt(config.getPort());
-    } catch (IllegalArgumentException e) {
-      // keep as -1
-    }
-    if (port < 0 || port > 65535) {
-      port = -1;
-    }
+    _hostName = hostName;
     _port = port;
-    _isEnabled = config.getInstanceEnabled();
-
-    List<String> disabledPartitions = config.getDisabledPartitions();
-    if (disabledPartitions == null) {
-      _disabledPartitionIds = Collections.emptySet();
-    } else {
-      Set<PartitionId> disabledPartitionSet = new HashSet<PartitionId>();
-      for (String partitionId : disabledPartitions) {
-        disabledPartitionSet.add(new PartitionId(PartitionId.extracResourceId(partitionId),
-            PartitionId.stripResourceId(partitionId)));
-      }
-      _disabledPartitionIds = ImmutableSet.copyOf(disabledPartitionSet);
-    }
-
-    List<String> tags = config.getTags();
-    if (tags == null) {
-      _tags = Collections.emptySet();
-    } else {
-      _tags = ImmutableSet.copyOf(config.getTags());
-    }
-
-    if (liveInstance != null) {
-      _runningInstance =
-          new RunningInstance(new SessionId(liveInstance.getSessionId()), new HelixVersion(
-              liveInstance.getHelixVersion()), new ProcId(liveInstance.getLiveInstance()));
-    } else {
-      _runningInstance = null;
-    }
-
-    // TODO set curstate
-    // Map<ParticipantId, CurState> curStateMap = new HashMap<ParticipantId, CurState>();
-    // if (currentStateMap != null) {
-    // for (String participantId : currentStateMap.keySet()) {
-    // CurState curState =
-    // new CurState(_id, new ParticipantId(participantId), currentStateMap.get(participantId));
-    // curStateMap.put(new ParticipantId(participantId), curState);
-    // }
-    // }
-    // _currentStateMap = ImmutableMap.copyOf(curStateMap);
-    _currentStateMap = null;
-
-    Map<MsgId, Msg> msgMap = new HashMap<MsgId, Msg>();
-    for (String msgId : instanceMsgMap.keySet()) {
-      Message message = instanceMsgMap.get(msgId);
-      msgMap.put(new MsgId(msgId), new Msg(message));
-    }
-    _messageMap = ImmutableMap.copyOf(msgMap);
-
+    _isEnabled = isEnabled;
+    _disabledPartitionIdSet = ImmutableSet.copyOf(disabledPartitionIdSet);
+    _tags = ImmutableSet.copyOf(tags);
+    _runningInstance = runningInstance;
+    _currentStateMap = ImmutableMap.copyOf(currentStateMap);
+    _messageMap = ImmutableMap.copyOf(messageMap);
   }
 
   /**
@@ -176,7 +124,7 @@ public class Participant {
    * @return set of disabled partition id's, or empty set if none
    */
   public Set<PartitionId> getDisablePartitionIds() {
-    return _disabledPartitionIds;
+    return _disabledPartitionIdSet;
   }
 
   /**
@@ -191,7 +139,7 @@ public class Participant {
    * Get message map
    * @return message map
    */
-  public Map<MsgId, Msg> getMessageMap() {
+  public Map<MessageId, Message> getMessageMap() {
     return _messageMap;
   }
 
@@ -199,7 +147,129 @@ public class Participant {
    * Get the current states of the resource
    * @return map of resource-id to current state, or empty map if none
    */
-  public Map<ResourceId, CurState> getCurrentStateMap() {
+  public Map<ResourceId, CurrentState> getCurrentStateMap() {
     return _currentStateMap;
   }
+
+  public ParticipantId getId() {
+    return _id;
+  }
+
+  /**
+   * Assemble a participant
+   */
+  public static class Builder {
+    private final ParticipantId _id;
+    private final Set<PartitionId> _disabledPartitions;
+    private final Set<String> _tags;
+    private final Map<ResourceId, CurrentState> _currentStateMap;
+    private final Map<MessageId, Message> _messageMap;
+    private String _hostName;
+    private int _port;
+    private boolean _isEnabled;
+    private RunningInstance _runningInstance;
+
+    /**
+     * Build a participant with a given id
+     * @param id participant id
+     */
+    public Builder(ParticipantId id) {
+      _id = id;
+      _disabledPartitions = new HashSet<PartitionId>();
+      _tags = new HashSet<String>();
+      _currentStateMap = new HashMap<ResourceId, CurrentState>();
+      _messageMap = new HashMap<MessageId, Message>();
+      _isEnabled = true;
+    }
+
+    /**
+     * Set the participant host name
+     * @param hostName reachable host when live
+     * @return Builder
+     */
+    public Builder hostName(String hostName) {
+      _hostName = hostName;
+      return this;
+    }
+
+    /**
+     * Set the participant port
+     * @param port port number
+     * @return Builder
+     */
+    public Builder port(int port) {
+      _port = port;
+      return this;
+    }
+
+    /**
+     * Set whether or not the participant is enabled
+     * @param isEnabled true if enabled, false otherwise
+     * @return Builder
+     */
+    public Builder enabled(boolean isEnabled) {
+      _isEnabled = isEnabled;
+      return this;
+    }
+
+    /**
+     * Add a partition to disable for this participant
+     * @param partitionId the partition to disable
+     * @return Builder
+     */
+    public Builder addDisabledPartition(PartitionId partitionId) {
+      _disabledPartitions.add(partitionId);
+      return this;
+    }
+
+    /**
+     * Add an arbitrary tag for this participant
+     * @param tag the tag to add
+     * @return Builder
+     */
+    public Builder addTag(String tag) {
+      _tags.add(tag);
+      return this;
+    }
+
+    /**
+     * Add live properties to participants that are running
+     * @param runningInstance live participant properties
+     * @return Builder
+     */
+    public Builder runningInstance(RunningInstance runningInstance) {
+      _runningInstance = runningInstance;
+      return this;
+    }
+
+    /**
+     * Add a resource current state for this participant
+     * @param resourceId the resource the current state corresponds to
+     * @param currentState the current state
+     * @return Builder
+     */
+    public Builder addCurrentState(ResourceId resourceId, CurrentState currentState) {
+      _currentStateMap.put(resourceId, currentState);
+      return this;
+    }
+
+    /**
+     * Add a message for the participant
+     * @param message message to add
+     * @return Builder
+     */
+    public Builder addMessage(Message message) {
+      _messageMap.put(new MessageId(message.getId()), message);
+      return this;
+    }
+
+    /**
+     * Assemble the participant
+     * @return instantiated Participant
+     */
+    public Participant build() {
+      return new Participant(_id, _hostName, _port, _isEnabled, _disabledPartitions, _tags,
+          _runningInstance, _currentStateMap, _messageMap);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/433b0011/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
index 03e0992..9de2c5a 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
@@ -19,80 +19,309 @@ package org.apache.helix.api;
  * under the License.
  */
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.I0Itec.zkclient.DataUpdater;
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.InstanceConfig.InstanceConfigProperty;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.log4j.Logger;
 
 public class ParticipantAccessor {
+  private static final Logger LOG = Logger.getLogger(ParticipantAccessor.class);
+
   private final HelixDataAccessor _accessor;
+  private final PropertyKey.Builder _keyBuilder;
+  private final ClusterId _clusterId;
 
-  public ParticipantAccessor(HelixDataAccessor accessor) {
+  public ParticipantAccessor(ClusterId clusterId, HelixDataAccessor accessor) {
+    _clusterId = clusterId;
     _accessor = accessor;
+    _keyBuilder = accessor.keyBuilder();
   }
 
   /**
-   * 
+   * @param participantId
+   * @param isEnabled
    */
-  public void disable(ParticipantId participantId) {
+  void enableParticipant(ParticipantId participantId, boolean isEnabled) {
+    if (_accessor.getProperty(_keyBuilder.instanceConfig(participantId.stringify())) == null) {
+      throw new HelixException("Config for participant: " + participantId
+          + " does NOT exist in cluster: " + _clusterId);
+    }
+
+    InstanceConfig config = new InstanceConfig(participantId.stringify());
+    config.setInstanceEnabled(isEnabled);
+    _accessor.updateProperty(_keyBuilder.instanceConfig(participantId.stringify()), config);
 
   }
 
   /**
-   * 
+   * disable participant
    */
-  public void enable(ParticipantId participantId) {
+  public void disableParticipant(ParticipantId participantId) {
+    enableParticipant(participantId, false);
+  }
 
+  /**
+   * enable participant
+   */
+  public void enableParticipant(ParticipantId participantId) {
+    enableParticipant(participantId, false);
   }
 
   /**
+   * create messages for participant
    * @param msgs
    */
-  public void insertMsgs(ParticipantId participantId, SessionId sessionId, Map<MsgId, Msg> msgs) {
+  public void insertMessagesToParticipant(ParticipantId participantId, Map<MessageId, Message> msgMap) {
+    List<PropertyKey> msgKeys = new ArrayList<PropertyKey>();
+    List<Message> msgs = new ArrayList<Message>();
+    for (MessageId msgId : msgMap.keySet()) {
+      msgKeys.add(_keyBuilder.message(participantId.stringify(), msgId.stringify()));
+      msgs.add(msgMap.get(msgId));
+    }
 
+    _accessor.createChildren(msgKeys, msgs);
   }
 
   /**
+   * set messages of participant
    * @param msgs
    */
-  public void updateMsgs(ParticipantId participantId, SessionId sessionId, Map<MsgId, Msg> msgs) {
-
+  public void setMessagesOfParticipant(ParticipantId participantId, Map<MessageId, Message> msgMap) {
+    List<PropertyKey> msgKeys = new ArrayList<PropertyKey>();
+    List<Message> msgs = new ArrayList<Message>();
+    for (MessageId msgId : msgMap.keySet()) {
+      msgKeys.add(_keyBuilder.message(participantId.stringify(), msgId.stringify()));
+      msgs.add(msgMap.get(msgId));
+    }
+    _accessor.setChildren(msgKeys, msgs);
   }
 
   /**
+   * delete messages from participant
    * @param msgIdSet
    */
-  public void deleteMsgs(ParticipantId participantId, SessionId sessionId, Set<MsgId> msgIdSet) {
+  public void deleteMessagesFromParticipant(ParticipantId participantId, Set<MessageId> msgIdSet) {
+    List<PropertyKey> msgKeys = new ArrayList<PropertyKey>();
+    for (MessageId msgId : msgIdSet) {
+      msgKeys.add(_keyBuilder.message(participantId.stringify(), msgId.stringify()));
+    }
 
+    // TODO impl batch remove
+    for (PropertyKey msgKey : msgKeys) {
+      _accessor.removeProperty(msgKey);
+    }
   }
 
   /**
-   * @param disablePartitionSet
+   * @param enabled
+   * @param participantId
+   * @param resourceId
+   * @param partitionIdSet
    */
-  public void disablePartitions(ParticipantId participantId, Set<PartitionId> disablePartitionSet) {
+  void enablePartitionsForParticipant(final boolean enabled, final ParticipantId participantId,
+      final ResourceId resourceId, final Set<PartitionId> partitionIdSet) {
+    // check instanceConfig exists
+    PropertyKey instanceConfigKey = _keyBuilder.instanceConfig(participantId.stringify());
+    if (_accessor.getProperty(instanceConfigKey) == null) {
+      throw new HelixException("Config for participant: " + participantId
+          + " does NOT exist in cluster: " + _clusterId);
+    }
+
+    // check resource exist. warn if not
+    IdealState idealState = _accessor.getProperty(_keyBuilder.idealStates(resourceId.stringify()));
+    if (idealState == null) {
+      LOG.warn("Disable partitions: " + partitionIdSet + " but Cluster: " + _clusterId
+          + ", resource: " + resourceId
+          + " does NOT exists. probably disable it during ERROR->DROPPED transtition");
 
+    } else {
+      // check partitions exist. warn if not
+      for (PartitionId partitionId : partitionIdSet) {
+        if ((idealState.getRebalanceMode() == RebalanceMode.SEMI_AUTO && idealState
+            .getPreferenceList(partitionId.stringify()) == null)
+            || (idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED && idealState
+                .getInstanceStateMap(partitionId.stringify()) == null)) {
+          LOG.warn("Cluster: " + _clusterId + ", resource: " + resourceId + ", partition: "
+              + partitionId + ", partition does not exist in ideal state");
+        }
+      }
+    }
+
+    // TODO merge list logic should go to znrecord updater
+    // update participantConfig
+    // could not use ZNRecordUpdater since it doesn't do listField merge/subtract
+    BaseDataAccessor<ZNRecord> baseAccessor = _accessor.getBaseDataAccessor();
+    final List<String> partitionNames = new ArrayList<String>();
+    for (PartitionId partitionId : partitionIdSet) {
+      partitionNames.add(partitionId.stringify());
+    }
+    baseAccessor.update(instanceConfigKey.getPath(), new DataUpdater<ZNRecord>() {
+      @Override
+      public ZNRecord update(ZNRecord currentData) {
+        if (currentData == null) {
+          throw new HelixException("Cluster: " + _clusterId + ", instance: " + participantId
+              + ", participant config is null");
+        }
+
+        // TODO: merge with InstanceConfig.setInstanceEnabledForPartition
+        List<String> list =
+            currentData.getListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString());
+        Set<String> disabledPartitions = new HashSet<String>();
+        if (list != null) {
+          disabledPartitions.addAll(list);
+        }
+
+        if (enabled) {
+          disabledPartitions.removeAll(partitionNames);
+        } else {
+          disabledPartitions.addAll(partitionNames);
+        }
+
+        list = new ArrayList<String>(disabledPartitions);
+        Collections.sort(list);
+        currentData.setListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString(), list);
+        return currentData;
+      }
+    }, AccessOption.PERSISTENT);
   }
 
   /**
-   * @param enablePartitionSet
+   * @param disablePartitionSet
    */
-  public void enablePartitions(ParticipantId participantId, Set<PartitionId> enablePartitionSet) {
+  public void disablePartitionsForParticipant(ParticipantId participantId, ResourceId resourceId,
+      Set<PartitionId> disablePartitionIdSet) {
+    enablePartitionsForParticipant(false, participantId, resourceId, disablePartitionIdSet);
+  }
 
+  /**
+   * @param enablePartitionSet
+   */
+  public void enablePartitionsForParticipant(ParticipantId participantId, ResourceId resourceId,
+      Set<PartitionId> enablePartitionIdSet) {
+    enablePartitionsForParticipant(true, participantId, resourceId, enablePartitionIdSet);
   }
 
   /**
    * create live instance for the participant
    * @param participantId
    */
-  public void start(ParticipantId participantId) {
-
+  public void startParticipant(ParticipantId participantId) {
+    // TODO impl this
   }
 
   /**
+   * read participant related data
    * @param participantId
    * @return
    */
-  public Participant read(ParticipantId participantId) {
-    return null;
+  public Participant readParticipant(ParticipantId participantId) {
+    // read physical model
+    String participantName = participantId.stringify();
+    InstanceConfig instanceConfig = _accessor.getProperty(_keyBuilder.instance(participantName));
+    LiveInstance liveInstance = _accessor.getProperty(_keyBuilder.liveInstance(participantName));
+
+    Map<String, Message> instanceMsgMap = Collections.emptyMap();
+    Map<String, CurrentState> instanceCurStateMap = Collections.emptyMap();
+    if (liveInstance != null) {
+      SessionId sessionId = liveInstance.getSessionId();
+
+      instanceMsgMap = _accessor.getChildValuesMap(_keyBuilder.messages(participantName));
+      instanceCurStateMap =
+          _accessor.getChildValuesMap(_keyBuilder.currentStates(participantName,
+              sessionId.stringify()));
+    }
+
+    // convert to logical model
+    String hostName = instanceConfig.getHostName();
+
+    int port = -1;
+    try {
+      port = Integer.parseInt(instanceConfig.getPort());
+    } catch (IllegalArgumentException e) {
+      // keep as -1
+    }
+    if (port < 0 || port > 65535) {
+      port = -1;
+    }
+    boolean isEnabled = instanceConfig.getInstanceEnabled();
+
+    List<String> disabledPartitions = instanceConfig.getDisabledPartitions();
+    Set<PartitionId> disabledPartitionIdSet;
+    if (disabledPartitions == null) {
+      disabledPartitionIdSet = Collections.emptySet();
+    } else {
+      disabledPartitionIdSet = new HashSet<PartitionId>();
+      for (String partitionId : disabledPartitions) {
+        disabledPartitionIdSet.add(new PartitionId(PartitionId.extractResourceId(partitionId),
+            PartitionId.stripResourceId(partitionId)));
+      }
+    }
+
+    Set<String> tags = new HashSet<String>(instanceConfig.getTags());
+
+    RunningInstance runningInstance = null;
+    if (liveInstance != null) {
+      runningInstance =
+          new RunningInstance(new SessionId(liveInstance.getSessionIdString()), new HelixVersion(
+              liveInstance.getHelixVersionString()), new ProcId(liveInstance.getLiveInstance()));
+    }
+
+    Map<MessageId, Message> msgMap = new HashMap<MessageId, Message>();
+    for (String msgId : instanceMsgMap.keySet()) {
+      Message message = instanceMsgMap.get(msgId);
+      msgMap.put(new MessageId(msgId), message);
+    }
+
+    // TODO convert current state
+    // Map<ParticipantId, CurState> curStateMap = new HashMap<ParticipantId, CurState>();
+    // if (currentStateMap != null) {
+    // for (String participantId : currentStateMap.keySet()) {
+    // CurState curState =
+    // new CurState(_id, new ParticipantId(participantId), currentStateMap.get(participantId));
+    // curStateMap.put(new ParticipantId(participantId), curState);
+    // }
+    // }
+
+    return new Participant(participantId, hostName, port, isEnabled, disabledPartitionIdSet, tags,
+        runningInstance, null, msgMap);
+  }
+
+  /**
+   * update resource current state of a participant
+   * @param curStateUpdate current state change delta
+   */
+  public void updateParticipantCurrentState(ParticipantId participantId, SessionId sessionId,
+      ResourceId resourceId, CurrentState curStateUpdate) {
+    _accessor.updateProperty(
+        _keyBuilder.currentState(participantId.stringify(), sessionId.stringify(),
+            resourceId.stringify()), curStateUpdate);
+  }
+
+  /**
+   * drop resource current state of a participant
+   */
+  public void dropParticipantCurrentState(ParticipantId participantId, SessionId sessionId,
+      ResourceId resourceId) {
+    _accessor.removeProperty(_keyBuilder.currentState(participantId.stringify(),
+        sessionId.stringify(), resourceId.stringify()));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/433b0011/helix-core/src/main/java/org/apache/helix/api/PartitionId.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/PartitionId.java b/helix-core/src/main/java/org/apache/helix/api/PartitionId.java
index 3bec1ad..04dbab1 100644
--- a/helix-core/src/main/java/org/apache/helix/api/PartitionId.java
+++ b/helix-core/src/main/java/org/apache/helix/api/PartitionId.java
@@ -45,7 +45,7 @@ public class PartitionId extends Id {
    * @param partitionName
    * @return
    */
-  public static ResourceId extracResourceId(String partitionName) {
+  public static ResourceId extractResourceId(String partitionName) {
     return new ResourceId(partitionName.substring(0, partitionName.lastIndexOf("_")));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/433b0011/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
index 219e867..cf4fbbc 100644
--- a/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
@@ -28,12 +28,12 @@ public class RebalancerConfig {
 
   private final RscAssignment _resourceAssignment;
 
-  public RebalancerConfig() {
-    _rebalancerMode = RebalanceMode.NONE;
-    _rebalancerRef = null;
-    _stateModelDefId = null;
-
-    _resourceAssignment = null;
+  public RebalancerConfig(RebalanceMode mode, RebalancerRef rebalancerRef,
+      StateModelDefId stateModelDefId, RscAssignment resourceAssignment) {
+    _rebalancerMode = mode;
+    _rebalancerRef = rebalancerRef;
+    _stateModelDefId = stateModelDefId;
+    _resourceAssignment = resourceAssignment;
   }
 
   /**
@@ -59,4 +59,69 @@ public class RebalancerConfig {
   public StateModelDefId getStateModelDefId() {
     return _stateModelDefId;
   }
+
+  /**
+   * Get the ideal node and state assignment of the resource
+   * @return resource assignment
+   */
+  public RscAssignment getResourceAssignment() {
+    return _resourceAssignment;
+  }
+
+  /**
+   * Assembles a RebalancerConfig
+   */
+  public static class Builder {
+    private RebalanceMode _mode = RebalanceMode.NONE;
+    private RebalancerRef _rebalancerRef;
+    private StateModelDefId _stateModelDefId;
+    private RscAssignment _resourceAssignment;
+
+    /**
+     * Set the rebalancer mode
+     * @param mode {@link RebalanceMode}
+     */
+    public Builder rebalancerMode(RebalanceMode mode) {
+      _mode = mode;
+      return this;
+    }
+
+    /**
+     * Set a user-defined rebalancer
+     * @param rebalancerRef a reference to the rebalancer
+     * @return Builder
+     */
+    public Builder rebalancer(RebalancerRef rebalancerRef) {
+      _rebalancerRef = rebalancerRef;
+      return this;
+    }
+
+    /**
+     * Set the state model definition
+     * @param stateModelDefId state model identifier
+     * @return Builder
+     */
+    public Builder stateModelDef(StateModelDefId stateModelDefId) {
+      _stateModelDefId = stateModelDefId;
+      return this;
+    }
+
+    /**
+     * Set the full assignment of partitions to nodes and corresponding states
+     * @param resourceAssignment resource assignment
+     * @return Builder
+     */
+    public Builder resourceAssignment(RscAssignment resourceAssignment) {
+      _resourceAssignment = resourceAssignment;
+      return this;
+    }
+
+    /**
+     * Assemble a RebalancerConfig
+     * @return a fully defined rebalancer configuration
+     */
+    public RebalancerConfig build() {
+      return new RebalancerConfig(_mode, _rebalancerRef, _stateModelDefId, _resourceAssignment);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/433b0011/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java b/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java
index 9011da9..033e1e7 100644
--- a/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java
+++ b/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java
@@ -48,4 +48,25 @@ public class RebalancerRef {
     return null;
   }
 
+  @Override
+  public String toString() {
+    return _rebalancerClassName;
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that instanceof RebalancerRef) {
+      return this.toString().equals(((RebalancerRef) that).toString());
+    }
+    return false;
+  }
+
+  /**
+   * Get a rebalancer class reference
+   * @param rebalancerClassName name of the class
+   * @return RebalancerRef
+   */
+  public static RebalancerRef from(String rebalancerClassName) {
+    return new RebalancerRef(rebalancerClassName);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/433b0011/helix-core/src/main/java/org/apache/helix/api/Resource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Resource.java b/helix-core/src/main/java/org/apache/helix/api/Resource.java
index b76a0f8..354d0a1 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Resource.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Resource.java
@@ -19,18 +19,12 @@ package org.apache.helix.api;
  * under the License.
  */
 
-import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Set;
 
-import org.apache.helix.controller.rebalancer.Rebalancer;
-import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.IdealState;
-import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.ResourceAssignment;
 
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 
 /**
@@ -58,7 +52,7 @@ public class Resource {
     _rebalancerConfig = null;
 
     Set<Partition> partitionSet = new HashSet<Partition>();
-    for (String partitionId : idealState.getPartitionSet()) {
+    for (String partitionId : idealState.getPartitionStringSet()) {
       partitionSet
           .add(new Partition(new PartitionId(id, PartitionId.stripResourceId(partitionId))));
     }
@@ -71,6 +65,21 @@ public class Resource {
   }
 
   /**
+   * Construct a Resource
+   * @param id resource identifier
+   * @param partitionSet disjoint partitions of the resource
+   * @param externalView external view of the resource
+   * @param rebalancerConfig configuration properties for rebalancing this resource
+   */
+  public Resource(ResourceId id, Set<Partition> partitionSet, ExtView externalView,
+      RebalancerConfig rebalancerConfig) {
+    _id = id;
+    _partitionSet = ImmutableSet.copyOf(partitionSet);
+    _externalView = externalView;
+    _rebalancerConfig = rebalancerConfig;
+  }
+
+  /**
    * Get the set of partitions of the resource
    * @return set of partitions or empty set if none
    */
@@ -86,4 +95,68 @@ public class Resource {
     return _externalView;
   }
 
+  public RebalancerConfig getRebalancerConfig() {
+    return _rebalancerConfig;
+  }
+
+  public ResourceId getId() {
+    return _id;
+  }
+
+  /**
+   * Assembles a Resource
+   */
+  public static class Builder {
+    private final ResourceId _id;
+    private final Set<Partition> _partitionSet;
+    private ExtView _externalView;
+    private RebalancerConfig _rebalancerConfig;
+
+    /**
+     * Build a Resource with an id
+     * @param id resource id
+     */
+    public Builder(ResourceId id) {
+      _id = id;
+      _partitionSet = new HashSet<Partition>();
+    }
+
+    /**
+     * Add a partition that the resource serves
+     * @param partition fully-qualified partition
+     * @return Builder
+     */
+    public Builder addPartition(Partition partition) {
+      _partitionSet.add(partition);
+      return this;
+    }
+
+    /**
+     * Set the external view of this resource
+     * @param extView currently served replica placement and state
+     * @return Builder
+     */
+    public Builder externalView(ExtView extView) {
+      _externalView = extView;
+      return this;
+    }
+
+    /**
+     * Set the rebalancer configuration
+     * @param rebalancerConfig properties of interest for rebalancing
+     * @return Builder
+     */
+    public Builder rebalancerConfig(RebalancerConfig rebalancerConfig) {
+      _rebalancerConfig = rebalancerConfig;
+      return this;
+    }
+
+    /**
+     * Create a Resource object
+     * @return instantiated Resource
+     */
+    public Resource build() {
+      return new Resource(_id, _partitionSet, _externalView, _rebalancerConfig);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/433b0011/helix-core/src/main/java/org/apache/helix/api/ResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/ResourceAccessor.java
index b5a6516..c0757b4 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ResourceAccessor.java
@@ -20,26 +20,47 @@ package org.apache.helix.api;
  */
 
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
 
 public class ResourceAccessor {
-
+  private final ClusterId _clusterId;
   private final HelixDataAccessor _accessor;
+  private final PropertyKey.Builder _keyBuilder;
 
-  public ResourceAccessor(HelixDataAccessor accessor) {
+  public ResourceAccessor(ClusterId clusterId, HelixDataAccessor accessor) {
+    _clusterId = clusterId;
     _accessor = accessor;
+    _keyBuilder = accessor.keyBuilder();
   }
 
   /**
-   * 
+   * save resource assignment
    */
   public void setRresourceAssignment(ResourceId resourceId, RscAssignment resourceAssignment) {
+    // TODO impl this
+  }
 
+  /**
+   * set ideal-state
+   */
+  public void setResourceIdealState(ResourceId resourceId, IdealState idealState) {
+    _accessor.setProperty(_keyBuilder.idealStates(resourceId.stringify()), idealState);
   }
 
   /**
-   * 
+   * set external view of a resource
+   * @param extView
    */
-  public void setRebalancerConfig(RebalancerConfig config) {
+  public void setResourceExternalView(ResourceId resourceId, ExternalView extView) {
+    _accessor.setProperty(_keyBuilder.idealStates(resourceId.stringify()), extView);
+  }
 
+  /**
+   * drop external view of a resource
+   */
+  public void dropResourceExternalView(ResourceId resourceId) {
+    _accessor.removeProperty(_keyBuilder.idealStates(resourceId.stringify()));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/433b0011/helix-core/src/main/java/org/apache/helix/api/RscAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/RscAssignment.java b/helix-core/src/main/java/org/apache/helix/api/RscAssignment.java
index 88e4ff6..90b77e3 100644
--- a/helix-core/src/main/java/org/apache/helix/api/RscAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/api/RscAssignment.java
@@ -21,6 +21,7 @@ package org.apache.helix.api;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.helix.model.ResourceAssignment;
 
@@ -29,16 +30,111 @@ import com.google.common.collect.ImmutableMap;
 public class RscAssignment {
   private final Map<PartitionId, Map<ParticipantId, State>> _resourceAssignment;
 
+  /**
+   * Construct an assignment from a physically-stored assignment
+   * @param rscAssignment the assignment
+   */
   public RscAssignment(ResourceAssignment rscAssignment) {
     Map<PartitionId, Map<ParticipantId, State>> resourceAssignment =
         new HashMap<PartitionId, Map<ParticipantId, State>>();
 
-    // TODO fill the map
+    for (org.apache.helix.model.Partition partition : rscAssignment.getMappedPartitions()) {
+      Map<ParticipantId, State> replicaMap = new HashMap<ParticipantId, State>();
+      Map<String, String> rawReplicaMap = rscAssignment.getReplicaMap(partition);
+      for (String participantId : rawReplicaMap.keySet()) {
+        replicaMap.put(new ParticipantId(participantId),
+            new State(rawReplicaMap.get(participantId)));
+      }
+      resourceAssignment.put(new PartitionId(new ResourceId(rscAssignment.getResourceName()),
+          partition.getPartitionName()), replicaMap);
+    }
 
     _resourceAssignment = ImmutableMap.copyOf(resourceAssignment);
   }
 
+  /**
+   * Build an assignment from a map of assigned replicas
+   * @param resourceAssignment map of (partition, participant, state)
+   */
+  public RscAssignment(Map<PartitionId, Map<ParticipantId, State>> resourceAssignment) {
+    ImmutableMap.Builder<PartitionId, Map<ParticipantId, State>> mapBuilder =
+        new ImmutableMap.Builder<PartitionId, Map<ParticipantId, State>>();
+    for (PartitionId partitionId : resourceAssignment.keySet()) {
+      mapBuilder.put(partitionId, ImmutableMap.copyOf(resourceAssignment.get(partitionId)));
+    }
+    _resourceAssignment = mapBuilder.build();
+  }
+
+  /**
+   * Get the partitions currently with assignments
+   * @return set of partition ids
+   */
+  public Set<PartitionId> getAssignedPartitions() {
+    return _resourceAssignment.keySet();
+  }
+
+  /**
+   * Get the replica assignment map for a partition
+   * @param partitionId the partition to look up
+   * @return map of (participant id, state)
+   */
   public Map<ParticipantId, State> getParticipantStateMap(PartitionId partitionId) {
     return _resourceAssignment.get(partitionId);
   }
+
+  /**
+   * Assemble a full assignment
+   */
+  public static class Builder {
+    private final Map<PartitionId, Map<ParticipantId, State>> _resourceAssignment;
+
+    /**
+     * Instantiate the builder
+     */
+    public Builder() {
+      _resourceAssignment = new HashMap<PartitionId, Map<ParticipantId, State>>();
+    }
+
+    /**
+     * Add assignments for a partition
+     * @param partitionId partition to assign
+     * @param replicaMap map of participant and state for each replica
+     * @return Builder
+     */
+    public Builder addAssignments(PartitionId partitionId, Map<ParticipantId, State> replicaMap) {
+      if (!_resourceAssignment.containsKey(partitionId)) {
+        _resourceAssignment.put(partitionId, replicaMap);
+      } else {
+        _resourceAssignment.get(partitionId).putAll(replicaMap);
+      }
+      return this;
+    }
+
+    /**
+     * Assign a single replica
+     * @param partitionId partition to assign
+     * @param participantId participant to host the replica
+     * @param state replica state
+     * @return Builder
+     */
+    public Builder addAssignment(PartitionId partitionId, ParticipantId participantId, State state) {
+      Map<ParticipantId, State> replicaMap;
+      if (!_resourceAssignment.containsKey(partitionId)) {
+        replicaMap = new HashMap<ParticipantId, State>();
+        _resourceAssignment.put(partitionId, replicaMap);
+      } else {
+        replicaMap = _resourceAssignment.get(partitionId);
+      }
+      replicaMap.put(participantId, state);
+      return this;
+    }
+
+    /**
+     * Build the resource assignment
+     * @return instantiated RscAssignment
+     */
+    public RscAssignment build() {
+      return new RscAssignment(_resourceAssignment);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/433b0011/helix-core/src/main/java/org/apache/helix/api/RunningInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/RunningInstance.java b/helix-core/src/main/java/org/apache/helix/api/RunningInstance.java
index 49c5ccf..4effd24 100644
--- a/helix-core/src/main/java/org/apache/helix/api/RunningInstance.java
+++ b/helix-core/src/main/java/org/apache/helix/api/RunningInstance.java
@@ -63,5 +63,4 @@ public class RunningInstance {
   public ProcId getPid() {
     return _pid;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/433b0011/helix-core/src/main/java/org/apache/helix/api/State.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/State.java b/helix-core/src/main/java/org/apache/helix/api/State.java
index b2000f2..0fc8bc7 100644
--- a/helix-core/src/main/java/org/apache/helix/api/State.java
+++ b/helix-core/src/main/java/org/apache/helix/api/State.java
@@ -33,4 +33,24 @@ public class State {
   public String toString() {
     return _state;
   }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that instanceof State) {
+      return this.toString().equals(((State) that).toString());
+    }
+    return false;
+  }
+
+  /**
+   * Get a State from a state name
+   * @param state state name
+   * @return State
+   */
+  public static State from(String state) {
+    if (state == null) {
+      return null;
+    }
+    return new State(state);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/433b0011/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 8e4e1ea..5ba4362 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -480,7 +480,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
     Map<String, LiveInstance> curSessions = new HashMap<String, LiveInstance>();
     for (LiveInstance liveInstance : liveInstances) {
       curInstances.put(liveInstance.getInstanceName(), liveInstance);
-      curSessions.put(liveInstance.getSessionId(), liveInstance);
+      curSessions.put(liveInstance.getSessionIdString(), liveInstance);
     }
 
     Map<String, LiveInstance> lastInstances = _lastSeenInstances.get();


Mime
View raw message