helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject git commit: Complete job runner recipe
Date Thu, 01 May 2014 01:28:18 GMT
Repository: helix
Updated Branches:
  refs/heads/helix-provisioning 97ca4de4a -> 785bb9fbb


Complete job runner recipe


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/785bb9fb
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/785bb9fb
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/785bb9fb

Branch: refs/heads/helix-provisioning
Commit: 785bb9fbbab2d82532a26ed253e6a72dffaa9849
Parents: 97ca4de
Author: Kanak Biscuitwala <kanak@apache.org>
Authored: Wed Apr 30 18:28:08 2014 -0700
Committer: Kanak Biscuitwala <kanak@apache.org>
Committed: Wed Apr 30 18:28:08 2014 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/task/JobContext.java  |  23 ++-
 .../java/org/apache/helix/task/TaskDriver.java  |  60 ++++---
 .../org/apache/helix/task/TaskRebalancer.java   |   5 +
 .../java/org/apache/helix/task/Workflow.java    |   2 +-
 .../helix/provisioning/yarn/AppLauncher.java    |  75 +++++++--
 .../provisioning/yarn/AppMasterLauncher.java    |  50 +++---
 recipes/jobrunner-yarn/pom.xml                  | 159 +++++++++++++++++++
 recipes/jobrunner-yarn/run.sh                   |   6 +
 .../jobrunner-yarn/src/assemble/assembly.xml    |  60 +++++++
 .../src/main/config/log4j.properties            |  31 ++++
 .../yarn/example/JobRunnerMain.java             | 127 +++++++++++++++
 .../helix/provisioning/yarn/example/MyTask.java |  53 +++++++
 .../yarn/example/MyTaskAppSpec.java             | 148 +++++++++++++++++
 .../yarn/example/MyTaskAppSpecFactory.java      |  28 ++++
 .../yarn/example/MyTaskService.java             |  62 ++++++++
 .../src/main/resources/dummy_job.yaml           |  18 +++
 .../src/main/resources/job_runner_app_spec.yaml |  27 ++++
 recipes/jobrunner-yarn/src/test/conf/testng.xml |  27 ++++
 recipes/pom.xml                                 |   1 +
 19 files changed, 909 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/785bb9fb/helix-core/src/main/java/org/apache/helix/task/JobContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobContext.java b/helix-core/src/main/java/org/apache/helix/task/JobContext.java
index 7742c67..c10173d 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobContext.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobContext.java
@@ -43,7 +43,8 @@ public class JobContext extends HelixProperty {
     NUM_ATTEMPTS,
     FINISH_TIME,
     TARGET,
-    TASK_ID
+    TASK_ID,
+    ASSIGNED_PARTICIPANT
   }
 
   public JobContext(ZNRecord record) {
@@ -224,4 +225,24 @@ public class JobContext extends HelixProperty {
     }
     return partitionMap;
   }
+
+  public void setAssignedParticipant(int p, String participantName) {
+    String pStr = String.valueOf(p);
+    Map<String, String> map = _record.getMapField(pStr);
+    if (map == null) {
+      map = new TreeMap<String, String>();
+      _record.setMapField(pStr, map);
+    }
+    map.put(ContextProperties.ASSIGNED_PARTICIPANT.toString(), participantName);
+  }
+
+  public String getAssignedParticipant(int p) {
+    String pStr = String.valueOf(p);
+    Map<String, String> map = _record.getMapField(pStr);
+    if (map == null) {
+      return null;
+    } else {
+      return map.get(ContextProperties.ASSIGNED_PARTICIPANT.toString());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/785bb9fb/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index ada2f99..193b78e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -24,7 +24,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.TreeMap;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -46,7 +45,7 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.builder.CustomModeISBuilder;
 import org.apache.log4j.Logger;
 
-import com.beust.jcommander.internal.Lists;
+import com.google.common.collect.Lists;
 
 /**
  * CLI for scheduling/canceling workflows
@@ -233,36 +232,59 @@ public class TaskDriver {
     WorkflowConfig wCfg = TaskUtil.getWorkflowCfg(_manager, resource);
     WorkflowContext wCtx = TaskUtil.getWorkflowContext(_manager, resource);
 
-    LOG.info("Workflow " + resource + " consists of the following tasks: "
+    System.out.println("Workflow " + resource + " consists of the following tasks: "
         + wCfg.getJobDag().getAllNodes());
-    LOG.info("Current state of workflow is " + wCtx.getWorkflowState().name());
-    LOG.info("Job states are: ");
-    LOG.info("-------");
+    System.out.println("Current state of workflow is " + wCtx.getWorkflowState().name());
+    System.out.println("Job states are: ");
+    System.out.println("-------");
     for (String job : wCfg.getJobDag().getAllNodes()) {
-      LOG.info("Task " + job + " is " + wCtx.getJobState(job));
+      System.out.println("Job " + job + " is " + wCtx.getJobState(job));
 
       // fetch task information
+      JobConfig jCfg = TaskUtil.getJobCfg(_manager, job);
       JobContext jCtx = TaskUtil.getJobContext(_manager, job);
 
       // calculate taskPartitions
       List<Integer> partitions = Lists.newArrayList(jCtx.getPartitionSet());
       Collections.sort(partitions);
 
-      // group partitions by status
-      Map<TaskPartitionState, Integer> statusCount = new TreeMap<TaskPartitionState, Integer>();
-      for (Integer i : partitions) {
-        TaskPartitionState s = jCtx.getPartitionState(i);
-        if (!statusCount.containsKey(s)) {
-          statusCount.put(s, 0);
+      // report status
+      for (Integer partition : partitions) {
+        String taskId = jCtx.getTaskIdForPartition(partition);
+        taskId = (taskId != null) ? taskId : jCtx.getTargetForPartition(partition);
+        System.out.println("Task: " + taskId);
+        TaskConfig taskConfig = jCfg.getTaskConfig(taskId);
+        if (taskConfig != null) {
+          System.out.println("Configuration: " + taskConfig.getConfigMap());
         }
-        statusCount.put(s, statusCount.get(s) + 1);
-      }
-
-      for (TaskPartitionState s : statusCount.keySet()) {
-        LOG.info(statusCount.get(s) + "/" + partitions.size() + " in state " + s.name());
+        TaskPartitionState state = jCtx.getPartitionState(partition);
+        if (state == null) {
+          state = TaskPartitionState.INIT;
+        }
+        System.out.println("State: " + state);
+        String assignedParticipant = jCtx.getAssignedParticipant(partition);
+        if (assignedParticipant != null) {
+          System.out.println("Assigned participant: " + assignedParticipant);
+        }
+        System.out.println("-------");
       }
 
-      LOG.info("-------");
+      // group partitions by status
+      /*
+       * Map<TaskPartitionState, Integer> statusCount = new TreeMap<TaskPartitionState, Integer>();
+       * for (Integer i : partitions) {
+       * TaskPartitionState s = jCtx.getPartitionState(i);
+       * if (!statusCount.containsKey(s)) {
+       * statusCount.put(s, 0);
+       * }
+       * statusCount.put(s, statusCount.get(s) + 1);
+       * }
+       * for (TaskPartitionState s : statusCount.keySet()) {
+       * LOG.info(statusCount.get(s) + "/" + partitions.size() + " in state " + s.name());
+       * }
+       */
+
+      System.out.println("-------");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/785bb9fb/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 829f0c4..e9f60f9 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -227,6 +227,7 @@ public abstract class TaskRebalancer implements HelixRebalancer {
       // TASK_ERROR, ERROR.
       Set<Integer> donePartitions = new TreeSet<Integer>();
       for (int pId : pSet) {
+        jobCtx.setPartitionState(pId, TaskPartitionState.INIT);
         final String pName = pName(jobResource, pId);
 
         // Check for pending state transitions on this (partition, instance).
@@ -289,6 +290,8 @@ public abstract class TaskRebalancer implements HelixRebalancer {
             nextState = TaskPartitionState.STOPPED;
           }
 
+          jobCtx.setPartitionState(pId, currState);
+
           paMap.put(pId, new PartitionAssignment(instance.toString(), nextState.name()));
           assignedPartitions.add(pId);
           LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
@@ -378,6 +381,8 @@ public abstract class TaskRebalancer implements HelixRebalancer {
             paMap.put(pId,
                 new PartitionAssignment(instance.toString(), TaskPartitionState.RUNNING.name()));
             excludeSet.add(pId);
+            jobCtx.setPartitionState(pId, TaskPartitionState.INIT);
+            jobCtx.setAssignedParticipant(pId, instance.toString());
             LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
                 TaskPartitionState.RUNNING, instance));
           }

http://git-wip-us.apache.org/repos/asf/helix/blob/785bb9fb/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index 5b27fb6..383180e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -152,7 +152,7 @@ public class Workflow {
       builder.addConfig(job.name, JobConfig.WORKFLOW_ID, wf.name);
       builder.addConfig(job.name, JobConfig.COMMAND, job.command);
       if (job.jobConfigMap != null) {
-        builder.addConfig(job.name, JobConfig.JOB_CONFIG_MAP, job.jobConfigMap.toString());
+        builder.addJobConfigMap(job.name, job.jobConfigMap);
       }
       builder.addConfig(job.name, JobConfig.TARGET_RESOURCE, job.targetResource);
       if (job.targetPartitionStates != null) {

http://git-wip-us.apache.org/repos/asf/helix/blob/785bb9fb/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
index 4b77105..9a19842 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
@@ -2,7 +2,6 @@ package org.apache.helix.provisioning.yarn;
 
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
@@ -93,6 +92,10 @@ public class AppLauncher {
     yarnClient.init(_conf);
   }
 
+  public ApplicationSpec getApplicationSpec() {
+    return _applicationSpec;
+  }
+
   public boolean launch() throws Exception {
     LOG.info("Running Client");
     yarnClient.start();
@@ -189,7 +192,7 @@ public class AppLauncher {
       classPathEnv.append(':');
       classPathEnv.append(System.getProperty("java.class.path"));
     }
-    LOG.info("\n\n Setting the classpath to launch AppMaster:\n\n" );
+    LOG.info("\n\n Setting the classpath to launch AppMaster:\n\n");
     // Set the env variables to be setup in the env where the application master will be run
     Map<String, String> env = new HashMap<String, String>(_appMasterConfig.getEnv());
     env.put("CLASSPATH", classPathEnv.toString());
@@ -268,12 +271,11 @@ public class AppLauncher {
     // Set the queue to which this application is to be submitted in the RM
     appContext.setQueue(amQueue);
 
-
     LOG.info("Submitting application to YARN Resource Manager");
 
     ApplicationId applicationId = yarnClient.submitApplication(appContext);
 
-    LOG.info("Submitted application with applicationId:" + applicationId );
+    LOG.info("Submitted application with applicationId:" + applicationId);
 
     return true;
   }
@@ -352,6 +354,52 @@ public class AppLauncher {
         || path.endsWith("zip");
   }
 
+  public HelixConnection pollForConnection() {
+    String prevReport = "";
+    HelixConnection connection = null;
+
+    while (true) {
+      try {
+        // Get application report for the appId we are interested in
+        ApplicationReport report = yarnClient.getApplicationReport(_appId);
+
+        String reportMessage = generateReport(report);
+        if (!reportMessage.equals(prevReport)) {
+          LOG.info(reportMessage);
+        }
+        YarnApplicationState state = report.getYarnApplicationState();
+        if (YarnApplicationState.RUNNING == state) {
+          if (connection == null) {
+            String hostName = null;
+            int ind = report.getHost().indexOf('/');
+            if (ind > -1) {
+              hostName = report.getHost().substring(ind + 1);
+            } else {
+              hostName = report.getHost();
+            }
+            connection = new ZkHelixConnection(hostName + ":2181");
+
+            try {
+              connection.connect();
+            } catch (Exception e) {
+              LOG.warn("AppMaster started but not yet initialized");
+              connection = null;
+            }
+          }
+          if (connection.isConnected()) {
+            return connection;
+          }
+        }
+        prevReport = reportMessage;
+        Thread.sleep(10000);
+      } catch (Exception e) {
+        LOG.error("Exception while getting info ");
+        break;
+      }
+    }
+    return null;
+  }
+
   /**
    * @return true if successfully completed, it will print status every X seconds
    */
@@ -434,7 +482,7 @@ public class AppLauncher {
         + ", appTrackingUrl=" + report.getTrackingUrl() + ", appUser=" + report.getUser();
   }
 
-  protected void cleanup() {
+  public void cleanup() {
     LOG.info("Cleaning up");
     try {
       ApplicationReport applicationReport = yarnClient.getApplicationReport(_appId);
@@ -446,23 +494,28 @@ public class AppLauncher {
   }
 
   /**
-   * Launches the application on a YARN cluster. Once launched, it will display (periodically) the status of the containers in the application.
+   * Launches the application on a YARN cluster. Once launched, it will display (periodically) the
+   * status of the containers in the application.
    * @param args app_spec_provider and app_config_spec
    * @throws Exception
    */
   public static void main(String[] args) throws Exception {
 
     Options opts = new Options();
-    opts.addOption(new Option("app_spec_provider",true, "Application Spec Factory Class that will parse the app_config_spec file"));
-    opts.addOption(new Option("app_config_spec",true, "YAML config file that provides the app specifications"));
+    opts.addOption(new Option("app_spec_provider", true,
+        "Application Spec Factory Class that will parse the app_config_spec file"));
+    opts.addOption(new Option("app_config_spec", true,
+        "YAML config file that provides the app specifications"));
     CommandLine cliParser = new GnuParser().parse(opts, args);
     String appSpecFactoryClass = cliParser.getOptionValue("app_spec_provider");
     String yamlConfigFileName = cliParser.getOptionValue("app_config_spec");
 
-    ApplicationSpecFactory applicationSpecFactory = HelixYarnUtil.createInstance(appSpecFactoryClass);
+    ApplicationSpecFactory applicationSpecFactory =
+        HelixYarnUtil.createInstance(appSpecFactoryClass);
     File yamlConfigFile = new File(yamlConfigFileName);
-    if(!yamlConfigFile.exists()){
-      throw new IllegalArgumentException("YAML app_config_spec file: '"+ yamlConfigFileName + "' does not exist");
+    if (!yamlConfigFile.exists()) {
+      throw new IllegalArgumentException("YAML app_config_spec file: '" + yamlConfigFileName
+          + "' does not exist");
     }
     final AppLauncher launcher = new AppLauncher(applicationSpecFactory, yamlConfigFile);
     launcher.launch();

http://git-wip-us.apache.org/repos/asf/helix/blob/785bb9fb/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java
index 72d6ea9..523fee0 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java
@@ -3,21 +3,16 @@ package org.apache.helix.provisioning.yarn;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
-import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 
 import org.I0Itec.zkclient.IDefaultNameSpace;
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.ZkServer;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
 import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.helix.HelixController;
 import org.apache.helix.api.accessor.ClusterAccessor;
@@ -26,15 +21,18 @@ import org.apache.helix.api.config.ResourceConfig;
 import org.apache.helix.api.id.ClusterId;
 import org.apache.helix.api.id.ControllerId;
 import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.provisioner.ProvisionerConfig;
 import org.apache.helix.controller.rebalancer.config.FullAutoRebalancerConfig;
 import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.manager.zk.HelixConnectionAdaptor;
 import org.apache.helix.manager.zk.ZkHelixConnection;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.provisioning.ApplicationSpec;
 import org.apache.helix.provisioning.ApplicationSpecFactory;
 import org.apache.helix.provisioning.HelixYarnUtil;
 import org.apache.helix.provisioning.ServiceConfig;
+import org.apache.helix.provisioning.TaskConfig;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.Workflow;
 import org.apache.helix.tools.StateModelConfigGenerator;
 import org.apache.log4j.Logger;
 
@@ -50,8 +48,7 @@ import org.apache.log4j.Logger;
 public class AppMasterLauncher {
   public static Logger LOG = Logger.getLogger(AppMasterLauncher.class);
 
-  @SuppressWarnings("unchecked")
-  public static void main(String[] args) throws Exception{
+  public static void main(String[] args) throws Exception {
     Map<String, String> env = System.getenv();
     LOG.info("Starting app master with the following environment variables");
     for (String key : env.keySet()) {
@@ -61,11 +58,6 @@ public class AppMasterLauncher {
     Options opts;
     opts = new Options();
     opts.addOption("num_containers", true, "Number of containers");
-    try {
-      CommandLine cliParser = new GnuParser().parse(opts, args);
-    } catch (Exception e) {
-      LOG.error("Error parsing input arguments" + Arrays.toString(args), e);
-    }
 
     // START ZOOKEEPER
     String dataDir = "dataDir";
@@ -94,7 +86,7 @@ public class AppMasterLauncher {
 
     String configFile = AppMasterConfig.AppEnvironment.APP_SPEC_FILE.toString();
     String className = appMasterConfig.getApplicationSpecFactory();
-   
+
     GenericApplicationMaster genericApplicationMaster = new GenericApplicationMaster(appAttemptID);
     try {
       genericApplicationMaster.start();
@@ -102,8 +94,8 @@ public class AppMasterLauncher {
       LOG.error("Unable to start application master: ", e);
     }
     ApplicationSpecFactory factory = HelixYarnUtil.createInstance(className);
-    
-    //TODO: Avoid setting static variable.
+
+    // TODO: Avoid setting static variable.
     YarnProvisioner.applicationMaster = genericApplicationMaster;
     YarnProvisioner.applicationMasterConfig = appMasterConfig;
     ApplicationSpec applicationSpec = factory.fromYaml(new FileInputStream(configFile));
@@ -121,17 +113,19 @@ public class AppMasterLauncher {
     ClusterAccessor clusterAccessor = connection.createClusterAccessor(clusterId);
     StateModelDefinition statelessService =
         new StateModelDefinition(StateModelConfigGenerator.generateConfigForStatelessService());
-    clusterAccessor.createCluster(new ClusterConfig.Builder(clusterId).addStateModelDefinition(
-        statelessService).build());
+    StateModelDefinition taskStateModel =
+        new StateModelDefinition(StateModelConfigGenerator.generateConfigForTaskStateModel());
+    clusterAccessor.createCluster(new ClusterConfig.Builder(clusterId)
+        .addStateModelDefinition(statelessService).addStateModelDefinition(taskStateModel).build());
     for (String service : applicationSpec.getServices()) {
       String resourceName = service;
       // add the resource with the local provisioner
       ResourceId resourceId = ResourceId.from(resourceName);
-      
+
       ServiceConfig serviceConfig = applicationSpec.getServiceConfig(resourceName);
       serviceConfig.setSimpleField("service_name", service);
       int numContainers = serviceConfig.getIntField("num_containers", 1);
-      
+
       YarnProvisionerConfig provisionerConfig = new YarnProvisionerConfig(resourceId);
       provisionerConfig.setNumContainers(numContainers);
 
@@ -153,6 +147,20 @@ public class AppMasterLauncher {
     HelixController controller = connection.createController(clusterId, controllerId);
     controller.start();
 
+    // Start any pre-specified jobs
+    List<TaskConfig> taskConfigs = applicationSpec.getTaskConfigs();
+    if (taskConfigs != null) {
+      for (TaskConfig taskConfig : taskConfigs) {
+        String yamlFile = taskConfig.getValue("yamlFile");
+        if (yamlFile != null) {
+          File file = new File(yamlFile);
+          Workflow workflow = Workflow.parse(file);
+          TaskDriver taskDriver = new TaskDriver(new HelixConnectionAdaptor(controller));
+          taskDriver.start(workflow);
+        }
+      }
+    }
+
     Thread shutdownhook = new Thread(new Runnable() {
       @Override
       public void run() {

http://git-wip-us.apache.org/repos/asf/helix/blob/785bb9fb/recipes/jobrunner-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/recipes/jobrunner-yarn/pom.xml b/recipes/jobrunner-yarn/pom.xml
new file mode 100644
index 0000000..f067a56
--- /dev/null
+++ b/recipes/jobrunner-yarn/pom.xml
@@ -0,0 +1,159 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.helix.recipes</groupId>
+    <artifactId>recipes</artifactId>
+    <version>0.7.1-incubating-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>jobrunner-yarn</artifactId>
+  <packaging>bundle</packaging>
+  <name>Apache Helix :: Recipes :: Provisioning :: YARN :: Job Runner</name>
+
+  <properties>
+    <osgi.import>
+      org.apache.helix*,
+      org.apache.log4j,
+      *
+    </osgi.import>
+    <osgi.export>org.apache.helix.provisioning.yarn.example*;version="${project.version};-noimport:=true</osgi.export>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.testng</groupId>
+      <artifactId>testng</artifactId>
+      <version>6.0.1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.helix</groupId>
+      <artifactId>helix-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.helix</groupId>
+      <artifactId>helix-provisioning</artifactId>
+      <version>0.7.1-incubating-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>javax.mail</groupId>
+          <artifactId>mail</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>javax.jms</groupId>
+          <artifactId>jms</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jdmk</groupId>
+          <artifactId>jmxtools</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jmx</groupId>
+          <artifactId>jmxri</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+  </dependencies>
+  <build>
+    <pluginManagement>
+      <plugins>
+        <plugin>
+          <groupId>org.codehaus.mojo</groupId>
+          <artifactId>appassembler-maven-plugin</artifactId>
+          <configuration>
+            <!-- Set the target configuration directory to be used in the bin scripts -->
+            <!-- <configurationDirectory>conf</configurationDirectory> -->
+            <!-- Copy the contents from "/src/main/config" to the target configuration
+              directory in the assembled application -->
+            <!-- <copyConfigurationDirectory>true</copyConfigurationDirectory> -->
+            <!-- Include the target configuration directory in the beginning of
+              the classpath declaration in the bin scripts -->
+            <includeConfigurationDirectoryInClasspath>true</includeConfigurationDirectoryInClasspath>
+            <assembleDirectory>${project.build.directory}/${project.artifactId}-pkg</assembleDirectory>
+            <!-- Extra JVM arguments that will be included in the bin scripts -->
+            <extraJvmArguments>-Xms512m -Xmx512m</extraJvmArguments>
+            <!-- Generate bin scripts for windows and unix pr default -->
+            <platforms>
+              <platform>windows</platform>
+              <platform>unix</platform>
+            </platforms>
+          </configuration>
+          <executions>
+            <execution>
+              <phase>package</phase>
+              <goals>
+                <goal>assemble</goal>
+              </goals>
+            </execution>
+          </executions>
+        </plugin>
+        <plugin>
+          <groupId>org.apache.rat</groupId>
+          <artifactId>apache-rat-plugin</artifactId>
+            <configuration>
+              <excludes combine.children="append">
+              </excludes>
+            </configuration>
+        </plugin>
+      </plugins>
+    </pluginManagement>
+    <plugins>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>appassembler-maven-plugin</artifactId>
+        <configuration>
+          <programs>
+             <program>
+              <mainClass>org.apache.helix.provisioning.yarn.AppLauncher</mainClass>
+              <name>app-launcher</name>
+            </program>
+             <program>
+              <mainClass>org.apache.helix.provisioning.yarn.example.JobRunnerMain</mainClass>
+              <name>job-runner</name>
+            </program>
+          </programs>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <configuration>
+          <descriptors>
+            <descriptor>src/assemble/assembly.xml</descriptor>
+          </descriptors>
+        </configuration>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/helix/blob/785bb9fb/recipes/jobrunner-yarn/run.sh
----------------------------------------------------------------------
diff --git a/recipes/jobrunner-yarn/run.sh b/recipes/jobrunner-yarn/run.sh
new file mode 100755
index 0000000..07448bb
--- /dev/null
+++ b/recipes/jobrunner-yarn/run.sh
@@ -0,0 +1,6 @@
+#cd ../../
+#mvn clean install -DskipTests
+#cd recipes/helloworld-provisioning-yarn
+mvn clean package -DskipTests
+chmod +x target/helloworld-provisioning-yarn-pkg/bin/app-launcher.sh
+target/helloworld-provisioning-yarn/pkg/bin/app-launcher.sh org.apache.helix.provisioning.yarn.example.HelloWordAppSpecFactory /Users/kgopalak/Documents/projects/incubator-helix/recipes/helloworld-provisioning-yarn/src/main/resources/hello_world_app_spec.yaml

http://git-wip-us.apache.org/repos/asf/helix/blob/785bb9fb/recipes/jobrunner-yarn/src/assemble/assembly.xml
----------------------------------------------------------------------
diff --git a/recipes/jobrunner-yarn/src/assemble/assembly.xml b/recipes/jobrunner-yarn/src/assemble/assembly.xml
new file mode 100644
index 0000000..c2d08a1
--- /dev/null
+++ b/recipes/jobrunner-yarn/src/assemble/assembly.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<assembly>
+  <id>pkg</id>
+  <formats>
+    <format>tar</format>
+  </formats>
+  <fileSets>
+    <fileSet>
+      <directory>${project.build.directory}/${project.artifactId}-pkg/bin</directory>
+      <outputDirectory>bin</outputDirectory>
+      <lineEnding>unix</lineEnding>
+      <fileMode>0755</fileMode>
+      <directoryMode>0755</directoryMode>
+    </fileSet>
+    <fileSet>
+      <directory>${project.build.directory}/${project.artifactId}-pkg/repo/</directory>
+      <outputDirectory>repo</outputDirectory>
+      <fileMode>0755</fileMode>
+      <directoryMode>0755</directoryMode>
+      <excludes>
+        <exclude>**/*.xml</exclude>
+      </excludes>
+    </fileSet>
+     <fileSet>
+      <directory>${project.build.directory}/${project.artifactId}-pkg/conf</directory>
+      <outputDirectory>conf</outputDirectory>
+      <lineEnding>unix</lineEnding>
+      <fileMode>0755</fileMode>
+      <directoryMode>0755</directoryMode>
+    </fileSet>
+    <fileSet>
+      <directory>${project.basedir}</directory>
+      <outputDirectory>/</outputDirectory>
+      <includes>
+        <include>LICENSE</include>
+        <include>NOTICE</include>
+        <include>DISCLAIMER</include>
+      </includes>
+      <fileMode>0755</fileMode>
+    </fileSet>
+  </fileSets>
+</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/785bb9fb/recipes/jobrunner-yarn/src/main/config/log4j.properties
----------------------------------------------------------------------
diff --git a/recipes/jobrunner-yarn/src/main/config/log4j.properties b/recipes/jobrunner-yarn/src/main/config/log4j.properties
new file mode 100644
index 0000000..91fac03
--- /dev/null
+++ b/recipes/jobrunner-yarn/src/main/config/log4j.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+##
+
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=DEBUG,A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+log4j.logger.org.I0Itec=ERROR
+log4j.logger.org.apache=ERROR

http://git-wip-us.apache.org/repos/asf/helix/blob/785bb9fb/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/JobRunnerMain.java
----------------------------------------------------------------------
diff --git a/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/JobRunnerMain.java b/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/JobRunnerMain.java
new file mode 100644
index 0000000..623854f
--- /dev/null
+++ b/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/JobRunnerMain.java
@@ -0,0 +1,127 @@
+package org.apache.helix.provisioning.yarn.example;
+
+import java.io.File;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.HelixConnection;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixRole;
+import org.apache.helix.InstanceType;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.accessor.ClusterAccessor;
+import org.apache.helix.api.config.ContainerConfig;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.Id;
+import org.apache.helix.manager.zk.HelixConnectionAdaptor;
+import org.apache.helix.provisioning.ApplicationSpec;
+import org.apache.helix.provisioning.ApplicationSpecFactory;
+import org.apache.helix.provisioning.HelixYarnUtil;
+import org.apache.helix.provisioning.TaskConfig;
+import org.apache.helix.provisioning.yarn.AppLauncher;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.Workflow;
+
+public class JobRunnerMain {
+  public static void main(String[] args) throws Exception {
+    Options opts = new Options();
+    opts.addOption(new Option("app_spec_provider", true,
+        "Application Spec Factory Class that will parse the app_config_spec file"));
+    opts.addOption(new Option("app_config_spec", true,
+        "YAML config file that provides the app specifications"));
+    CommandLine cliParser = new GnuParser().parse(opts, args);
+    String appSpecFactoryClass = cliParser.getOptionValue("app_spec_provider");
+    String yamlConfigFileName = cliParser.getOptionValue("app_config_spec");
+
+    ApplicationSpecFactory applicationSpecFactory =
+        HelixYarnUtil.createInstance(appSpecFactoryClass);
+    File yamlConfigFile = new File(yamlConfigFileName);
+    if (!yamlConfigFile.exists()) {
+      throw new IllegalArgumentException("YAML app_config_spec file: '" + yamlConfigFileName
+          + "' does not exist");
+    }
+    final AppLauncher launcher = new AppLauncher(applicationSpecFactory, yamlConfigFile);
+    launcher.launch();
+    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+
+      @Override
+      public void run() {
+        launcher.cleanup();
+      }
+    }));
+
+    final ApplicationSpec appSpec = launcher.getApplicationSpec();
+
+    // Repeatedly print status
+    final HelixConnection connection = launcher.pollForConnection();
+    final ClusterId clusterId = ClusterId.from(appSpec.getAppName());
+    // TODO: this is a hack -- TaskDriver should accept a connection instead of a manager
+    HelixManager manager = new HelixConnectionAdaptor(new HelixRole() {
+      @Override
+      public HelixConnection getConnection() {
+        return connection;
+      }
+
+      @Override
+      public ClusterId getClusterId() {
+        return clusterId;
+      }
+
+      @Override
+      public Id getId() {
+        return null;
+      }
+
+      @Override
+      public InstanceType getType() {
+        return InstanceType.ADMINISTRATOR;
+      }
+
+      @Override
+      public ClusterMessagingService getMessagingService() {
+        return null;
+      }
+    });
+
+    // Get all submitted jobs
+    String workflow = null;
+    List<TaskConfig> taskConfigs = appSpec.getTaskConfigs();
+    if (taskConfigs != null) {
+      for (TaskConfig taskConfig : taskConfigs) {
+        String yamlFile = taskConfig.getValue("yamlFile");
+        if (yamlFile != null) {
+          Workflow flow = Workflow.parse(new File(yamlFile));
+          workflow = flow.getName();
+        }
+      }
+    }
+
+    // Repeatedly poll for status
+    if (workflow != null) {
+      ClusterAccessor accessor = connection.createClusterAccessor(clusterId);
+      TaskDriver driver = new TaskDriver(manager);
+      while (true) {
+        System.out.println("CONTAINER STATUS");
+        System.out.println("----------------");
+        Collection<Participant> participants = accessor.readParticipants().values();
+        for (Participant participant : participants) {
+          ContainerConfig containerConfig = participant.getContainerConfig();
+          if (containerConfig != null) {
+            System.out.println(participant.getId() + "[" + containerConfig.getId() + "]: "
+                + containerConfig.getState());
+          }
+        }
+        System.out.println("----------------");
+        System.out.println("TASK STATUS");
+        System.out.println("----------------");
+        driver.list(workflow);
+        Thread.sleep(5000);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/785bb9fb/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTask.java
----------------------------------------------------------------------
diff --git a/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTask.java b/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTask.java
new file mode 100644
index 0000000..584550d
--- /dev/null
+++ b/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTask.java
@@ -0,0 +1,53 @@
+package org.apache.helix.provisioning.yarn.example;
+
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskResult;
+import org.apache.log4j.Logger;
+
+/**
+ * Callbacks for task execution - THIS INTERFACE IS SUBJECT TO CHANGE
+ */
+public class MyTask implements Task {
+  private static final Logger LOG = Logger.getLogger(MyTask.class);
+  private static final long DEFAULT_DELAY = 60000L;
+  private final long _delay;
+  private volatile boolean _canceled;
+
+  public MyTask(TaskCallbackContext context) {
+    LOG.info("Job config" + context.getJobConfig().getJobConfigMap());
+    if (context.getTaskConfig() != null) {
+      LOG.info("Task config: " + context.getTaskConfig().getConfigMap());
+    }
+    _delay = DEFAULT_DELAY;
+  }
+
+  @Override
+  public TaskResult run() {
+    long expiry = System.currentTimeMillis() + _delay;
+    long timeLeft;
+    while (System.currentTimeMillis() < expiry) {
+      if (_canceled) {
+        timeLeft = expiry - System.currentTimeMillis();
+        return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0 ? 0
+            : timeLeft));
+      }
+      sleep(50);
+    }
+    timeLeft = expiry - System.currentTimeMillis();
+    return new TaskResult(TaskResult.Status.COMPLETED, String.valueOf(timeLeft < 0 ? 0 : timeLeft));
+  }
+
+  @Override
+  public void cancel() {
+    _canceled = true;
+  }
+
+  private static void sleep(long d) {
+    try {
+      Thread.sleep(d);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/785bb9fb/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTaskAppSpec.java
----------------------------------------------------------------------
diff --git a/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTaskAppSpec.java b/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTaskAppSpec.java
new file mode 100644
index 0000000..a20994c
--- /dev/null
+++ b/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTaskAppSpec.java
@@ -0,0 +1,148 @@
+package org.apache.helix.provisioning.yarn.example;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.provisioning.AppConfig;
+import org.apache.helix.provisioning.ApplicationSpec;
+import org.apache.helix.provisioning.ServiceConfig;
+import org.apache.helix.provisioning.TaskConfig;
+
+import com.google.common.collect.Maps;
+
+public class MyTaskAppSpec implements ApplicationSpec {
+
+  public String _appName;
+
+  public AppConfig _appConfig;
+
+  public List<String> _services;
+
+  private String _appMasterPackageUri;
+
+  private Map<String, String> _servicePackageURIMap;
+
+  private Map<String, String> _serviceMainClassMap;
+
+  private Map<String, ServiceConfig> _serviceConfigMap;
+
+  private List<TaskConfig> _taskConfigs;
+
+  public AppConfig getAppConfig() {
+    return _appConfig;
+  }
+
+  public void setAppConfig(AppConfig appConfig) {
+    _appConfig = appConfig;
+  }
+
+  public String getAppMasterPackageUri() {
+    return _appMasterPackageUri;
+  }
+
+  public void setAppMasterPackageUri(String appMasterPackageUri) {
+    _appMasterPackageUri = appMasterPackageUri;
+  }
+
+  public Map<String, String> getServicePackageURIMap() {
+    return _servicePackageURIMap;
+  }
+
+  public void setServicePackageURIMap(Map<String, String> servicePackageURIMap) {
+    _servicePackageURIMap = servicePackageURIMap;
+  }
+
+  public Map<String, String> getServiceMainClassMap() {
+    return _serviceMainClassMap;
+  }
+
+  public void setServiceMainClassMap(Map<String, String> serviceMainClassMap) {
+    _serviceMainClassMap = serviceMainClassMap;
+  }
+
+  public Map<String, Map<String, String>> getServiceConfigMap() {
+    Map<String, Map<String, String>> map = Maps.newHashMap();
+    for (String service : _serviceConfigMap.keySet()) {
+      map.put(service, _serviceConfigMap.get(service).getSimpleFields());
+    }
+    return map;
+  }
+
+  public void setServiceConfigMap(Map<String, Map<String, Object>> map) {
+    _serviceConfigMap = Maps.newHashMap();
+
+    for (String service : map.keySet()) {
+      ServiceConfig serviceConfig = new ServiceConfig(Scope.resource(ResourceId.from(service)));
+      Map<String, Object> simpleFields = map.get(service);
+      for (String key : simpleFields.keySet()) {
+        serviceConfig.setSimpleField(key, simpleFields.get(key).toString());
+      }
+      _serviceConfigMap.put(service, serviceConfig);
+    }
+  }
+
+  public void setAppName(String appName) {
+    _appName = appName;
+  }
+
+  public void setServices(List<String> services) {
+    _services = services;
+  }
+
+  public void setTaskConfigs(List<TaskConfig> taskConfigs) {
+    _taskConfigs = taskConfigs;
+  }
+
+  @Override
+  public String getAppName() {
+    return _appName;
+  }
+
+  @Override
+  public AppConfig getConfig() {
+    return _appConfig;
+  }
+
+  @Override
+  public List<String> getServices() {
+    return _services;
+  }
+
+  @Override
+  public URI getAppMasterPackage() {
+    try {
+      return new URI(_appMasterPackageUri);
+    } catch (URISyntaxException e) {
+      return null;
+    }
+  }
+
+  @Override
+  public URI getServicePackage(String serviceName) {
+    try {
+      return new URI(_servicePackageURIMap.get(serviceName));
+    } catch (URISyntaxException e) {
+      return null;
+    }
+  }
+
+  @Override
+  public String getServiceMainClass(String service) {
+    return _serviceMainClassMap.get(service);
+  }
+
+  @Override
+  public ServiceConfig getServiceConfig(String serviceName) {
+    return _serviceConfigMap.get(serviceName);
+  }
+
+  @Override
+  public List<TaskConfig> getTaskConfigs() {
+    return _taskConfigs;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/785bb9fb/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTaskAppSpecFactory.java
----------------------------------------------------------------------
diff --git a/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTaskAppSpecFactory.java b/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTaskAppSpecFactory.java
new file mode 100644
index 0000000..17601ba
--- /dev/null
+++ b/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTaskAppSpecFactory.java
@@ -0,0 +1,28 @@
+package org.apache.helix.provisioning.yarn.example;
+
+import java.io.InputStream;
+
+import org.apache.helix.provisioning.ApplicationSpec;
+import org.apache.helix.provisioning.ApplicationSpecFactory;
+import org.yaml.snakeyaml.Yaml;
+
+public class MyTaskAppSpecFactory implements ApplicationSpecFactory {
+
+  @Override
+  public ApplicationSpec fromYaml(InputStream inputstream) {
+    return (ApplicationSpec) new Yaml().load(inputstream);
+    // return data;
+  }
+
+  public static void main(String[] args) {
+
+    Yaml yaml = new Yaml();
+    InputStream resourceAsStream =
+        ClassLoader.getSystemClassLoader().getResourceAsStream("job_runner_app_spec.yaml");
+    MyTaskAppSpec spec = yaml.loadAs(resourceAsStream, MyTaskAppSpec.class);
+    String dump = yaml.dump(spec);
+    System.out.println(dump);
+    System.out.println(spec.getServiceConfig("JobRunner").getStringField("num_containers", "1"));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/785bb9fb/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTaskService.java
----------------------------------------------------------------------
diff --git a/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTaskService.java b/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTaskService.java
new file mode 100644
index 0000000..22c3ab0
--- /dev/null
+++ b/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTaskService.java
@@ -0,0 +1,62 @@
+package org.apache.helix.provisioning.yarn.example;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.HelixConnection;
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.manager.zk.HelixConnectionAdaptor;
+import org.apache.helix.participant.AbstractParticipantService;
+import org.apache.helix.provisioning.ServiceConfig;
+import org.apache.helix.provisioning.participant.StatelessParticipantService;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.log4j.Logger;
+
+/**
+ * A simple "service" for task callback registration.
+ */
+public class MyTaskService extends StatelessParticipantService {
+
+  private static Logger LOG = Logger.getLogger(AbstractParticipantService.class);
+
+  static String SERVICE_NAME = "JobRunner";
+
+  public MyTaskService(HelixConnection connection, ClusterId clusterId,
+      ParticipantId participantId) {
+    super(connection, clusterId, participantId, SERVICE_NAME);
+  }
+
+  @Override
+  protected void init(ServiceConfig serviceConfig) {
+    LOG.info("Initialized service with config " + serviceConfig);
+
+    // Register for callbacks for tasks
+    HelixManager manager = new HelixConnectionAdaptor(getParticipant());
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
+    taskFactoryReg.put("RunTask", new TaskFactory() {
+      @Override
+      public Task createNewTask(TaskCallbackContext context) {
+        return new MyTask(context);
+      }
+    });
+    getParticipant().getStateMachineEngine().registerStateModelFactory(
+        StateModelDefId.from("Task"), new TaskStateModelFactory(manager, taskFactoryReg));
+  }
+
+  @Override
+  protected void goOnline() {
+    LOG.info("JobRunner service is told to go online");
+  }
+
+  @Override
+  protected void goOffine() {
+    LOG.info("JobRunner service is told to go offline");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/785bb9fb/recipes/jobrunner-yarn/src/main/resources/dummy_job.yaml
----------------------------------------------------------------------
diff --git a/recipes/jobrunner-yarn/src/main/resources/dummy_job.yaml b/recipes/jobrunner-yarn/src/main/resources/dummy_job.yaml
new file mode 100644
index 0000000..0187fd1
--- /dev/null
+++ b/recipes/jobrunner-yarn/src/main/resources/dummy_job.yaml
@@ -0,0 +1,18 @@
+name: myJob1234
+jobs:
+    - name: myJob1234
+      command: RunTask
+      jobConfigMap: {
+        k1: "v1",
+        k2: "v2"
+      }
+      tasks:
+        - taskConfigMap: {
+            k3: "v3"
+          }
+        - taskConfigMap: {
+            k4: "v4"
+          }
+        - taskConfigMap: {
+            k5: "v5"
+          }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/785bb9fb/recipes/jobrunner-yarn/src/main/resources/job_runner_app_spec.yaml
----------------------------------------------------------------------
diff --git a/recipes/jobrunner-yarn/src/main/resources/job_runner_app_spec.yaml b/recipes/jobrunner-yarn/src/main/resources/job_runner_app_spec.yaml
new file mode 100755
index 0000000..ad62ffc
--- /dev/null
+++ b/recipes/jobrunner-yarn/src/main/resources/job_runner_app_spec.yaml
@@ -0,0 +1,27 @@
+!!org.apache.helix.provisioning.yarn.example.MyTaskAppSpec
+appConfig:
+  config: {
+    k1: v1
+  }
+appMasterPackageUri: 'file:///Users/kbiscuit/helix/incubator-helix/recipes/jobrunner-yarn/target/jobrunner-yarn-0.7.1-incubating-SNAPSHOT-pkg.tar'
+appName: testApp
+serviceConfigMap:
+  JobRunner: {
+    num_containers: 3,
+    memory: 1024
+  }
+serviceMainClassMap: {
+  JobRunner: org.apache.helix.provisioning.yarn.example.MyTaskService
+}
+servicePackageURIMap: {
+  JobRunner: 'file:///Users/kbiscuit/helix/incubator-helix/recipes/jobrunner-yarn/target/jobrunner-yarn-0.7.1-incubating-SNAPSHOT-pkg.tar'
+}
+services: [
+  JobRunner]
+taskConfigs:
+  - config: {
+      yamlFile: '/Users/kbiscuit/helix/incubator-helix/recipes/jobrunner-yarn/src/main/resources/dummy_job.yaml'
+    }
+
+
+

http://git-wip-us.apache.org/repos/asf/helix/blob/785bb9fb/recipes/jobrunner-yarn/src/test/conf/testng.xml
----------------------------------------------------------------------
diff --git a/recipes/jobrunner-yarn/src/test/conf/testng.xml b/recipes/jobrunner-yarn/src/test/conf/testng.xml
new file mode 100644
index 0000000..37bccf3
--- /dev/null
+++ b/recipes/jobrunner-yarn/src/test/conf/testng.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd">
+<suite name="Suite" parallel="none">
+  <test name="Test" preserve-order="false">
+    <packages>
+      <package name="org.apache.helix.agent"/>
+    </packages>
+  </test>
+</suite>

http://git-wip-us.apache.org/repos/asf/helix/blob/785bb9fb/recipes/pom.xml
----------------------------------------------------------------------
diff --git a/recipes/pom.xml b/recipes/pom.xml
index 5d137c2..3fcaf42 100644
--- a/recipes/pom.xml
+++ b/recipes/pom.xml
@@ -37,6 +37,7 @@ under the License.
     <module>task-execution</module>
     <module>service-discovery</module>
     <module>helloworld-provisioning-yarn</module>
+    <module>jobrunner-yarn</module>
   </modules>
 
   <build>


Mime
View raw message