helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject [42/50] [abbrv] git commit: Task framework recipe runs on distributed YARN
Date Thu, 10 Jul 2014 17:05:25 GMT
Task framework recipe runs on distributed YARN


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/feaea562
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/feaea562
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/feaea562

Branch: refs/heads/master
Commit: feaea562f2b52ebad5cfd6aba92864cd411a582f
Parents: 99f5ff7
Author: Kanak Biscuitwala <kanak@apache.org>
Authored: Thu Jul 3 16:47:50 2014 -0700
Committer: Kanak Biscuitwala <kanak@apache.org>
Committed: Thu Jul 3 16:47:50 2014 -0700

----------------------------------------------------------------------
 .../helix/model/ClusterConfiguration.java       | 31 +++++++++------
 .../java/org/apache/helix/model/IdealState.java | 16 +++++---
 .../org/apache/helix/model/InstanceConfig.java  | 41 ++++++++++++--------
 .../helix/model/ResourceConfiguration.java      | 31 +++++++++------
 .../java/org/apache/helix/task/Workflow.java    | 16 ++++++++
 .../apache/helix/provisioning/TaskConfig.java   | 17 ++++++++
 .../helix/provisioning/yarn/AppLauncher.java    | 23 ++++++++++-
 .../provisioning/yarn/AppMasterConfig.java      | 17 ++++++--
 .../provisioning/yarn/AppMasterLauncher.java    | 28 +++++++++++--
 .../src/main/resources/job_runner_app_spec.yaml |  8 +---
 10 files changed, 167 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/feaea562/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
index 1e9c205..63f5776 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
@@ -25,11 +25,14 @@ import org.apache.helix.api.config.NamespacedConfig;
 import org.apache.helix.api.config.UserConfig;
 import org.apache.helix.api.id.ClusterId;
 import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.log4j.Logger;
 
 /**
  * Persisted configuration properties for a cluster
  */
 public class ClusterConfiguration extends HelixProperty {
+  private static final Logger LOG = Logger.getLogger(ClusterConfiguration.class);
+
   /**
    * Instantiate for an id
    * @param id cluster id
@@ -76,21 +79,25 @@ public class ClusterConfiguration extends HelixProperty {
    */
   public UserConfig getUserConfig() {
     UserConfig userConfig = UserConfig.from(this);
-    for (String simpleField : _record.getSimpleFields().keySet()) {
-      if (!simpleField.contains(NamespacedConfig.PREFIX_CHAR + "")
-          && !simpleField.equals(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN)) {
-        userConfig.setSimpleField(simpleField, _record.getSimpleField(simpleField));
+    try {
+      for (String simpleField : _record.getSimpleFields().keySet()) {
+        if (!simpleField.contains(NamespacedConfig.PREFIX_CHAR + "")
+            && !simpleField.equals(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN)) {
+          userConfig.setSimpleField(simpleField, _record.getSimpleField(simpleField));
+        }
       }
-    }
-    for (String listField : _record.getListFields().keySet()) {
-      if (!listField.contains(NamespacedConfig.PREFIX_CHAR + "")) {
-        userConfig.setListField(listField, _record.getListField(listField));
+      for (String listField : _record.getListFields().keySet()) {
+        if (!listField.contains(NamespacedConfig.PREFIX_CHAR + "")) {
+          userConfig.setListField(listField, _record.getListField(listField));
+        }
       }
-    }
-    for (String mapField : _record.getMapFields().keySet()) {
-      if (!mapField.contains(NamespacedConfig.PREFIX_CHAR + "")) {
-        userConfig.setMapField(mapField, _record.getMapField(mapField));
+      for (String mapField : _record.getMapFields().keySet()) {
+        if (!mapField.contains(NamespacedConfig.PREFIX_CHAR + "")) {
+          userConfig.setMapField(mapField, _record.getMapField(mapField));
+        }
       }
+    } catch (NoSuchMethodError e) {
+      LOG.error("Could not parse ClusterConfiguration", e);
     }
     return userConfig;
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/feaea562/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 173e251..cc8fc4b 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -58,6 +58,8 @@ import com.google.common.collect.Sets;
  * The ideal states of all partitions in a resource
  */
 public class IdealState extends HelixProperty {
+  private static final Logger LOG = Logger.getLogger(IdealState.class);
+
   /**
    * Properties that are persisted and are queryable for an ideal state
    */
@@ -760,12 +762,16 @@ public class IdealState extends HelixProperty {
    * @param userConfig the user config to update
    */
   public void updateUserConfig(UserConfig userConfig) {
-    for (String simpleField : _record.getSimpleFields().keySet()) {
-      Optional<IdealStateProperty> enumField =
-          Enums.getIfPresent(IdealStateProperty.class, simpleField);
-      if (!simpleField.contains(NamespacedConfig.PREFIX_CHAR + "") && !enumField.isPresent())
{
-        userConfig.setSimpleField(simpleField, _record.getSimpleField(simpleField));
+    try {
+      for (String simpleField : _record.getSimpleFields().keySet()) {
+        Optional<IdealStateProperty> enumField =
+            Enums.getIfPresent(IdealStateProperty.class, simpleField);
+        if (!simpleField.contains(NamespacedConfig.PREFIX_CHAR + "") && !enumField.isPresent())
{
+          userConfig.setSimpleField(simpleField, _record.getSimpleField(simpleField));
+        }
       }
+    } catch (NoSuchMethodError e) {
+      LOG.error("Could not update user config", e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/feaea562/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
index 5f27b05..2dde23e 100644
--- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
@@ -34,6 +34,7 @@ import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.controller.provisioner.ContainerId;
 import org.apache.helix.controller.provisioner.ContainerSpec;
 import org.apache.helix.controller.provisioner.ContainerState;
+import org.apache.log4j.Logger;
 
 import com.google.common.base.Enums;
 import com.google.common.base.Optional;
@@ -42,6 +43,8 @@ import com.google.common.base.Optional;
  * Instance configurations
  */
 public class InstanceConfig extends HelixProperty {
+  private static final Logger LOG = Logger.getLogger(InstanceConfig.class);
+
   /**
    * Configurable characteristics of an instance
    */
@@ -279,26 +282,30 @@ public class InstanceConfig extends HelixProperty {
    */
   public UserConfig getUserConfig() {
     UserConfig userConfig = UserConfig.from(this);
-    for (String simpleField : _record.getSimpleFields().keySet()) {
-      Optional<InstanceConfigProperty> enumField =
-          Enums.getIfPresent(InstanceConfigProperty.class, simpleField);
-      if (!simpleField.contains(NamespacedConfig.PREFIX_CHAR + "") && !enumField.isPresent())
{
-        userConfig.setSimpleField(simpleField, _record.getSimpleField(simpleField));
+    try {
+      for (String simpleField : _record.getSimpleFields().keySet()) {
+        Optional<InstanceConfigProperty> enumField =
+            Enums.getIfPresent(InstanceConfigProperty.class, simpleField);
+        if (!simpleField.contains(NamespacedConfig.PREFIX_CHAR + "") && !enumField.isPresent())
{
+          userConfig.setSimpleField(simpleField, _record.getSimpleField(simpleField));
+        }
       }
-    }
-    for (String listField : _record.getListFields().keySet()) {
-      Optional<InstanceConfigProperty> enumField =
-          Enums.getIfPresent(InstanceConfigProperty.class, listField);
-      if (!listField.contains(NamespacedConfig.PREFIX_CHAR + "") && !enumField.isPresent())
{
-        userConfig.setListField(listField, _record.getListField(listField));
+      for (String listField : _record.getListFields().keySet()) {
+        Optional<InstanceConfigProperty> enumField =
+            Enums.getIfPresent(InstanceConfigProperty.class, listField);
+        if (!listField.contains(NamespacedConfig.PREFIX_CHAR + "") && !enumField.isPresent())
{
+          userConfig.setListField(listField, _record.getListField(listField));
+        }
       }
-    }
-    for (String mapField : _record.getMapFields().keySet()) {
-      Optional<InstanceConfigProperty> enumField =
-          Enums.getIfPresent(InstanceConfigProperty.class, mapField);
-      if (!mapField.contains(NamespacedConfig.PREFIX_CHAR + "") && !enumField.isPresent())
{
-        userConfig.setMapField(mapField, _record.getMapField(mapField));
+      for (String mapField : _record.getMapFields().keySet()) {
+        Optional<InstanceConfigProperty> enumField =
+            Enums.getIfPresent(InstanceConfigProperty.class, mapField);
+        if (!mapField.contains(NamespacedConfig.PREFIX_CHAR + "") && !enumField.isPresent())
{
+          userConfig.setMapField(mapField, _record.getMapField(mapField));
+        }
       }
+    } catch (NoSuchMethodError e) {
+      LOG.error("Could not parse InstanceConfig", e);
     }
     return userConfig;
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/feaea562/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java b/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
index 65762cf..46d7ed7 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
@@ -9,6 +9,7 @@ import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.controller.provisioner.ProvisionerConfig;
 import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.controller.rebalancer.config.RebalancerConfigHolder;
+import org.apache.log4j.Logger;
 
 import com.google.common.base.Enums;
 import com.google.common.base.Optional;
@@ -36,6 +37,8 @@ import com.google.common.base.Optional;
  * Persisted configuration properties for a resource
  */
 public class ResourceConfiguration extends HelixProperty {
+  private static final Logger LOG = Logger.getLogger(ResourceConfiguration.class);
+
   public enum Fields {
     TYPE
   }
@@ -86,21 +89,25 @@ public class ResourceConfiguration extends HelixProperty {
    */
   public UserConfig getUserConfig() {
     UserConfig userConfig = UserConfig.from(this);
-    for (String simpleField : _record.getSimpleFields().keySet()) {
-      Optional<Fields> enumField = Enums.getIfPresent(Fields.class, simpleField);
-      if (!simpleField.contains(NamespacedConfig.PREFIX_CHAR + "") && !enumField.isPresent())
{
-        userConfig.setSimpleField(simpleField, _record.getSimpleField(simpleField));
+    try {
+      for (String simpleField : _record.getSimpleFields().keySet()) {
+        Optional<Fields> enumField = Enums.getIfPresent(Fields.class, simpleField);
+        if (!simpleField.contains(NamespacedConfig.PREFIX_CHAR + "") && !enumField.isPresent())
{
+          userConfig.setSimpleField(simpleField, _record.getSimpleField(simpleField));
+        }
       }
-    }
-    for (String listField : _record.getListFields().keySet()) {
-      if (!listField.contains(NamespacedConfig.PREFIX_CHAR + "")) {
-        userConfig.setListField(listField, _record.getListField(listField));
+      for (String listField : _record.getListFields().keySet()) {
+        if (!listField.contains(NamespacedConfig.PREFIX_CHAR + "")) {
+          userConfig.setListField(listField, _record.getListField(listField));
+        }
       }
-    }
-    for (String mapField : _record.getMapFields().keySet()) {
-      if (!mapField.contains(NamespacedConfig.PREFIX_CHAR + "")) {
-        userConfig.setMapField(mapField, _record.getMapField(mapField));
+      for (String mapField : _record.getMapFields().keySet()) {
+        if (!mapField.contains(NamespacedConfig.PREFIX_CHAR + "")) {
+          userConfig.setMapField(mapField, _record.getMapField(mapField));
+        }
       }
+    } catch (NoSuchMethodError e) {
+      LOG.error("Could not parse ResourceConfiguration", e);
     }
     return userConfig;
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/feaea562/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index 383180e..1a41e06 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -22,6 +22,7 @@ package org.apache.helix.task;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
+import java.io.InputStream;
 import java.io.Reader;
 import java.io.StringReader;
 import java.util.ArrayList;
@@ -132,10 +133,25 @@ public class Workflow {
     return parse(new StringReader(yaml));
   }
 
+  /**
+   * Read a workflow from an open input stream
+   * @param inputStream the stream
+   * @return Workflow
+   */
+  public static Workflow parse(InputStream inputStream) {
+    Yaml yaml = new Yaml(new Constructor(WorkflowBean.class));
+    WorkflowBean wf = (WorkflowBean) yaml.load(inputStream);
+    return parse(wf);
+  }
+
   /** Helper function to parse workflow from a generic {@link Reader} */
   private static Workflow parse(Reader reader) throws Exception {
     Yaml yaml = new Yaml(new Constructor(WorkflowBean.class));
     WorkflowBean wf = (WorkflowBean) yaml.load(reader);
+    return parse(wf);
+  }
+
+  private static Workflow parse(WorkflowBean wf) {
     Builder builder = new Builder(wf.name);
 
     for (JobBean job : wf.jobs) {

http://git-wip-us.apache.org/repos/asf/helix/blob/feaea562/helix-provisioning/src/main/java/org/apache/helix/provisioning/TaskConfig.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/TaskConfig.java
b/helix-provisioning/src/main/java/org/apache/helix/provisioning/TaskConfig.java
index 283538d..442d074 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/TaskConfig.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/TaskConfig.java
@@ -1,10 +1,27 @@
 package org.apache.helix.provisioning;
 
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.log4j.Logger;
+
 public class TaskConfig {
+  private static final Logger LOG = Logger.getLogger(TaskConfig.class);
+
   public Map<String, String> config = new HashMap<String, String>();
+  public String yamlFile;
+  public String name;
+
+  public URI getYamlURI() {
+    try {
+      return yamlFile != null ? new URI(yamlFile) : null;
+    } catch (URISyntaxException e) {
+      LOG.error("Error parsing URI for task config", e);
+    }
+    return null;
+  }
 
   public String getValue(String key) {
     return (config != null ? config.get(key) : null);

http://git-wip-us.apache.org/repos/asf/helix/blob/feaea562/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
index 76b7877..2db4afb 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
@@ -53,6 +53,7 @@ import org.apache.helix.manager.zk.ZkHelixConnection;
 import org.apache.helix.provisioning.ApplicationSpec;
 import org.apache.helix.provisioning.ApplicationSpecFactory;
 import org.apache.helix.provisioning.HelixYarnUtil;
+import org.apache.helix.provisioning.TaskConfig;
 
 /**
  * Main class to launch the job.
@@ -151,6 +152,19 @@ public class AppLauncher {
         _appMasterConfig.setMainClass(name, serviceMainClass);
       }
     }
+
+    // Get YAML files describing all workflows to immediately start
+    Map<String, URI> workflowFiles = new HashMap<String, URI>();
+    List<TaskConfig> taskConfigs = _applicationSpec.getTaskConfigs();
+    if (taskConfigs != null) {
+      for (TaskConfig taskConfig : taskConfigs) {
+        URI configUri = taskConfig.getYamlURI();
+        if (taskConfig.name != null && configUri != null) {
+          workflowFiles.put(taskConfig.name, taskConfig.getYamlURI());
+        }
+      }
+    }
+
     // set local resources for the application master
     // local files or archives as needed
     // In this scenario, the jar file for the application master is part of the local resources
@@ -163,6 +177,13 @@ public class AppLauncher {
             hdfsDest.get(AppMasterConfig.AppEnvironment.APP_SPEC_FILE.toString()));
     localResources.put(AppMasterConfig.AppEnvironment.APP_MASTER_PKG.toString(), appMasterPkg);
     localResources.put(AppMasterConfig.AppEnvironment.APP_SPEC_FILE.toString(), appSpecFile);
+    for (String name : workflowFiles.keySet()) {
+      URI uri = workflowFiles.get(name);
+      Path dst = copyToHDFS(fs, name, uri);
+      LocalResource taskLocalResource = setupLocalResource(fs, dst);
+      localResources.put(AppMasterConfig.AppEnvironment.TASK_CONFIG_FILE.toString() + "_"
+ name,
+          taskLocalResource);
+    }
 
     // Set local resource info into app master container launch context
     amContainer.setLocalResources(localResources);
@@ -393,7 +414,7 @@ public class AppLauncher {
         prevReport = reportMessage;
         Thread.sleep(10000);
       } catch (Exception e) {
-        LOG.error("Exception while getting info ");
+        LOG.error("Exception while getting info ", e);
         break;
       }
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/feaea562/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterConfig.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterConfig.java
b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterConfig.java
index 9dcabc2..38a0dd1 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterConfig.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterConfig.java
@@ -19,7 +19,8 @@ public class AppMasterConfig {
     APP_SPEC_FILE("APP_SPEC_FILE"),
     APP_NAME("APP_NAME"),
     APP_ID("APP_ID"),
-    APP_SPEC_FACTORY("APP_SPEC_FACTORY");
+    APP_SPEC_FACTORY("APP_SPEC_FACTORY"),
+    TASK_CONFIG_FILE("TASK_CONFIG_FILE");
     String _name;
 
     private AppEnvironment(String name) {
@@ -37,8 +38,8 @@ public class AppMasterConfig {
 
   private String get(String key) {
     String value = (_envs.containsKey(key)) ? _envs.get(key) : System.getenv().get(key);
-    LOG.info("Returning value:"+ value +" for key:'"+ key + "'");
-    
+    LOG.info("Returning value:" + value + " for key:'" + key + "'");
+
     return value;
   }
 
@@ -83,6 +84,14 @@ public class AppMasterConfig {
     _envs.put(serviceName + "_classpath", classpath);
   }
 
+  public void setTaskConfigFile(String configName, String path) {
+    _envs.put(AppEnvironment.TASK_CONFIG_FILE.toString() + "_" + configName, path);
+  }
+
+  public String getTaskConfigFile(String configName) {
+    return get(AppEnvironment.TASK_CONFIG_FILE.toString() + "_" + configName);
+  }
+
   public String getApplicationSpecConfigFile() {
     return get(AppEnvironment.APP_SPEC_FILE.toString());
   }
@@ -97,6 +106,6 @@ public class AppMasterConfig {
   }
 
   public void setMainClass(String serviceName, String serviceMainClass) {
-    _envs.put(serviceName + "_mainClass", serviceMainClass);    
+    _envs.put(serviceName + "_mainClass", serviceMainClass);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/feaea562/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java
b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java
index 523fee0..e7a0f61 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java
@@ -3,6 +3,8 @@ package org.apache.helix.provisioning.yarn;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
 import java.util.List;
 import java.util.Map;
 
@@ -11,8 +13,12 @@ import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.ZkServer;
 import org.apache.commons.cli.Options;
 import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.helix.HelixController;
 import org.apache.helix.api.accessor.ClusterAccessor;
@@ -150,11 +156,16 @@ public class AppMasterLauncher {
     // Start any pre-specified jobs
     List<TaskConfig> taskConfigs = applicationSpec.getTaskConfigs();
     if (taskConfigs != null) {
+      YarnConfiguration conf = new YarnConfiguration();
+      FileSystem fs;
+      fs = FileSystem.get(conf);
       for (TaskConfig taskConfig : taskConfigs) {
-        String yamlFile = taskConfig.getValue("yamlFile");
-        if (yamlFile != null) {
-          File file = new File(yamlFile);
-          Workflow workflow = Workflow.parse(file);
+        URI yamlUri = taskConfig.getYamlURI();
+        if (yamlUri != null && taskConfig.name != null) {
+          InputStream is =
+              readFromHDFS(fs, taskConfig.name, yamlUri, applicationSpec,
+                  appAttemptID.getApplicationId());
+          Workflow workflow = Workflow.parse(is);
           TaskDriver taskDriver = new TaskDriver(new HelixConnectionAdaptor(controller));
           taskDriver.start(workflow);
         }
@@ -171,4 +182,13 @@ public class AppMasterLauncher {
     Thread.sleep(10000);
 
   }
+
+  private static InputStream readFromHDFS(FileSystem fs, String name, URI uri,
+      ApplicationSpec appSpec, ApplicationId appId) throws Exception {
+    // will throw exception if the file name is without extension
+    String extension = uri.getPath().substring(uri.getPath().lastIndexOf(".") + 1);
+    String pathSuffix = appSpec.getAppName() + "/" + appId.getId() + "/" + name + "." + extension;
+    Path dst = new Path(fs.getHomeDirectory(), pathSuffix);
+    return fs.open(dst).getWrappedStream();
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/feaea562/recipes/jobrunner-yarn/src/main/resources/job_runner_app_spec.yaml
----------------------------------------------------------------------
diff --git a/recipes/jobrunner-yarn/src/main/resources/job_runner_app_spec.yaml b/recipes/jobrunner-yarn/src/main/resources/job_runner_app_spec.yaml
index ad62ffc..0945690 100755
--- a/recipes/jobrunner-yarn/src/main/resources/job_runner_app_spec.yaml
+++ b/recipes/jobrunner-yarn/src/main/resources/job_runner_app_spec.yaml
@@ -19,9 +19,5 @@ servicePackageURIMap: {
 services: [
   JobRunner]
 taskConfigs:
-  - config: {
-      yamlFile: '/Users/kbiscuit/helix/incubator-helix/recipes/jobrunner-yarn/src/main/resources/dummy_job.yaml'
-    }
-
-
-
+  - name: JobRunnerWorkflow
+    yamlFile: 'file:///Users/kbiscuit/helix/incubator-helix/recipes/jobrunner-yarn/src/main/resources/dummy_job.yaml'


Mime
View raw message