helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kisho...@apache.org
Subject [1/3] helix git commit: [HELIX-601] Allow work flow to schedule dependency jobs in parallel
Date Tue, 28 Jul 2015 21:42:40 GMT
Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x a23beb7cf -> 8006547ba


[HELIX-601] Allow work flow to schedule dependency jobs in parallel

Currently, Helix won't schedule dependency jobs in a same work flow. For example, if Job2
depends on Job1, Job2 won't be scheduled until every partition of Job1 is completed.
However, if some participant is very slow, then all dependency jobs is waiting for that single
participant.
Helix should be able to schedule multiple jobs according to a parameter.
A.C.
1. Introduce parallel count parameter in work flow and job queue.
2. Dependency jobs can be scheduled according to the parameter (Now the parameter is always
1, so no parallel)
3. If Job2 depends on Job1, Job1 is scheduled before Job2.
4. No parallel jobs on the same instance. If a instance is running Job1, it won't run Job2
until Job1 is finished.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/88192207
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/88192207
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/88192207

Branch: refs/heads/helix-0.6.x
Commit: 8819220738b18c54652e4b32b9677ea78d585da2
Parents: 589c96c
Author: Congrui Ji <cji@linkedin.com>
Authored: Fri Jun 19 11:51:19 2015 -0700
Committer: Congrui Ji <cji@linkedin.com>
Committed: Fri Jun 19 11:51:19 2015 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/helix/task/JobDag.java |  17 ++
 .../java/org/apache/helix/task/JobQueue.java    |   9 +-
 .../org/apache/helix/task/TaskRebalancer.java   |  52 ++++-
 .../org/apache/helix/task/WorkflowConfig.java   |  31 ++-
 .../task/TestTaskRebalancerParallel.java        | 195 +++++++++++++++++++
 .../apache/helix/integration/task/TestUtil.java |  83 ++++++++
 .../integration/task/WorkflowGenerator.java     |   2 +-
 7 files changed, 377 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/88192207/helix-core/src/main/java/org/apache/helix/task/JobDag.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDag.java b/helix-core/src/main/java/org/apache/helix/task/JobDag.java
index 3564f19..f708e91 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDag.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDag.java
@@ -19,6 +19,7 @@ package org.apache.helix.task;
  * under the License.
  */
 
+import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
@@ -124,6 +125,22 @@ public class JobDag {
     return _childrenToParents.get(node);
   }
 
+  public Set<String> getAncestors(String node) {
+    Set<String> ret = new TreeSet<String>();
+    Set<String> current = Collections.singleton(node);
+
+    while (!current.isEmpty()) {
+      Set<String> next = new TreeSet<String>();
+      for (String currentNode : current) {
+        next.addAll(getDirectParents(currentNode));
+      }
+      ret.addAll(next);
+      current = next;
+    }
+
+    return ret;
+  }
+
   public String toJson() throws Exception {
     return new ObjectMapper().writeValueAsString(this);
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/88192207/helix-core/src/main/java/org/apache/helix/task/JobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobQueue.java b/helix-core/src/main/java/org/apache/helix/task/JobQueue.java
index 39dc84c..bca5911 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobQueue.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobQueue.java
@@ -32,8 +32,8 @@ public class JobQueue extends WorkflowConfig {
   private final int _capacity;
 
   private JobQueue(String name, int capacity, WorkflowConfig config) {
-    super(config.getJobDag(), config.getTargetState(), config.getExpiry(), config.isTerminable(),
-        config.getScheduleConfig());
+    super(config.getJobDag(), config.getParallelJobs(), config.getTargetState(), config.getExpiry(),
+        config.isTerminable(), config.getScheduleConfig());
     _name = name;
     _capacity = capacity;
   }
@@ -72,6 +72,11 @@ public class JobQueue extends WorkflowConfig {
       _name = name;
     }
 
+    public Builder parallelJobs(int parallelJobs) {
+      _builder.setParallelJobs(parallelJobs);
+      return this;
+    }
+
     public Builder expiry(long expiry) {
       _builder.setExpiry(expiry);
       return this;

http://git-wip-us.apache.org/repos/asf/helix/blob/88192207/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index fe3f496..963247d 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -134,14 +134,22 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
       workflowCtx.setStartTime(System.currentTimeMillis());
     }
 
-    // Check parent dependencies
-    for (String parent : workflowCfg.getJobDag().getDirectParents(resourceName)) {
-      if (workflowCtx.getJobState(parent) == null
-          || !workflowCtx.getJobState(parent).equals(TaskState.COMPLETED)) {
-        return emptyAssignment(resourceName, currStateOutput);
+    // check ancestor job status
+    int unStartCount = 0;
+    int inCompleteCount = 0;
+    for (String ancestor : workflowCfg.getJobDag().getAncestors(resourceName)) {
+      TaskState jobState = workflowCtx.getJobState(ancestor);
+      if (jobState == null || jobState == TaskState.NOT_STARTED) {
+        ++unStartCount;
+      } else if (jobState == TaskState.IN_PROGRESS || jobState == TaskState.STOPPED) {
+        ++inCompleteCount;
       }
     }
 
+    if (unStartCount > 0 || inCompleteCount >= workflowCfg.getParallelJobs()) {
+      return emptyAssignment(resourceName, currStateOutput);
+    }
+
     // Clean up if workflow marked for deletion
     TargetState targetState = workflowCfg.getTargetState();
     if (targetState == TargetState.DELETE) {
@@ -219,6 +227,32 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
     return newAssignment;
   }
 
+  private Set<String> getWorkflowAssignedInstances(String currentJobName,
+      WorkflowConfig workflowCfg) {
+
+    Set<String> ret = new HashSet<String>();
+
+    for (String jobName : workflowCfg.getJobDag().getAllNodes()) {
+      if (jobName.equals(currentJobName)) {
+        continue;
+      }
+
+      JobContext jobContext = TaskUtil.getJobContext(_manager, jobName);
+      if (jobContext == null) {
+        continue;
+      }
+      for (int partition : jobContext.getPartitionSet()) {
+        TaskPartitionState partitionState = jobContext.getPartitionState(partition);
+        if (partitionState == TaskPartitionState.INIT ||
+            partitionState == TaskPartitionState.RUNNING) {
+          ret.add(jobContext.getAssignedParticipant(partition));
+        }
+      }
+    }
+
+    return ret;
+  }
+
   private ResourceAssignment computeResourceMapping(String jobResource,
       WorkflowConfig workflowConfig, JobConfig jobCfg, ResourceAssignment prevAssignment,
       Collection<String> liveInstances, CurrentStateOutput currStateOutput,
@@ -248,6 +282,8 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
     // Keeps a mapping of (partition) -> (instance, state)
     Map<Integer, PartitionAssignment> paMap = new TreeMap<Integer, PartitionAssignment>();
 
+    Set<String> excludedInstances = getWorkflowAssignedInstances(jobResource, workflowConfig);
+
     // Process all the current assignments of tasks.
     Set<Integer> allPartitions =
         getAllTaskPartitions(jobCfg, jobCtx, workflowConfig, workflowCtx, cache);
@@ -255,6 +291,10 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
         getTaskPartitionAssignments(liveInstances, prevAssignment, allPartitions);
     long currentTime = System.currentTimeMillis();
     for (String instance : taskAssignments.keySet()) {
+      if (excludedInstances.contains(instance)) {
+        continue;
+      }
+
       Set<Integer> pSet = taskAssignments.get(instance);
       // Used to keep track of partitions that are in one of the final states: COMPLETED,
TIMED_OUT,
       // TASK_ERROR, ERROR.
@@ -430,7 +470,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
               workflowConfig, workflowCtx, allPartitions, cache);
       for (Map.Entry<String, SortedSet<Integer>> entry : taskAssignments.entrySet())
{
         String instance = entry.getKey();
-        if (!tgtPartitionAssignments.containsKey(instance)) {
+        if (!tgtPartitionAssignments.containsKey(instance) || excludedInstances.contains(instance))
{
           continue;
         }
         // Contains the set of task partitions currently assigned to the instance.

http://git-wip-us.apache.org/repos/asf/helix/blob/88192207/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
index 06d92fc..f15f235 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
 public class WorkflowConfig {
   /* Config fields */
   public static final String DAG = "Dag";
+  public static final String PARALLEL_JOBS = "ParallelJobs";
   public static final String TARGET_STATE = "TargetState";
   public static final String EXPIRY = "Expiry";
   public static final String START_TIME = "StartTime";
@@ -44,14 +45,19 @@ public class WorkflowConfig {
 
   /* Member variables */
   private final JobDag _jobDag;
+  // _parallelJobs would kind of break the job dependency,
+  // e.g: if job1 -> job2, but _parallelJobs == 2,
+  // then job1 and job2 could be scheduled at the same time
+  private final int _parallelJobs;
   private final TargetState _targetState;
   private final long _expiry;
   private final boolean _terminable;
   private final ScheduleConfig _scheduleConfig;
 
-  protected WorkflowConfig(JobDag jobDag, TargetState targetState, long expiry, boolean terminable,
-      ScheduleConfig scheduleConfig) {
+  protected WorkflowConfig(JobDag jobDag, int parallelJobs, TargetState targetState, long
expiry,
+      boolean terminable, ScheduleConfig scheduleConfig) {
     _jobDag = jobDag;
+    _parallelJobs = parallelJobs;
     _targetState = targetState;
     _expiry = expiry;
     _terminable = terminable;
@@ -62,6 +68,10 @@ public class WorkflowConfig {
     return _jobDag;
   }
 
+  public int getParallelJobs() {
+    return _parallelJobs;
+  }
+
   public TargetState getTargetState() {
     return _targetState;
   }
@@ -93,6 +103,7 @@ public class WorkflowConfig {
   public Map<String, String> getResourceConfigMap() throws Exception {
     Map<String, String> cfgMap = new HashMap<String, String>();
     cfgMap.put(WorkflowConfig.DAG, getJobDag().toJson());
+    cfgMap.put(WorkflowConfig.PARALLEL_JOBS, String.valueOf(getParallelJobs()));
     cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(getExpiry()));
     cfgMap.put(WorkflowConfig.TARGET_STATE, getTargetState().name());
     cfgMap.put(WorkflowConfig.TERMINABLE, String.valueOf(isTerminable()));
@@ -116,6 +127,7 @@ public class WorkflowConfig {
 
   public static class Builder {
     private JobDag _taskDag = JobDag.EMPTY_DAG;
+    private int _parallelJobs = 1;
     private TargetState _targetState = TargetState.START;
     private long _expiry = DEFAULT_EXPIRY;
     private boolean _isTerminable = true;
@@ -124,7 +136,7 @@ public class WorkflowConfig {
     public WorkflowConfig build() {
       validate();
 
-      return new WorkflowConfig(_taskDag, _targetState, _expiry, _isTerminable, _scheduleConfig);
+      return new WorkflowConfig(_taskDag, _parallelJobs, _targetState, _expiry, _isTerminable,
_scheduleConfig);
     }
 
     public Builder setJobDag(JobDag v) {
@@ -132,6 +144,11 @@ public class WorkflowConfig {
       return this;
     }
 
+    public Builder setParallelJobs(int parallelJobs) {
+      _parallelJobs = parallelJobs;
+      return this;
+    }
+
     public Builder setExpiry(long v, TimeUnit unit) {
       _expiry = unit.toMillis(v);
       return this;
@@ -171,6 +188,14 @@ public class WorkflowConfig {
       if (cfg.containsKey(TERMINABLE)) {
         b.setTerminable(Boolean.parseBoolean(cfg.get(TERMINABLE)));
       }
+      if (cfg.containsKey(PARALLEL_JOBS)) {
+        String value = cfg.get(PARALLEL_JOBS);
+        if (value == null) {
+          b.setParallelJobs(1);
+        } else {
+          b.setParallelJobs(Integer.parseInt(value));
+        }
+      }
 
       // Parse schedule-specific configs, if they exist
       ScheduleConfig scheduleConfig = TaskUtil.parseScheduleFromConfigMap(cfg);

http://git-wip-us.apache.org/repos/asf/helix/blob/88192207/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
new file mode 100644
index 0000000..368ef9f
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
@@ -0,0 +1,195 @@
+package org.apache.helix.integration.task;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskConstants;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskPartitionState;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+
+public class TestTaskRebalancerParallel extends ZkIntegrationTestBase {
+  private static final int n = 5;
+  private static final int START_PORT = 12918;
+  private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
+  private static final int NUM_PARTITIONS = 20;
+  private static final int NUM_REPLICAS = 3;
+  private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
+  private final List<String> testDbNames =
+      Arrays.asList("TestDB_1", "TestDB_2", "TestDB_3", "TestDB_4");
+
+
+  private final MockParticipantManager[] _participants = new MockParticipantManager[n];
+  private ClusterControllerManager _controller;
+
+  private HelixManager _manager;
+  private TaskDriver _driver;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
+    }
+
+    ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+    setupTool.addCluster(CLUSTER_NAME, true);
+    for (int i = 0; i < n; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    }
+
+    for (String testDbName : testDbNames) {
+      setupTool.addResourceToCluster(CLUSTER_NAME, testDbName, NUM_PARTITIONS,
+          MASTER_SLAVE_STATE_MODEL);
+      setupTool.rebalanceStorageCluster(CLUSTER_NAME, testDbName, NUM_REPLICAS);
+    }
+
+    // start dummy participants
+    for (int i = 0; i < n; i++) {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+
+      final long delay = (i + 1) * 1000L;
+      Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
+      taskFactoryReg.put("Reindex", new TaskFactory() {
+        @Override
+        public Task createNewTask(TaskCallbackContext context) {
+          return new ReindexTask(delay);
+        }
+      });
+
+      // Register a Task state model factory.
+      StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
+      stateMachine.registerStateModelFactory("Task",
+          new TaskStateModelFactory(_participants[i], taskFactoryReg));
+      _participants[i].syncStart();
+    }
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    // create cluster manager
+    _manager =
+        HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR,
+            ZK_ADDR);
+    _manager.connect();
+    _driver = new TaskDriver(_manager);
+
+    boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                CLUSTER_NAME));
+    Assert.assertTrue(result);
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    _controller.syncStop();
+    // _controller = null;
+    for (int i = 0; i < n; i++) {
+      _participants[i].syncStop();
+      // _participants[i] = null;
+    }
+
+    _manager.disconnect();
+  }
+
+  @Test
+  public void test() throws Exception {
+    final int PARALLEL_COUNT = 2;
+
+    String queueName = TestHelper.getTestMethodName();
+
+    JobQueue queue = new JobQueue.Builder(queueName).parallelJobs(PARALLEL_COUNT).build();
+    _driver.createQueue(queue);
+
+    List<JobConfig.Builder> jobConfigBuilders = new ArrayList<JobConfig.Builder>();
+    for (String testDbName : testDbNames) {
+      jobConfigBuilders.add(
+          new JobConfig.Builder().setCommand("Reindex").setTargetResource(testDbName)
+              .setTargetPartitionStates(Collections.singleton("SLAVE")));
+    }
+
+    for (int i = 0; i < jobConfigBuilders.size(); ++i) {
+      _driver.enqueueJob(queueName, "job_" + (i + 1), jobConfigBuilders.get(i));
+    }
+
+    Assert.assertTrue(TestUtil.pollForWorkflowParallelState(_manager, queueName));
+  }
+
+  public static class ReindexTask implements Task {
+    private final long _delay;
+    private volatile boolean _canceled;
+
+    public ReindexTask(long delay) {
+      _delay = delay;
+    }
+
+    @Override
+    public TaskResult run() {
+      long expiry = System.currentTimeMillis() + _delay;
+      long timeLeft;
+      while (System.currentTimeMillis() < expiry) {
+        if (_canceled) {
+          timeLeft = expiry - System.currentTimeMillis();
+          return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft <
0 ? 0
+              : timeLeft));
+        }
+        sleep(50);
+      }
+      timeLeft = expiry - System.currentTimeMillis();
+      return new TaskResult(TaskResult.Status.COMPLETED,
+          String.valueOf(timeLeft < 0 ? 0 : timeLeft));
+    }
+
+    @Override
+    public void cancel() {
+      _canceled = true;
+    }
+
+    private static void sleep(long d) {
+      try {
+        Thread.sleep(d);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/88192207/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
index 43c5783..5c081a4 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
@@ -19,8 +19,15 @@ package org.apache.helix.integration.task;
  * under the License.
  */
 
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
 import org.apache.helix.HelixManager;
 import org.apache.helix.TestHelper;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.TaskPartitionState;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskUtil;
 import org.apache.helix.task.WorkflowConfig;
@@ -112,4 +119,80 @@ public class TestUtil {
     Assert.assertNotNull(ctx);
     return ctx;
   }
+
+  // 1. Different jobs in a same work flow is in RUNNING at the same time
+  // 2. No two jobs in the same work flow is in RUNNING at the same instance
+  public static boolean pollForWorkflowParallelState(HelixManager manager, String workflowName)
+      throws InterruptedException {
+
+    WorkflowConfig workflowConfig = TaskUtil.getWorkflowCfg(manager, workflowName);
+    Assert.assertNotNull(workflowConfig);
+
+    WorkflowContext workflowContext = null;
+    while (workflowContext == null) {
+      workflowContext = TaskUtil.getWorkflowContext(manager, workflowName);
+      Thread.sleep(100);
+    }
+
+    int maxRunningCount = 0;
+    boolean finished = false;
+
+    while (!finished) {
+      finished = true;
+      int runningCount = 0;
+
+      workflowContext = TaskUtil.getWorkflowContext(manager, workflowName);
+      for (String jobName : workflowConfig.getJobDag().getAllNodes()) {
+        TaskState jobState = workflowContext.getJobState(jobName);
+        if (jobState == TaskState.IN_PROGRESS) {
+          ++runningCount;
+          finished = false;
+        }
+      }
+
+      if (runningCount > maxRunningCount ) {
+        maxRunningCount = runningCount;
+      }
+
+      List<JobContext> jobContextList = new ArrayList<JobContext>();
+      for (String jobName : workflowConfig.getJobDag().getAllNodes()) {
+        JobContext jobContext = TaskUtil.getJobContext(manager, jobName);
+        if (jobContext != null) {
+          jobContextList.add(TaskUtil.getJobContext(manager, jobName));
+        }
+      }
+
+      Set<String> instances = new HashSet<String>();
+      for (JobContext jobContext : jobContextList) {
+        for (int partition : jobContext.getPartitionSet()) {
+          String instance = jobContext.getAssignedParticipant(partition);
+          TaskPartitionState taskPartitionState = jobContext.getPartitionState(partition);
+
+          if (instance == null) {
+            continue;
+          }
+          if (taskPartitionState != TaskPartitionState.INIT &&
+              taskPartitionState != TaskPartitionState.RUNNING) {
+            continue;
+          }
+          if (instances.contains(instance)) {
+            return false;
+          }
+
+          TaskPartitionState state = jobContext.getPartitionState(partition);
+          if (state != TaskPartitionState.COMPLETED) {
+            instances.add(instance);
+          }
+        }
+      }
+
+      Thread.sleep(100);
+    }
+
+    return maxRunningCount > 1 && maxRunningCount <= workflowConfig.getParallelJobs();
+  }
+
+  public static boolean pollForParticipantParallelState() {
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/88192207/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
index 318673f..a414f5c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
@@ -62,7 +62,7 @@ public class WorkflowGenerator {
       throw new IllegalArgumentException(
           "Additional configs should have even number of keys and values");
     }
-    Workflow.Builder bldr = generateDefaultSingleJobWorkflowBuilder(jobName);
+    Workflow.Builder bldr = generateSingleJobWorkflowBuilder(jobName, commandConfig, DEFAULT_JOB_CONFIG);
     for (int i = 0; i < cfgs.length; i += 2) {
       bldr.addConfig(jobName, cfgs[i], cfgs[i + 1]);
     }


Mime
View raw message