helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject [21/53] [abbrv] git commit: [HELIX-238] Added basic updater logic
Date Thu, 07 Nov 2013 01:19:29 GMT
[HELIX-238] Added basic updater logic


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/89b3bc59
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/89b3bc59
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/89b3bc59

Branch: refs/heads/master
Commit: 89b3bc59e367a8d0dbe268d755d59b6030d3fdf7
Parents: 0b67257
Author: Kanak Biscuitwala <kanak@apache.org>
Authored: Wed Sep 25 18:08:20 2013 -0700
Committer: Kanak Biscuitwala <kanak@apache.org>
Committed: Wed Nov 6 13:17:34 2013 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/helix/PropertyKey.java |  16 ++
 .../main/java/org/apache/helix/api/Cluster.java |   2 +-
 .../org/apache/helix/api/ClusterAccessor.java   | 171 +++++++++++---
 .../org/apache/helix/api/ClusterConfig.java     | 220 ++++++++++++++++++-
 .../java/org/apache/helix/api/Participant.java  |   2 +-
 .../apache/helix/api/ParticipantAccessor.java   |  28 ++-
 .../org/apache/helix/api/ParticipantConfig.java | 156 ++++++++++++-
 .../java/org/apache/helix/api/Resource.java     |  12 +-
 .../org/apache/helix/api/ResourceAccessor.java  |  29 ++-
 .../org/apache/helix/api/ResourceConfig.java    | 121 +++++++++-
 .../helix/api/StateModelDefinitionAccessor.java |   4 +-
 .../context/PartitionedRebalancerContext.java   |   7 +-
 .../rebalancer/context/RebalancerConfig.java    |   4 +-
 .../helix/model/ResourceConfiguration.java      |  37 +---
 .../helix/controller/stages/BaseStageTest.java  |   5 +-
 .../apache/helix/examples/NewModelExample.java  |   7 +-
 16 files changed, 727 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/89b3bc59/helix-core/src/main/java/org/apache/helix/PropertyKey.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
index 1a6d11d..16c6c1c 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
@@ -38,6 +38,7 @@ import static org.apache.helix.PropertyType.MESSAGES;
 import static org.apache.helix.PropertyType.MESSAGES_CONTROLLER;
 import static org.apache.helix.PropertyType.PAUSE;
 import static org.apache.helix.PropertyType.PERSISTENTSTATS;
+import static org.apache.helix.PropertyType.PROPERTYSTORE;
 import static org.apache.helix.PropertyType.RESOURCEASSIGNMENTS;
 import static org.apache.helix.PropertyType.STATEMODELDEFS;
 import static org.apache.helix.PropertyType.STATUSUPDATES;
@@ -594,6 +595,14 @@ public class PropertyKey {
     }
 
     /**
+     * Get the root of all controller status updates
+     * @return {@link PropertyKey}
+     */
+    public PropertyKey controllerTaskStatuses() {
+      return new PropertyKey(STATUSUPDATES_CONTROLLER, StatusUpdate.class, _clusterName);
+    }
+
+    /**
      * Get a property key associated with {@link StatusUpdate} of controller status updates
      * @param subPath
      * @return {@link PropertyKey}
@@ -705,6 +714,13 @@ public class PropertyKey {
       return new PropertyKey(HEALTHREPORT, HealthStat.class, _clusterName, instanceName);
     }
 
+    /**
+     * Get a propertykey associated with the root of the Helix property store
+     * @return {@link PropertyStore}
+     */
+    public PropertyKey propertyStore() {
+      return new PropertyKey(PROPERTYSTORE, null, _clusterName);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/89b3bc59/helix-core/src/main/java/org/apache/helix/api/Cluster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Cluster.java b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
index 56a84e9..9e71904 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Cluster.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
@@ -102,7 +102,7 @@ public class Cluster {
     _config =
         new ClusterConfig.Builder(id).addResources(resourceConfigMap.values())
             .addParticipants(participantConfigMap.values()).addConstraints(constraintMap.values())
-            .addStateModelDefinitions(stateModelMap.values()).setPausedStatus(isPaused)
+            .addStateModelDefinitions(stateModelMap.values()).pausedStatus(isPaused)
             .userConfig(userConfig).build();
 
     _resourceMap = ImmutableMap.copyOf(resourceMap);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/89b3bc59/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
index a87ce74..6302e33 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
@@ -25,8 +25,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixException;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
 import org.apache.helix.controller.rebalancer.context.RebalancerContext;
@@ -65,15 +66,13 @@ public class ClusterAccessor {
   public boolean createCluster(ClusterConfig cluster) {
     boolean created = _accessor.createProperty(_keyBuilder.cluster(), null);
     if (!created) {
-      // LOG.warn("Cluster already created. Aborting.");
-      // return false;
+      LOG.error("Cluster already created. Aborting.");
+      return false;
     }
-
-    StateModelDefinitionAccessor stateModelDefAccessor =
-        new StateModelDefinitionAccessor(_accessor);
+    initClusterStructure();
     Map<StateModelDefId, StateModelDefinition> stateModelDefs = cluster.getStateModelMap();
     for (StateModelDefinition stateModelDef : stateModelDefs.values()) {
-      stateModelDefAccessor.addStateModelDefinition(stateModelDef);
+      addStateModelDefinitionToCluster(stateModelDef);
     }
     Map<ResourceId, ResourceConfig> resources = cluster.getResourceMap();
     for (ResourceConfig resource : resources.values()) {
@@ -97,25 +96,38 @@ public class ClusterAccessor {
     return true;
   }
 
+  public ClusterConfig updateCluster(ClusterConfig.Delta clusterDelta) {
+    Cluster cluster = readCluster();
+    if (cluster == null) {
+      LOG.error("Cluster does not exist, cannot be updated");
+      return null;
+    }
+    ClusterConfig config = clusterDelta.mergeInto(cluster.getConfig());
+    // TODO: persist this
+    return config;
+  }
+
   /**
    * drop a cluster
+   * @return true if the cluster was dropped, false if there was an error
    */
-  public void dropCluster() {
+  public boolean dropCluster() {
     LOG.info("Dropping cluster: " + _clusterId);
     List<String> liveInstanceNames = _accessor.getChildNames(_keyBuilder.liveInstances());
     if (liveInstanceNames.size() > 0) {
-      throw new HelixException("Can't drop cluster: " + _clusterId
-          + " because there are running participant: " + liveInstanceNames
-          + ", shutdown participants first.");
+      LOG.error("Can't drop cluster: " + _clusterId + " because there are running participant: "
+          + liveInstanceNames + ", shutdown participants first.");
+      return false;
     }
 
     LiveInstance leader = _accessor.getProperty(_keyBuilder.controllerLeader());
     if (leader != null) {
-      throw new HelixException("Can't drop cluster: " + _clusterId + ", because leader: "
-          + leader.getId() + " are running, shutdown leader first.");
+      LOG.error("Can't drop cluster: " + _clusterId + ", because leader: " + leader.getId()
+          + " are running, shutdown leader first.");
+      return false;
     }
 
-    _accessor.removeProperty(_keyBuilder.cluster());
+    return _accessor.removeProperty(_keyBuilder.cluster());
   }
 
   /**
@@ -271,30 +283,42 @@ public class ClusterAccessor {
   /**
    * add a resource to cluster
    * @param resource
+   * @return true if resource added, false if there was an error
    */
-  public void addResourceToCluster(ResourceConfig resource) {
+  public boolean addResourceToCluster(ResourceConfig resource) {
+    if (!isClusterStructureValid()) {
+      LOG.error("Cluster: " + _clusterId + " structure is not valid");
+      return false;
+    }
     RebalancerContext context =
         resource.getRebalancerConfig().getRebalancerContext(RebalancerContext.class);
     StateModelDefId stateModelDefId = context.getStateModelDefId();
     if (_accessor.getProperty(_keyBuilder.stateModelDef(stateModelDefId.stringify())) == null) {
-      throw new HelixException("State model: " + stateModelDefId + " not found in cluster: "
-          + _clusterId);
+      LOG.error("State model: " + stateModelDefId + " not found in cluster: " + _clusterId);
+      return false;
     }
 
     ResourceId resourceId = resource.getId();
     if (_accessor.getProperty(_keyBuilder.idealState(resourceId.stringify())) != null) {
-      throw new HelixException("Skip adding resource: " + resourceId
+      LOG.error("Skip adding resource: " + resourceId
           + ", because resource ideal state already exists in cluster: " + _clusterId);
+      return false;
+    }
+    if (_accessor.getProperty(_keyBuilder.resourceConfig(resourceId.stringify())) != null) {
+      LOG.error("Skip adding resource: " + resourceId
+          + ", because resource config already exists in cluster: " + _clusterId);
+      return false;
     }
 
     // Add resource user config
     if (resource.getUserConfig() != null) {
       ResourceConfiguration configuration = new ResourceConfiguration(resourceId);
+      configuration.setType(resource.getType());
       configuration.addNamespacedConfig(resource.getUserConfig());
       configuration.addNamespacedConfig(resource.getRebalancerConfig().toNamespacedConfig());
       configuration.setBucketSize(resource.getBucketSize());
       configuration.setBatchMessageMode(resource.getBatchMessageMode());
-      _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
+      _accessor.createProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
     }
 
     // Create an IdealState from a RebalancerConfig (if the resource is partitioned)
@@ -305,15 +329,23 @@ public class ClusterAccessor {
     if (idealState != null) {
       _accessor.createProperty(_keyBuilder.idealState(resourceId.stringify()), idealState);
     }
+    return true;
   }
 
   /**
    * drop a resource from cluster
    * @param resourceId
+   * @return true if removal succeeded, false otherwise
    */
-  public void dropResourceFromCluster(ResourceId resourceId) {
+  public boolean dropResourceFromCluster(ResourceId resourceId) {
+    if (_accessor.getProperty(_keyBuilder.idealState(resourceId.stringify())) == null) {
+      LOG.error("Skip removing resource: " + resourceId
+          + ", because resource ideal state already removed from cluster: " + _clusterId);
+      return false;
+    }
     _accessor.removeProperty(_keyBuilder.idealState(resourceId.stringify()));
     _accessor.removeProperty(_keyBuilder.resourceConfig(resourceId.stringify()));
+    return true;
   }
 
   /**
@@ -321,23 +353,71 @@ public class ClusterAccessor {
    * @return true if valid or false otherwise
    */
   public boolean isClusterStructureValid() {
-    // TODO impl this
+    List<String> paths = getRequiredPaths();
+    BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
+    boolean[] existsResults = baseAccessor.exists(paths, 0);
+    for (boolean exists : existsResults) {
+      if (!exists) {
+        return false;
+      }
+    }
     return true;
   }
 
   /**
+   * Create empty persistent properties to ensure that there is a valid cluster structure
+   */
+  private void initClusterStructure() {
+    BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
+    List<String> paths = getRequiredPaths();
+    for (String path : paths) {
+      boolean status = baseAccessor.create(path, null, AccessOption.PERSISTENT);
+      if (!status && LOG.isDebugEnabled()) {
+        LOG.debug(path + " already exists");
+      }
+    }
+  }
+
+  /**
+   * Get all property paths that must be set for a cluster structure to be valid
+   * @return list of paths as strings
+   */
+  private List<String> getRequiredPaths() {
+    List<String> paths = new ArrayList<String>();
+    paths.add(_keyBuilder.cluster().getPath());
+    paths.add(_keyBuilder.idealStates().getPath());
+    paths.add(_keyBuilder.clusterConfigs().getPath());
+    paths.add(_keyBuilder.instanceConfigs().getPath());
+    paths.add(_keyBuilder.resourceConfigs().getPath());
+    paths.add(_keyBuilder.propertyStore().getPath());
+    paths.add(_keyBuilder.liveInstances().getPath());
+    paths.add(_keyBuilder.instances().getPath());
+    paths.add(_keyBuilder.externalViews().getPath());
+    paths.add(_keyBuilder.controller().getPath());
+    paths.add(_keyBuilder.stateModelDefs().getPath());
+    paths.add(_keyBuilder.controllerMessages().getPath());
+    paths.add(_keyBuilder.controllerTaskErrors().getPath());
+    paths.add(_keyBuilder.controllerTaskStatuses().getPath());
+    paths.add(_keyBuilder.controllerLeaderHistory().getPath());
+    return paths;
+  }
+
+  /**
    * add a participant to cluster
    * @param participant
+   * @return true if participant added, false otherwise
    */
-  public void addParticipantToCluster(ParticipantConfig participant) {
+  public boolean addParticipantToCluster(ParticipantConfig participant) {
     if (!isClusterStructureValid()) {
-      throw new HelixException("Cluster: " + _clusterId + " structure is not valid");
+      LOG.error("Cluster: " + _clusterId + " structure is not valid");
+      return false;
     }
 
     ParticipantId participantId = participant.getId();
     if (_accessor.getProperty(_keyBuilder.instanceConfig(participantId.stringify())) != null) {
-      throw new HelixException("Config for participant: " + participantId
-          + " already exists in cluster: " + _clusterId);
+      LOG.error("Config for participant: " + participantId + " already exists in cluster: "
+          + _clusterId);
+      return false;
     }
 
     // add empty root ZNodes
@@ -361,27 +441,31 @@ public class ClusterAccessor {
     for (String tag : tags) {
       instanceConfig.addTag(tag);
     }
-    Set<PartitionId> disabledPartitions = participant.getDisablePartitionIds();
+    Set<PartitionId> disabledPartitions = participant.getDisabledPartitions();
     for (PartitionId partitionId : disabledPartitions) {
       instanceConfig.setInstanceEnabledForPartition(partitionId, false);
     }
     _accessor.createProperty(_keyBuilder.instanceConfig(participantId.stringify()), instanceConfig);
     _accessor.createProperty(_keyBuilder.messages(participantId.stringify()), null);
+    return true;
   }
 
   /**
    * drop a participant from cluster
    * @param participantId
+   * @return true if participant dropped, false if there was an error
    */
-  public void dropParticipantFromCluster(ParticipantId participantId) {
+  public boolean dropParticipantFromCluster(ParticipantId participantId) {
     if (_accessor.getProperty(_keyBuilder.instanceConfig(participantId.stringify())) == null) {
-      throw new HelixException("Config for participant: " + participantId
-          + " does NOT exist in cluster: " + _clusterId);
+      LOG.error("Config for participant: " + participantId + " does NOT exist in cluster: "
+          + _clusterId);
+      return false;
     }
 
     if (_accessor.getProperty(_keyBuilder.instance(participantId.stringify())) == null) {
-      throw new HelixException("Participant: " + participantId
-          + " structure does NOT exist in cluster: " + _clusterId);
+      LOG.error("Participant: " + participantId + " structure does NOT exist in cluster: "
+          + _clusterId);
+      return false;
     }
 
     // delete participant config path
@@ -389,5 +473,30 @@ public class ClusterAccessor {
 
     // delete participant path
     _accessor.removeProperty(_keyBuilder.instance(participantId.stringify()));
+    return true;
+  }
+
+  /**
+   * Add a state model definition. Updates the existing state model definition if it already exists.
+   * @param stateModelDef fully initialized state model definition
+   * @return true if the model is persisted, false otherwise
+   */
+  public boolean addStateModelDefinitionToCluster(StateModelDefinition stateModelDef) {
+    if (!isClusterStructureValid()) {
+      LOG.error("Cluster: " + _clusterId + " structure is not valid");
+      return false;
+    }
+
+    StateModelDefinitionAccessor smdAccessor = new StateModelDefinitionAccessor(_accessor);
+    return smdAccessor.setStateModelDefinition(stateModelDef);
+  }
+
+  /**
+   * Remove a state model definition if it exists
+   * @param stateModelDefId state model definition id
+   * @return true if removed, false if it did not exist
+   */
+  public boolean dropStateModelDefinitionFromCluster(StateModelDefId stateModelDefId) {
+    return _accessor.removeProperty(_keyBuilder.stateModelDef(stateModelDefId.stringify()));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/89b3bc59/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
index ba1e78e..590fb01 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
@@ -18,6 +18,7 @@ import org.apache.log4j.Logger;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -224,7 +225,7 @@ public class ClusterConfig {
   }
 
   /**
-   * Check the pasued status of the cluster
+   * Check the paused status of the cluster
    * @return true if paused, false otherwise
    */
   public boolean isPaused() {
@@ -232,6 +233,205 @@ public class ClusterConfig {
   }
 
   /**
+   * Update context for a ClusterConfig
+   */
+  public static class Delta {
+    private enum Fields {
+      PAUSE_STATUS,
+      USER_CONFIG
+    }
+
+    private Set<Fields> _updateFields;
+    private Map<ConstraintType, Set<ConstraintId>> _removedConstraints;
+    private Builder _builder;
+
+    /**
+     * Instantiate the delta for a cluster config
+     * @param clusterId the cluster to update
+     */
+    public Delta(ClusterId clusterId) {
+      _updateFields = Sets.newHashSet();
+      _removedConstraints = Maps.newHashMap();
+      for (ConstraintType type : ConstraintType.values()) {
+        Set<ConstraintId> constraints = Sets.newHashSet();
+        _removedConstraints.put(type, constraints);
+      }
+      _builder = new Builder(clusterId);
+    }
+
+    /**
+     * Add a state upper bound constraint
+     * @param scope scope under which the constraint is valid
+     * @param stateModelDefId identifier of the state model that owns the state
+     * @param state the state to constrain
+     * @param upperBound maximum number of replicas per partition in the state
+     * @return Delta
+     */
+    public Delta addStateUpperBoundConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+        State state, int upperBound) {
+      return addStateUpperBoundConstraint(scope, stateModelDefId, state,
+          Integer.toString(upperBound));
+    }
+
+    /**
+     * Add a state upper bound constraint
+     * @param scope scope under which the constraint is valid
+     * @param stateModelDefId identifier of the state model that owns the state
+     * @param state the state to constrain
+     * @param dynamicUpperBound the upper bound of replicas per partition in the state, can be a
+     *          number, or the currently supported special bound values:<br />
+     *          "R" - Refers to the number of replicas specified during resource
+     *          creation. This allows having different replication factor for each
+     *          resource without having to create a different state machine. <br />
+     *          "N" - Refers to all nodes in the cluster. Useful for resources that need
+     *          to exist on all nodes. This way one can add/remove nodes without having
+     *          the change the bounds.
+     * @return Delta
+     */
+    public Delta addStateUpperBoundConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+        State state, String dynamicUpperBound) {
+      _builder.addStateUpperBoundConstraint(scope, stateModelDefId, state, dynamicUpperBound);
+      return this;
+    }
+
+    /**
+     * Remove state upper bound constraint
+     * @param scope scope under which the constraint is valid
+     * @param stateModelDefId identifier of the state model that owns the state
+     * @param state the state to constrain
+     * @return Delta
+     */
+    public Delta removeStateUpperBoundConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+        State state) {
+      _removedConstraints.get(ConstraintType.STATE_CONSTRAINT).add(
+          ConstraintId.from(scope, stateModelDefId, state));
+      return this;
+    }
+
+    /**
+     * Add a constraint on the maximum number of in-flight transitions of a certain type
+     * @param scope scope of the constraint
+     * @param stateModelDefId identifies the state model containing the transition
+     * @param transition the transition to constrain
+     * @param maxInFlightTransitions number of allowed in-flight transitions in the scope
+     * @return Delta
+     */
+    public Delta addTransitionConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+        Transition transition, int maxInFlightTransitions) {
+      _builder.addTransitionConstraint(scope, stateModelDefId, transition, maxInFlightTransitions);
+      return this;
+    }
+
+    /**
+     * Remove a constraint on the maximum number of in-flight transitions of a certain type
+     * @param scope scope of the constraint
+     * @param stateModelDefId identifies the state model containing the transition
+     * @param transition the transition to constrain
+     * @return Delta
+     */
+    public Delta removeTransitionConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+        Transition transition) {
+      _removedConstraints.get(ConstraintType.MESSAGE_CONSTRAINT).add(
+          ConstraintId.from(scope, stateModelDefId, transition));
+      return this;
+    }
+
+    /**
+     * Add a single constraint item
+     * @param type type of the constraint item
+     * @param constraintId unique constraint id
+     * @param item instantiated ConstraintItem
+     * @return Delta
+     */
+    public Delta addConstraintItem(ConstraintType type, ConstraintId constraintId,
+        ConstraintItem item) {
+      _builder.addConstraint(type, constraintId, item);
+      return this;
+    }
+
+    /**
+     * Remove a single constraint item
+     * @param type type of the constraint item
+     * @param constraintId unique constraint id
+     * @return Delta
+     */
+    public Delta removeConstraintItem(ConstraintType type, ConstraintId constraintId) {
+      _removedConstraints.get(type).add(constraintId);
+      return this;
+    }
+
+    /**
+     * Set the paused status of the cluster
+     * @param isPaused true if paused, false otherwise
+     * @return Delta
+     */
+    public Delta setPausedStatus(boolean isPaused) {
+      _builder.pausedStatus(isPaused);
+      _updateFields.add(Fields.PAUSE_STATUS);
+      return this;
+    }
+
+    /**
+     * Set the user configuration
+     * @param userConfig user-specified properties
+     * @return Builder
+     */
+    public Delta setUserConfig(UserConfig userConfig) {
+      _builder.userConfig(userConfig);
+      _updateFields.add(Fields.USER_CONFIG);
+      return this;
+    }
+
+    /**
+     * Create a ClusterConfig that is the combination of an existing ClusterConfig and this delta
+     * @param orig the original ClusterConfig
+     * @return updated ClusterConfig
+     */
+    public ClusterConfig mergeInto(ClusterConfig orig) {
+      // copy in original and updated fields
+      ClusterConfig deltaConfig = _builder.build();
+      Builder builder =
+          new Builder(orig.getId()).addResources(orig.getResourceMap().values())
+              .addParticipants(orig.getParticipantMap().values())
+              .addStateModelDefinitions(orig.getStateModelMap().values())
+              .userConfig(orig.getUserConfig()).pausedStatus(orig.isPaused());
+      for (Fields field : _updateFields) {
+        switch (field) {
+        case PAUSE_STATUS:
+          _builder.pausedStatus(deltaConfig.isPaused());
+          break;
+        case USER_CONFIG:
+          _builder.userConfig(deltaConfig.getUserConfig());
+          break;
+        }
+      }
+      // add constraint deltas
+      for (ConstraintType type : ConstraintType.values()) {
+        ClusterConstraints constraints;
+        if (orig.getConstraintMap().containsKey(type)) {
+          constraints = orig.getConstraintMap().get(type);
+        } else {
+          constraints = new ClusterConstraints(type);
+        }
+        // add new constraints
+        if (deltaConfig.getConstraintMap().containsKey(type)) {
+          ClusterConstraints deltaConstraints = deltaConfig.getConstraintMap().get(type);
+          for (ConstraintId constraintId : deltaConstraints.getConstraintItems().keySet()) {
+            ConstraintItem constraintItem = deltaConstraints.getConstraintItem(constraintId);
+            constraints.addConstraintItem(constraintId, constraintItem);
+          }
+        }
+        // remove constraints
+        for (ConstraintId constraintId : _removedConstraints.get(type)) {
+          constraints.removeConstraintItem(constraintId);
+        }
+        builder.addConstraint(constraints);
+      }
+      return builder.build();
+    }
+  }
+
+  /**
    * Assembles a cluster configuration
    */
   public static class Builder {
@@ -316,6 +516,19 @@ public class ClusterConfig {
     }
 
     /**
+     * Add a single constraint item
+     * @param type type of the constraint
+     * @param constraintId unique constraint identifier
+     * @param item instantiated ConstraintItem
+     * @return Builder
+     */
+    public Builder addConstraint(ConstraintType type, ConstraintId constraintId, ConstraintItem item) {
+      ClusterConstraints existConstraints = getConstraintsInstance(type);
+      existConstraints.addConstraintItem(constraintId, item);
+      return this;
+    }
+
+    /**
      * Add multiple constraints to the cluster
      * @param constraints cluster constraints of multiple distinct types
      * @return Builder
@@ -330,7 +543,6 @@ public class ClusterConfig {
     /**
      * Add a constraint on the maximum number of in-flight transitions of a certain type
      * @param scope scope of the constraint
-     * @param constraintId unique constraint identifier
      * @param stateModelDefId identifies the state model containing the transition
      * @param transition the transition to constrain
      * @param maxInFlightTransitions number of allowed in-flight transitions in the scope
@@ -371,7 +583,7 @@ public class ClusterConfig {
      * @param stateModelDefId identifier of the state model that owns the state
      * @param state the state to constrain
      * @param upperBound maximum number of replicas per partition in the state
-     * @return
+     * @return Builder
      */
     public Builder addStateUpperBoundConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
         State state, int upperBound) {
@@ -451,7 +663,7 @@ public class ClusterConfig {
      * @param isPaused true if paused, false otherwise
      * @return Builder
      */
-    public Builder setPausedStatus(boolean isPaused) {
+    public Builder pausedStatus(boolean isPaused) {
       _isPaused = isPaused;
       return this;
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/89b3bc59/helix-core/src/main/java/org/apache/helix/api/Participant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Participant.java b/helix-core/src/main/java/org/apache/helix/api/Participant.java
index 45d0315..8b02f0e 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Participant.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Participant.java
@@ -106,7 +106,7 @@ public class Participant {
    * @return set of disabled partition id's, or empty set if none
    */
   public Set<PartitionId> getDisablePartitionIds() {
-    return _config.getDisablePartitionIds();
+    return _config.getDisabledPartitions();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/89b3bc59/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
index 46895d3..11e3608 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
@@ -64,8 +64,9 @@ public class ParticipantAccessor {
   void enableParticipant(ParticipantId participantId, boolean isEnabled) {
     String participantName = participantId.stringify();
     if (_accessor.getProperty(_keyBuilder.instanceConfig(participantName)) == null) {
-      throw new HelixException("Config for participant: " + participantId
-          + " does NOT exist in cluster: " + _clusterId);
+      LOG.error("Config for participant: " + participantId + " does NOT exist in cluster: "
+          + _clusterId);
+      return;
     }
 
     InstanceConfig config = new InstanceConfig(participantName);
@@ -156,8 +157,9 @@ public class ParticipantAccessor {
     // check instanceConfig exists
     PropertyKey instanceConfigKey = _keyBuilder.instanceConfig(participantName);
     if (_accessor.getProperty(instanceConfigKey) == null) {
-      throw new HelixException("Config for participant: " + participantId
-          + " does NOT exist in cluster: " + _clusterId);
+      LOG.error("Config for participant: " + participantId + " does NOT exist in cluster: "
+          + _clusterId);
+      return;
     }
 
     // check resource exist. warn if not
@@ -253,11 +255,21 @@ public class ParticipantAccessor {
   }
 
   /**
-   * create live instance for the participant
-   * @param participantId
+   * Update a participant configuration
+   * @param participantId the participant to update
+   * @param participantDelta changes to the participant
+   * @return ParticipantConfig, or null if participant is not persisted
    */
-  public void connectParticipant(ParticipantId participantId) {
-    // TODO impl this
+  public ParticipantConfig updateParticipant(ParticipantId participantId,
+      ParticipantConfig.Delta participantDelta) {
+    Participant participant = readParticipant(participantId);
+    if (participant == null) {
+      LOG.error("Participant " + participantId + " does not exist, cannot be updated");
+      return null;
+    }
+    ParticipantConfig config = participantDelta.mergeInto(participant.getConfig());
+    // TODO: persist this
+    return config;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/89b3bc59/helix-core/src/main/java/org/apache/helix/api/ParticipantConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ParticipantConfig.java b/helix-core/src/main/java/org/apache/helix/api/ParticipantConfig.java
index f41056c..5498ca3 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ParticipantConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ParticipantConfig.java
@@ -4,6 +4,7 @@ import java.util.HashSet;
 import java.util.Set;
 
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -84,7 +85,7 @@ public class ParticipantConfig {
    * Get disabled partition id's
    * @return set of disabled partition id's, or empty set if none
    */
-  public Set<PartitionId> getDisablePartitionIds() {
+  public Set<PartitionId> getDisabledPartitions() {
     return _disabledPartitions;
   }
 
@@ -122,6 +123,159 @@ public class ParticipantConfig {
   }
 
   /**
+   * Update context for a ParticipantConfig
+   */
+  public static class Delta {
+    private enum Fields {
+      HOST_NAME,
+      PORT,
+      ENABLED,
+      USER_CONFIG
+    }
+
+    private Set<Fields> _updateFields;
+    private Set<String> _removedTags;
+    private Set<PartitionId> _removedDisabledPartitions;
+    private Builder _builder;
+
+    /**
+     * Instantiate the delta for a participant config
+     * @param participantId the participant to update
+     */
+    public Delta(ParticipantId participantId) {
+      _updateFields = Sets.newHashSet();
+      _removedTags = Sets.newHashSet();
+      _removedDisabledPartitions = Sets.newHashSet();
+      _builder = new Builder(participantId);
+    }
+
+    /**
+     * Set the participant host name
+     * @param hostName reachable host when live
+     * @return Delta
+     */
+    public Delta setHostName(String hostName) {
+      _builder.hostName(hostName);
+      _updateFields.add(Fields.HOST_NAME);
+      return this;
+    }
+
+    /**
+     * Set the participant port
+     * @param port port number
+     * @return Delta
+     */
+    public Delta setPort(int port) {
+      _builder.port(port);
+      _updateFields.add(Fields.PORT);
+      return this;
+    }
+
+    /**
+     * Set whether or not the participant is enabled
+     * @param isEnabled true if enabled, false otherwise
+     * @return Delta
+     */
+    public Delta setEnabled(boolean isEnabled) {
+      _builder.enabled(isEnabled);
+      _updateFields.add(Fields.ENABLED);
+      return this;
+    }
+
+    /**
+     * Set the user configuration
+     * @param userConfig user-specified properties
+     * @return Delta
+     */
+    public Delta setUserConfig(UserConfig userConfig) {
+      _builder.userConfig(userConfig);
+      _updateFields.add(Fields.USER_CONFIG);
+      return this;
+    }
+
+    /**
+     * Add an new tag for this participant
+     * @param tag the tag to add
+     * @return Delta
+     */
+    public Delta addTag(String tag) {
+      _builder.addTag(tag);
+      return this;
+    }
+
+    /**
+     * Remove a tag for this participant
+     * @param tag the tag to remove
+     * @return Delta
+     */
+    public Delta removeTag(String tag) {
+      _removedTags.add(tag);
+      return this;
+    }
+
+    /**
+     * Add a partition to disable for this participant
+     * @param partitionId the partition to disable
+     * @return Delta
+     */
+    public Delta addDisabledPartition(PartitionId partitionId) {
+      _builder.addDisabledPartition(partitionId);
+      return this;
+    }
+
+    /**
+     * Remove a partition from the disabled set for this participant
+     * @param partitionId the partition to enable
+     * @return Delta
+     */
+    public Delta removeDisabledPartition(PartitionId partitionId) {
+      _removedDisabledPartitions.add(partitionId);
+      return this;
+    }
+
+    /**
+     * Create a ParticipantConfig that is the combination of an existing ParticipantConfig and this
+     * delta
+     * @param orig the original ParticipantConfig
+     * @return updated ParticipantConfig
+     */
+    public ParticipantConfig mergeInto(ParticipantConfig orig) {
+      ParticipantConfig deltaConfig = _builder.build();
+      Builder builder =
+          new Builder(orig.getId()).hostName(orig.getHostName()).port(orig.getPort())
+              .userConfig(orig.getUserConfig());
+      for (Fields field : _updateFields) {
+        switch (field) {
+        case HOST_NAME:
+          builder.hostName(deltaConfig.getHostName());
+          break;
+        case PORT:
+          builder.port(deltaConfig.getPort());
+          break;
+        case ENABLED:
+          builder.enabled(deltaConfig.isEnabled());
+          break;
+        case USER_CONFIG:
+          builder.userConfig(deltaConfig.getUserConfig());
+          break;
+        }
+      }
+      Set<String> tags = Sets.newHashSet(orig.getTags());
+      tags.addAll(deltaConfig.getTags());
+      tags.removeAll(_removedTags);
+      for (String tag : tags) {
+        builder.addTag(tag);
+      }
+      Set<PartitionId> disabledPartitions = Sets.newHashSet(orig.getDisabledPartitions());
+      disabledPartitions.addAll(deltaConfig.getDisabledPartitions());
+      for (PartitionId partitionId : disabledPartitions) {
+        builder.addDisabledPartition(partitionId);
+      }
+      return builder.build();
+    }
+  }
+
+  /**
    * Assemble a participant
    */
   public static class Builder {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/89b3bc59/helix-core/src/main/java/org/apache/helix/api/Resource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Resource.java b/helix-core/src/main/java/org/apache/helix/api/Resource.java
index 7816888..2c3b7ca 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Resource.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Resource.java
@@ -43,6 +43,7 @@ public class Resource {
   /**
    * Construct a resource
    * @param id resource id
+   * @param type ResourceType type
    * @param idealState ideal state of the resource
    * @param externalView external view of the resource
    * @param resourceAssignment current resource assignment of the cluster
@@ -51,9 +52,10 @@ public class Resource {
    * @param bucketSize the bucket size to use for physically saved state
    * @param batchMessageMode true if batch messaging allowed, false otherwise
    */
-  public Resource(ResourceId id, IdealState idealState, ResourceAssignment resourceAssignment,
-      ExternalView externalView, RebalancerContext rebalancerContext, UserConfig userConfig,
-      int bucketSize, boolean batchMessageMode) {
+  public Resource(ResourceId id, ResourceType type, IdealState idealState,
+      ResourceAssignment resourceAssignment, ExternalView externalView,
+      RebalancerContext rebalancerContext, UserConfig userConfig, int bucketSize,
+      boolean batchMessageMode) {
     Map<PartitionId, Partition> partitionMap = new HashMap<PartitionId, Partition>();
     new HashMap<PartitionId, Map<String, String>>();
     Set<PartitionId> partitionSet = idealState.getPartitionSet();
@@ -71,8 +73,8 @@ public class Resource {
     RebalancerConfig rebalancerConfig = new RebalancerConfig(rebalancerContext);
 
     _config =
-        new ResourceConfig(id, ResourceType.DATA, schedulerTaskConfig, rebalancerConfig,
-            userConfig, bucketSize, batchMessageMode);
+        new ResourceConfig(id, type, schedulerTaskConfig, rebalancerConfig, userConfig, bucketSize,
+            batchMessageMode);
     _externalView = externalView;
     _resourceAssignment = resourceAssignment;
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/89b3bc59/helix-core/src/main/java/org/apache/helix/api/ResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/ResourceAccessor.java
index bdc44c7..5adec4e 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ResourceAccessor.java
@@ -22,6 +22,7 @@ package org.apache.helix.api;
 import org.apache.helix.HelixConstants.StateModelToken;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.api.ResourceConfig.ResourceType;
 import org.apache.helix.controller.rebalancer.context.CustomRebalancerContext;
 import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
 import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
@@ -32,8 +33,10 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.ResourceConfiguration;
+import org.apache.log4j.Logger;
 
 public class ResourceAccessor {
+  private static final Logger LOG = Logger.getLogger(ResourceAccessor.class);
   private final HelixDataAccessor _accessor;
   private final PropertyKey.Builder _keyBuilder;
 
@@ -59,6 +62,23 @@ public class ResourceAccessor {
   }
 
   /**
+   * Update a resource configuration
+   * @param resourceId the resource id to update
+   * @param resourceDelta changes to the resource
+   * @return ResourceConfig, or null if the resource is not persisted
+   */
+  public ResourceConfig updateResource(ResourceId resourceId, ResourceConfig.Delta resourceDelta) {
+    Resource resource = readResource(resourceId);
+    if (resource == null) {
+      LOG.error("Resource " + resourceId + " does not exist, cannot be updated");
+      return null;
+    }
+    ResourceConfig config = resourceDelta.mergeInto(resource.getConfig());
+    // TODO: persist this
+    return config;
+  }
+
+  /**
    * save resource assignment
    * @param resourceId
    * @param resourceAssignment
@@ -73,8 +93,8 @@ public class ResourceAccessor {
    * @param resourceId
    * @return resource assignment or null
    */
-  public void getResourceAssignment(ResourceId resourceId) {
-    _accessor.getProperty(_keyBuilder.resourceAssignment(resourceId.stringify()));
+  public ResourceAssignment getResourceAssignment(ResourceId resourceId) {
+    return _accessor.getProperty(_keyBuilder.resourceAssignment(resourceId.stringify()));
   }
 
   /**
@@ -182,10 +202,11 @@ public class ResourceAccessor {
   static Resource createResource(ResourceId resourceId,
       ResourceConfiguration resourceConfiguration, IdealState idealState,
       ExternalView externalView, ResourceAssignment resourceAssignment) {
-    // TODO pass resource assignment
     UserConfig userConfig;
+    ResourceType type = ResourceType.DATA;
     if (resourceConfiguration != null) {
       userConfig = UserConfig.from(resourceConfiguration);
+      type = resourceConfiguration.getType();
     } else {
       userConfig = new UserConfig(Scope.resource(resourceId));
     }
@@ -206,7 +227,7 @@ public class ResourceAccessor {
         rebalancerContext = new PartitionedRebalancerContext(RebalanceMode.NONE);
       }
     }
-    return new Resource(resourceId, idealState, resourceAssignment, externalView,
+    return new Resource(resourceId, type, idealState, resourceAssignment, externalView,
         rebalancerContext, userConfig, bucketSize, batchMessageMode);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/89b3bc59/helix-core/src/main/java/org/apache/helix/api/ResourceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ResourceConfig.java b/helix-core/src/main/java/org/apache/helix/api/ResourceConfig.java
index a21301b..dc3dc1d 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ResourceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ResourceConfig.java
@@ -6,6 +6,8 @@ import java.util.Set;
 import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
 import org.apache.helix.controller.rebalancer.context.RebalancerContext;
 
+import com.google.common.collect.Sets;
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -156,6 +158,123 @@ public class ResourceConfig {
   }
 
   /**
+   * Update context for a ResourceConfig
+   */
+  public static class Delta {
+    private enum Fields {
+      TYPE,
+      REBALANCER_CONTEXT,
+      USER_CONFIG,
+      BUCKET_SIZE,
+      BATCH_MESSAGE_MODE
+    }
+
+    private Set<Fields> _updateFields;
+    private Builder _builder;
+
+    /**
+     * Instantiate the delta for a resource config
+     * @param resourceId the resource to update
+     */
+    public Delta(ResourceId resourceId) {
+      _builder = new Builder(resourceId);
+      _updateFields = Sets.newHashSet();
+    }
+
+    /**
+     * Set the type of this resource
+     * @param type ResourceType
+     * @return Delta
+     */
+    public Delta setType(ResourceType type) {
+      _builder.type(type);
+      _updateFields.add(Fields.TYPE);
+      return this;
+    }
+
+    /**
+     * Set the rebalancer configuration
+     * @param context properties of interest for rebalancing
+     * @return Delta
+     */
+    public Delta setRebalancerContext(RebalancerContext context) {
+      _builder.rebalancerContext(context);
+      _updateFields.add(Fields.REBALANCER_CONTEXT);
+      return this;
+    }
+
+    /**
+     * Set the user configuration
+     * @param userConfig user-specified properties
+     * @return Delta
+     */
+    public Delta setUserConfig(UserConfig userConfig) {
+      _builder.userConfig(userConfig);
+      _updateFields.add(Fields.USER_CONFIG);
+      return this;
+    }
+
+    /**
+     * Set the bucket size
+     * @param bucketSize the size to use
+     * @return Delta
+     */
+    public Delta setBucketSize(int bucketSize) {
+      _builder.bucketSize(bucketSize);
+      _updateFields.add(Fields.BUCKET_SIZE);
+      return this;
+    }
+
+    /**
+     * Set the batch message mode
+     * @param batchMessageMode true to enable, false to disable
+     * @return Delta
+     */
+    public Delta setBatchMessageMode(boolean batchMessageMode) {
+      _builder.batchMessageMode(batchMessageMode);
+      _updateFields.add(Fields.BATCH_MESSAGE_MODE);
+      return this;
+    }
+
+    /**
+     * Create a ResourceConfig that is the combination of an existing ResourceConfig and this delta
+     * @param orig the original ResourceConfig
+     * @return updated ResourceConfig
+     */
+    public ResourceConfig mergeInto(ResourceConfig orig) {
+      ResourceConfig deltaConfig = _builder.build();
+      Builder builder =
+          new Builder(orig.getId())
+              .type(orig.getType())
+              .rebalancerContext(
+                  orig.getRebalancerConfig().getRebalancerContext(RebalancerContext.class))
+              .schedulerTaskConfig(orig.getSchedulerTaskConfig()).userConfig(orig.getUserConfig())
+              .bucketSize(orig.getBucketSize()).batchMessageMode(orig.getBatchMessageMode());
+      for (Fields field : _updateFields) {
+        switch (field) {
+        case TYPE:
+          builder.type(deltaConfig.getType());
+          break;
+        case REBALANCER_CONTEXT:
+          builder.rebalancerContext(deltaConfig.getRebalancerConfig().getRebalancerContext(
+              RebalancerContext.class));
+          break;
+        case USER_CONFIG:
+          builder.userConfig(deltaConfig.getUserConfig());
+          break;
+        case BUCKET_SIZE:
+          builder.bucketSize(deltaConfig.getBucketSize());
+          break;
+        case BATCH_MESSAGE_MODE:
+          builder.batchMessageMode(deltaConfig.getBatchMessageMode());
+          break;
+        }
+      }
+      return builder.build();
+    }
+  }
+
+  /**
    * Assembles a ResourceConfig
    */
   public static class Builder {
@@ -191,7 +310,7 @@ public class ResourceConfig {
 
     /**
      * Set the rebalancer configuration
-     * @param rebalancerConfig properties of interest for rebalancing
+     * @param rebalancerContext properties of interest for rebalancing
      * @return Builder
      */
     public Builder rebalancerContext(RebalancerContext rebalancerContext) {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/89b3bc59/helix-core/src/main/java/org/apache/helix/api/StateModelDefinitionAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/StateModelDefinitionAccessor.java b/helix-core/src/main/java/org/apache/helix/api/StateModelDefinitionAccessor.java
index 988484b..60b6210 100644
--- a/helix-core/src/main/java/org/apache/helix/api/StateModelDefinitionAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/StateModelDefinitionAccessor.java
@@ -59,11 +59,11 @@ public class StateModelDefinitionAccessor {
   }
 
   /**
-   * Add a state model definition. Updates the existing state model definition if it already exists.
+   * Set a state model definition. Adds the state model definition if it does not exist
    * @param stateModelDef fully initialized state model definition
    * @return true if the model is persisted, false otherwise
    */
-  public boolean addStateModelDefinition(StateModelDefinition stateModelDef) {
+  public boolean setStateModelDefinition(StateModelDefinition stateModelDef) {
     return _accessor.setProperty(_keyBuilder.stateModelDef(stateModelDef.getId()), stateModelDef);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/89b3bc59/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
index 5165ba7..3925c2b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
@@ -13,9 +13,8 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.codehaus.jackson.annotate.JsonIgnore;
 import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.testng.collections.Maps;
 
-import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -62,7 +61,7 @@ public class PartitionedRebalancerContext extends BasicRebalancerContext impleme
 
   /**
    * Get a map from partition id to partition
-   * @return partition map
+   * @return partition map (mutable)
    */
   public Map<PartitionId, Partition> getPartitionMap() {
     return _partitionMap;
@@ -73,7 +72,7 @@ public class PartitionedRebalancerContext extends BasicRebalancerContext impleme
    * @param partitionMap partition map
    */
   public void setPartitionMap(Map<PartitionId, Partition> partitionMap) {
-    _partitionMap = ImmutableMap.copyOf(partitionMap);
+    _partitionMap = Maps.newHashMap(partitionMap);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/89b3bc59/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
index 26f134b..7d5fed2 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
@@ -105,7 +105,9 @@ public final class RebalancerConfig {
       String serialized = _config.getSimpleField(Fields.REBALANCER_CONTEXT.toString());
       return _serializer.deserialize(contextClass, serialized);
     } catch (ClassNotFoundException e) {
-      LOG.info(className + " is not a valid class");
+      LOG.error(className + " is not a valid class");
+    } catch (ClassCastException e) {
+      LOG.error(className + " does not implement RebalancerContext");
     }
     return null;
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/89b3bc59/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java b/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
index 8966434..c753f22 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
@@ -1,16 +1,10 @@
 package org.apache.helix.model;
 
-import java.util.List;
-
 import org.apache.helix.HelixProperty;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.ResourceConfig.ResourceType;
 import org.apache.helix.api.ResourceId;
 
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
-import com.google.common.collect.Lists;
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -35,7 +29,7 @@ import com.google.common.collect.Lists;
  */
 public class ResourceConfiguration extends HelixProperty {
   public enum Fields {
-    PARTITION_LIST
+    TYPE
   }
 
   /**
@@ -63,29 +57,18 @@ public class ResourceConfiguration extends HelixProperty {
   }
 
   /**
-   * Set the partitions for this resource
-   * @param partitionIds list of partition ids
+   * Set the resource type
+   * @param type ResourceType type
    */
-  public void setPartitionIds(List<PartitionId> partitionIds) {
-    _record.setListField(Fields.PARTITION_LIST.toString(),
-        Lists.transform(partitionIds, Functions.toStringFunction()));
+  public void setType(ResourceType type) {
+    _record.setEnumField(Fields.TYPE.toString(), type);
   }
 
   /**
-   * Get the partitions for this resource
-   * @return list of partition ids
+   * Get the resource type
+   * @return ResourceType type
    */
-  public List<PartitionId> getPartitionIds() {
-    List<String> partitionNames = _record.getListField(Fields.PARTITION_LIST.toString());
-    if (partitionNames != null) {
-      return Lists.transform(partitionNames, new Function<String, PartitionId>() {
-        @Override
-        public PartitionId apply(String partitionName) {
-          return PartitionId.from(partitionName);
-        }
-      });
-    }
-    return null;
+  public ResourceType getType() {
+    return _record.getEnumField(Fields.TYPE.toString(), ResourceType.class, ResourceType.DATA);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/89b3bc59/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
index 262f779..141f6b7 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
@@ -34,6 +34,7 @@ import org.apache.helix.ZNRecord;
 import org.apache.helix.api.ParticipantId;
 import org.apache.helix.api.Resource;
 import org.apache.helix.api.ResourceConfig;
+import org.apache.helix.api.ResourceConfig.ResourceType;
 import org.apache.helix.api.ResourceId;
 import org.apache.helix.api.Scope;
 import org.apache.helix.api.StateModelDefId;
@@ -170,8 +171,8 @@ public class BaseStageTest {
       ResourceId resourceId = idealState.getResourceId();
       RebalancerContext context = PartitionedRebalancerContext.from(idealState);
       Resource resource =
-          new Resource(resourceId, idealState, null, null, context, new UserConfig(
-              Scope.resource(resourceId)), idealState.getBucketSize(),
+          new Resource(resourceId, ResourceType.DATA, idealState, null, null, context,
+              new UserConfig(Scope.resource(resourceId)), idealState.getBucketSize(),
               idealState.getBatchMessageMode());
       resourceMap.put(resourceId, resource.getConfig());
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/89b3bc59/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
----------------------------------------------------------------------
diff --git a/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java b/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
index 6791bfd..72ea9f8 100644
--- a/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
+++ b/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
@@ -50,6 +50,9 @@ import com.google.common.collect.Lists;
  * under the License.
  */
 
+/**
+ * Example showing all major interactions with the new Helix logical model
+ */
 public class NewModelExample {
   private static final Logger LOG = Logger.getLogger(NewModelExample.class);
 
@@ -130,10 +133,10 @@ public class NewModelExample {
     ResourceId resourceId = ResourceId.from("exampleResource");
 
     // create a partition
-    Partition partition1 = new Partition(PartitionId.from("partition1"));
+    Partition partition1 = new Partition(PartitionId.from(resourceId, "1"));
 
     // create a second partition
-    Partition partition2 = new Partition(PartitionId.from("partition2"));
+    Partition partition2 = new Partition(PartitionId.from(resourceId, "2"));
 
     // specify the rebalancer configuration
     // this resource will be rebalanced in FULL_AUTO mode, so use the FullAutoRebalancerConfig


Mime
View raw message