helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject git commit: Local provisioning
Date Fri, 13 Dec 2013 18:30:04 GMT
Updated Branches:
  refs/heads/helix-provisioning 1022d0733 -> 3153c5e95


Local provisioning


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/3153c5e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/3153c5e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/3153c5e9

Branch: refs/heads/helix-provisioning
Commit: 3153c5e9543446c900f4b231395bcbca290ceecd
Parents: 1022d07
Author: Kanak Biscuitwala <kanak@apache.org>
Authored: Fri Dec 13 10:29:59 2013 -0800
Committer: Kanak Biscuitwala <kanak@apache.org>
Committed: Fri Dec 13 10:29:59 2013 -0800

----------------------------------------------------------------------
 .../java/org/apache/helix/api/Participant.java  |  13 +-
 .../java/org/apache/helix/api/Resource.java     |  18 +-
 .../helix/api/accessor/ClusterAccessor.java     |  32 +-
 .../helix/api/accessor/ParticipantAccessor.java |  15 +-
 .../helix/api/accessor/ResourceAccessor.java    |   7 +-
 .../helix/api/config/ContainerConfig.java       |  66 ++++
 .../apache/helix/api/config/ResourceConfig.java |  32 +-
 .../controller/GenericHelixController.java      |   2 +
 .../controller/provisioner/ContainerState.java  |   4 +-
 .../provisioner/ProvisionerConfig.java          |   7 +
 .../provisioner/TargetProviderResponse.java     |  19 ++
 .../stages/ContainerProvisioningStage.java      |  53 +--
 .../stages/ResourceComputationStage.java        |   1 +
 .../helix/model/ClusterConfiguration.java       |  14 +-
 .../org/apache/helix/model/InstanceConfig.java  |   9 +-
 .../helix/model/ProvisionerConfigHolder.java    | 184 +++++++++++
 .../helix/model/ResourceConfiguration.java      |  11 +
 .../helix/controller/stages/BaseStageTest.java  |   2 +-
 .../stages/TestMsgSelectionStage.java           |   8 +-
 .../strategy/TestNewAutoRebalanceStrategy.java  |   2 +-
 .../integration/TestLocalContainerProvider.java | 321 +++++++++++++++++++
 21 files changed, 753 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3153c5e9/helix-core/src/main/java/org/apache/helix/api/Participant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Participant.java b/helix-core/src/main/java/org/apache/helix/api/Participant.java
index 53f4038..fc20968 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Participant.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Participant.java
@@ -22,6 +22,7 @@ package org.apache.helix.api;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.helix.api.config.ContainerConfig;
 import org.apache.helix.api.config.ParticipantConfig;
 import org.apache.helix.api.config.UserConfig;
 import org.apache.helix.api.id.MessageId;
@@ -38,6 +39,7 @@ import com.google.common.collect.ImmutableMap;
  */
 public class Participant {
   private final ParticipantConfig _config;
+  private final ContainerConfig _containerConfig;
 
   private final RunningInstance _runningInstance;
 
@@ -58,10 +60,11 @@ public class Participant {
   public Participant(ParticipantId id, String hostName, int port, boolean isEnabled,
       Set<PartitionId> disabledPartitionIdSet, Set<String> tags, RunningInstance runningInstance,
       Map<ResourceId, CurrentState> currentStateMap, Map<MessageId, Message> messageMap,
-      UserConfig userConfig) {
+      UserConfig userConfig, ContainerConfig containerConfig) {
     _config =
         new ParticipantConfig(id, hostName, port, isEnabled, disabledPartitionIdSet, tags,
             userConfig);
+    _containerConfig = containerConfig;
     _runningInstance = runningInstance;
     _currentStateMap = ImmutableMap.copyOf(currentStateMap);
     _messageMap = ImmutableMap.copyOf(messageMap);
@@ -171,4 +174,12 @@ public class Participant {
   public ParticipantConfig getConfig() {
     return _config;
   }
+
+  /**
+   * Get the container config (if any)
+   * @return ContainerConfig for the container associated with this participant
+   */
+  public ContainerConfig getContainerConfig() {
+    return _containerConfig;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3153c5e9/helix-core/src/main/java/org/apache/helix/api/Resource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Resource.java b/helix-core/src/main/java/org/apache/helix/api/Resource.java
index 0726510..c9b329e 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Resource.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Resource.java
@@ -30,6 +30,7 @@ import org.apache.helix.api.config.UserConfig;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.controller.provisioner.ProvisionerConfig;
 import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
@@ -52,18 +53,19 @@ public class Resource {
    * @param externalView external view of the resource
    * @param resourceAssignment current resource assignment of the cluster
    * @param rebalancerConfig parameters that the rebalancer should be aware of
+   * @param provisionerConfig parameters that the provisioner should be aware of
    * @param userConfig any resource user-defined configuration
    * @param bucketSize the bucket size to use for physically saved state
    * @param batchMessageMode true if batch messaging allowed, false otherwise
    */
   public Resource(ResourceId id, ResourceType type, IdealState idealState,
       ResourceAssignment resourceAssignment, ExternalView externalView,
-      RebalancerConfig rebalancerConfig, UserConfig userConfig, int bucketSize,
-      boolean batchMessageMode) {
+      RebalancerConfig rebalancerConfig, ProvisionerConfig provisionerConfig,
+      UserConfig userConfig, int bucketSize, boolean batchMessageMode) {
     SchedulerTaskConfig schedulerTaskConfig = schedulerTaskConfig(idealState);
     _config =
-        new ResourceConfig(id, type, schedulerTaskConfig, rebalancerConfig, userConfig, bucketSize,
-            batchMessageMode);
+        new ResourceConfig(id, type, schedulerTaskConfig, rebalancerConfig, provisionerConfig,
+            userConfig, bucketSize, batchMessageMode);
     _externalView = externalView;
     _resourceAssignment = resourceAssignment;
   }
@@ -200,6 +202,14 @@ public class Resource {
   }
 
   /**
+   * Get the properties configuring provisioning
+   * @return ProvisionerConfig properties
+   */
+  public ProvisionerConfig getProvisionerConfig() {
+    return _config.getProvisionerConfig();
+  }
+
+  /**
    * Get the configuration of this resource
    * @return ResourceConfig that backs this Resource
    */

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3153c5e9/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index 36c7aaa..cacdf6c 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -54,6 +54,7 @@ import org.apache.helix.api.id.SessionId;
 import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.controller.context.ControllerContext;
 import org.apache.helix.controller.context.ControllerContextHolder;
+import org.apache.helix.controller.provisioner.ProvisionerConfig;
 import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
 import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.controller.rebalancer.config.RebalancerConfigHolder;
@@ -70,6 +71,7 @@ import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.PauseSignal;
 import org.apache.helix.model.PersistentStats;
+import org.apache.helix.model.ProvisionerConfigHolder;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.ResourceConfiguration;
 import org.apache.helix.model.StateModelDefinition;
@@ -673,21 +675,23 @@ public class ClusterAccessor {
       return false;
     }
 
-    // Add resource user config
-    if (resource.getUserConfig() != null) {
-      ResourceConfiguration configuration = new ResourceConfiguration(resourceId);
-      configuration.setType(resource.getType());
-      configuration.addNamespacedConfig(resource.getUserConfig());
-      PartitionedRebalancerConfig partitionedConfig = PartitionedRebalancerConfig.from(config);
-      if (partitionedConfig == null
-          || partitionedConfig.getRebalanceMode() == RebalanceMode.USER_DEFINED) {
-        // only persist if this is not easily convertible to an ideal state
-        configuration
-            .addNamespacedConfig(new RebalancerConfigHolder(resource.getRebalancerConfig())
-                .toNamespacedConfig());
-      }
-      _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
+    // Add resource config ZNode
+    ResourceConfiguration configuration = new ResourceConfiguration(resourceId);
+    configuration.setType(resource.getType());
+    configuration.addNamespacedConfig(resource.getUserConfig());
+    PartitionedRebalancerConfig partitionedConfig = PartitionedRebalancerConfig.from(config);
+    if (partitionedConfig == null
+        || partitionedConfig.getRebalanceMode() == RebalanceMode.USER_DEFINED) {
+      // only persist if this is not easily convertible to an ideal state
+      configuration.addNamespacedConfig(new RebalancerConfigHolder(resource.getRebalancerConfig())
+          .toNamespacedConfig());
+    }
+    ProvisionerConfig provisionerConfig = resource.getProvisionerConfig();
+    if (provisionerConfig != null) {
+      configuration.addNamespacedConfig(new ProvisionerConfigHolder(provisionerConfig)
+          .toNamespacedConfig());
     }
+    _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
 
     // Create an IdealState from a RebalancerConfig (if the resource is partitioned)
     IdealState idealState =

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3153c5e9/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
index c3deafe..cf4c549 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
@@ -43,6 +43,7 @@ import org.apache.helix.api.Resource;
 import org.apache.helix.api.RunningInstance;
 import org.apache.helix.api.Scope;
 import org.apache.helix.api.State;
+import org.apache.helix.api.config.ContainerConfig;
 import org.apache.helix.api.config.ParticipantConfig;
 import org.apache.helix.api.config.UserConfig;
 import org.apache.helix.api.id.MessageId;
@@ -51,6 +52,9 @@ import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.SessionId;
 import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.controller.provisioner.ContainerId;
+import org.apache.helix.controller.provisioner.ContainerSpec;
+import org.apache.helix.controller.provisioner.ContainerState;
 import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
 import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.model.CurrentState;
@@ -547,8 +551,17 @@ public class ParticipantAccessor {
       }
     }
 
+    // set up the container config if it exists
+    ContainerConfig containerConfig = null;
+    ContainerSpec containerSpec = instanceConfig.getContainerSpec();
+    ContainerState containerState = instanceConfig.getContainerState();
+    ContainerId containerId = instanceConfig.getContainerId();
+    if (containerSpec != null || containerState != null || containerId != null) {
+      containerConfig = new ContainerConfig(containerId, containerSpec, containerState);
+    }
+
     return new Participant(participantId, hostName, port, isEnabled, disabledPartitionIdSet, tags,
-        runningInstance, curStateMap, msgMap, userConfig);
+        runningInstance, curStateMap, msgMap, userConfig, containerConfig);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3153c5e9/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
index 80c5b16..8359da5 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
@@ -38,6 +38,7 @@ import org.apache.helix.api.config.UserConfig;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.provisioner.ProvisionerConfig;
 import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
 import org.apache.helix.controller.rebalancer.config.CustomRebalancerConfig;
 import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
@@ -440,6 +441,7 @@ public class ResourceAccessor {
       ResourceConfiguration resourceConfiguration, IdealState idealState,
       ExternalView externalView, ResourceAssignment resourceAssignment) {
     UserConfig userConfig;
+    ProvisionerConfig provisionerConfig = null;
     RebalancerConfig rebalancerConfig = null;
     ResourceType type = ResourceType.DATA;
     if (resourceConfiguration != null) {
@@ -472,8 +474,11 @@ public class ResourceAccessor {
     if (rebalancerConfig == null) {
       rebalancerConfig = new PartitionedRebalancerConfig();
     }
+    if (resourceConfiguration != null) {
+      provisionerConfig = resourceConfiguration.getProvisionerConfig(ProvisionerConfig.class);
+    }
     return new Resource(resourceId, type, idealState, resourceAssignment, externalView,
-        rebalancerConfig, userConfig, bucketSize, batchMessageMode);
+        rebalancerConfig, provisionerConfig, userConfig, bucketSize, batchMessageMode);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3153c5e9/helix-core/src/main/java/org/apache/helix/api/config/ContainerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/ContainerConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/ContainerConfig.java
new file mode 100644
index 0000000..e4b97cf
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/config/ContainerConfig.java
@@ -0,0 +1,66 @@
+package org.apache.helix.api.config;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.controller.provisioner.ContainerId;
+import org.apache.helix.controller.provisioner.ContainerSpec;
+import org.apache.helix.controller.provisioner.ContainerState;
+
+public class ContainerConfig {
+  private final ContainerId _id;
+  private final ContainerSpec _spec;
+  private final ContainerState _state;
+
+  /**
+   * Instantiate a ContainerConfig
+   * @param id the container id
+   * @param spec spec of the container
+   * @param state state that the container is in
+   */
+  public ContainerConfig(ContainerId id, ContainerSpec spec, ContainerState state) {
+    _id = id;
+    _spec = spec;
+    _state = state;
+  }
+
+  /**
+   * Get the container id
+   * @return ContainerId
+   */
+  public ContainerId getId() {
+    return _id;
+  }
+
+  /**
+   * Get the spec for the container
+   * @return ContainerSpec
+   */
+  public ContainerSpec getSpec() {
+    return _spec;
+  }
+
+  /**
+   * Get the state of the container
+   * @return ContainerState
+   */
+  public ContainerState getState() {
+    return _state;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3153c5e9/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
index 0aedfaf..0b2df4a 100644
--- a/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
@@ -48,6 +48,7 @@ public class ResourceConfig {
   private final ResourceId _id;
   private final RebalancerConfig _rebalancerConfig;
   private final SchedulerTaskConfig _schedulerTaskConfig;
+  private final ProvisionerConfig _provisionerConfig;
   private final UserConfig _userConfig;
   private final int _bucketSize;
   private final boolean _batchMessageMode;
@@ -59,17 +60,20 @@ public class ResourceConfig {
    * @param partitionMap map of partition identifiers to partition objects
    * @param schedulerTaskConfig configuration for scheduler tasks associated with the resource
    * @param rebalancerConfig configuration for rebalancing the resource
+   * @param provisionerConfig configuration for provisioning for the resource
    * @param userConfig user-defined resource properties
    * @param bucketSize bucket size for this resource
    * @param batchMessageMode whether or not batch messaging is allowed
    */
   public ResourceConfig(ResourceId id, ResourceType resourceType,
       SchedulerTaskConfig schedulerTaskConfig, RebalancerConfig rebalancerConfig,
-      UserConfig userConfig, int bucketSize, boolean batchMessageMode) {
+      ProvisionerConfig provisionerConfig, UserConfig userConfig, int bucketSize,
+      boolean batchMessageMode) {
     _id = id;
     _resourceType = resourceType;
     _schedulerTaskConfig = schedulerTaskConfig;
     _rebalancerConfig = rebalancerConfig;
+    _provisionerConfig = provisionerConfig;
     _userConfig = userConfig;
     _bucketSize = bucketSize;
     _batchMessageMode = batchMessageMode;
@@ -133,6 +137,14 @@ public class ResourceConfig {
   }
 
   /**
+   * Get the properties configuring the provisioner
+   * @return ProvisionerConfig properties
+   */
+  public ProvisionerConfig getProvisionerConfig() {
+    return _provisionerConfig;
+  }
+
+  /**
    * Get user-specified configuration properties of this resource
    * @return UserConfig properties
    */
@@ -283,6 +295,7 @@ public class ResourceConfig {
     private ResourceType _type;
     private RebalancerConfig _rebalancerConfig;
     private SchedulerTaskConfig _schedulerTaskConfig;
+    private ProvisionerConfig _provisionerConfig;
     private UserConfig _userConfig;
     private int _bucketSize;
     private boolean _batchMessageMode;
@@ -339,6 +352,15 @@ public class ResourceConfig {
     }
 
     /**
+     * @param schedulerTaskConfig
+     * @return
+     */
+    public Builder provisionerConfig(ProvisionerConfig provisionerConfig) {
+      _provisionerConfig = provisionerConfig;
+      return this;
+    }
+
+    /**
      * Set the bucket size
      * @param bucketSize the size to use
      * @return Builder
@@ -363,12 +385,8 @@ public class ResourceConfig {
      * @return instantiated Resource
      */
     public ResourceConfig build() {
-      return new ResourceConfig(_id, _type, _schedulerTaskConfig, _rebalancerConfig, _userConfig,
-          _bucketSize, _batchMessageMode);
+      return new ResourceConfig(_id, _type, _schedulerTaskConfig, _rebalancerConfig,
+          _provisionerConfig, _userConfig, _bucketSize, _batchMessageMode);
     }
   }
-
-  public ProvisionerConfig getProvisionerConfig() {
-    return null;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3153c5e9/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 96be0fa..b5fb23e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -47,6 +47,7 @@ import org.apache.helix.controller.pipeline.PipelineRegistry;
 import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
 import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.controller.stages.CompatibilityCheckStage;
+import org.apache.helix.controller.stages.ContainerProvisioningStage;
 import org.apache.helix.controller.stages.CurrentStateComputationStage;
 import org.apache.helix.controller.stages.ExternalViewComputeStage;
 import org.apache.helix.controller.stages.MessageGenerationStage;
@@ -182,6 +183,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
       Pipeline rebalancePipeline = new Pipeline();
       rebalancePipeline.addStage(new CompatibilityCheckStage());
       rebalancePipeline.addStage(new ResourceComputationStage());
+      rebalancePipeline.addStage(new ContainerProvisioningStage());
       rebalancePipeline.addStage(new CurrentStateComputationStage());
       rebalancePipeline.addStage(new BestPossibleStateCalcStage());
       rebalancePipeline.addStage(new MessageGenerationStage());

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3153c5e9/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java
index 64c8ad5..c2e5649 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java
@@ -21,10 +21,12 @@ package org.apache.helix.controller.provisioner;
 
 public enum ContainerState {
   ACQUIRING,
+  ACQUIRED,
   CONNECTING,
   ACTIVE,
   TEARDOWN,
   FAILED,
   HALTED,
-  FINALIZING
+  FINALIZING,
+  FINALIZED
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3153c5e9/helix-core/src/main/java/org/apache/helix/controller/provisioner/ProvisionerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ProvisionerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ProvisionerConfig.java
index 54fd492..0f8839a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ProvisionerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ProvisionerConfig.java
@@ -19,6 +19,13 @@ package org.apache.helix.controller.provisioner;
  * under the License.
  */
 
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.serializer.StringSerializer;
+
 public interface ProvisionerConfig {
   ProvisionerRef getProvisionerRef();
+
+  ResourceId getResourceId();
+
+  Class<? extends StringSerializer> getSerializerClass();
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3153c5e9/helix-core/src/main/java/org/apache/helix/controller/provisioner/TargetProviderResponse.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/provisioner/TargetProviderResponse.java b/helix-core/src/main/java/org/apache/helix/controller/provisioner/TargetProviderResponse.java
index 1e27525..bcb129b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/provisioner/TargetProviderResponse.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/TargetProviderResponse.java
@@ -1,5 +1,24 @@
 package org.apache.helix.controller.provisioner;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 import java.util.List;
 
 import org.apache.helix.api.Participant;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3153c5e9/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
index 5b0671c..c4cc5d8 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
@@ -89,7 +89,7 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
           }
         }
 
-        Cluster cluster = event.getAttribute("clusterDataCache");
+        Cluster cluster = event.getAttribute("ClusterDataCache");
         Collection<Participant> participants = cluster.getParticipantMap().values();
 
         // Participants registered in helix
@@ -106,11 +106,10 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
         TargetProviderResponse response =
             provisioner.evaluateExistingContainers(cluster, resourceId, participants);
 
-        // random participant id
-        ParticipantId participantId = ParticipantId.from(UUID.randomUUID().toString());
-
         // allocate new containers
         for (ContainerSpec spec : response.getContainersToAcquire()) {
+          // random participant id
+          ParticipantId participantId = ParticipantId.from(UUID.randomUUID().toString());
           // create a new Participant, attach the container spec
           InstanceConfig instanceConfig = new InstanceConfig(participantId);
           instanceConfig.setContainerSpec(spec);
@@ -118,63 +117,71 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
           instanceConfig.setContainerState(ContainerState.ACQUIRING);
           // create the helix participant and add it to cluster
           helixAdmin.addInstance(cluster.getId().toString(), instanceConfig);
-
           ContainerId containerId = provisioner.allocateContainer(spec);
           InstanceConfig existingInstance =
               helixAdmin.getInstanceConfig(cluster.getId().toString(), participantId.toString());
           existingInstance.setContainerId(containerId);
+          existingInstance.setContainerState(ContainerState.ACQUIRED);
           accessor.setProperty(keyBuilder.instanceConfig(participantId.toString()),
               existingInstance);
         }
 
         // start new containers
         for (Participant participant : response.getContainersToStart()) {
-          String containerIdStr = participant.getUserConfig().getSimpleField("ContainerId");
-          ContainerId containerId = ContainerId.from(containerIdStr);
           InstanceConfig existingInstance =
-              helixAdmin.getInstanceConfig(cluster.getId().toString(), participantId.toString());
+              helixAdmin.getInstanceConfig(cluster.getId().toString(), participant.getId()
+                  .toString());
+          ContainerId containerId = existingInstance.getContainerId();
+          existingInstance.setContainerId(containerId);
           existingInstance.setContainerState(ContainerState.CONNECTING);
-          accessor.setProperty(keyBuilder.instanceConfig(participantId.toString()),
+          accessor.setProperty(keyBuilder.instanceConfig(participant.getId().toString()),
               existingInstance);
           // create the helix participant and add it to cluster
           provisioner.startContainer(containerId);
           existingInstance =
-              helixAdmin.getInstanceConfig(cluster.getId().toString(), participantId.toString());
+              helixAdmin.getInstanceConfig(cluster.getId().toString(), participant.getId()
+                  .toString());
           existingInstance.setContainerState(ContainerState.ACTIVE);
-          accessor.setProperty(keyBuilder.instanceConfig(participantId.toString()),
+          accessor.setProperty(keyBuilder.instanceConfig(participant.getId().toString()),
               existingInstance);
         }
 
         // release containers
         for (Participant participant : response.getContainersToRelease()) {
-          String containerIdStr = participant.getUserConfig().getSimpleField("ContainerId");
-          ContainerId containerId = ContainerId.from(containerIdStr);
           // this will change the container state
+          InstanceConfig existingInstance =
+              helixAdmin.getInstanceConfig(cluster.getId().toString(), participant.getId()
+                  .toString());
+          ContainerId containerId = existingInstance.getContainerId();
+          existingInstance.setContainerState(ContainerState.FINALIZING);
+          accessor.setProperty(keyBuilder.instanceConfig(participant.getId().toString()),
+              existingInstance);
           provisioner.deallocateContainer(containerId);
           // remove the participant
-          InstanceConfig existingInstance =
-              helixAdmin.getInstanceConfig(cluster.getId().toString(), participantId.toString());
+          existingInstance =
+              helixAdmin.getInstanceConfig(cluster.getId().toString(), participant.getId()
+                  .toString());
           helixAdmin.dropInstance(cluster.getId().toString(), existingInstance);
         }
 
         // stop but don't remove
-        for (Participant participant : participants) {
-          String containerIdStr = participant.getUserConfig().getSimpleField("ContainerId");
+        for (Participant participant : response.getContainersToStop()) {
           // disable the node first
-          // TODO: get the participant id from the container id
           InstanceConfig existingInstance =
-              helixAdmin.getInstanceConfig(cluster.getId().toString(), participantId.toString());
+              helixAdmin.getInstanceConfig(cluster.getId().toString(), participant.getId()
+                  .toString());
+          ContainerId containerId = existingInstance.getContainerId();
           existingInstance.setInstanceEnabled(false);
           existingInstance.setContainerState(ContainerState.TEARDOWN);
-          accessor.setProperty(keyBuilder.instanceConfig(participantId.toString()),
+          accessor.setProperty(keyBuilder.instanceConfig(participant.getId().toString()),
               existingInstance);
           // stop the container
-          ContainerId containerId = ContainerId.from(containerIdStr);
           provisioner.stopContainer(containerId);
           existingInstance =
-              helixAdmin.getInstanceConfig(cluster.getId().toString(), participantId.toString());
+              helixAdmin.getInstanceConfig(cluster.getId().toString(), participant.getId()
+                  .toString());
           existingInstance.setContainerState(ContainerState.HALTED);
-          accessor.setProperty(keyBuilder.instanceConfig(participantId.toString()),
+          accessor.setProperty(keyBuilder.instanceConfig(participant.getId().toString()),
               existingInstance);
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3153c5e9/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
index 5b75535..b8f1954 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
@@ -72,6 +72,7 @@ public class ResourceComputationStage extends AbstractBaseStage {
       resCfgBuilder.batchMessageMode(resource.getBatchMessageMode());
       resCfgBuilder.schedulerTaskConfig(resource.getSchedulerTaskConfig());
       resCfgBuilder.rebalancerConfig(rebalancerCfg);
+      resCfgBuilder.provisionerConfig(resource.getProvisionerConfig());
       resCfgMap.put(resourceId, resCfgBuilder.build());
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3153c5e9/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
index 8386d6c..1e9c205 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
@@ -1,12 +1,5 @@
 package org.apache.helix.model;
 
-import org.apache.helix.HelixProperty;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.api.config.NamespacedConfig;
-import org.apache.helix.api.config.UserConfig;
-import org.apache.helix.api.id.ClusterId;
-import org.apache.helix.manager.zk.ZKHelixManager;
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -26,6 +19,13 @@ import org.apache.helix.manager.zk.ZKHelixManager;
  * under the License.
  */
 
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.config.NamespacedConfig;
+import org.apache.helix.api.config.UserConfig;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.manager.zk.ZKHelixManager;
+
 /**
  * Persisted configuration properties for a cluster
  */

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3153c5e9/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
index c547fa2..5f27b05 100644
--- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
@@ -304,7 +304,9 @@ public class InstanceConfig extends HelixProperty {
   }
 
   public void setContainerSpec(ContainerSpec spec) {
-    _record.setSimpleField(InstanceConfigProperty.CONTAINER_SPEC.toString(), spec.toString());
+    if (spec != null) {
+      _record.setSimpleField(InstanceConfigProperty.CONTAINER_SPEC.toString(), spec.toString());
+    }
   }
 
   public ContainerSpec getContainerSpec() {
@@ -322,7 +324,10 @@ public class InstanceConfig extends HelixProperty {
   }
 
   public void setContainerId(ContainerId containerId) {
-    _record.setSimpleField(InstanceConfigProperty.CONTAINER_ID.toString(), containerId.toString());
+    if (containerId != null) {
+      _record
+          .setSimpleField(InstanceConfigProperty.CONTAINER_ID.toString(), containerId.toString());
+    }
   }
 
   public ContainerId getContainerId() {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3153c5e9/helix-core/src/main/java/org/apache/helix/model/ProvisionerConfigHolder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ProvisionerConfigHolder.java b/helix-core/src/main/java/org/apache/helix/model/ProvisionerConfigHolder.java
new file mode 100644
index 0000000..caaae0e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/model/ProvisionerConfigHolder.java
@@ -0,0 +1,184 @@
+package org.apache.helix.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.config.NamespacedConfig;
+import org.apache.helix.controller.provisioner.Provisioner;
+import org.apache.helix.controller.provisioner.ProvisionerConfig;
+import org.apache.helix.controller.serializer.StringSerializer;
+import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Configuration for a resource provisioner. This contains a ProvisionerConfig, which contains
+ * information specific to each provisioner.
+ */
+public final class ProvisionerConfigHolder {
+  private enum Fields {
+    SERIALIZER_CLASS,
+    PROVISIONER_CONFIG,
+    PROVISIONER_CONFIG_CLASS
+  }
+
+  private static final Logger LOG = Logger.getLogger(ProvisionerConfigHolder.class);
+  private StringSerializer _serializer;
+  private Provisioner _provisioner;
+  private final ProvisionerConfig _config;
+  private final NamespacedConfig _backingConfig;
+
+  /**
+   * Instantiate a ProvisionerConfigHolder
+   * @param config provisioner config
+   */
+  public ProvisionerConfigHolder(ProvisionerConfig config) {
+    _backingConfig =
+        new NamespacedConfig(Scope.resource(config.getResourceId()),
+            ProvisionerConfigHolder.class.getSimpleName());
+    _backingConfig.setSimpleField(Fields.SERIALIZER_CLASS.toString(), config.getSerializerClass()
+        .getName());
+    _backingConfig.setSimpleField(Fields.PROVISIONER_CONFIG_CLASS.toString(), config.getClass()
+        .getName());
+    _config = config;
+    try {
+      _serializer = config.getSerializerClass().newInstance();
+      _backingConfig.setSimpleField(Fields.PROVISIONER_CONFIG.toString(),
+          _serializer.serialize(config));
+    } catch (InstantiationException e) {
+      LOG.error("Error initializing the configuration", e);
+    } catch (IllegalAccessException e) {
+      LOG.error("Error initializing the configuration", e);
+    }
+  }
+
+  /**
+   * Instantiate from a physical ResourceConfiguration
+   * @param resourceConfiguration populated ResourceConfiguration
+   */
+  public ProvisionerConfigHolder(ResourceConfiguration resourceConfiguration) {
+    _backingConfig =
+        new NamespacedConfig(resourceConfiguration, ProvisionerConfigHolder.class.getSimpleName());
+    _serializer = getSerializer();
+    _config = getConfig();
+  }
+
+  /**
+   * Get the class that can serialize and deserialize the provisioner config
+   * @return StringSerializer
+   */
+  private StringSerializer getSerializer() {
+    String serializerClassName = _backingConfig.getSimpleField(Fields.SERIALIZER_CLASS.toString());
+    if (serializerClassName != null) {
+      try {
+        return (StringSerializer) HelixUtil.loadClass(getClass(), serializerClassName)
+            .newInstance();
+      } catch (InstantiationException e) {
+        LOG.error("Error getting the serializer", e);
+      } catch (IllegalAccessException e) {
+        LOG.error("Error getting the serializer", e);
+      } catch (ClassNotFoundException e) {
+        LOG.error("Error getting the serializer", e);
+      }
+    }
+    return null;
+  }
+
+  private ProvisionerConfig getConfig() {
+    String className = _backingConfig.getSimpleField(Fields.PROVISIONER_CONFIG_CLASS.toString());
+    if (className != null) {
+      try {
+        Class<? extends ProvisionerConfig> configClass =
+            HelixUtil.loadClass(getClass(), className).asSubclass(ProvisionerConfig.class);
+        String serialized = _backingConfig.getSimpleField(Fields.PROVISIONER_CONFIG.toString());
+        return _serializer.deserialize(configClass, serialized);
+      } catch (ClassNotFoundException e) {
+        LOG.error(className + " is not a valid class");
+      } catch (ClassCastException e) {
+        LOG.error("Could not convert the persisted data into a " + className, e);
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Get a provisioner class instance
+   * @return Provisioner
+   */
+  public Provisioner getProvisioner() {
+    // cache the provisioner to avoid loading and instantiating it excessively
+    if (_provisioner == null) {
+      if (_config == null || _config.getProvisionerRef() == null) {
+        return null;
+      }
+      _provisioner = _config.getProvisionerRef().getProvisioner();
+    }
+    return _provisioner;
+  }
+
+  /**
+   * Get the instantiated ProvisionerConfig
+   * @param configClass specific class of the ProvisionerConfig
+   * @return ProvisionerConfig subclass instance, or null if conversion is not possible
+   */
+  public <T extends ProvisionerConfig> T getProvisionerConfig(Class<T> configClass) {
+    if (_config != null) {
+      try {
+        return configClass.cast(_config);
+      } catch (ClassCastException e) {
+        LOG.warn(configClass + " is incompatible with config class: " + _config.getClass());
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Get the provisioner config serialized as a string
+   * @return string representing the config
+   */
+  public String getSerializedConfig() {
+    return _backingConfig.getSimpleField(Fields.PROVISIONER_CONFIG.toString());
+  }
+
+  /**
+   * Convert this to a namespaced config
+   * @return NamespacedConfig
+   */
+  public NamespacedConfig toNamespacedConfig() {
+    return _backingConfig;
+  }
+
+  /**
+   * Get a ProvisionerConfig from a physical resource config
+   * @param resourceConfiguration physical resource config
+   * @return ProvisionerConfig
+   */
+  public static ProvisionerConfigHolder from(ResourceConfiguration resourceConfiguration) {
+    return new ProvisionerConfigHolder(resourceConfiguration);
+  }
+
+  /**
+   * Get a ProvisionerConfigHolder from a ProvisionerConfig
+   * @param config instantiated ProvisionerConfig
+   * @return ProvisionerConfigHolder
+   */
+  public static ProvisionerConfigHolder from(ProvisionerConfig config) {
+    return new ProvisionerConfigHolder(config);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3153c5e9/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java b/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
index 8c5b863..3ead10a 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
@@ -6,6 +6,7 @@ import org.apache.helix.api.config.NamespacedConfig;
 import org.apache.helix.api.config.ResourceConfig.ResourceType;
 import org.apache.helix.api.config.UserConfig;
 import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.provisioner.ProvisionerConfig;
 import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.controller.rebalancer.config.RebalancerConfigHolder;
 
@@ -112,4 +113,14 @@ public class ResourceConfiguration extends HelixProperty {
     RebalancerConfigHolder config = new RebalancerConfigHolder(this);
     return config.getRebalancerConfig(clazz);
   }
+
+  /**
+   * Get a ProvisionerConfig, if available
+   * @param clazz the class to cast to
+   * @return ProvisionerConfig, or null
+   */
+  public ProvisionerConfig getProvisionerConfig(Class<? extends ProvisionerConfig> clazz) {
+    ProvisionerConfigHolder configHolder = new ProvisionerConfigHolder(this);
+    return configHolder.getProvisionerConfig(clazz);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3153c5e9/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
index 22c4168..8b26d86 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
@@ -171,7 +171,7 @@ public class BaseStageTest {
       ResourceId resourceId = idealState.getResourceId();
       RebalancerConfig context = PartitionedRebalancerConfig.from(idealState);
       Resource resource =
-          new Resource(resourceId, ResourceType.DATA, idealState, null, null, context,
+          new Resource(resourceId, ResourceType.DATA, idealState, null, null, context, null,
               new UserConfig(Scope.resource(resourceId)), idealState.getBucketSize(),
               idealState.getBatchMessageMode());
       resourceMap.put(resourceId, resource.getConfig());

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3153c5e9/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
index 4b33d8c..46c0fd9 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
@@ -65,11 +65,11 @@ public class TestMsgSelectionStage {
     liveInstances.put(ParticipantId.from("localhost_0"),
         new Participant(ParticipantId.from("localhost_0"), "localhost", 0, true,
             disabledPartitions, tags, runningInstance0, currentStateMap, messageMap,
-            new UserConfig(Scope.participant(ParticipantId.from("localhost_0")))));
+            new UserConfig(Scope.participant(ParticipantId.from("localhost_0"))), null));
     liveInstances.put(ParticipantId.from("localhost_1"),
         new Participant(ParticipantId.from("localhost_1"), "localhost", 1, true,
             disabledPartitions, tags, runningInstance1, currentStateMap, messageMap,
-            new UserConfig(Scope.participant(ParticipantId.from("localhost_1")))));
+            new UserConfig(Scope.participant(ParticipantId.from("localhost_1"))), null));
 
     Map<ParticipantId, State> currentStates = new HashMap<ParticipantId, State>();
     currentStates.put(ParticipantId.from("localhost_0"), State.from("SLAVE"));
@@ -119,11 +119,11 @@ public class TestMsgSelectionStage {
     liveInstances.put(ParticipantId.from("localhost_0"),
         new Participant(ParticipantId.from("localhost_0"), "localhost", 0, true,
             disabledPartitions, tags, runningInstance0, currentStateMap, messageMap,
-            new UserConfig(Scope.participant(ParticipantId.from("localhost_0")))));
+            new UserConfig(Scope.participant(ParticipantId.from("localhost_0"))), null));
     liveInstances.put(ParticipantId.from("localhost_1"),
         new Participant(ParticipantId.from("localhost_1"), "localhost", 1, true,
             disabledPartitions, tags, runningInstance1, currentStateMap, messageMap,
-            new UserConfig(Scope.participant(ParticipantId.from("localhost_1")))));
+            new UserConfig(Scope.participant(ParticipantId.from("localhost_1"))), null));
 
     Map<ParticipantId, State> currentStates = new HashMap<ParticipantId, State>();
     currentStates.put(ParticipantId.from("localhost_0"), State.from("SLAVE"));

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3153c5e9/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
index 9f52866..2c26d5d 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
@@ -246,7 +246,7 @@ public class TestNewAutoRebalanceStrategy {
           Participant participant =
               new Participant(participantId, "hostname", 0, true, disabledPartitionIdSet, tags,
                   null, currentStateMap, messageMap, new UserConfig(
-                      Scope.participant(participantId)));
+                      Scope.participant(participantId)), null);
           liveParticipantMap.put(participantId, participant);
         }
         List<ParticipantId> participantPreferenceList =

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3153c5e9/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java b/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
new file mode 100644
index 0000000..6439fed
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
@@ -0,0 +1,321 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.HelixConnection;
+import org.apache.helix.HelixController;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixParticipant;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.accessor.ClusterAccessor;
+import org.apache.helix.api.config.ClusterConfig;
+import org.apache.helix.api.config.ContainerConfig;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ControllerId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.controller.provisioner.ContainerId;
+import org.apache.helix.controller.provisioner.ContainerSpec;
+import org.apache.helix.controller.provisioner.ContainerState;
+import org.apache.helix.controller.provisioner.Provisioner;
+import org.apache.helix.controller.provisioner.ProvisionerConfig;
+import org.apache.helix.controller.provisioner.ProvisionerRef;
+import org.apache.helix.controller.provisioner.TargetProviderResponse;
+import org.apache.helix.controller.rebalancer.config.FullAutoRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.controller.serializer.DefaultStringSerializer;
+import org.apache.helix.controller.serializer.StringSerializer;
+import org.apache.helix.manager.zk.ZkHelixConnection;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.tools.StateModelConfigGenerator;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.AbstractService;
+
+public class TestLocalContainerProvider extends ZkUnitTestBase {
+  private static final int MAX_PARTICIPANTS = 10;
+  static String clusterName = null;
+  static String resourceName = null;
+  static int allocated = 0;
+  static int started = 0;
+  static int stopped = 0;
+  static int deallocated = 0;
+  static HelixConnection connection = null;
+
+  @Test
+  public void testBasic() throws Exception {
+    final int NUM_PARTITIONS = 4;
+    final int NUM_REPLICAS = 2;
+    resourceName = "TestDB0";
+
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    clusterName = className + "_" + methodName;
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    allocated = 0;
+    started = 0;
+    stopped = 0;
+    deallocated = 0;
+
+    // connect
+    connection = new ZkHelixConnection(ZK_ADDR);
+    connection.connect();
+
+    // create the cluster
+    ClusterId clusterId = ClusterId.from(clusterName);
+    ClusterAccessor clusterAccessor = connection.createClusterAccessor(clusterId);
+    StateModelDefinition masterSlave =
+        new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
+    clusterAccessor.createCluster(new ClusterConfig.Builder(clusterId).addStateModelDefinition(
+        masterSlave).build());
+
+    // add the resource with the local provisioner
+    ResourceId resourceId = ResourceId.from(resourceName);
+    ProvisionerConfig provisionerConfig = new LocalProvisionerConfig(resourceId);
+    RebalancerConfig rebalancerConfig =
+        new FullAutoRebalancerConfig.Builder(resourceId).addPartitions(NUM_PARTITIONS)
+            .replicaCount(NUM_REPLICAS).stateModelDefId(masterSlave.getStateModelDefId()).build();
+    clusterAccessor.addResourceToCluster(new ResourceConfig.Builder(ResourceId.from(resourceName))
+        .provisionerConfig(provisionerConfig).rebalancerConfig(rebalancerConfig).build());
+
+    // start controller
+    ControllerId controllerId = ControllerId.from("controller1");
+    HelixController controller = connection.createController(clusterId, controllerId);
+    controller.startAsync(); // TODO: is this really async?
+
+    Thread.sleep(10000);
+
+    // clean up
+    controller.stopAsync(); // TODO: is this really async?
+    connection.disconnect();
+
+    Assert.assertEquals(allocated, MAX_PARTICIPANTS);
+    Assert.assertEquals(started, MAX_PARTICIPANTS);
+    Assert.assertEquals(stopped, MAX_PARTICIPANTS);
+    Assert.assertEquals(deallocated, MAX_PARTICIPANTS);
+  }
+
+  /**
+   * Use Guava's service to wrap a participant lifecycle
+   */
+  public static class ParticipantService extends AbstractService {
+    private final ClusterId _clusterId;
+    private final ParticipantId _participantId;
+    private HelixParticipant _participant;
+
+    public ParticipantService(ClusterId clusterId, ParticipantId participantId) {
+      // TODO: probably should pass a connection in here
+      _clusterId = clusterId;
+      _participantId = participantId;
+    }
+
+    @Override
+    protected void doStart() {
+      _participant = connection.createParticipant(_clusterId, _participantId);
+      _participant.getStateMachineEngine().registerStateModelFactory(
+          StateModelDefId.from("MasterSlave"), new TestHelixConnection.MockStateModelFactory());
+      _participant.startAsync();
+      notifyStarted();
+    }
+
+    @Override
+    protected void doStop() {
+      _participant.stopAsync();
+      notifyStopped();
+    }
+
+  }
+
+  /**
+   * Bare-bones ProvisionerConfig
+   */
+  public static class LocalProvisionerConfig implements ProvisionerConfig {
+    private ResourceId _resourceId;
+    private Class<? extends StringSerializer> _serializerClass;
+    private ProvisionerRef _provisionerRef;
+
+    public LocalProvisionerConfig(@JsonProperty("resourceId") ResourceId resourceId) {
+      _resourceId = resourceId;
+      _serializerClass = DefaultStringSerializer.class;
+      _provisionerRef = ProvisionerRef.from(LocalProvisioner.class.getName());
+    }
+
+    @Override
+    public ResourceId getResourceId() {
+      return _resourceId;
+    }
+
+    @Override
+    public ProvisionerRef getProvisionerRef() {
+      return _provisionerRef;
+    }
+
+    public void setProvisionerRef(ProvisionerRef provisionerRef) {
+      _provisionerRef = provisionerRef;
+    }
+
+    @Override
+    public Class<? extends StringSerializer> getSerializerClass() {
+      return _serializerClass;
+    }
+
+    public void setSerializerClass(Class<? extends StringSerializer> serializerClass) {
+      _serializerClass = serializerClass;
+    }
+  }
+
+  /**
+   * Provisioner that will start and stop participants locally
+   */
+  public static class LocalProvisioner implements Provisioner {
+    private HelixManager _helixManager;
+    private ClusterId _clusterId;
+    private int _askCount;
+    private Map<ContainerId, ContainerState> _states;
+    private Map<ContainerId, ParticipantId> _containerParticipants;
+    private Map<ContainerId, ParticipantService> _participants;
+
+    @Override
+    public void init(HelixManager helixManager) {
+      // TODO: would be nice to have a HelixConnection instead of a HelixManager
+      _helixManager = helixManager;
+      _clusterId = ClusterId.from(_helixManager.getClusterName());
+      _askCount = 0;
+      _states = Maps.newHashMap();
+      _containerParticipants = Maps.newHashMap();
+      _participants = Maps.newHashMap();
+    }
+
+    @Override
+    public ContainerId allocateContainer(ContainerSpec spec) {
+      // allocation is a no-op
+      ContainerId containerId = spec.getContainerId();
+      _states.put(containerId, ContainerState.ACQUIRED);
+      allocated++;
+      return containerId;
+    }
+
+    @Override
+    public boolean deallocateContainer(ContainerId containerId) {
+      // deallocation is a no-op
+      _states.put(containerId, ContainerState.FINALIZED);
+      deallocated++;
+      return true;
+    }
+
+    @Override
+    public boolean startContainer(ContainerId containerId) {
+      ParticipantService participant =
+          new ParticipantService(_clusterId, _containerParticipants.get(containerId));
+      participant.startAsync();
+      participant.awaitRunning();
+      _participants.put(containerId, participant);
+      _states.put(containerId, ContainerState.ACTIVE);
+      started++;
+      return true;
+    }
+
+    @Override
+    public boolean stopContainer(ContainerId containerId) {
+      ParticipantService participant = _participants.get(containerId);
+      participant.stopAsync();
+      participant.awaitTerminated();
+      _states.put(containerId, ContainerState.HALTED);
+      stopped++;
+      return true;
+    }
+
+    @Override
+    public ContainerState getContainerState(ContainerId containerId) {
+      return _states.get(containerId);
+    }
+
+    @Override
+    public TargetProviderResponse evaluateExistingContainers(Cluster cluster,
+        ResourceId resourceId, Collection<Participant> participants) {
+      TargetProviderResponse response = new TargetProviderResponse();
+      // ask for two containers at a time
+      List<ContainerSpec> containersToAcquire = Lists.newArrayList();
+      boolean asked = false;
+      if (_askCount < MAX_PARTICIPANTS) {
+        containersToAcquire.add(new ContainerSpec(ContainerId.from("container" + _askCount)));
+        containersToAcquire.add(new ContainerSpec(ContainerId.from("container" + (_askCount + 1))));
+        asked = true;
+      }
+      List<Participant> containersToStart = Lists.newArrayList();
+      List<Participant> containersToStop = Lists.newArrayList();
+      List<Participant> containersToRelease = Lists.newArrayList();
+      int stopCount = 0;
+      for (Participant participant : participants) {
+        ContainerConfig containerConfig = participant.getContainerConfig();
+        if (containerConfig != null && containerConfig.getState() != null) {
+          ContainerState state = containerConfig.getState();
+          switch (state) {
+          case ACQUIRED:
+            // acquired containers are ready to start
+            containersToStart.add(participant);
+            break;
+          case ACTIVE:
+            // stop at most two active at a time, wait for everything to be up first
+            if (stopCount < 2 && _askCount >= MAX_PARTICIPANTS) {
+              containersToStop.add(participant);
+              stopCount++;
+            }
+            break;
+          case HALTED:
+            // halted containers can be released
+            containersToRelease.add(participant);
+            break;
+          }
+          ContainerId containerId = containerConfig.getId();
+          if (containerId != null) {
+            _containerParticipants.put(containerId, participant.getId());
+            _states.put(containerId, state);
+          }
+        }
+      }
+      // update acquire request count
+      if (asked) {
+        _askCount += 2;
+      }
+      // set the response
+      response.setContainersToAcquire(containersToAcquire);
+      response.setContainersToStart(containersToStart);
+      response.setContainersToStop(containersToStop);
+      response.setContainersToRelease(containersToRelease);
+      return response;
+    }
+  }
+}


Mime
View raw message