helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject [06/50] [abbrv] git commit: Almost complete working example of Helloworld
Date Thu, 10 Jul 2014 17:04:49 GMT
Almost complete working example of Helloworld


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/8b19cfc7
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/8b19cfc7
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/8b19cfc7

Branch: refs/heads/master
Commit: 8b19cfc77b0ddd6bc90dcb034cfbd9b983ff2932
Parents: 57b4b18
Author: Kishore Gopalakrishna <g.kishore@gmail.com>
Authored: Thu Feb 20 22:08:18 2014 -0800
Committer: Kishore Gopalakrishna <g.kishore@gmail.com>
Committed: Thu Feb 20 22:08:18 2014 -0800

----------------------------------------------------------------------
 .../controller/provisioner/ContainerSpec.java   | 19 ++++--
 .../stages/ContainerProvisioningStage.java      | 23 ++++---
 .../manager/zk/AbstractParticipantService.java  | 68 ++++++++++++++-----
 .../integration/TestLocalContainerProvider.java |  4 +-
 .../provisioning/yarn/ApplicationSpec.java      |  4 +-
 .../yarn/HelixYarnApplicationMasterMain.java    | 40 ++++++-----
 .../helix/provisioning/yarn/ServiceConfig.java  | 14 ++--
 .../yarn/YamlApplicationSpecFactory.java        | 70 --------------------
 .../provisioning/yarn/YarnProvisioner.java      | 53 ++++++---------
 .../yarn/example/HelloWorldService.java         | 40 +++++++----
 .../yarn/example/HelloworldAppSpec.java         | 23 +++----
 .../main/resources/hello_world_app_spec.yaml    |  3 +-
 12 files changed, 177 insertions(+), 184 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/8b19cfc7/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java
b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java
index 4d3a521..ab3c46a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java
@@ -1,5 +1,7 @@
 package org.apache.helix.controller.provisioner;
 
+import org.apache.helix.api.id.ParticipantId;
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -27,8 +29,10 @@ public class ContainerSpec {
   
   int _memory;
 
-  public ContainerSpec(ContainerId containerId) {
-    this._containerId = containerId;
+  private ParticipantId _participantId;
+
+  public ContainerSpec(ParticipantId _participantId) {
+    this._participantId = _participantId;
   }
 
   public ContainerId getContainerId() {
@@ -37,7 +41,7 @@ public class ContainerSpec {
 
   @Override
   public String toString() {
-    return _containerId.toString();
+    return _participantId.toString();
   }
   
   public void setMemory(int memory){
@@ -49,6 +53,13 @@ public class ContainerSpec {
   }
   
   public static ContainerSpec from(String serialized) {
-    return new ContainerSpec(ContainerId.from(serialized));
+    //todo 
+    return null;
+    //return new ContainerSpec(ContainerId.from(serialized));
   }
+
+  public ParticipantId getParticipantId() {
+    return _participantId;
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/8b19cfc7/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
index 42c8218..f7105d1 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
@@ -21,6 +21,7 @@ package org.apache.helix.controller.stages;
 
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.helix.HelixAdmin;
@@ -121,16 +122,17 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
 
         // allocate new containers
         for (final ContainerSpec spec : response.getContainersToAcquire()) {
-          // random participant id
-          final ParticipantId participantId = ParticipantId.from(spec.getContainerId().stringify());
-          // create a new Participant, attach the container spec
-          InstanceConfig instanceConfig = new InstanceConfig(participantId);
-          instanceConfig.setContainerSpec(spec);
-          // create a helix_participant in ACQUIRING state
-          instanceConfig.setContainerState(ContainerState.ACQUIRING);
-          // create the helix participant and add it to cluster
-          helixAdmin.addInstance(cluster.getId().toString(), instanceConfig);
-
+          final ParticipantId participantId = spec.getParticipantId();
+          List<String> instancesInCluster = helixAdmin.getInstancesInCluster(cluster.getId().stringify());
+          if (!instancesInCluster.contains(participantId.stringify())) {
+            // create a new Participant, attach the container spec
+            InstanceConfig instanceConfig = new InstanceConfig(participantId);
+            instanceConfig.setContainerSpec(spec);
+            // create a helix_participant in ACQUIRING state
+            instanceConfig.setContainerState(ContainerState.ACQUIRING);
+            // create the helix participant and add it to cluster
+            helixAdmin.addInstance(cluster.getId().toString(), instanceConfig);
+          }
           ListenableFuture<ContainerId> future = containerProvider.allocateContainer(spec);
           FutureCallback<ContainerId> callback = new FutureCallback<ContainerId>()
{
             @Override
@@ -160,7 +162,6 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
               helixAdmin.getInstanceConfig(cluster.getId().toString(), participant.getId()
                   .toString());
           final ContainerId containerId = existingInstance.getContainerId();
-          existingInstance.setContainerId(containerId);
           existingInstance.setContainerState(ContainerState.CONNECTING);
           accessor.updateProperty(keyBuilder.instanceConfig(participant.getId().toString()),
               existingInstance);

http://git-wip-us.apache.org/repos/asf/helix/blob/8b19cfc7/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractParticipantService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractParticipantService.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractParticipantService.java
index 2e5eafa..f515092 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractParticipantService.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractParticipantService.java
@@ -35,6 +35,7 @@ public abstract class AbstractParticipantService extends AbstractService
{
   private final ParticipantId _participantId;
   private HelixParticipant _participant;
   private HelixConnection _connection;
+  boolean initialized;
 
   /**
    * Initialize the service.
@@ -50,20 +51,22 @@ public abstract class AbstractParticipantService extends AbstractService
{
   }
 
   @Override
-  protected void doStart() {
+  protected final void doStart() {
     _participant = _connection.createParticipant(_clusterId, _participantId);
 
     // add a preconnect callback
     _participant.addPreConnectCallback(new PreConnectCallback() {
       @Override
       public void onPreConnect() {
-        onPreJoinCluster();
+        if (initialized) {
+          onReconnect();
+        } else {
+          init();
+          initialized = true;
+        }
       }
     });
 
-    // register state machine and other initialization
-    init();
-
     // start and notify
     if (!_connection.isConnected()) {
       _connection.connect();
@@ -73,34 +76,67 @@ public abstract class AbstractParticipantService extends AbstractService
{
   }
 
   @Override
-  protected void doStop() {
+  protected final void doStop() {
     _participant.stop();
     notifyStopped();
   }
 
   /**
-   * Initialize the participant. For example, here is where you can register a state machine:
<br/>
+   * Invoked when connection is re-established to zookeeper. Typical scenario this is invoked
is
+   * when there is a long GC pause that causes the node to disconnect from the cluster and
+   * reconnects. NOTE: When the service disconnects all its states are reset to initial state.
+   */
+  protected void onReconnect() {
+    // default implementation does nothing.
+  }
+
+  /**
+   * Initialize the participant. For example, here is where you can
+   * <ul>
+   * <li>Read configuration of the cluster,resource, node</li>
+   * <li>Read configuration of the cluster,resource, node register a state machine:
<br/>
    * <br/>
    * <code>
    * HelixParticipant participant = getParticipant();
    * participant.getStateMachineEngine().registerStateModelFactory(stateModelDefId, factory);
    * </code><br/>
    * <br/>
-   * This code is called prior to starting the participant.
+   * </li>
+   * </ul>
+   * This code is called after connecting to zookeeper but before creating the liveinstance.
    */
-  public abstract void init();
-
-  /**
-   * Complete any tasks that require a live Helix connection. This function is called before
the
-   * participant declares itself ready to receive state transitions.
-   */
-  public abstract void onPreJoinCluster();
+  protected abstract void init();
 
   /**
    * Get an instantiated participant instance.
    * @return HelixParticipant
    */
-  public HelixParticipant getParticipant() {
+  protected HelixParticipant getParticipant() {
     return _participant;
   }
+
+  /**
+   * @return ClusterId
+   * @see {@link ClusterId}
+   */
+  public ClusterId getClusterId() {
+    return _clusterId;
+  }
+
+  /**
+   * @see {@link ParticipantId}
+   * @return
+   */
+  public ParticipantId getParticipantId() {
+    return _participantId;
+  }
+
+  /**
+   * @see {@link HelixConnection}
+   * @return HelixConnection
+   */
+  public HelixConnection getConnection() {
+    return _connection;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/8b19cfc7/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
b/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
index 0f7be64..f4153cc 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
@@ -277,8 +277,8 @@ public class TestLocalContainerProvider extends ZkUnitTestBase {
       List<ContainerSpec> containersToAcquire = Lists.newArrayList();
       boolean asked = false;
       if (_askCount < MAX_PARTICIPANTS) {
-        containersToAcquire.add(new ContainerSpec(ContainerId.from("container" + _askCount)));
-        containersToAcquire.add(new ContainerSpec(ContainerId.from("container" + (_askCount
+ 1))));
+        containersToAcquire.add(new ContainerSpec(ParticipantId.from("container" + _askCount)));
+        containersToAcquire.add(new ContainerSpec(ParticipantId.from("container" + (_askCount
+ 1))));
         asked = true;
       }
       List<Participant> containersToStart = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/helix/blob/8b19cfc7/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java
b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java
index e104578..285d036 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java
@@ -3,8 +3,6 @@ package org.apache.helix.provisioning.yarn;
 import java.net.URI;
 import java.util.List;
 
-import org.apache.helix.api.config.ParticipantConfig;
-import org.apache.helix.api.id.ParticipantId;
 
 public interface ApplicationSpec {
   /**
@@ -23,7 +21,7 @@ public interface ApplicationSpec {
   
   String getServiceMainClass(String service);
 
-  ParticipantConfig getParticipantConfig(String serviceName, ParticipantId participantId);
+  ServiceConfig getServiceConfig(String serviceName);
 
   List<TaskConfig> getTaskConfigs();
 

http://git-wip-us.apache.org/repos/asf/helix/blob/8b19cfc7/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java
b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java
index 058b384..33183c7 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java
@@ -46,7 +46,7 @@ public class HelixYarnApplicationMasterMain {
   public static void main(String[] args) throws Exception {
     Map<String, String> env = System.getenv();
     LOG.info("Starting app master with the following environment variables");
-    for(String key: env.keySet()){
+    for (String key : env.keySet()) {
       LOG.info(key + "\t\t=" + env.get(key));
     }
     int numContainers = 1;
@@ -93,11 +93,11 @@ public class HelixYarnApplicationMasterMain {
 
     YarnProvisioner.applicationMaster = genericApplicationMaster;
     YarnProvisioner.applicationMasterConfig = appMasterConfig;
-    YarnProvisioner.applicationSpec = factory.fromYaml(new FileInputStream(configFile));
+    ApplicationSpec applicationSpec = factory.fromYaml(new FileInputStream(configFile));
+    YarnProvisioner.applicationSpec = applicationSpec;
     String zkAddress = appMasterConfig.getZKAddress();
     String clusterName = appMasterConfig.getAppName();
-    
-    String resourceName = "HelloWorld";
+
     // CREATE CLUSTER and setup the resources
     // connect
     ZkHelixConnection connection = new ZkHelixConnection(zkAddress);
@@ -110,17 +110,27 @@ public class HelixYarnApplicationMasterMain {
         new StateModelDefinition(StateModelConfigGenerator.generateConfigForStatelessService());
     clusterAccessor.createCluster(new ClusterConfig.Builder(clusterId).addStateModelDefinition(
         statelessService).build());
-
-    // add the resource with the local provisioner
-    ResourceId resourceId = ResourceId.from(resourceName);
-    YarnProvisionerConfig provisionerConfig = new YarnProvisionerConfig(resourceId);
-    provisionerConfig.setNumContainers(numContainers);
-    RebalancerConfig rebalancerConfig =
-        new FullAutoRebalancerConfig.Builder(resourceId).stateModelDefId(
-            statelessService.getStateModelDefId()).build();
-    clusterAccessor.addResourceToCluster(new ResourceConfig.Builder(ResourceId.from(resourceName))
-        .provisionerConfig(provisionerConfig).rebalancerConfig(rebalancerConfig).build());
-
+    for (String service : applicationSpec.getServices()) {
+      String resourceName = service;
+      // add the resource with the local provisioner
+      ResourceId resourceId = ResourceId.from(resourceName);
+      YarnProvisionerConfig provisionerConfig = new YarnProvisionerConfig(resourceId);
+      ServiceConfig serviceConfig = applicationSpec.getServiceConfig(resourceName);
+      provisionerConfig.setNumContainers(serviceConfig.getIntField("num_containers", 1));
+      serviceConfig.setSimpleField("service_name", service);
+      FullAutoRebalancerConfig.Builder rebalancerConfigBuilder =
+          new FullAutoRebalancerConfig.Builder(resourceId);
+      RebalancerConfig rebalancerConfig =
+          rebalancerConfigBuilder.stateModelDefId(statelessService.getStateModelDefId())//
+              .build();
+      ResourceConfig.Builder resourceConfigBuilder =
+          new ResourceConfig.Builder(ResourceId.from(resourceName));
+      ResourceConfig resourceConfig = resourceConfigBuilder.provisionerConfig(provisionerConfig)
//
+          .rebalancerConfig(rebalancerConfig) //
+          .userConfig(serviceConfig) //
+          .build();
+      clusterAccessor.addResourceToCluster(resourceConfig);
+    }
     // start controller
     ControllerId controllerId = ControllerId.from("controller1");
     HelixController controller = connection.createController(clusterId, controllerId);

http://git-wip-us.apache.org/repos/asf/helix/blob/8b19cfc7/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ServiceConfig.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ServiceConfig.java
b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ServiceConfig.java
index 4d9173e..87b5f12 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ServiceConfig.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ServiceConfig.java
@@ -3,11 +3,15 @@ package org.apache.helix.provisioning.yarn;
 import java.util.HashMap;
 import java.util.Map;
 
-public class ServiceConfig {
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.config.UserConfig;
+import org.apache.helix.api.id.ResourceId;
+
+public class ServiceConfig extends UserConfig{
 	public Map<String, String> config = new HashMap<String, String>();
 	
-	public String getValue(String key) {
-		return (config != null ? config.get(key) : null);
-	}
-
+	public ServiceConfig(Scope<ResourceId> scope) {
+	  super(scope);
+  }
+ 
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/8b19cfc7/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YamlApplicationSpecFactory.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YamlApplicationSpecFactory.java
b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YamlApplicationSpecFactory.java
deleted file mode 100644
index e87a5c2..0000000
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YamlApplicationSpecFactory.java
+++ /dev/null
@@ -1,70 +0,0 @@
-package org.apache.helix.provisioning.yarn;
-
-import java.io.InputStream;
-import java.net.URI;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.helix.api.config.ParticipantConfig;
-import org.apache.helix.api.id.ParticipantId;
-import org.yaml.snakeyaml.Yaml;
-
-class DefaultApplicationSpec implements ApplicationSpec {
-	public String appName;
-	public Integer minContainers;
-	public Integer maxContainers;
-	
-	public AppConfig appConfig;
-
-	public List<String> services;
-	public Map<String, ServiceConfig> serviceConfigMap;
-
-	@Override
-	public String getAppName() {
-		return appName;
-	}
-
-	@Override
-	public AppConfig getConfig() {
-		return appConfig;
-	}
-
-	@Override
-	public List<String> getServices() {
-		return services;
-	}
-
-  @Override
-  public URI getServicePackage(String serviceName) {
-    return null;
-  }
-
-  @Override
-  public ParticipantConfig getParticipantConfig(String serviceName, ParticipantId participantId)
{
-    return null;
-  }
-
-  @Override
-  public List<TaskConfig> getTaskConfigs() {
-    return null;
-  }
-
-  @Override
-  public URI getAppMasterPackage() {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
-  @Override
-  public String getServiceMainClass(String service) {
-    // TODO Auto-generated method stub
-    return null;
-  }
-}
-
-public class YamlApplicationSpecFactory {
-	ApplicationSpec fromYaml(InputStream input) {
-	    Yaml yaml = new Yaml();
-	    return yaml.loadAs(input, DefaultApplicationSpec.class);
-	}
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/8b19cfc7/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
index daac87b..8fd308e 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
@@ -210,8 +210,7 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr
     vargs.add("--cluster " + appName);
     vargs.add("--participantId " + participant.getId().stringify());
     vargs.add("--participantClass " + mainClass);
-    ;
-
+    
     vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/ContainerParticipant.stdout");
     vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/ContainerParticipant.stderr");
 
@@ -262,13 +261,13 @@ public class YarnProvisioner implements Provisioner, TargetProvider,
ContainerPr
     int targetNumContainers = provisionerConfig.getNumContainers();
 
     // Any container that is in a state should be put in this set
-    Set<ContainerId> existingContainersIdSet = new HashSet<ContainerId>();
+    Set<ParticipantId> existingContainersIdSet = new HashSet<ParticipantId>();
 
     // Cache halted containers to determine which to restart and which to release
-    Map<ContainerId, Participant> excessHaltedContainers = Maps.newHashMap();
+    Map<ParticipantId, Participant> excessHaltedContainers = Maps.newHashMap();
 
     // Cache participants to ensure that excess participants are stopped
-    Map<ContainerId, Participant> excessActiveContainers = Maps.newHashMap();
+    Map<ParticipantId, Participant> excessActiveContainers = Maps.newHashMap();
 
     for (Participant participant : participants) {
       ContainerConfig containerConfig = participant.getContainerConfig();
@@ -276,35 +275,35 @@ public class YarnProvisioner implements Provisioner, TargetProvider,
ContainerPr
         ContainerState state = containerConfig.getState();
         switch (state) {
         case ACQUIRING:
-          existingContainersIdSet.add(containerConfig.getId());
+          existingContainersIdSet.add(participant.getId());
           break;
         case ACQUIRED:
           // acquired containers are ready to start
-          existingContainersIdSet.add(containerConfig.getId());
+          existingContainersIdSet.add(participant.getId());
           containersToStart.add(participant);
           break;
         case CONNECTING:
-          existingContainersIdSet.add(containerConfig.getId());
+          existingContainersIdSet.add(participant.getId());
           break;
         case CONNECTED:
           // active containers can be stopped or kept active
-          existingContainersIdSet.add(containerConfig.getId());
-          excessActiveContainers.put(containerConfig.getId(), participant);
+          existingContainersIdSet.add(participant.getId());
+          excessActiveContainers.put(participant.getId(), participant);
           break;
         case DISCONNECTED:
           // disconnected containers must be stopped
-          existingContainersIdSet.add(containerConfig.getId());
+          existingContainersIdSet.add(participant.getId());
           containersToStop.add(participant);
         case HALTING:
-          existingContainersIdSet.add(containerConfig.getId());
+          existingContainersIdSet.add(participant.getId());
           break;
         case HALTED:
           // halted containers can be released or restarted
-          existingContainersIdSet.add(containerConfig.getId());
-          excessHaltedContainers.put(containerConfig.getId(), participant);
+          existingContainersIdSet.add(participant.getId());
+          excessHaltedContainers.put(participant.getId(), participant);
           break;
         case FINALIZING:
-          existingContainersIdSet.add(containerConfig.getId());
+          existingContainersIdSet.add(participant.getId());
           break;
         case FINALIZED:
           break;
@@ -316,29 +315,21 @@ public class YarnProvisioner implements Provisioner, TargetProvider,
ContainerPr
         default:
           break;
         }
-        ContainerId containerId = containerConfig.getId();
-        if (containerId != null) {
-          // _containerParticipants.put(containerId, participant.getId());
-          // _states.put(containerId, state);
-        }
       }
     }
 
     for (int i = 0; i < targetNumContainers; i++) {
-      ContainerId containerId = ContainerId.from(resourceId + "_container_" + (i));
-      excessActiveContainers.remove(containerId); // don't stop this container if active
-      if (excessHaltedContainers.containsKey(containerId)) {
+      ParticipantId participantId = ParticipantId.from(resourceId + "_container_" + (i));
+      excessActiveContainers.remove(participantId); // don't stop this container if active
+      if (excessHaltedContainers.containsKey(participantId)) {
         // Halted containers can be restarted if necessary
-        Participant participant = excessHaltedContainers.get(containerId);
+        Participant participant = excessHaltedContainers.get(participantId);
         containersToStart.add(participant);
-        excessHaltedContainers.remove(containerId); // don't release this container
-      } else if (!existingContainersIdSet.contains(containerId)) {
+        excessHaltedContainers.remove(participantId); // don't release this container
+      } else if (!existingContainersIdSet.contains(participantId)) {
         // Unallocated containers must be allocated
-        ContainerSpec containerSpec = new ContainerSpec(containerId);
-        ParticipantId participantId = ParticipantId.from(containerId.stringify());
-        ParticipantConfig participantConfig =
-            applicationSpec.getParticipantConfig(resourceId.stringify(), participantId);
-        containerSpec.setMemory(participantConfig.getUserConfig().getIntField("memory", 1024));
+        ContainerSpec containerSpec = new ContainerSpec(participantId);
+        containerSpec.setMemory(_resourceConfig.getUserConfig().getIntField("memory", 1024));
         containersToAcquire.add(containerSpec);
       }
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/8b19cfc7/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloWorldService.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloWorldService.java
b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloWorldService.java
index 614be36..f65fd5d 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloWorldService.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloWorldService.java
@@ -1,28 +1,40 @@
 package org.apache.helix.provisioning.yarn.example;
 
 import org.apache.helix.HelixConnection;
+import org.apache.helix.api.accessor.ResourceAccessor;
+import org.apache.helix.api.config.UserConfig;
 import org.apache.helix.api.id.ClusterId;
 import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.manager.zk.AbstractParticipantService;
-
+import org.apache.log4j.Logger;
 
 public class HelloWorldService extends AbstractParticipantService {
 
-	public HelloWorldService(HelixConnection connection, ClusterId clusterId,
-			ParticipantId participantId) {
-		super(connection, clusterId, participantId);
-	}
-	
-	@Override
-	public void init() {
-		HelloWorldStateModelFactory stateModelFactory = new HelloWorldStateModelFactory();
-		getParticipant().getStateMachineEngine().registerStateModelFactory(StateModelDefId.from("StatelessService"),
stateModelFactory);
-	}
+  private static Logger LOG = Logger.getLogger(AbstractParticipantService.class);
+
+  static String SERVICE_NAME = "HelloWorld";
+
+  public HelloWorldService(HelixConnection connection, ClusterId clusterId,
+      ParticipantId participantId) {
+    super(connection, clusterId, participantId);
+  }
 
+  /**
+   * init method to setup appropriate call back handlers.
+   */
   @Override
-  public void onPreJoinCluster() {
-    //this will be invoked prior to 
+  public void init() {
+    ClusterId clusterId = getClusterId();
+    ResourceAccessor resourceAccessor = getConnection().createResourceAccessor(clusterId);
+    UserConfig serviceConfig = resourceAccessor.readUserConfig(ResourceId.from(SERVICE_NAME));
+    LOG.info("Starting service:" + SERVICE_NAME + " with configuration:" + serviceConfig);
+
+    HelloWorldStateModelFactory stateModelFactory = new HelloWorldStateModelFactory();
+    getParticipant().getStateMachineEngine().registerStateModelFactory(
+        StateModelDefId.from("StatelessService"), stateModelFactory);
+
   }
-}
 
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/8b19cfc7/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloworldAppSpec.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloworldAppSpec.java
b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloworldAppSpec.java
index 2e4cd75..e22c7b2 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloworldAppSpec.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloworldAppSpec.java
@@ -7,27 +7,31 @@ import java.util.Map;
 
 import org.apache.helix.api.Scope;
 import org.apache.helix.api.config.ParticipantConfig;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.config.ResourceConfig.Builder;
 import org.apache.helix.api.config.UserConfig;
 import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.provisioning.yarn.AppConfig;
 import org.apache.helix.provisioning.yarn.ApplicationSpec;
+import org.apache.helix.provisioning.yarn.ServiceConfig;
 import org.apache.helix.provisioning.yarn.TaskConfig;
 
 public class HelloworldAppSpec implements ApplicationSpec {
 
-  private String _appName;
+  public String _appName;
 
-  private AppConfig _appConfig;
+  public AppConfig _appConfig;
 
-  private List<String> _services;
+  public List<String> _services;
 
   private String _appMasterPackageUri;
-  
+
   private Map<String, String> _servicePackageURIMap;
 
   private Map<String, String> _serviceMainClassMap;
 
-  private Map<String,Map<String,String>> _serviceConfigMap;
+  private Map<String, Map<String, String>> _serviceConfigMap;
 
   private List<TaskConfig> _taskConfigs;
 
@@ -122,13 +126,8 @@ public class HelloworldAppSpec implements ApplicationSpec {
   }
 
   @Override
-  public ParticipantConfig getParticipantConfig(String serviceName, ParticipantId participantId)
{
-    ParticipantConfig.Builder builder = new ParticipantConfig.Builder(participantId);
-    Scope<ParticipantId> scope = Scope.participant(participantId);
-    UserConfig userConfig = new UserConfig(scope);
-    Map<String, String> map = _serviceConfigMap.get(serviceName);
-    userConfig.setSimpleFields(map);
-    return builder.addTag(serviceName).userConfig(userConfig ).build();
+  public ServiceConfig getServiceConfig(String serviceName) {
+    return new ServiceConfig(Scope.resource(ResourceId.from(serviceName)));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/8b19cfc7/helix-provisioning/src/main/resources/hello_world_app_spec.yaml
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/resources/hello_world_app_spec.yaml b/helix-provisioning/src/main/resources/hello_world_app_spec.yaml
index 648104a..1d4f1b7 100644
--- a/helix-provisioning/src/main/resources/hello_world_app_spec.yaml
+++ b/helix-provisioning/src/main/resources/hello_world_app_spec.yaml
@@ -7,7 +7,8 @@ appMasterPackageUri: 'file:///Users/kgopalak/Documents/projects/incubator-helix/
 appName: testApp
 serviceConfigMap:
   HelloWorld: {
-    k1: v1
+    num_containers: 3,
+    memory: 1024
   }
 serviceMainClassMap: {
   HelloWorld: org.apache.helix.provisioning.yarn.example.HelloWorldService


Mime
View raw message