helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject [1/3] Port recent task framework changes
Date Wed, 30 Apr 2014 18:39:33 GMT
Repository: helix
Updated Branches:
  refs/heads/helix-provisioning 8f0b7e4c6 -> 97ca4de4a


http://git-wip-us.apache.org/repos/asf/helix/blob/97ca4de4/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index 902f616..5b27fb6 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -19,45 +19,53 @@ package org.apache.helix.task;
  * under the License.
  */
 
-import com.google.common.base.Joiner;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
 import java.io.Reader;
 import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+
+import org.apache.helix.task.beans.JobBean;
 import org.apache.helix.task.beans.TaskBean;
 import org.apache.helix.task.beans.WorkflowBean;
 import org.yaml.snakeyaml.Yaml;
 import org.yaml.snakeyaml.constructor.Constructor;
 
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+
 /**
- * Houses a task dag and config set to fully describe a task workflow
+ * Houses a job dag and config set to fully describe a job workflow
  */
 public class Workflow {
   /** Default workflow name, useful constant for single-node workflows */
-  public static enum WorkflowEnum {
-    UNSPECIFIED;
-  }
+  public static final String UNSPECIFIED = "UNSPECIFIED";
 
   /** Workflow name */
-  private final String _name;
+  private String _name;
 
   /** Holds workflow-level configurations */
-  private final WorkflowConfig _workflowConfig;
+  private WorkflowConfig _workflowConfig;
+
+  /** Contains the per-job configurations for all jobs specified in the provided dag */
+  private Map<String, Map<String, String>> _jobConfigs;
 
-  /** Contains the per-task configurations for all tasks specified in the provided dag */
-  private final Map<String, Map<String, String>> _taskConfigs;
+  /** Containers the per-job configurations of all individually-specified tasks */
+  private Map<String, List<TaskConfig>> _taskConfigs;
 
   /** Constructs and validates a workflow against a provided dag and config set */
   private Workflow(String name, WorkflowConfig workflowConfig,
-      Map<String, Map<String, String>> taskConfigs) {
+      Map<String, Map<String, String>> jobConfigs, Map<String, List<TaskConfig>> taskConfigs) {
     _name = name;
     _workflowConfig = workflowConfig;
+    _jobConfigs = jobConfigs;
     _taskConfigs = taskConfigs;
-
     validate();
   }
 
@@ -65,13 +73,17 @@ public class Workflow {
     return _name;
   }
 
-  public Map<String, Map<String, String>> getTaskConfigs() {
+  public Map<String, Map<String, String>> getJobConfigs() {
+    return _jobConfigs;
+  }
+
+  public Map<String, List<TaskConfig>> getTaskConfigs() {
     return _taskConfigs;
   }
 
   public Map<String, String> getResourceConfigMap() throws Exception {
     Map<String, String> cfgMap = new HashMap<String, String>();
-    cfgMap.put(WorkflowConfig.DAG, _workflowConfig.getTaskDag().toJson());
+    cfgMap.put(WorkflowConfig.DAG, _workflowConfig.getJobDag().toJson());
     cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(_workflowConfig.getExpiry()));
     cfgMap.put(WorkflowConfig.TARGET_STATE, _workflowConfig.getTargetState().name());
 
@@ -97,19 +109,19 @@ public class Workflow {
    * 
    * <pre>
    * name: MyFlow
-   * tasks:
-   *   - name : TaskA
+   * jobs:
+   *   - name : JobA
    *     command : SomeTask
    *     ...
-   *   - name : TaskB
-   *     parents : [TaskA]
+   *   - name : JobB
+   *     parents : [JobA]
    *     command : SomeOtherTask
    *     ...
-   *   - name : TaskC
+   *   - name : JobC
    *     command : AnotherTask
    *     ...
-   *   - name : TaskD
-   *     parents : [TaskB, TaskC]
+   *   - name : JobD
+   *     parents : [JobB, JobC]
    *     command : AnotherTask
    *     ...
    * </pre>
@@ -126,37 +138,44 @@ public class Workflow {
     WorkflowBean wf = (WorkflowBean) yaml.load(reader);
     Builder builder = new Builder(wf.name);
 
-    for (TaskBean task : wf.tasks) {
-      if (task.name == null) {
-        throw new IllegalArgumentException("A task must have a name.");
+    for (JobBean job : wf.jobs) {
+      if (job.name == null) {
+        throw new IllegalArgumentException("A job must have a name.");
       }
 
-      if (task.parents != null) {
-        for (String parent : task.parents) {
-          builder.addParentChildDependency(parent, task.name);
+      if (job.parents != null) {
+        for (String parent : job.parents) {
+          builder.addParentChildDependency(parent, job.name);
         }
       }
 
-      builder.addConfig(task.name, TaskConfig.WORKFLOW_ID, wf.name);
-      builder.addConfig(task.name, TaskConfig.COMMAND, task.command);
-      if (task.commandConfig != null) {
-        builder.addConfig(task.name, TaskConfig.COMMAND_CONFIG, task.commandConfig.toString());
+      builder.addConfig(job.name, JobConfig.WORKFLOW_ID, wf.name);
+      builder.addConfig(job.name, JobConfig.COMMAND, job.command);
+      if (job.jobConfigMap != null) {
+        builder.addConfig(job.name, JobConfig.JOB_CONFIG_MAP, job.jobConfigMap.toString());
+      }
+      builder.addConfig(job.name, JobConfig.TARGET_RESOURCE, job.targetResource);
+      if (job.targetPartitionStates != null) {
+        builder.addConfig(job.name, JobConfig.TARGET_PARTITION_STATES,
+            Joiner.on(",").join(job.targetPartitionStates));
       }
-      builder.addConfig(task.name, TaskConfig.TARGET_RESOURCE, task.targetResource);
-      if (task.targetPartitionStates != null) {
-        builder.addConfig(task.name, TaskConfig.TARGET_PARTITION_STATES,
-            Joiner.on(",").join(task.targetPartitionStates));
+      if (job.targetPartitions != null) {
+        builder.addConfig(job.name, JobConfig.TARGET_PARTITIONS,
+            Joiner.on(",").join(job.targetPartitions));
       }
-      if (task.targetPartitions != null) {
-        builder.addConfig(task.name, TaskConfig.TARGET_PARTITIONS,
-            Joiner.on(",").join(task.targetPartitions));
+      builder.addConfig(job.name, JobConfig.MAX_ATTEMPTS_PER_TASK,
+          String.valueOf(job.maxAttemptsPerPartition));
+      builder.addConfig(job.name, JobConfig.NUM_CONCURRENT_TASKS_PER_INSTANCE,
+          String.valueOf(job.numConcurrentTasksPerInstance));
+      builder.addConfig(job.name, JobConfig.TIMEOUT_PER_TASK,
+          String.valueOf(job.timeoutPerPartition));
+      if (job.tasks != null) {
+        List<TaskConfig> taskConfigs = Lists.newArrayList();
+        for (TaskBean task : job.tasks) {
+          taskConfigs.add(TaskConfig.from(task));
+        }
+        builder.addTaskConfigs(job.name, taskConfigs);
       }
-      builder.addConfig(task.name, TaskConfig.MAX_ATTEMPTS_PER_PARTITION,
-          String.valueOf(task.maxAttemptsPerPartition));
-      builder.addConfig(task.name, TaskConfig.NUM_CONCURRENT_TASKS_PER_INSTANCE,
-          String.valueOf(task.numConcurrentTasksPerInstance));
-      builder.addConfig(task.name, TaskConfig.TIMEOUT_PER_PARTITION,
-          String.valueOf(task.timeoutPerPartition));
     }
 
     return builder.build();
@@ -168,47 +187,78 @@ public class Workflow {
    */
   public void validate() {
     // validate dag and configs
-    if (!_taskConfigs.keySet().containsAll(_workflowConfig.getTaskDag().getAllNodes())) {
+    if (!_jobConfigs.keySet().containsAll(_workflowConfig.getJobDag().getAllNodes())) {
       throw new IllegalArgumentException("Nodes specified in DAG missing from config");
-    } else if (!_workflowConfig.getTaskDag().getAllNodes().containsAll(_taskConfigs.keySet())) {
+    } else if (!_workflowConfig.getJobDag().getAllNodes().containsAll(_jobConfigs.keySet())) {
       throw new IllegalArgumentException("Given DAG lacks nodes with supplied configs");
     }
 
-    _workflowConfig.getTaskDag().validate();
+    _workflowConfig.getJobDag().validate();
 
-    for (String node : _taskConfigs.keySet()) {
+    for (String node : _jobConfigs.keySet()) {
       buildConfig(node);
     }
   }
 
-  /** Builds a TaskConfig from config map. Useful for validating configs */
-  private TaskConfig buildConfig(String task) {
-    return TaskConfig.Builder.fromMap(_taskConfigs.get(task)).build();
+  /** Builds a JobConfig from config map. Useful for validating configs */
+  private JobConfig buildConfig(String job) {
+    JobConfig.Builder b = JobConfig.Builder.fromMap(_jobConfigs.get(job));
+    if (_taskConfigs != null && _taskConfigs.containsKey(job)) {
+      b.addTaskConfigs(_taskConfigs.get(job));
+    }
+    return b.build();
   }
 
   /** Build a workflow incrementally from dependencies and single configs, validate at build time */
   public static class Builder {
-    private final String _name;
-    private final TaskDag _dag;
-    private final Map<String, Map<String, String>> _taskConfigs;
+    private String _name;
+    private JobDag _dag;
+    private Map<String, Map<String, String>> _jobConfigs;
+    private Map<String, List<TaskConfig>> _taskConfigs;
     private long _expiry;
 
     public Builder(String name) {
       _name = name;
-      _dag = new TaskDag();
-      _taskConfigs = new TreeMap<String, Map<String, String>>();
+      _dag = new JobDag();
+      _jobConfigs = new TreeMap<String, Map<String, String>>();
+      _taskConfigs = new TreeMap<String, List<TaskConfig>>();
       _expiry = -1;
     }
 
     public Builder addConfig(String node, String key, String val) {
       node = namespacify(node);
       _dag.addNode(node);
+      if (!_jobConfigs.containsKey(node)) {
+        _jobConfigs.put(node, new TreeMap<String, String>());
+      }
+      _jobConfigs.get(node).put(key, val);
+      return this;
+    }
 
-      if (!_taskConfigs.containsKey(node)) {
-        _taskConfigs.put(node, new TreeMap<String, String>());
+    public Builder addJobConfigMap(String node, Map<String, String> jobConfigMap) {
+      return addConfig(node, JobConfig.JOB_CONFIG_MAP, TaskUtil.serializeJobConfigMap(jobConfigMap));
+    }
+
+    public Builder addJobConfig(String node, JobConfig jobConfig) {
+      for (Map.Entry<String, String> e : jobConfig.getResourceConfigMap().entrySet()) {
+        String key = e.getKey();
+        String val = e.getValue();
+        addConfig(node, key, val);
       }
-      _taskConfigs.get(node).put(key, val);
+      addTaskConfigs(node, jobConfig.getTaskConfigMap().values());
+      return this;
+    }
 
+    public Builder addTaskConfigs(String node, Collection<TaskConfig> taskConfigs) {
+      node = namespacify(node);
+      _dag.addNode(node);
+      if (!_taskConfigs.containsKey(node)) {
+        _taskConfigs.put(node, new ArrayList<TaskConfig>());
+      }
+      if (!_jobConfigs.containsKey(node)) {
+        _jobConfigs.put(node, new TreeMap<String, String>());
+      }
+      _taskConfigs.get(node).addAll(taskConfigs);
       return this;
     }
 
@@ -226,13 +276,13 @@ public class Workflow {
     }
 
     public String namespacify(String task) {
-      return TaskUtil.getNamespacedTaskName(_name, task);
+      return TaskUtil.getNamespacedJobName(_name, task);
     }
 
     public Workflow build() {
-      for (String task : _taskConfigs.keySet()) {
+      for (String task : _jobConfigs.keySet()) {
         // addConfig(task, TaskConfig.WORKFLOW_ID, _name);
-        _taskConfigs.get(task).put(TaskConfig.WORKFLOW_ID, _name);
+        _jobConfigs.get(task).put(JobConfig.WORKFLOW_ID, _name);
       }
 
       WorkflowConfig.Builder builder = new WorkflowConfig.Builder();
@@ -242,7 +292,8 @@ public class Workflow {
         builder.setExpiry(_expiry);
       }
 
-      return new Workflow(_name, builder.build(), _taskConfigs); // calls validate internally
+      return new Workflow(_name, builder.build(), _jobConfigs, _taskConfigs); // calls validate
+                                                                              // internally
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/97ca4de4/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
index bb88be7..6f10955 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -34,18 +34,18 @@ public class WorkflowConfig {
   public static final long DEFAULT_EXPIRY = 24 * 60 * 60 * 1000;
 
   /* Member variables */
-  private final TaskDag _taskDag;
-  private final TargetState _targetState;
-  private final long _expiry;
+  private JobDag _jobDag;
+  private TargetState _targetState;
+  private long _expiry;
 
-  private WorkflowConfig(TaskDag taskDag, TargetState targetState, long expiry) {
-    _taskDag = taskDag;
+  private WorkflowConfig(JobDag jobDag, TargetState targetState, long expiry) {
+    _jobDag = jobDag;
     _targetState = targetState;
     _expiry = expiry;
   }
 
-  public TaskDag getTaskDag() {
-    return _taskDag;
+  public JobDag getJobDag() {
+    return _jobDag;
   }
 
   public TargetState getTargetState() {
@@ -57,7 +57,7 @@ public class WorkflowConfig {
   }
 
   public static class Builder {
-    private TaskDag _taskDag = TaskDag.EMPTY_DAG;
+    private JobDag _taskDag = JobDag.EMPTY_DAG;
     private TargetState _targetState = TargetState.START;
     private long _expiry = DEFAULT_EXPIRY;
 
@@ -71,7 +71,7 @@ public class WorkflowConfig {
       return new WorkflowConfig(_taskDag, _targetState, _expiry);
     }
 
-    public Builder setTaskDag(TaskDag v) {
+    public Builder setTaskDag(JobDag v) {
       _taskDag = v;
       return this;
     }
@@ -93,7 +93,7 @@ public class WorkflowConfig {
         b.setExpiry(Long.parseLong(cfg.get(EXPIRY)));
       }
       if (cfg.containsKey(DAG)) {
-        b.setTaskDag(TaskDag.fromJson(cfg.get(DAG)));
+        b.setTaskDag(JobDag.fromJson(cfg.get(DAG)));
       }
       if (cfg.containsKey(TARGET_STATE)) {
         b.setTargetState(TargetState.valueOf(cfg.get(TARGET_STATE)));

http://git-wip-us.apache.org/repos/asf/helix/blob/97ca4de4/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
index cd30860..4feda1b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
@@ -19,35 +19,21 @@ package org.apache.helix.task;
  * under the License.
  */
 
-import org.apache.helix.HelixProperty;
-import org.apache.helix.ZNRecord;
-
 import java.util.Map;
 import java.util.TreeMap;
 
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+
 /**
  * Typed interface to the workflow context information stored by {@link TaskRebalancer} in the Helix
  * property store
  */
 public class WorkflowContext extends HelixProperty {
-
-  enum WorkflowContextEnum {
-    WORKFLOW_STATE("STATE"),
-    START_TIME("START_TIME"),
-    FINISH_TIME("FINISH_TIME"),
-    TASK_STATES("TASK_STATES");
-
-    final String _value;
-
-    private WorkflowContextEnum(String value) {
-      _value = value;
-    }
-
-    public String value() {
-      return _value;
-    }
-  }
-
+  public static final String WORKFLOW_STATE = "STATE";
+  public static final String START_TIME = "START_TIME";
+  public static final String FINISH_TIME = "FINISH_TIME";
+  public static final String TASK_STATES = "TASK_STATES";
   public static final int UNFINISHED = -1;
 
   public WorkflowContext(ZNRecord record) {
@@ -55,18 +41,16 @@ public class WorkflowContext extends HelixProperty {
   }
 
   public void setWorkflowState(TaskState s) {
-    if (_record.getSimpleField(WorkflowContextEnum.WORKFLOW_STATE.value()) == null) {
-      _record.setSimpleField(WorkflowContextEnum.WORKFLOW_STATE.value(), s.name());
-    } else if (!_record.getSimpleField(WorkflowContextEnum.WORKFLOW_STATE.value()).equals(
-        TaskState.FAILED.name())
-        && !_record.getSimpleField(WorkflowContextEnum.WORKFLOW_STATE.value()).equals(
-            TaskState.COMPLETED.name())) {
-      _record.setSimpleField(WorkflowContextEnum.WORKFLOW_STATE.value(), s.name());
+    if (_record.getSimpleField(WORKFLOW_STATE) == null) {
+      _record.setSimpleField(WORKFLOW_STATE, s.name());
+    } else if (!_record.getSimpleField(WORKFLOW_STATE).equals(TaskState.FAILED.name())
+        && !_record.getSimpleField(WORKFLOW_STATE).equals(TaskState.COMPLETED.name())) {
+      _record.setSimpleField(WORKFLOW_STATE, s.name());
     }
   }
 
   public TaskState getWorkflowState() {
-    String s = _record.getSimpleField(WorkflowContextEnum.WORKFLOW_STATE.value());
+    String s = _record.getSimpleField(WORKFLOW_STATE);
     if (s == null) {
       return null;
     }
@@ -74,22 +58,22 @@ public class WorkflowContext extends HelixProperty {
     return TaskState.valueOf(s);
   }
 
-  public void setTaskState(String taskResource, TaskState s) {
-    Map<String, String> states = _record.getMapField(WorkflowContextEnum.TASK_STATES.value());
+  public void setJobState(String jobResource, TaskState s) {
+    Map<String, String> states = _record.getMapField(TASK_STATES);
     if (states == null) {
       states = new TreeMap<String, String>();
-      _record.setMapField(WorkflowContextEnum.TASK_STATES.value(), states);
+      _record.setMapField(TASK_STATES, states);
     }
-    states.put(taskResource, s.name());
+    states.put(jobResource, s.name());
   }
 
-  public TaskState getTaskState(String taskResource) {
-    Map<String, String> states = _record.getMapField(WorkflowContextEnum.TASK_STATES.value());
+  public TaskState getJobState(String jobResource) {
+    Map<String, String> states = _record.getMapField(TASK_STATES);
     if (states == null) {
       return null;
     }
 
-    String s = states.get(taskResource);
+    String s = states.get(jobResource);
     if (s == null) {
       return null;
     }
@@ -98,11 +82,11 @@ public class WorkflowContext extends HelixProperty {
   }
 
   public void setStartTime(long t) {
-    _record.setSimpleField(WorkflowContextEnum.START_TIME.value(), String.valueOf(t));
+    _record.setSimpleField(START_TIME, String.valueOf(t));
   }
 
   public long getStartTime() {
-    String tStr = _record.getSimpleField(WorkflowContextEnum.START_TIME.value());
+    String tStr = _record.getSimpleField(START_TIME);
     if (tStr == null) {
       return -1;
     }
@@ -111,11 +95,11 @@ public class WorkflowContext extends HelixProperty {
   }
 
   public void setFinishTime(long t) {
-    _record.setSimpleField(WorkflowContextEnum.FINISH_TIME.value(), String.valueOf(t));
+    _record.setSimpleField(FINISH_TIME, String.valueOf(t));
   }
 
   public long getFinishTime() {
-    String tStr = _record.getSimpleField(WorkflowContextEnum.FINISH_TIME.value());
+    String tStr = _record.getSimpleField(FINISH_TIME);
     if (tStr == null) {
       return UNFINISHED;
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/97ca4de4/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
new file mode 100644
index 0000000..5e12f19
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
@@ -0,0 +1,42 @@
+package org.apache.helix.task.beans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.task.JobConfig;
+
+/**
+ * Bean class used for parsing job definitions from YAML.
+ */
+public class JobBean {
+  public String name;
+  public List<String> parents;
+  public String targetResource;
+  public List<String> targetPartitionStates;
+  public List<String> targetPartitions;
+  public String command;
+  public Map<String, String> jobConfigMap;
+  public List<TaskBean> tasks;
+  public long timeoutPerPartition = JobConfig.DEFAULT_TIMEOUT_PER_TASK;
+  public int numConcurrentTasksPerInstance = JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
+  public int maxAttemptsPerPartition = JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK;
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/97ca4de4/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java
index 9481c6e..eedccb5 100644
--- a/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/TaskBean.java
@@ -19,22 +19,14 @@ package org.apache.helix.task.beans;
  * under the License.
  */
 
-import java.util.List;
 import java.util.Map;
-import org.apache.helix.task.TaskConfig;
 
 /**
- * Bean class used for parsing task definitions from YAML.
+ * Describes task-specific configuration, including an arbitrary map of
+ * key-value pairs to pass to the task
  */
+
 public class TaskBean {
-  public String name;
-  public List<String> parents;
-  public String targetResource;
-  public List<String> targetPartitionStates;
-  public List<Integer> targetPartitions;
   public String command;
-  public Map<String, Object> commandConfig;
-  public long timeoutPerPartition = TaskConfig.DEFAULT_TIMEOUT_PER_PARTITION;
-  public int numConcurrentTasksPerInstance = TaskConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
-  public int maxAttemptsPerPartition = TaskConfig.DEFAULT_MAX_ATTEMPTS_PER_PARTITION;
+  public Map<String, String> taskConfigMap;
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/97ca4de4/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
index 4e64692..76da4c8 100644
--- a/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
@@ -27,5 +27,5 @@ import java.util.List;
 public class WorkflowBean {
   public String name;
   public String expiry;
-  public List<TaskBean> tasks;
+  public List<JobBean> jobs;
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/97ca4de4/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
new file mode 100644
index 0000000..1ee3991
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -0,0 +1,171 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.TestTaskRebalancerStopResume.ReindexTask;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.tools.ClusterSetup;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.testng.collections.Sets;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
+  private static final int n = 5;
+  private static final int START_PORT = 12918;
+  private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
+  private final MockParticipantManager[] _participants = new MockParticipantManager[n];
+  private ClusterControllerManager _controller;
+  private Set<String> _invokedClasses = Sets.newHashSet();
+
+  private HelixManager _manager;
+  private TaskDriver _driver;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
+    }
+
+    // Setup cluster and instances
+    ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+    setupTool.addCluster(CLUSTER_NAME, true);
+    for (int i = 0; i < n; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    }
+
+    // Set task callbacks
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
+    taskFactoryReg.put("TaskOne", new TaskFactory() {
+      @Override
+      public Task createNewTask(TaskCallbackContext context) {
+        return new TaskOne(context);
+      }
+    });
+    taskFactoryReg.put("TaskTwo", new TaskFactory() {
+      @Override
+      public Task createNewTask(TaskCallbackContext context) {
+        return new TaskTwo(context);
+      }
+    });
+
+    // start dummy participants
+    for (int i = 0; i < n; i++) {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+
+      // Register a Task state model factory.
+      StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
+      stateMachine.registerStateModelFactory(StateModelDefId.from("Task"),
+          new TaskStateModelFactory(_participants[i], taskFactoryReg));
+      _participants[i].syncStart();
+    }
+
+    // Start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    // Start an admin connection
+    _manager =
+        HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR,
+            ZK_ADDR);
+    _manager.connect();
+    _driver = new TaskDriver(_manager);
+  }
+
+  @BeforeMethod
+  public void beforeMethod() {
+    _invokedClasses.clear();
+  }
+
+  @Test
+  public void testDifferentTasks() throws Exception {
+    // Create a job with two different tasks
+    String jobName = TestHelper.getTestMethodName();
+    Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
+    List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(2);
+    TaskConfig taskConfig1 = new TaskConfig("TaskOne", null);
+    TaskConfig taskConfig2 = new TaskConfig("TaskTwo", null);
+    taskConfigs.add(taskConfig1);
+    taskConfigs.add(taskConfig2);
+    workflowBuilder.addTaskConfigs(jobName, taskConfigs);
+    workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
+    Map<String, String> jobConfigMap = Maps.newHashMap();
+    jobConfigMap.put("Timeout", "1000");
+    workflowBuilder.addJobConfigMap(jobName, jobConfigMap);
+    _driver.start(workflowBuilder.build());
+
+    // Ensure the job completes
+    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
+    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
+
+    // Ensure that each class was invoked
+    Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
+    Assert.assertTrue(_invokedClasses.contains(TaskTwo.class.getName()));
+  }
+
+  private class TaskOne extends ReindexTask {
+    public TaskOne(TaskCallbackContext context) {
+      super(context);
+    }
+
+    @Override
+    public TaskResult run() {
+      _invokedClasses.add(getClass().getName());
+      return super.run();
+    }
+  }
+
+  private class TaskTwo extends TaskOne {
+    public TaskTwo(TaskCallbackContext context) {
+      super(context);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/97ca4de4/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
index 1c83291..0a59ee1 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
@@ -19,9 +19,9 @@ package org.apache.helix.integration.task;
  * under the License.
  */
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.TreeMap;
 
 import org.apache.helix.AccessOption;
 import org.apache.helix.HelixDataAccessor;
@@ -34,10 +34,11 @@ import org.apache.helix.integration.ZkIntegrationTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
 import org.apache.helix.task.Task;
-import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskConstants;
-import org.apache.helix.task.TaskContext;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskFactory;
 import org.apache.helix.task.TaskPartitionState;
@@ -56,11 +57,13 @@ import org.testng.annotations.Test;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 
 public class TestTaskRebalancer extends ZkIntegrationTestBase {
   private static final int n = 5;
   private static final int START_PORT = 12918;
   private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
+  private static final String TIMEOUT_CONFIG = "Timeout";
   private static final int NUM_PARTITIONS = 20;
   private static final int NUM_REPLICAS = 3;
   private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
@@ -92,8 +95,8 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
     taskFactoryReg.put("Reindex", new TaskFactory() {
       @Override
-      public Task createNewTask(String config) {
-        return new ReindexTask(config);
+      public Task createNewTask(TaskCallbackContext context) {
+        return new ReindexTask(context);
       }
     });
 
@@ -152,29 +155,30 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
 
   @Test
   public void testExpiry() throws Exception {
-    String taskName = "Expiry";
+    String jobName = "Expiry";
     long expiry = 1000;
+    Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(100));
     Workflow flow =
         WorkflowGenerator
-            .generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(taskName,
-                TaskConfig.COMMAND_CONFIG, String.valueOf(100)).setExpiry(expiry).build();
+            .generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(jobName, commandConfig)
+            .setExpiry(expiry).build();
 
     _driver.start(flow);
-    TestUtil.pollForWorkflowState(_manager, taskName, TaskState.IN_PROGRESS);
+    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
 
     // Running workflow should have config and context viewable through accessor
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-    PropertyKey workflowCfgKey = accessor.keyBuilder().resourceConfig(taskName);
+    PropertyKey workflowCfgKey = accessor.keyBuilder().resourceConfig(jobName);
     String workflowPropStoreKey =
-        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, taskName);
+        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobName);
 
     // Ensure context and config exist
     Assert.assertTrue(_manager.getHelixPropertyStore().exists(workflowPropStoreKey,
         AccessOption.PERSISTENT));
     Assert.assertNotSame(accessor.getProperty(workflowCfgKey), null);
 
-    // Wait for task to finish and expire
-    TestUtil.pollForWorkflowState(_manager, taskName, TaskState.COMPLETED);
+    // Wait for job to finish and expire
+    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
     Thread.sleep(expiry);
     _driver.invokeRebalance();
     Thread.sleep(expiry);
@@ -185,25 +189,26 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     Assert.assertEquals(accessor.getProperty(workflowCfgKey), null);
   }
 
-  private void basic(long taskCompletionTime) throws Exception {
+  private void basic(long jobCompletionTime) throws Exception {
     // We use a different resource name in each test method as a work around for a helix participant
     // bug where it does
     // not clear locally cached state when a resource partition is dropped. Once that is fixed we
     // should change these
     // tests to use the same resource name and implement a beforeMethod that deletes the task
     // resource.
-    final String taskResource = "basic" + taskCompletionTime;
+    final String jobResource = "basic" + jobCompletionTime;
+    Map<String, String> commandConfig =
+        ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(jobCompletionTime));
     Workflow flow =
-        WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(taskResource,
-            TaskConfig.COMMAND_CONFIG, String.valueOf(taskCompletionTime)).build();
+        WorkflowGenerator.generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(jobResource,
+            commandConfig).build();
     _driver.start(flow);
 
-    // Wait for task completion
-    TestUtil.pollForWorkflowState(_manager, taskResource, TaskState.COMPLETED);
+    // Wait for job completion
+    TestUtil.pollForWorkflowState(_manager, jobResource, TaskState.COMPLETED);
 
     // Ensure all partitions are completed individually
-    TaskContext ctx =
-        TaskUtil.getTaskContext(_manager, TaskUtil.getNamespacedTaskName(taskResource));
+    JobContext ctx = TaskUtil.getJobContext(_manager, TaskUtil.getNamespacedJobName(jobResource));
     for (int i = 0; i < NUM_PARTITIONS; i++) {
       Assert.assertEquals(ctx.getPartitionState(i), TaskPartitionState.COMPLETED);
       Assert.assertEquals(ctx.getPartitionNumAttempts(i), 1);
@@ -212,29 +217,31 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
 
   @Test
   public void partitionSet() throws Exception {
-    final String taskResource = "partitionSet";
-    ImmutableList<Integer> targetPartitions = ImmutableList.of(1, 2, 3, 5, 8, 13);
+    final String jobResource = "partitionSet";
+    ImmutableList<String> targetPartitions =
+        ImmutableList.of("TestDB_1", "TestDB_2", "TestDB_3", "TestDB_5", "TestDB_8", "TestDB_13");
 
     // construct and submit our basic workflow
+    Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(100));
     Workflow flow =
-        WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(taskResource,
-            TaskConfig.COMMAND_CONFIG, String.valueOf(100), TaskConfig.MAX_ATTEMPTS_PER_PARTITION,
-            String.valueOf(1), TaskConfig.TARGET_PARTITIONS, Joiner.on(",").join(targetPartitions))
-            .build();
+        WorkflowGenerator.generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(jobResource,
+            commandConfig, JobConfig.MAX_ATTEMPTS_PER_TASK, String.valueOf(1),
+            JobConfig.TARGET_PARTITIONS, Joiner.on(",").join(targetPartitions)).build();
     _driver.start(flow);
 
-    // wait for task completeness/timeout
-    TestUtil.pollForWorkflowState(_manager, taskResource, TaskState.COMPLETED);
+    // wait for job completeness/timeout
+    TestUtil.pollForWorkflowState(_manager, jobResource, TaskState.COMPLETED);
 
     // see if resulting context completed successfully for our partition set
-    String namespacedName = TaskUtil.getNamespacedTaskName(taskResource);
+    String namespacedName = TaskUtil.getNamespacedJobName(jobResource);
 
-    TaskContext ctx = TaskUtil.getTaskContext(_manager, namespacedName);
-    WorkflowContext workflowContext = TaskUtil.getWorkflowContext(_manager, taskResource);
+    JobContext ctx = TaskUtil.getJobContext(_manager, namespacedName);
+    WorkflowContext workflowContext = TaskUtil.getWorkflowContext(_manager, jobResource);
     Assert.assertNotNull(ctx);
     Assert.assertNotNull(workflowContext);
-    Assert.assertEquals(workflowContext.getTaskState(namespacedName), TaskState.COMPLETED);
-    for (int i : targetPartitions) {
+    Assert.assertEquals(workflowContext.getJobState(namespacedName), TaskState.COMPLETED);
+    for (String pName : targetPartitions) {
+      int i = ctx.getPartitionsByTarget().get(pName).get(0);
       Assert.assertEquals(ctx.getPartitionState(i), TaskPartitionState.COMPLETED);
       Assert.assertEquals(ctx.getPartitionNumAttempts(i), 1);
     }
@@ -244,33 +251,32 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
   public void testRepeatedWorkflow() throws Exception {
     String workflowName = "SomeWorkflow";
     Workflow flow =
-        WorkflowGenerator.generateDefaultRepeatedTaskWorkflowBuilder(workflowName).build();
+        WorkflowGenerator.generateDefaultRepeatedJobWorkflowBuilder(workflowName).build();
     new TaskDriver(_manager).start(flow);
 
-    // Wait until the task completes
+    // Wait until the workflow completes
     TestUtil.pollForWorkflowState(_manager, workflowName, TaskState.COMPLETED);
 
     // Assert completion for all tasks within two minutes
-    for (String task : flow.getTaskConfigs().keySet()) {
-      TestUtil.pollForTaskState(_manager, workflowName, task, TaskState.COMPLETED);
+    for (String task : flow.getJobConfigs().keySet()) {
+      TestUtil.pollForJobState(_manager, workflowName, task, TaskState.COMPLETED);
     }
   }
 
   @Test
   public void timeouts() throws Exception {
-    final String taskResource = "timeouts";
+    final String jobResource = "timeouts";
     Workflow flow =
-        WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(taskResource,
-            TaskConfig.MAX_ATTEMPTS_PER_PARTITION, String.valueOf(2),
-            TaskConfig.TIMEOUT_PER_PARTITION, String.valueOf(100)).build();
+        WorkflowGenerator.generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(jobResource,
+            WorkflowGenerator.DEFAULT_COMMAND_CONFIG, JobConfig.MAX_ATTEMPTS_PER_TASK,
+            String.valueOf(2), JobConfig.TIMEOUT_PER_TASK, String.valueOf(100)).build();
     _driver.start(flow);
 
-    // Wait until the task reports failure.
-    TestUtil.pollForWorkflowState(_manager, taskResource, TaskState.FAILED);
+    // Wait until the job reports failure.
+    TestUtil.pollForWorkflowState(_manager, jobResource, TaskState.FAILED);
 
     // Check that all partitions timed out up to maxAttempts
-    TaskContext ctx =
-        TaskUtil.getTaskContext(_manager, TaskUtil.getNamespacedTaskName(taskResource));
+    JobContext ctx = TaskUtil.getJobContext(_manager, TaskUtil.getNamespacedJobName(jobResource));
     int maxAttempts = 0;
     for (int i = 0; i < NUM_PARTITIONS; i++) {
       TaskPartitionState state = ctx.getPartitionState(i);
@@ -282,57 +288,17 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     Assert.assertEquals(maxAttempts, 2);
   }
 
-  @Test
-  public void testIndependentTask() throws Exception {
-    final String taskResource = "independentTask";
-    Map<String, String> config = new TreeMap<String, String>();
-    config.put("TargetPartitions", "0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19");
-    config.put("Command", "Reindex");
-    config.put("CommandConfig", String.valueOf(200));
-    config.put("TimeoutPerPartition", String.valueOf(10 * 1000));
-    Workflow flow =
-        WorkflowGenerator.generateSingleTaskWorkflowBuilder(taskResource, config).build();
-    _driver.start(flow);
-
-    // Wait for task completion
-    TestUtil.pollForWorkflowState(_manager, taskResource, TaskState.COMPLETED);
-
-    // Ensure all partitions are completed individually
-    TaskContext ctx =
-        TaskUtil.getTaskContext(_manager, TaskUtil.getNamespacedTaskName(taskResource));
-    for (int i = 0; i < NUM_PARTITIONS; i++) {
-      Assert.assertEquals(ctx.getPartitionState(i), TaskPartitionState.COMPLETED);
-      Assert.assertEquals(ctx.getPartitionNumAttempts(i), 1);
-    }
-  }
-
-  @Test
-  public void testIndependentRepeatedWorkflow() throws Exception {
-    final String workflowName = "independentTaskWorkflow";
-    Map<String, String> config = new TreeMap<String, String>();
-    config.put("TargetPartitions", "0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19");
-    config.put("Command", "Reindex");
-    config.put("CommandConfig", String.valueOf(200));
-    config.put("TimeoutPerPartition", String.valueOf(10 * 1000));
-    Workflow flow =
-        WorkflowGenerator.generateRepeatedTaskWorkflowBuilder(workflowName, config).build();
-    new TaskDriver(_manager).start(flow);
-
-    // Wait until the task completes
-    TestUtil.pollForWorkflowState(_manager, workflowName, TaskState.COMPLETED);
-
-    // Assert completion for all tasks within two minutes
-    for (String task : flow.getTaskConfigs().keySet()) {
-      TestUtil.pollForTaskState(_manager, workflowName, task, TaskState.COMPLETED);
-    }
-  }
-
   private static class ReindexTask implements Task {
     private final long _delay;
     private volatile boolean _canceled;
 
-    public ReindexTask(String cfg) {
-      _delay = Long.parseLong(cfg);
+    public ReindexTask(TaskCallbackContext context) {
+      JobConfig jobCfg = context.getJobConfig();
+      Map<String, String> cfg = jobCfg.getJobConfigMap();
+      if (cfg == null) {
+        cfg = Collections.emptyMap();
+      }
+      _delay = cfg.containsKey(TIMEOUT_CONFIG) ? Long.parseLong(cfg.get(TIMEOUT_CONFIG)) : 200L;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/97ca4de4/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
index bb490ea..e555468 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
@@ -19,6 +19,7 @@ package org.apache.helix.integration.task;
  * under the License.
  */
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -30,8 +31,9 @@ import org.apache.helix.integration.ZkIntegrationTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.Task;
-import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskFactory;
 import org.apache.helix.task.TaskResult;
@@ -46,13 +48,16 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.ImmutableMap;
+
 public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
   private static final Logger LOG = Logger.getLogger(TestTaskRebalancerStopResume.class);
   private static final int n = 5;
   private static final int START_PORT = 12918;
   private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
+  private static final String TIMEOUT_CONFIG = "Timeout";
   private static final String TGT_DB = "TestDB";
-  private static final String TASK_RESOURCE = "SomeTask";
+  private static final String JOB_RESOURCE = "SomeJob";
   private static final int NUM_PARTITIONS = 20;
   private static final int NUM_REPLICAS = 3;
   private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
@@ -83,8 +88,8 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
     Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
     taskFactoryReg.put("Reindex", new TaskFactory() {
       @Override
-      public Task createNewTask(String config) {
-        return new ReindexTask(config);
+      public Task createNewTask(TaskCallbackContext context) {
+        return new ReindexTask(context);
       }
     });
 
@@ -137,27 +142,28 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
 
   @Test
   public void stopAndResume() throws Exception {
+    Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(100));
     Workflow flow =
-        WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(TASK_RESOURCE,
-            TaskConfig.COMMAND_CONFIG, String.valueOf(100)).build();
+        WorkflowGenerator.generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(JOB_RESOURCE,
+            commandConfig).build();
 
     LOG.info("Starting flow " + flow.getName());
     _driver.start(flow);
-    TestUtil.pollForWorkflowState(_manager, TASK_RESOURCE, TaskState.IN_PROGRESS);
+    TestUtil.pollForWorkflowState(_manager, JOB_RESOURCE, TaskState.IN_PROGRESS);
 
-    LOG.info("Pausing task");
-    _driver.stop(TASK_RESOURCE);
-    TestUtil.pollForWorkflowState(_manager, TASK_RESOURCE, TaskState.STOPPED);
+    LOG.info("Pausing job");
+    _driver.stop(JOB_RESOURCE);
+    TestUtil.pollForWorkflowState(_manager, JOB_RESOURCE, TaskState.STOPPED);
 
-    LOG.info("Resuming task");
-    _driver.resume(TASK_RESOURCE);
-    TestUtil.pollForWorkflowState(_manager, TASK_RESOURCE, TaskState.COMPLETED);
+    LOG.info("Resuming job");
+    _driver.resume(JOB_RESOURCE);
+    TestUtil.pollForWorkflowState(_manager, JOB_RESOURCE, TaskState.COMPLETED);
   }
 
   @Test
   public void stopAndResumeWorkflow() throws Exception {
     String workflow = "SomeWorkflow";
-    Workflow flow = WorkflowGenerator.generateDefaultRepeatedTaskWorkflowBuilder(workflow).build();
+    Workflow flow = WorkflowGenerator.generateDefaultRepeatedJobWorkflowBuilder(workflow).build();
 
     LOG.info("Starting flow " + workflow);
     _driver.start(flow);
@@ -176,8 +182,13 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
     private final long _delay;
     private volatile boolean _canceled;
 
-    public ReindexTask(String cfg) {
-      _delay = Long.parseLong(cfg);
+    public ReindexTask(TaskCallbackContext context) {
+      JobConfig jobCfg = context.getJobConfig();
+      Map<String, String> cfg = jobCfg.getJobConfigMap();
+      if (cfg == null) {
+        cfg = Collections.emptyMap();
+      }
+      _delay = cfg.containsKey(TIMEOUT_CONFIG) ? Long.parseLong(cfg.get(TIMEOUT_CONFIG)) : 200L;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/97ca4de4/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
index 2cc6cb8..520d7c0 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
@@ -20,18 +20,17 @@ package org.apache.helix.integration.task;
  */
 
 import org.apache.helix.HelixManager;
-import org.apache.helix.task.*;
-import org.apache.log4j.Logger;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.WorkflowContext;
 import org.testng.Assert;
 
 /**
  * Static test utility methods.
  */
 public class TestUtil {
-  private static final Logger LOG = Logger.getLogger(TestUtil.class);
-
   /**
-   * Polls {@link org.apache.helix.task.TaskContext} for given task resource until a timeout is
+   * Polls {@link org.apache.helix.task.JobContext} for given task resource until a timeout is
    * reached.
    * If the task has not reached target state by then, an error is thrown
    * @param workflowResource Resource to poll for completeness
@@ -52,15 +51,15 @@ public class TestUtil {
     Assert.assertEquals(ctx.getWorkflowState(), state);
   }
 
-  public static void pollForTaskState(HelixManager manager, String workflowResource,
-      String taskName, TaskState state) throws InterruptedException {
+  public static void pollForJobState(HelixManager manager, String workflowResource,
+      String jobName, TaskState state) throws InterruptedException {
     // Wait for completion.
     long st = System.currentTimeMillis();
     WorkflowContext ctx;
     do {
       Thread.sleep(100);
       ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
-    } while ((ctx == null || ctx.getTaskState(taskName) == null || ctx.getTaskState(taskName) != state)
+    } while ((ctx == null || ctx.getJobState(jobName) == null || ctx.getJobState(jobName) != state)
         && System.currentTimeMillis() < st + 2 * 60 * 1000 /* 2 mins */);
 
     Assert.assertNotNull(ctx);

http://git-wip-us.apache.org/repos/asf/helix/blob/97ca4de4/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
index 653d88a..921a5f9 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
@@ -19,72 +19,95 @@ package org.apache.helix.integration.task;
  * under the License.
  */
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
 import java.util.TreeMap;
 
+import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.Workflow;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
 
 /**
  * Convenience class for generating various test workflows
  */
 public class WorkflowGenerator {
+  private static final Logger LOG = Logger.getLogger(WorkflowGenerator.class);
+
   public static final String DEFAULT_TGT_DB = "TestDB";
-  private static final String TASK_NAME_1 = "SomeTask1";
-  private static final String TASK_NAME_2 = "SomeTask2";
+  public static final String JOB_NAME_1 = "SomeJob1";
+  public static final String JOB_NAME_2 = "SomeJob2";
 
-  private static final Map<String, String> DEFAULT_TASK_CONFIG;
+  public static final Map<String, String> DEFAULT_JOB_CONFIG;
   static {
     Map<String, String> tmpMap = new TreeMap<String, String>();
     tmpMap.put("TargetResource", DEFAULT_TGT_DB);
     tmpMap.put("TargetPartitionStates", "MASTER");
     tmpMap.put("Command", "Reindex");
-    tmpMap.put("CommandConfig", String.valueOf(2000));
     tmpMap.put("TimeoutPerPartition", String.valueOf(10 * 1000));
-    DEFAULT_TASK_CONFIG = Collections.unmodifiableMap(tmpMap);
+    DEFAULT_JOB_CONFIG = Collections.unmodifiableMap(tmpMap);
+  }
+
+  public static final Map<String, String> DEFAULT_COMMAND_CONFIG;
+  static {
+    Map<String, String> tmpMap = new TreeMap<String, String>();
+    tmpMap.put("Timeout", String.valueOf(2000));
+    DEFAULT_COMMAND_CONFIG = Collections.unmodifiableMap(tmpMap);
   }
 
-  public static Workflow.Builder generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(
-      String taskName, String... cfgs) {
+  public static Workflow.Builder generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(
+      String jobName, Map<String, String> commandConfig, String... cfgs) {
     if (cfgs.length % 2 != 0) {
       throw new IllegalArgumentException(
           "Additional configs should have even number of keys and values");
     }
-    Workflow.Builder bldr = generateDefaultSingleTaskWorkflowBuilder(taskName);
+    Workflow.Builder bldr = generateDefaultSingleJobWorkflowBuilder(jobName);
     for (int i = 0; i < cfgs.length; i += 2) {
-      bldr.addConfig(taskName, cfgs[i], cfgs[i + 1]);
+      bldr.addConfig(jobName, cfgs[i], cfgs[i + 1]);
     }
 
     return bldr;
   }
 
-  public static Workflow.Builder generateDefaultSingleTaskWorkflowBuilder(String taskName) {
-    return generateSingleTaskWorkflowBuilder(taskName, DEFAULT_TASK_CONFIG);
+  public static Workflow.Builder generateDefaultSingleJobWorkflowBuilder(String jobName) {
+    return generateSingleJobWorkflowBuilder(jobName, DEFAULT_COMMAND_CONFIG, DEFAULT_JOB_CONFIG);
   }
 
-  public static Workflow.Builder generateSingleTaskWorkflowBuilder(String taskName,
-      Map<String, String> config) {
-    Workflow.Builder builder = new Workflow.Builder(taskName);
+  public static Workflow.Builder generateSingleJobWorkflowBuilder(String jobName,
+      Map<String, String> commandConfig, Map<String, String> config) {
+    Workflow.Builder builder = new Workflow.Builder(jobName);
     for (String key : config.keySet()) {
-      builder.addConfig(taskName, key, config.get(key));
+      builder.addConfig(jobName, key, config.get(key));
+    }
+    if (commandConfig != null) {
+      ObjectMapper mapper = new ObjectMapper();
+      try {
+        String serializedMap = mapper.writeValueAsString(commandConfig);
+        builder.addConfig(jobName, JobConfig.JOB_CONFIG_MAP, serializedMap);
+      } catch (IOException e) {
+        LOG.error("Error serializing " + commandConfig, e);
+      }
     }
     return builder;
   }
 
-  public static Workflow.Builder generateDefaultRepeatedTaskWorkflowBuilder(String workflowName) {
-    return generateRepeatedTaskWorkflowBuilder(workflowName, DEFAULT_TASK_CONFIG);
-  }
-
-  public static Workflow.Builder generateRepeatedTaskWorkflowBuilder(String workflowName,
-      Map<String, String> config) {
+  public static Workflow.Builder generateDefaultRepeatedJobWorkflowBuilder(String workflowName) {
     Workflow.Builder builder = new Workflow.Builder(workflowName);
-    builder.addParentChildDependency(TASK_NAME_1, TASK_NAME_2);
+    builder.addParentChildDependency(JOB_NAME_1, JOB_NAME_2);
 
-    for (String key : config.keySet()) {
-      builder.addConfig(TASK_NAME_1, key, config.get(key));
-      builder.addConfig(TASK_NAME_2, key, config.get(key));
+    for (String key : DEFAULT_JOB_CONFIG.keySet()) {
+      builder.addConfig(JOB_NAME_1, key, DEFAULT_JOB_CONFIG.get(key));
+      builder.addConfig(JOB_NAME_2, key, DEFAULT_JOB_CONFIG.get(key));
+    }
+    ObjectMapper mapper = new ObjectMapper();
+    try {
+      String serializedMap = mapper.writeValueAsString(DEFAULT_COMMAND_CONFIG);
+      builder.addConfig(JOB_NAME_1, JobConfig.JOB_CONFIG_MAP, serializedMap);
+      builder.addConfig(JOB_NAME_2, JobConfig.JOB_CONFIG_MAP, serializedMap);
+    } catch (IOException e) {
+      LOG.error("Error serializing " + DEFAULT_COMMAND_CONFIG, e);
     }
-
     return builder;
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/97ca4de4/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/TaskManager.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/TaskManager.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/TaskManager.java
deleted file mode 100644
index 437880e..0000000
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/TaskManager.java
+++ /dev/null
@@ -1,247 +0,0 @@
-package org.apache.helix.provisioning.tools;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-import org.I0Itec.zkclient.DataUpdater;
-import org.apache.helix.AccessOption;
-import org.apache.helix.ClusterMessagingService;
-import org.apache.helix.HelixConnection;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixRole;
-import org.apache.helix.InstanceType;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.api.State;
-import org.apache.helix.api.id.ClusterId;
-import org.apache.helix.api.id.Id;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.id.SessionId;
-import org.apache.helix.manager.zk.HelixConnectionAdaptor;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.IdealState.IdealStateProperty;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.ResourceConfiguration;
-import org.apache.helix.task.TaskConfig;
-import org.apache.helix.task.TaskDriver;
-import org.apache.helix.task.TaskPartitionState;
-import org.apache.helix.task.TaskUtil;
-import org.apache.helix.task.Workflow;
-import org.apache.log4j.Logger;
-
-import com.google.common.collect.Maps;
-
-public class TaskManager {
-  private static final Logger LOG = Logger.getLogger(TaskManager.class);
-
-  private final ClusterId _clusterId;
-  private final HelixConnection _connection;
-  private final HelixManager _manager;
-  private final TaskDriver _driver;
-
-  public TaskManager(final ClusterId clusterId, final HelixConnection connection) {
-    HelixRole dummyRole = new HelixRole() {
-      @Override
-      public HelixConnection getConnection() {
-        return connection;
-      }
-
-      @Override
-      public ClusterId getClusterId() {
-        return clusterId;
-      }
-
-      @Override
-      public Id getId() {
-        return clusterId;
-      }
-
-      @Override
-      public InstanceType getType() {
-        return InstanceType.ADMINISTRATOR;
-      }
-
-      @Override
-      public ClusterMessagingService getMessagingService() {
-        return null;
-      }
-    };
-    _manager = new HelixConnectionAdaptor(dummyRole);
-    _driver = new TaskDriver(_manager);
-    _clusterId = clusterId;
-    _connection = connection;
-  }
-
-  public boolean createTaskQueue(String queueName, boolean isParallel) {
-    Workflow.Builder builder = new Workflow.Builder(queueName);
-    builder.addConfig(queueName, TaskConfig.COMMAND, queueName);
-    builder.addConfig(queueName, TaskConfig.TARGET_PARTITIONS, "");
-    builder.addConfig(queueName, TaskConfig.COMMAND_CONFIG, "");
-    builder.addConfig(queueName, TaskConfig.LONG_LIVED + "", String.valueOf(true));
-    if (isParallel) {
-      builder.addConfig(queueName, TaskConfig.NUM_CONCURRENT_TASKS_PER_INSTANCE,
-          String.valueOf(Integer.MAX_VALUE));
-    }
-    Workflow workflow = builder.build();
-    try {
-      _driver.start(workflow);
-    } catch (Exception e) {
-      LOG.error("Failed to start queue " + queueName, e);
-      return false;
-    }
-    return true;
-  }
-
-  public void addTaskToQueue(final String taskName, final String queueName) {
-    // Update the resource config with the new partition count
-    HelixDataAccessor accessor = _connection.createDataAccessor(_clusterId);
-    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-    final ResourceId resourceId = resourceId(queueName);
-    final int[] numPartitions = {
-      0
-    };
-    DataUpdater<ZNRecord> dataUpdater = new DataUpdater<ZNRecord>() {
-      @Override
-      public ZNRecord update(ZNRecord currentData) {
-        // Update the partition integers to add one to the end, and have that integer map to the
-        // task name
-        String current = currentData.getSimpleField(TaskConfig.TARGET_PARTITIONS);
-        int currentId = 0;
-        if (current == null || current.isEmpty()) {
-          currentData.setSimpleField(TaskConfig.TARGET_PARTITIONS, String.valueOf(currentId));
-        } else {
-          String[] parts = current.split(",");
-          currentId = parts.length;
-          numPartitions[0] = currentId + 1;
-          currentData.setSimpleField(TaskConfig.TARGET_PARTITIONS, current + "," + currentId);
-        }
-        Map<String, String> partitionMap = currentData.getMapField(TaskConfig.TASK_NAME_MAP);
-        if (partitionMap == null) {
-          partitionMap = Maps.newHashMap();
-          currentData.setMapField(TaskConfig.TASK_NAME_MAP, partitionMap);
-        }
-        partitionMap.put(resourceId.toString() + '_' + currentId, taskName);
-        return currentData;
-      }
-    };
-    String configPath = keyBuilder.resourceConfig(resourceId.toString()).getPath();
-    List<DataUpdater<ZNRecord>> dataUpdaters = new ArrayList<DataUpdater<ZNRecord>>();
-    dataUpdaters.add(dataUpdater);
-    accessor.updateChildren(Arrays.asList(configPath), dataUpdaters, AccessOption.PERSISTENT);
-
-    // Update the ideal state with the proper partition count
-    DataUpdater<ZNRecord> idealStateUpdater = new DataUpdater<ZNRecord>() {
-      @Override
-      public ZNRecord update(ZNRecord currentData) {
-        currentData.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(),
-            String.valueOf(numPartitions[0]));
-        return currentData;
-      }
-    };
-    String idealStatePath = keyBuilder.idealStates(queueName + "_" + queueName).getPath();
-    dataUpdaters.clear();
-    dataUpdaters.add(idealStateUpdater);
-    accessor.updateChildren(Arrays.asList(idealStatePath), dataUpdaters, AccessOption.PERSISTENT);
-  }
-
-  public void cancelTask(String queueName, String taskName) {
-    // Get the mapped task name
-    final ResourceId resourceId = resourceId(queueName);
-    HelixDataAccessor accessor = _connection.createDataAccessor(_clusterId);
-    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-    ResourceConfiguration resourceConfig =
-        accessor.getProperty(keyBuilder.resourceConfig(resourceId.stringify()));
-    if (resourceConfig == null) {
-      LOG.error("Queue " + queueName + " does not exist!");
-      return;
-    }
-    Map<String, String> taskMap = resourceConfig.getRecord().getMapField(TaskConfig.TASK_NAME_MAP);
-    if (taskMap == null) {
-      LOG.error("Task " + taskName + " in queue " + queueName + " does not exist!");
-      return;
-    }
-    String partitionName = null;
-    for (Map.Entry<String, String> e : taskMap.entrySet()) {
-      String possiblePartition = e.getKey();
-      String possibleTask = e.getValue();
-      if (taskName.equals(possibleTask)) {
-        partitionName = possiblePartition;
-        break;
-      }
-    }
-    if (partitionName == null) {
-      LOG.error("Task " + taskName + " in queue " + queueName + " does not exist!");
-      return;
-    }
-
-    // Now search the external view for who is running the task
-    ExternalView externalView =
-        accessor.getProperty(keyBuilder.externalView(resourceId.toString()));
-    if (externalView == null) {
-      LOG.error("Queue " + queueName + " was never started!");
-      return;
-    }
-    PartitionId partitionId = PartitionId.from(partitionName);
-    Map<ParticipantId, State> stateMap = externalView.getStateMap(partitionId);
-    if (stateMap == null || stateMap.isEmpty()) {
-      LOG.warn("Task " + taskName + " in queue " + queueName + " is not currently running");
-      return;
-    }
-    ParticipantId targetParticipant = null;
-    for (ParticipantId participantId : stateMap.keySet()) {
-      targetParticipant = participantId;
-    }
-    if (targetParticipant == null) {
-      LOG.warn("Task " + taskName + " in queue " + queueName + " is not currently running");
-      return;
-    }
-
-    // Send a request to stop to the appropriate live instance
-    LiveInstance liveInstance =
-        accessor.getProperty(keyBuilder.liveInstance(targetParticipant.toString()));
-    if (liveInstance == null) {
-      LOG.error("Task " + taskName + " in queue " + queueName
-          + " is assigned to a non-running participant");
-      return;
-    }
-    SessionId sessionId = liveInstance.getTypedSessionId();
-    TaskUtil.setRequestedState(accessor, targetParticipant.toString(), sessionId.toString(),
-        resourceId.toString(), partitionId.toString(), TaskPartitionState.STOPPED);
-    LOG.info("Task" + taskName + " for queue " + queueName + " instructed to stop");
-  }
-
-  public void shutdownQueue(String queueName) {
-    // Check if tasks are complete, then set task and workflows to complete
-
-    // Otherwise, send a stop for everybody
-    _driver.stop(resourceId(queueName).toString());
-  }
-
-  private ResourceId resourceId(String queueName) {
-    return ResourceId.from(queueName + '_' + queueName);
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/97ca4de4/helix-provisioning/src/test/java/org/apache/helix/provisioning/tools/TestTaskManager.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/test/java/org/apache/helix/provisioning/tools/TestTaskManager.java b/helix-provisioning/src/test/java/org/apache/helix/provisioning/tools/TestTaskManager.java
deleted file mode 100644
index 7d46cff..0000000
--- a/helix-provisioning/src/test/java/org/apache/helix/provisioning/tools/TestTaskManager.java
+++ /dev/null
@@ -1,149 +0,0 @@
-package org.apache.helix.provisioning.tools;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.helix.HelixConnection;
-import org.apache.helix.TestHelper;
-import org.apache.helix.ZkUnitTestBase;
-import org.apache.helix.api.id.ClusterId;
-import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.integration.TestHelixConnection;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.manager.zk.ZkHelixConnection;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.task.Task;
-import org.apache.helix.task.TaskFactory;
-import org.apache.helix.task.TaskResult;
-import org.apache.helix.task.TaskStateModelFactory;
-import org.testng.annotations.Test;
-
-public class TestTaskManager extends ZkUnitTestBase {
-  @Test
-  public void testBasic() throws Exception {
-    final int NUM_PARTICIPANTS = 3;
-    final int NUM_PARTITIONS = 1;
-    final int NUM_REPLICAS = 1;
-
-    String className = TestHelper.getTestClassName();
-    String methodName = TestHelper.getTestMethodName();
-    String clusterName = className + "_" + methodName;
-
-    // Set up cluster
-    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
-        "localhost", // participant name prefix
-        "TestService", // resource name prefix
-        1, // resources
-        NUM_PARTITIONS, // partitions per resource
-        NUM_PARTICIPANTS, // number of nodes
-        NUM_REPLICAS, // replicas
-        "StatelessService", RebalanceMode.FULL_AUTO, // just get everything up
-        true); // do rebalance
-
-    Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
-    taskFactoryReg.put("mytask1", new TaskFactory() {
-      @Override
-      public Task createNewTask(String config) {
-        return new MyTask(1);
-      }
-    });
-    taskFactoryReg.put("mytask2", new TaskFactory() {
-      @Override
-      public Task createNewTask(String config) {
-        return new MyTask(2);
-      }
-    });
-    MockParticipantManager[] participants = new MockParticipantManager[NUM_PARTICIPANTS];
-    for (int i = 0; i < participants.length; i++) {
-      String instanceName = "localhost_" + (12918 + i);
-      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
-      participants[i].getStateMachineEngine()
-          .registerStateModelFactory(StateModelDefId.from("StatelessService"),
-              new TestHelixConnection.MockStateModelFactory());
-      participants[i].getStateMachineEngine().registerStateModelFactory(
-          StateModelDefId.from("Task"), new TaskStateModelFactory(participants[i], taskFactoryReg));
-      participants[i].syncStart();
-    }
-
-    ClusterControllerManager controller =
-        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_1");
-    controller.syncStart();
-
-    HelixConnection connection = new ZkHelixConnection(ZK_ADDR);
-    connection.connect();
-    ClusterId clusterId = ClusterId.from(clusterName);
-    TaskManager taskManager = new TaskManager(clusterId, connection);
-    taskManager.createTaskQueue("myqueue", true);
-    taskManager.addTaskToQueue("mytask1", "myqueue");
-    Thread.sleep(5000);
-    taskManager.addTaskToQueue("mytask2", "myqueue");
-    taskManager.cancelTask("myqueue", "mytask1");
-
-    controller.syncStop();
-    for (MockParticipantManager participant : participants) {
-      participant.syncStop();
-    }
-  }
-
-  public static class MyTask implements Task {
-    private final int _id;
-    private Thread _t;
-    private TaskResult.Status _status = null;
-
-    public MyTask(int id) {
-      _id = id;
-    }
-
-    @Override
-    public TaskResult run() {
-      _t = new Thread() {
-        @Override
-        public void run() {
-          try {
-            Thread.sleep(60000);
-            _status = TaskResult.Status.COMPLETED;
-            System.err.println("task complete for " + _id);
-          } catch (InterruptedException e) {
-            _status = TaskResult.Status.CANCELED;
-            System.err.println("task canceled for " + _id);
-            interrupt();
-          }
-        }
-      };
-      _t.start();
-      try {
-        _t.join();
-      } catch (InterruptedException e) {
-        _status = TaskResult.Status.CANCELED;
-      }
-      return new TaskResult(_status, "");
-    }
-
-    @Override
-    public void cancel() {
-      if (_t != null && _t.isAlive()) {
-        _t.interrupt();
-      }
-    }
-  }
-}


Mime
View raw message