helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject [31/33] helix git commit: Add pollForJobState and pollForWorkflowState function in TaskDriver
Date Wed, 17 Aug 2016 04:27:27 GMT
Add pollForJobState and pollForWorkflowState function in TaskDriver

1. Add pollForJobState and pollForWorkflowState functions in TaskDriver
2. Add unit tests for poll completed states for those tasks.
3. Refactor all tests using new functions.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/183a26ae
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/183a26ae
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/183a26ae

Branch: refs/heads/helix-0.6.x
Commit: 183a26ae7857a7c35985b14df9742ba3f4a06566
Parents: 9a30df4
Author: Junkai Xue <jxue@linkedin.com>
Authored: Tue May 10 11:44:25 2016 -0700
Committer: Lei Xia <lxia@linkedin.com>
Committed: Tue Jul 5 16:21:13 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/task/TaskDriver.java  | 123 +++++++++++++++++++
 .../helix/integration/task/TaskTestUtil.java    |  72 +----------
 .../task/TestDisableJobExternalView.java        |   2 +-
 .../helix/integration/task/TestGenericJobs.java |   2 +-
 .../task/TestIndependentTaskRebalancer.java     |  22 ++--
 .../task/TestJobAndWorkflowType.java            |   2 +-
 .../task/TestJobFailureDependence.java          |  18 +--
 .../integration/task/TestRecurringJobQueue.java |  30 ++---
 .../integration/task/TestRetrieveWorkflows.java |   2 +-
 .../task/TestRunJobsWithMissingTarget.java      |  12 +-
 .../task/TestTaskConditionalRetry.java          |   2 +-
 .../task/TestTaskErrorReporting.java            |   2 +-
 .../integration/task/TestTaskRebalancer.java    |  18 +--
 .../task/TestTaskRebalancerFailover.java        |   6 +-
 .../task/TestTaskRebalancerRetryLimit.java      |   2 +-
 .../task/TestTaskRebalancerStopResume.java      |  61 +++++----
 .../integration/task/TestTaskRetryDelay.java    |   4 +-
 .../task/TestTaskWithInstanceDisabled.java      |   2 +-
 .../integration/task/TestUpdateWorkflow.java    |   8 +-
 .../task/TestWorkflowAndJobPoll.java            |  62 ++++++++++
 .../task/TestWorkflowJobDependency.java         |   2 +-
 21 files changed, 283 insertions(+), 171 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/183a26ae/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index b3a0364..e62d15c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -84,6 +85,10 @@ public class TaskDriver {
   /** Field for specifying a workflow file when starting a job */
   private static final String WORKFLOW_FILE_OPTION = "file";
 
+  /** Default time out for monitoring workflow or job state */
+  private final static int _defaultTimeout = 2 * 60 * 1000; /* 2 mins */
+
+
   private final HelixDataAccessor _accessor;
   private final ConfigAccessor _cfgAccessor;
   private final HelixPropertyStore<ZNRecord> _propertyStore;
@@ -824,6 +829,124 @@ public class TaskDriver {
     }
   }
 
+  /**
+   * This call will be blocked until either workflow reaches to one of the state specified
+   * in the arguments, or timeout happens. If timeout happens, then it will throw a HelixException
+   * Otherwise, it will return current workflow state
+   *
+   * @param workflowName The workflow to be monitored
+   * @param timeout A long integer presents the time out, in milliseconds
+   * @param targetStates Specified states that user would like to stop monitoring
+   * @return A TaskState, which is current workflow state
+   * @throws InterruptedException
+   */
+  public TaskState pollForWorkflowState(String workflowName, long timeout,
+      TaskState... targetStates) throws InterruptedException {
+    // Wait for completion.
+    long st = System.currentTimeMillis();
+    WorkflowContext ctx;
+    Set<TaskState> allowedStates = new HashSet<TaskState>(Arrays.asList(targetStates));
+
+    long timeToSleep = timeout > 100L ? 100L : timeout;
+    do {
+      Thread.sleep(timeToSleep);
+      ctx = getWorkflowContext(workflowName);
+    } while ((ctx == null || ctx.getWorkflowState() == null || !allowedStates
+        .contains(ctx.getWorkflowState())) && System.currentTimeMillis() < st + timeout);
+
+    if (ctx == null || !allowedStates.contains(ctx.getWorkflowState())) {
+      throw new HelixException(String
+          .format("Workflow \"%s\" context is empty or not in states: \"%s\"", workflowName,
+              targetStates));
+    }
+
+    return ctx.getWorkflowState();
+  }
+
+  /**
+   * This is a wrapper function that set default time out for monitoring workflow in 2 MINUTES.
+   * If timeout happens, then it will throw a HelixException, Otherwise, it will return
+   * current job state.
+   *
+   * @param workflowName The workflow to be monitored
+   * @param targetStates Specified states that user would like to stop monitoring
+   * @return A TaskState, which is current workflow state
+   * @throws InterruptedException
+   */
+  public TaskState pollForWorkflowState(String workflowName, TaskState... targetStates)
+      throws InterruptedException {
+    return pollForWorkflowState(workflowName, _defaultTimeout, targetStates);
+  }
+
+  /**
+   * This call will be blocked until either specified job reaches to one of the state
+   * in the arguments, or timeout happens. If timeout happens, then it will throw a HelixException
+   * Otherwise, it will return current job state
+   *
+   * @param workflowName The workflow that contains the job to monitor
+   * @param jobName The specified job to monitor
+   * @param timeout A long integer presents the time out, in milliseconds
+   * @param states Specified states that user would like to stop monitoring
+   * @return A TaskState, which is current job state
+   * @throws Exception
+   */
+  public TaskState pollForJobState(String workflowName, String jobName, long timeout,
+      TaskState... states) throws InterruptedException {
+    // Get workflow config
+    WorkflowConfig workflowConfig = getWorkflowConfig(workflowName);
+
+    if (workflowConfig == null) {
+      throw new HelixException(String.format("Workflow \"%s\" does not exists!", workflowName));
+    }
+
+    long timeToSleep = timeout > 100L ? 100L : timeout;
+
+    WorkflowContext ctx;
+    if (workflowConfig.isRecurring()) {
+      // if it's recurring, need to reconstruct workflow and job name
+      do {
+        Thread.sleep(timeToSleep);
+        ctx = getWorkflowContext(workflowName);
+      } while ((ctx == null || ctx.getLastScheduledSingleWorkflow() == null));
+
+      jobName = jobName.substring(workflowName.length() + 1);
+      workflowName = ctx.getLastScheduledSingleWorkflow();
+      jobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
+    }
+
+    Set<TaskState> allowedStates = new HashSet<TaskState>(Arrays.asList(states));
+    // Wait for state
+    long st = System.currentTimeMillis();
+    do {
+      Thread.sleep(timeToSleep);
+      ctx = getWorkflowContext(workflowName);
+    } while ((ctx == null || ctx.getJobState(jobName) == null || !allowedStates
+        .contains(ctx.getJobState(jobName))) && System.currentTimeMillis() < st + timeout);
+
+    if (ctx == null || !allowedStates.contains(ctx.getJobState(jobName))) {
+      throw new HelixException(
+          String.format("Job \"%s\" context is null or not in states: \"%s\"", jobName, states));
+    }
+
+    return ctx.getJobState(jobName);
+  }
+
+  /**
+   * This is a wrapper function for monitoring job state with default timeout 2 MINUTES.
+   * If timeout happens, then it will throw a HelixException, Otherwise, it will return
+   * current job state
+   *
+   * @param workflowName The workflow that contains the job to monitor
+   * @param jobName The specified job to monitor
+   * @param states Specified states that user would like to stop monitoring
+   * @return A TaskState, which is current job state
+   * @throws Exception
+   */
+  public TaskState pollForJobState(String workflowName, String jobName, TaskState... states)
+      throws InterruptedException {
+    return pollForJobState(workflowName, jobName, _defaultTimeout, states);
+  }
+
   /** Constructs options set for all basic control messages */
   private static Options constructOptions() {
     Options options = new Options();

http://git-wip-us.apache.org/repos/asf/helix/blob/183a26ae/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
index 3e5385c..2f8fa60 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
@@ -18,7 +18,7 @@ package org.apache.helix.integration.task;
  * specific language governing permissions and limitations
  * under the License.
  */
-import java.util.Arrays;
+
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Date;
@@ -45,76 +45,6 @@ import org.testng.Assert;
 public class TaskTestUtil {
   private final static int _default_timeout = 2 * 60 * 1000; /* 2 mins */
 
-  /**
-   * Polls {@link org.apache.helix.task.JobContext} for given task resource until a timeout is
-   * reached.
-   * If the task has not reached target state by then, an error is thrown
-   *
-   * @param workflowResource Resource to poll for completeness
-   * @throws InterruptedException
-   */
-  public static void pollForWorkflowState(TaskDriver driver, String workflowResource,
-      TaskState... targetStates) throws InterruptedException {
-    // Wait for completion.
-    long st = System.currentTimeMillis();
-    WorkflowContext ctx;
-    Set<TaskState> allowedStates = new HashSet<TaskState>(Arrays.asList(targetStates));
-    do {
-      Thread.sleep(100);
-      ctx = driver.getWorkflowContext(workflowResource);
-    } while ((ctx == null || ctx.getWorkflowState() == null || !allowedStates
-        .contains(ctx.getWorkflowState())) && System.currentTimeMillis() < st + _default_timeout);
-
-    Assert.assertNotNull(ctx);
-    TaskState workflowState = ctx.getWorkflowState();
-    Assert.assertTrue(allowedStates.contains(workflowState),
-        "expect workflow states: " + allowedStates + " actual workflow state: " + workflowState);
-  }
-
-  /**
-   * poll for job until it is at either state in targetStates.
-   * @param driver
-   * @param workflowResource
-   * @param jobName
-   * @param targetStates
-   * @throws InterruptedException
-   */
-  public static void pollForJobState(TaskDriver driver, String workflowResource, String jobName,
-      TaskState... targetStates) throws InterruptedException {
-    // Get workflow config
-    WorkflowConfig wfCfg = driver.getWorkflowConfig(workflowResource);
-    Assert.assertNotNull(wfCfg);
-    WorkflowContext ctx;
-    if (wfCfg.isRecurring()) {
-      // if it's recurring, need to reconstruct workflow and job name
-      do {
-        Thread.sleep(100);
-        ctx = driver.getWorkflowContext(workflowResource);
-      } while ((ctx == null || ctx.getLastScheduledSingleWorkflow() == null));
-      Assert.assertNotNull(ctx);
-      Assert.assertNotNull(ctx.getLastScheduledSingleWorkflow());
-      jobName = jobName.substring(workflowResource.length() + 1);
-      workflowResource = ctx.getLastScheduledSingleWorkflow();
-      jobName = String.format("%s_%s", workflowResource, jobName);
-    }
-
-    Set<TaskState> allowedStates = new HashSet<TaskState>(Arrays.asList(targetStates));
-    // Wait for state
-    long st = System.currentTimeMillis();
-    do {
-      Thread.sleep(100);
-      ctx = driver.getWorkflowContext(workflowResource);
-    }
-    while ((ctx == null || ctx.getJobState(jobName) == null || !allowedStates.contains(
-        ctx.getJobState(jobName)))
-        && System.currentTimeMillis() < st + _default_timeout);
-    Assert.assertNotNull(ctx, "Empty job context");
-    TaskState jobState = ctx.getJobState(jobName);
-    Assert.assertTrue(allowedStates.contains(jobState),
-        "expect job " + jobName + " is in states: " + allowedStates + " actual job state: "
-            + jobState + " all other job states in the workflow: " + ctx.getJobStates().entrySet());
-  }
-
   public static void pollForEmptyJobState(final TaskDriver driver, final String workflowName,
       final String jobName) throws Exception {
     final String namespacedJobName = String.format("%s_%s", workflowName, jobName);

http://git-wip-us.apache.org/repos/asf/helix/blob/183a26ae/helix-core/src/test/java/org/apache/helix/integration/task/TestDisableJobExternalView.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestDisableJobExternalView.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestDisableJobExternalView.java
index 4563e70..061114f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestDisableJobExternalView.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestDisableJobExternalView.java
@@ -72,7 +72,7 @@ public class TestDisableJobExternalView extends TaskTestBase {
 
     // ensure all jobs are completed
     String namedSpaceJob3 = String.format("%s_%s", queueName, "job3");
-    TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob3, TaskState.COMPLETED);
+    _driver.pollForJobState(queueName, namedSpaceJob3, TaskState.COMPLETED);
 
     Set<String> seenExternalViews = externviewChecker.getSeenExternalViews();
     String namedSpaceJob1 = String.format("%s_%s", queueName, "job1");

http://git-wip-us.apache.org/repos/asf/helix/blob/183a26ae/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericJobs.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericJobs.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericJobs.java
index 426bade..f376b85 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericJobs.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericJobs.java
@@ -65,7 +65,7 @@ public class TestGenericJobs extends TaskTestBase {
 
     String namedSpaceJob =
         String.format("%s_%s", queueName, currentJobNames.get(currentJobNames.size() - 1));
-    TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob, TaskState.COMPLETED);
+    _driver.pollForJobState(queueName, namedSpaceJob, TaskState.COMPLETED);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/183a26ae/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index 0e598c1..49b4bf4 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -149,8 +149,9 @@ public class TestIndependentTaskRebalancer extends TaskTestBase {
     _driver.start(workflowBuilder.build());
 
     // Ensure the job completes
-    TaskTestUtil.pollForWorkflowState(_driver, jobName, TaskState.IN_PROGRESS);
-    TaskTestUtil.pollForWorkflowState(_driver, jobName, TaskState.COMPLETED);
+    _driver.pollForWorkflowState(jobName, TaskState.IN_PROGRESS);
+    _driver.pollForWorkflowState(jobName, TaskState.COMPLETED);
+
 
     // Ensure that each class was invoked
     Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
@@ -176,8 +177,8 @@ public class TestIndependentTaskRebalancer extends TaskTestBase {
     _driver.start(workflowBuilder.build());
 
     // Ensure the job completes
-    TaskTestUtil.pollForWorkflowState(_driver, jobName, TaskState.IN_PROGRESS);
-    TaskTestUtil.pollForWorkflowState(_driver, jobName, TaskState.COMPLETED);
+    _driver.pollForWorkflowState(jobName, TaskState.IN_PROGRESS);
+    _driver.pollForWorkflowState(jobName, TaskState.COMPLETED);
 
     // Ensure that each class was invoked
     Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
@@ -205,8 +206,8 @@ public class TestIndependentTaskRebalancer extends TaskTestBase {
     _driver.start(workflowBuilder.build());
 
     // Ensure the job completes
-    TaskTestUtil.pollForWorkflowState(_driver, jobName, TaskState.IN_PROGRESS);
-    TaskTestUtil.pollForWorkflowState(_driver, jobName, TaskState.COMPLETED);
+    _driver.pollForWorkflowState(jobName, TaskState.IN_PROGRESS);
+    _driver.pollForWorkflowState(jobName, TaskState.COMPLETED);
 
     // Ensure that each class was invoked
     Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
@@ -233,8 +234,8 @@ public class TestIndependentTaskRebalancer extends TaskTestBase {
     _driver.start(workflowBuilder.build());
 
     // Ensure the job completes
-    TaskTestUtil.pollForWorkflowState(_driver, jobName, TaskState.IN_PROGRESS);
-    TaskTestUtil.pollForWorkflowState(_driver, jobName, TaskState.COMPLETED);
+    _driver.pollForWorkflowState(jobName, TaskState.IN_PROGRESS);
+    _driver.pollForWorkflowState(jobName, TaskState.COMPLETED);
 
     // Ensure that the class was invoked
     Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
@@ -268,7 +269,8 @@ public class TestIndependentTaskRebalancer extends TaskTestBase {
     _driver.start(workflowBuilder.build());
 
     // Ensure the job completes
-    TaskTestUtil.pollForWorkflowState(_driver, jobName, TaskState.COMPLETED);
+    _driver.pollForWorkflowState(jobName, TaskState.IN_PROGRESS);
+    _driver.pollForWorkflowState(jobName, TaskState.COMPLETED);
 
     // Ensure that the class was invoked
     Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
@@ -300,7 +302,7 @@ public class TestIndependentTaskRebalancer extends TaskTestBase {
     _driver.start(workflowBuilder.build());
 
     // Ensure completion
-    TaskTestUtil.pollForWorkflowState(_driver, jobName, TaskState.COMPLETED);
+    _driver.pollForWorkflowState(jobName, TaskState.COMPLETED);
 
     // Ensure a single retry happened
     JobContext jobCtx = _driver.getJobContext(jobName + "_" + jobName);

http://git-wip-us.apache.org/repos/asf/helix/blob/183a26ae/helix-core/src/test/java/org/apache/helix/integration/task/TestJobAndWorkflowType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobAndWorkflowType.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobAndWorkflowType.java
index 0b02085..6e74bcf 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobAndWorkflowType.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobAndWorkflowType.java
@@ -48,7 +48,7 @@ public class TestJobAndWorkflowType extends TaskTestBase {
     // Start workflow
     _driver.start(builder.build());
 
-    TaskTestUtil.pollForWorkflowState(_driver, jobName, TaskState.COMPLETED);
+    _driver.pollForWorkflowState(jobName, TaskState.COMPLETED);
     String fetchedJobType =
         _driver.getJobConfig(String.format("%s_%s", jobName, jobName)).getJobType();
     String fetchedWorkflowType =

http://git-wip-us.apache.org/repos/asf/helix/blob/183a26ae/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailureDependence.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailureDependence.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailureDependence.java
index d4f6dbb..254a35d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailureDependence.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailureDependence.java
@@ -66,7 +66,7 @@ public class TestJobFailureDependence extends TaskTestBase {
     // all jobs after failed job should fail too.
     for (int i = 2; i < _numDbs; i++) {
       String namedSpaceJob = String.format("%s_%s", queueName, currentJobNames.get(i));
-      TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob, TaskState.FAILED);
+      _driver.pollForJobState(queueName, namedSpaceJob, TaskState.FAILED);
     }
   }
 
@@ -92,8 +92,8 @@ public class TestJobFailureDependence extends TaskTestBase {
     _setupTool.dropResourceFromCluster(CLUSTER_NAME, _testDbs.get(2));
 
     String namedSpaceJob1 = String.format("%s_%s", queueName, currentJobNames.get(2));
-    TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob1, TaskState.FAILED);
-    TaskTestUtil.pollForWorkflowState(_driver, queueName, TaskState.FAILED);
+    _driver.pollForJobState(queueName, namedSpaceJob1, TaskState.FAILED);
+    _driver.pollForWorkflowState(queueName, TaskState.FAILED);
   }
 
   @Test
@@ -117,12 +117,12 @@ public class TestJobFailureDependence extends TaskTestBase {
     _driver.start(queueBuilder.build());
     _setupTool.dropResourceFromCluster(CLUSTER_NAME, _testDbs.get(2));
     String namedSpaceJob2 = String.format("%s_%s", queueName, currentJobNames.get(2));
-    TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob2, TaskState.FAILED);
+    _driver.pollForJobState(queueName, namedSpaceJob2, TaskState.FAILED);
 
     // all jobs after failed job should complete.
     for (int i = 3; i < _numDbs; i++) {
       String namedSpaceJob = String.format("%s_%s", queueName, currentJobNames.get(i));
-      TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob, TaskState.COMPLETED);
+      _driver.pollForJobState(queueName, namedSpaceJob, TaskState.COMPLETED);
     }
   }
 
@@ -148,10 +148,10 @@ public class TestJobFailureDependence extends TaskTestBase {
     _setupTool.dropResourceFromCluster(CLUSTER_NAME, _testDbs.get(1));
 
     String namedSpaceJob1 = String.format("%s_%s", queueName, currentJobNames.get(1));
-    TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob1, TaskState.FAILED);
+    _driver.pollForJobState(queueName, namedSpaceJob1, TaskState.FAILED);
     String lastJob =
         String.format("%s_%s", queueName, currentJobNames.get(currentJobNames.size() - 1));
-    TaskTestUtil.pollForJobState(_driver, queueName, lastJob, TaskState.COMPLETED);
+    _driver.pollForJobState(queueName, lastJob, TaskState.COMPLETED);
 
     _driver.flushQueue(queueName);
 
@@ -174,8 +174,8 @@ public class TestJobFailureDependence extends TaskTestBase {
     _driver.resume(queueName);
 
     namedSpaceJob1 = String.format("%s_%s", queueName, currentJobNames.get(1));
-    TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob1, TaskState.FAILED);
-    TaskTestUtil.pollForWorkflowState(_driver, queueName, TaskState.FAILED);
+    _driver.pollForJobState(queueName, namedSpaceJob1, TaskState.FAILED);
+    _driver.pollForWorkflowState(queueName, TaskState.FAILED);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/183a26ae/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
index 65ec458..4d4d96a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
@@ -71,8 +71,7 @@ public class TestRecurringJobQueue extends TaskTestBase {
     // ensure job 1 is started before stop it
     String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
     String namedSpaceJob1 = String.format("%s_%s", scheduledQueue, currentJobNames.get(0));
-    TaskTestUtil
-        .pollForJobState(_driver, scheduledQueue, namedSpaceJob1, TaskState.IN_PROGRESS);
+    _driver.pollForJobState(scheduledQueue, namedSpaceJob1, TaskState.IN_PROGRESS);
 
     _driver.stop(queueName);
     _driver.delete(queueName);
@@ -100,13 +99,11 @@ public class TestRecurringJobQueue extends TaskTestBase {
     // ensure jobs are started and completed
     scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
     namedSpaceJob1 = String.format("%s_%s", scheduledQueue, currentJobNames.get(0));
-    TaskTestUtil
-        .pollForJobState(_driver, scheduledQueue, namedSpaceJob1, TaskState.COMPLETED);
+    _driver.pollForJobState(scheduledQueue, namedSpaceJob1, TaskState.COMPLETED);
 
     scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
     String namedSpaceJob2 = String.format("%s_%s", scheduledQueue, currentJobNames.get(1));
-    TaskTestUtil
-        .pollForJobState(_driver, scheduledQueue, namedSpaceJob2, TaskState.COMPLETED);
+    _driver.pollForJobState(scheduledQueue, namedSpaceJob2, TaskState.COMPLETED);
   }
 
   @Test
@@ -140,15 +137,14 @@ public class TestRecurringJobQueue extends TaskTestBase {
     // ensure job 1 is started before deleting it
     String deletedJob1 = currentJobNames.get(0);
     String namedSpaceDeletedJob1 = String.format("%s_%s", scheduledQueue, deletedJob1);
-    TaskTestUtil
-        .pollForJobState(_driver, scheduledQueue, namedSpaceDeletedJob1, TaskState.IN_PROGRESS,
-            TaskState.COMPLETED);
+    _driver.pollForJobState(scheduledQueue, namedSpaceDeletedJob1, TaskState.IN_PROGRESS,
+        TaskState.COMPLETED);
 
     // stop the queue
     LOG.info("Pausing job-queue: " + scheduledQueue);
     _driver.stop(queueName);
-    TaskTestUtil.pollForJobState(_driver, scheduledQueue, namedSpaceDeletedJob1, TaskState.STOPPED);
-    TaskTestUtil.pollForWorkflowState(_driver, scheduledQueue, TaskState.STOPPED);
+    _driver.pollForJobState(scheduledQueue, namedSpaceDeletedJob1, TaskState.STOPPED);
+    _driver.pollForWorkflowState(scheduledQueue, TaskState.STOPPED);
 
     // delete the in-progress job (job 1) and verify it being deleted
     _driver.deleteJob(queueName, deletedJob1);
@@ -159,16 +155,16 @@ public class TestRecurringJobQueue extends TaskTestBase {
     _driver.resume(queueName);
 
     // ensure job 2 is started
-    TaskTestUtil.pollForJobState(_driver, scheduledQueue,
+    _driver.pollForJobState(scheduledQueue,
         String.format("%s_%s", scheduledQueue, currentJobNames.get(1)), TaskState.IN_PROGRESS,
         TaskState.COMPLETED);
 
     // stop the queue
     LOG.info("Pausing job-queue: " + queueName);
     _driver.stop(queueName);
-    TaskTestUtil.pollForJobState(_driver, scheduledQueue,
+    _driver.pollForJobState(scheduledQueue,
         String.format("%s_%s", scheduledQueue, currentJobNames.get(1)), TaskState.STOPPED);
-    TaskTestUtil.pollForWorkflowState(_driver, scheduledQueue, TaskState.STOPPED);
+    _driver.pollForWorkflowState(scheduledQueue, TaskState.STOPPED);
 
     // Ensure job 3 is not started before deleting it
     String deletedJob2 = currentJobNames.get(2);
@@ -189,7 +185,7 @@ public class TestRecurringJobQueue extends TaskTestBase {
     long preJobFinish = 0;
     for (int i = 0; i < currentJobNames.size(); i++) {
       String namedSpaceJobName = String.format("%s_%s", scheduledQueue, currentJobNames.get(i));
-      TaskTestUtil.pollForJobState(_driver, scheduledQueue, namedSpaceJobName, TaskState.COMPLETED);
+      _driver.pollForJobState(scheduledQueue, namedSpaceJobName, TaskState.COMPLETED);
 
       JobContext jobContext = _driver.getJobContext(namedSpaceJobName);
       long jobStart = jobContext.getStartTime();
@@ -238,7 +234,7 @@ public class TestRecurringJobQueue extends TaskTestBase {
 
     // ensure all jobs are finished
     String namedSpaceJob = String.format("%s_%s", scheduledQueue, currentLastJob);
-    TaskTestUtil.pollForJobState(_driver, scheduledQueue, namedSpaceJob, TaskState.COMPLETED);
+    _driver.pollForJobState(scheduledQueue, namedSpaceJob, TaskState.COMPLETED);
 
     // enqueue the last job
     LOG.info("Enqueuing job: " + jobNames.get(JOB_COUNTS - 1));
@@ -286,7 +282,7 @@ public class TestRecurringJobQueue extends TaskTestBase {
 
     // ensure current schedule is started
     String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
-    TaskTestUtil.pollForWorkflowState(_driver, scheduledQueue, TaskState.COMPLETED);
+    _driver.pollForWorkflowState(scheduledQueue, TaskState.COMPLETED);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/helix/blob/183a26ae/helix-core/src/test/java/org/apache/helix/integration/task/TestRetrieveWorkflows.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRetrieveWorkflows.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRetrieveWorkflows.java
index ddd6c85..45fb278 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRetrieveWorkflows.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRetrieveWorkflows.java
@@ -43,7 +43,7 @@ public class TestRetrieveWorkflows extends TaskTestBase {
     }
 
     for (Workflow workflow : workflowList) {
-      TaskTestUtil.pollForWorkflowState(_driver, workflow.getName(), TaskState.COMPLETED);
+      _driver.pollForWorkflowState(workflow.getName(), TaskState.COMPLETED);
     }
 
     Map<String, WorkflowConfig> workflowConfigMap = _driver.getWorkflows();

http://git-wip-us.apache.org/repos/asf/helix/blob/183a26ae/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
index 5a07942..879bfea 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
@@ -64,8 +64,8 @@ public class TestRunJobsWithMissingTarget extends TaskTestBase {
     _driver.start(queueBuilder.build());
 
     String namedSpaceJob = String.format("%s_%s", queueName, currentJobNames.get(1));
-    TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob, TaskState.FAILED);
-    TaskTestUtil.pollForWorkflowState(_driver, queueName, TaskState.FAILED);
+    _driver.pollForJobState(queueName, namedSpaceJob, TaskState.FAILED);
+    _driver.pollForWorkflowState(queueName, TaskState.FAILED);
 
     _driver.delete(queueName);
   }
@@ -91,10 +91,10 @@ public class TestRunJobsWithMissingTarget extends TaskTestBase {
     _driver.start(queueBuilder.build());
 
     String namedSpaceJob1 = String.format("%s_%s", queueName, currentJobNames.get(1));
-    TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob1, TaskState.FAILED);
+    _driver.pollForJobState(queueName, namedSpaceJob1, TaskState.FAILED);
     String lastJob =
         String.format("%s_%s", queueName, currentJobNames.get(currentJobNames.size() - 1));
-    TaskTestUtil.pollForJobState(_driver, queueName, lastJob, TaskState.COMPLETED);
+    _driver.pollForJobState(queueName, lastJob, TaskState.COMPLETED);
 
     _driver.delete(queueName);
   }
@@ -121,8 +121,8 @@ public class TestRunJobsWithMissingTarget extends TaskTestBase {
     _setupTool.dropResourceFromCluster(CLUSTER_NAME, _testDbs.get(0));
 
     String namedSpaceJob1 = String.format("%s_%s", queueName, currentJobNames.get(0));
-    TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJob1, TaskState.FAILED);
-    TaskTestUtil.pollForWorkflowState(_driver, queueName, TaskState.FAILED);
+    _driver.pollForJobState(queueName, namedSpaceJob1, TaskState.FAILED);
+    _driver.pollForWorkflowState(queueName, TaskState.FAILED);
 
     _driver.delete(queueName);
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/183a26ae/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskConditionalRetry.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskConditionalRetry.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskConditionalRetry.java
index 5fa370d..b742eae 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskConditionalRetry.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskConditionalRetry.java
@@ -79,7 +79,7 @@ public class TestTaskConditionalRetry extends TaskTestBase {
     _driver.start(flow);
 
     // Wait until the job completes.
-    TaskTestUtil.pollForWorkflowState(_driver, jobResource, TaskState.COMPLETED);
+    _driver.pollForWorkflowState(jobResource, TaskState.COMPLETED);
 
     JobContext ctx = _driver.getJobContext(TaskUtil.getNamespacedJobName(jobResource));
     for (int i = 0; i < num_tasks; i++) {

http://git-wip-us.apache.org/repos/asf/helix/blob/183a26ae/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskErrorReporting.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskErrorReporting.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskErrorReporting.java
index 906dcff..80176da 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskErrorReporting.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskErrorReporting.java
@@ -90,7 +90,7 @@ public class TestTaskErrorReporting extends TaskTestBase {
     _driver.start(flow);
 
     // Wait until the job completes.
-    TaskTestUtil.pollForWorkflowState(_driver, jobResource, TaskState.COMPLETED);
+    _driver.pollForWorkflowState(jobResource, TaskState.COMPLETED);
 
     JobContext ctx = _driver.getJobContext(TaskUtil.getNamespacedJobName(jobResource));
     for (int i = 0; i < num_tasks; i++) {

http://git-wip-us.apache.org/repos/asf/helix/blob/183a26ae/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
index f5a3441..4725c20 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
@@ -72,7 +72,7 @@ public class TestTaskRebalancer extends TaskTestBase {
             .setExpiry(expiry).build();
 
     _driver.start(flow);
-    TaskTestUtil.pollForWorkflowState(_driver, jobName, TaskState.IN_PROGRESS);
+    _driver.pollForWorkflowState(jobName, TaskState.IN_PROGRESS);
 
     // Running workflow should have config and context viewable through accessor
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
@@ -86,7 +86,7 @@ public class TestTaskRebalancer extends TaskTestBase {
     Assert.assertNotSame(accessor.getProperty(workflowCfgKey), null);
 
     // Wait for job to finish and expire
-    TaskTestUtil.pollForWorkflowState(_driver, jobName, TaskState.COMPLETED);
+    _driver.pollForWorkflowState(jobName, TaskState.COMPLETED);
     Thread.sleep(expiry + 100);
 
     // Ensure workflow config and context were cleaned up by now
@@ -114,7 +114,7 @@ public class TestTaskRebalancer extends TaskTestBase {
     _driver.start(flow);
 
     // Wait for job completion
-    TaskTestUtil.pollForWorkflowState(_driver, jobResource, TaskState.COMPLETED);
+    _driver.pollForWorkflowState(jobResource, TaskState.COMPLETED);
 
     // Ensure all partitions are completed individually
     JobContext ctx = _driver.getJobContext(TaskUtil.getNamespacedJobName(jobResource));
@@ -141,7 +141,7 @@ public class TestTaskRebalancer extends TaskTestBase {
     _driver.start(flow);
 
     // wait for job completeness/timeout
-    TaskTestUtil.pollForWorkflowState(_driver, jobResource, TaskState.COMPLETED);
+    _driver.pollForWorkflowState(jobResource, TaskState.COMPLETED);
 
     // see if resulting context completed successfully for our partition set
     String namespacedName = TaskUtil.getNamespacedJobName(jobResource);
@@ -166,11 +166,11 @@ public class TestTaskRebalancer extends TaskTestBase {
     new TaskDriver(_manager).start(flow);
 
     // Wait until the workflow completes
-    TaskTestUtil.pollForWorkflowState(_driver, workflowName, TaskState.COMPLETED);
+    _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
 
     // Assert completion for all tasks within two minutes
     for (String task : flow.getJobConfigs().keySet()) {
-      TaskTestUtil.pollForJobState(_driver, workflowName, task, TaskState.COMPLETED);
+      _driver.pollForJobState(workflowName, task, TaskState.COMPLETED);
     }
   }
 
@@ -186,7 +186,7 @@ public class TestTaskRebalancer extends TaskTestBase {
     _driver.start(flow);
 
     // Wait until the job reports failure.
-    TaskTestUtil.pollForWorkflowState(_driver, jobResource, TaskState.FAILED);
+    _driver.pollForWorkflowState(jobResource, TaskState.FAILED);
 
     // Check that all partitions timed out up to maxAttempts
     JobContext ctx = _driver.getJobContext(TaskUtil.getNamespacedJobName(jobResource));
@@ -224,8 +224,8 @@ public class TestTaskRebalancer extends TaskTestBase {
     // Ensure successful completion
     String namespacedJob1 = queueName + "_masterJob";
     String namespacedJob2 = queueName + "_slaveJob";
-    TaskTestUtil.pollForJobState(_driver, queueName, namespacedJob1, TaskState.COMPLETED);
-    TaskTestUtil.pollForJobState(_driver, queueName, namespacedJob2, TaskState.COMPLETED);
+    _driver.pollForJobState(queueName, namespacedJob1, TaskState.COMPLETED);
+    _driver.pollForJobState(queueName, namespacedJob2, TaskState.COMPLETED);
     JobContext masterJobContext = _driver.getJobContext(namespacedJob1);
     JobContext slaveJobContext = _driver.getJobContext(namespacedJob2);
 

http://git-wip-us.apache.org/repos/asf/helix/blob/183a26ae/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java
index 9d98ba9..0f87e1d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java
@@ -60,7 +60,7 @@ public class TestTaskRebalancerFailover extends TaskTestBase {
 
     // check all tasks completed on MASTER
     String namespacedJob1 = String.format("%s_%s", queueName, job1Name);
-    TaskTestUtil.pollForJobState(_driver, queueName, namespacedJob1, TaskState.COMPLETED);
+    _driver.pollForJobState(queueName, namespacedJob1, TaskState.COMPLETED);
 
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
@@ -87,9 +87,9 @@ public class TestTaskRebalancerFailover extends TaskTestBase {
     LOG.info("Enqueuing job: " + job2Name);
     _driver.enqueueJob(queueName, job2Name, job);
 
-    TaskTestUtil.pollForJobState(_driver, queueName, namespacedJob2, TaskState.IN_PROGRESS);
+    _driver.pollForJobState(queueName, namespacedJob2, TaskState.IN_PROGRESS);
     _participants[0].syncStop();
-    TaskTestUtil.pollForJobState(_driver, queueName, namespacedJob2, TaskState.COMPLETED);
+    _driver.pollForJobState(queueName, namespacedJob2, TaskState.COMPLETED);
 
     // tasks previously assigned to localhost_12918 should be re-scheduled on new master
     ctx = _driver.getJobContext(namespacedJob2);

http://git-wip-us.apache.org/repos/asf/helix/blob/183a26ae/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
index d677920..7f3d9d3 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
@@ -53,7 +53,7 @@ public class TestTaskRebalancerRetryLimit extends TaskTestBase {
     _driver.start(flow);
 
     // Wait until the job completes.
-    TaskTestUtil.pollForWorkflowState(_driver, jobResource, TaskState.COMPLETED);
+    _driver.pollForWorkflowState(jobResource, TaskState.COMPLETED);
 
     JobContext ctx = _driver.getJobContext(TaskUtil.getNamespacedJobName(jobResource));
     for (int i = 0; i < _numParitions; i++) {

http://git-wip-us.apache.org/repos/asf/helix/blob/183a26ae/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
index e92a129..8e1f47d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
@@ -66,15 +66,15 @@ public class TestTaskRebalancerStopResume extends TaskTestBase {
 
     LOG.info("Starting flow " + flow.getName());
     _driver.start(flow);
-    TaskTestUtil.pollForWorkflowState(_driver, JOB_RESOURCE, TaskState.IN_PROGRESS);
+    _driver.pollForWorkflowState(JOB_RESOURCE, TaskState.IN_PROGRESS);
 
     LOG.info("Pausing job");
     _driver.stop(JOB_RESOURCE);
-    TaskTestUtil.pollForWorkflowState(_driver, JOB_RESOURCE, TaskState.STOPPED);
+    _driver.pollForWorkflowState(JOB_RESOURCE, TaskState.STOPPED);
 
     LOG.info("Resuming job");
     _driver.resume(JOB_RESOURCE);
-    TaskTestUtil.pollForWorkflowState(_driver, JOB_RESOURCE, TaskState.COMPLETED);
+    _driver.pollForWorkflowState(JOB_RESOURCE, TaskState.COMPLETED);
   }
 
   @Test
@@ -84,15 +84,15 @@ public class TestTaskRebalancerStopResume extends TaskTestBase {
 
     LOG.info("Starting flow " + workflow);
     _driver.start(flow);
-    TaskTestUtil.pollForWorkflowState(_driver, workflow, TaskState.IN_PROGRESS);
+    _driver.pollForWorkflowState(workflow, TaskState.IN_PROGRESS);
 
     LOG.info("Pausing workflow");
     _driver.stop(workflow);
-    TaskTestUtil.pollForWorkflowState(_driver, workflow, TaskState.STOPPED);
+    _driver.pollForWorkflowState(workflow, TaskState.STOPPED);
 
     LOG.info("Resuming workflow");
     _driver.resume(workflow);
-    TaskTestUtil.pollForWorkflowState(_driver, workflow, TaskState.COMPLETED);
+    _driver.pollForWorkflowState(workflow, TaskState.COMPLETED);
   }
 
   @Test
@@ -122,13 +122,13 @@ public class TestTaskRebalancerStopResume extends TaskTestBase {
     _driver.enqueueJob(queueName, job2Name, job2);
 
     String namespacedJob1 = String.format("%s_%s", queueName,  job1Name);
-    TaskTestUtil.pollForJobState(_driver, queueName, namespacedJob1, TaskState.IN_PROGRESS);
+    _driver.pollForJobState(queueName, namespacedJob1, TaskState.IN_PROGRESS);
 
     // stop job1
     LOG.info("Pausing job-queue: " + queueName);
     _driver.stop(queueName);
-    TaskTestUtil.pollForJobState(_driver, queueName, namespacedJob1, TaskState.STOPPED);
-    TaskTestUtil.pollForWorkflowState(_driver, queueName, TaskState.STOPPED);
+    _driver.pollForJobState(queueName, namespacedJob1, TaskState.STOPPED);
+    _driver.pollForWorkflowState(queueName, TaskState.STOPPED);
 
     // Ensure job2 is not started
     TimeUnit.MILLISECONDS.sleep(200);
@@ -139,8 +139,8 @@ public class TestTaskRebalancerStopResume extends TaskTestBase {
     _driver.resume(queueName);
 
     // Ensure successful completion
-    TaskTestUtil.pollForJobState(_driver, queueName, namespacedJob1, TaskState.COMPLETED);
-    TaskTestUtil.pollForJobState(_driver, queueName, namespacedJob2, TaskState.COMPLETED);
+    _driver.pollForJobState(queueName, namespacedJob1, TaskState.COMPLETED);
+    _driver.pollForJobState(queueName, namespacedJob2, TaskState.COMPLETED);
     JobContext masterJobContext = _driver.getJobContext(namespacedJob1);
     JobContext slaveJobContext = _driver.getJobContext(namespacedJob2);
 
@@ -188,13 +188,13 @@ public class TestTaskRebalancerStopResume extends TaskTestBase {
     // ensure job 1 is started before deleting it
     String deletedJob1 = currentJobNames.get(0);
     String namedSpaceDeletedJob1 = String.format("%s_%s", queueName, deletedJob1);
-    TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceDeletedJob1, TaskState.IN_PROGRESS);
+    _driver.pollForJobState(queueName, namedSpaceDeletedJob1, TaskState.IN_PROGRESS);
 
     // stop the queue
     LOG.info("Pausing job-queue: " + queueName);
     _driver.stop(queueName);
-    TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceDeletedJob1, TaskState.STOPPED);
-    TaskTestUtil.pollForWorkflowState(_driver, queueName, TaskState.STOPPED);
+    _driver.pollForJobState(queueName, namedSpaceDeletedJob1, TaskState.STOPPED);
+    _driver.pollForWorkflowState(queueName, TaskState.STOPPED);
 
     // delete the in-progress job (job 1) and verify it being deleted
     _driver.deleteJob(queueName, deletedJob1);
@@ -204,15 +204,15 @@ public class TestTaskRebalancerStopResume extends TaskTestBase {
     _driver.resume(queueName);
 
     // ensure job 2 is started
-    TaskTestUtil
-        .pollForJobState(_driver, queueName, String.format("%s_%s", queueName, currentJobNames.get(1)), TaskState.IN_PROGRESS);
+    _driver.pollForJobState(queueName, String.format("%s_%s", queueName, currentJobNames.get(1)),
+        TaskState.IN_PROGRESS);
 
     // stop the queue
     LOG.info("Pausing job-queue: " + queueName);
     _driver.stop(queueName);
-    TaskTestUtil
-        .pollForJobState(_driver, queueName, String.format("%s_%s", queueName, currentJobNames.get(1)), TaskState.STOPPED);
-    TaskTestUtil.pollForWorkflowState(_driver, queueName, TaskState.STOPPED);
+    _driver.pollForJobState(queueName, String.format("%s_%s", queueName, currentJobNames.get(1)),
+        TaskState.STOPPED);
+    _driver.pollForWorkflowState(queueName, TaskState.STOPPED);
 
     // Ensure job 3 is not started before deleting it
     String deletedJob2 = currentJobNames.get(2);
@@ -241,7 +241,7 @@ public class TestTaskRebalancerStopResume extends TaskTestBase {
     long preJobFinish = 0;
     for (int i = 0; i < currentJobNames.size(); i++) {
       String namedSpaceJobName = String.format("%s_%s", queueName, currentJobNames.get(i));
-      TaskTestUtil.pollForJobState(_driver, queueName, namedSpaceJobName, TaskState.COMPLETED);
+      _driver.pollForJobState(queueName, namedSpaceJobName, TaskState.COMPLETED);
 
       JobContext jobContext = _driver.getJobContext(namedSpaceJobName);
       long jobStart = jobContext.getStartTime();
@@ -295,14 +295,13 @@ public class TestTaskRebalancerStopResume extends TaskTestBase {
     // ensure job 1 is started before deleting it
     String deletedJob1 = currentJobNames.get(0);
     String namedSpaceDeletedJob1 = String.format("%s_%s", scheduledQueue, deletedJob1);
-    TaskTestUtil
-        .pollForJobState(_driver, scheduledQueue, namedSpaceDeletedJob1, TaskState.IN_PROGRESS);
+    _driver.pollForJobState(scheduledQueue, namedSpaceDeletedJob1, TaskState.IN_PROGRESS);
 
     // stop the queue
     LOG.info("Pausing job-queue: " + scheduledQueue);
     _driver.stop(queueName);
-    TaskTestUtil.pollForJobState(_driver, scheduledQueue, namedSpaceDeletedJob1, TaskState.STOPPED);
-    TaskTestUtil.pollForWorkflowState(_driver, scheduledQueue, TaskState.STOPPED);
+    _driver.pollForJobState(scheduledQueue, namedSpaceDeletedJob1, TaskState.STOPPED);
+    _driver.pollForWorkflowState(scheduledQueue, TaskState.STOPPED);
 
     // delete the in-progress job (job 1) and verify it being deleted
     _driver.deleteJob(queueName, deletedJob1);
@@ -313,15 +312,15 @@ public class TestTaskRebalancerStopResume extends TaskTestBase {
     _driver.resume(queueName);
 
     // ensure job 2 is started
-    TaskTestUtil.pollForJobState(_driver, scheduledQueue,
+    _driver.pollForJobState(scheduledQueue,
         String.format("%s_%s", scheduledQueue, currentJobNames.get(1)), TaskState.IN_PROGRESS);
 
     // stop the queue
     LOG.info("Pausing job-queue: " + queueName);
     _driver.stop(queueName);
-    TaskTestUtil.pollForJobState(_driver, scheduledQueue,
+    _driver.pollForJobState(scheduledQueue,
         String.format("%s_%s", scheduledQueue, currentJobNames.get(1)), TaskState.STOPPED);
-    TaskTestUtil.pollForWorkflowState(_driver, scheduledQueue, TaskState.STOPPED);
+    _driver.pollForWorkflowState(scheduledQueue, TaskState.STOPPED);
 
     // Ensure job 3 is not started before deleting it
     String deletedJob2 = currentJobNames.get(2);
@@ -342,7 +341,7 @@ public class TestTaskRebalancerStopResume extends TaskTestBase {
     long preJobFinish = 0;
     for (int i = 0; i < currentJobNames.size(); i++) {
       String namedSpaceJobName = String.format("%s_%s", scheduledQueue, currentJobNames.get(i));
-      TaskTestUtil.pollForJobState(_driver, scheduledQueue, namedSpaceJobName, TaskState.COMPLETED);
+      _driver.pollForJobState(scheduledQueue, namedSpaceJobName, TaskState.COMPLETED);
 
       JobContext jobContext = _driver.getJobContext(namedSpaceJobName);
       long jobStart = jobContext.getStartTime();
@@ -390,7 +389,7 @@ public class TestTaskRebalancerStopResume extends TaskTestBase {
 
     // ensure all jobs are finished
     String namedSpaceJob = String.format("%s_%s", scheduledQueue, currentLastJob);
-    TaskTestUtil.pollForJobState(_driver, scheduledQueue, namedSpaceJob, TaskState.COMPLETED);
+    _driver.pollForJobState(scheduledQueue, namedSpaceJob, TaskState.COMPLETED);
 
     // enqueue the last job
     LOG.info("Enqueuing job: " + jobNames.get(JOB_COUNTS - 1));
@@ -433,10 +432,10 @@ public class TestTaskRebalancerStopResume extends TaskTestBase {
     _driver.enqueueJob(queueName, job2Name, job2);
 
     String namespacedJob1 = String.format("%s_%s", queueName,  job1Name);
-    TaskTestUtil.pollForJobState(_driver, queueName, namespacedJob1, TaskState.COMPLETED);
+    _driver.pollForJobState(queueName, namespacedJob1, TaskState.COMPLETED);
 
     String namespacedJob2 = String.format("%s_%s", queueName,  job2Name);
-    TaskTestUtil.pollForJobState(_driver, queueName, namespacedJob2, TaskState.COMPLETED);
+    _driver.pollForJobState(queueName, namespacedJob2, TaskState.COMPLETED);
 
     // Stop queue
     _driver.stop(queueName);

http://git-wip-us.apache.org/repos/asf/helix/blob/183a26ae/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRetryDelay.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRetryDelay.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRetryDelay.java
index 7d8ebff..47624e4 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRetryDelay.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRetryDelay.java
@@ -49,7 +49,7 @@ public class TestTaskRetryDelay extends TaskTestBase {
     _driver.start(flow);
 
     // Wait until the job completes.
-    TaskTestUtil.pollForWorkflowState(_driver, jobResource, TaskState.COMPLETED);
+    _driver.pollForWorkflowState(jobResource, TaskState.COMPLETED);
 
     long startTime = _driver.getWorkflowContext(jobResource).getStartTime();
     long finishedTime = _driver.getWorkflowContext(jobResource).getFinishTime();
@@ -70,7 +70,7 @@ public class TestTaskRetryDelay extends TaskTestBase {
     _driver.start(flow);
 
     // Wait until the job completes.
-    TaskTestUtil.pollForWorkflowState(_driver, jobResource, TaskState.COMPLETED);
+    _driver.pollForWorkflowState(jobResource, TaskState.COMPLETED);
 
     long startTime = _driver.getWorkflowContext(jobResource).getStartTime();
     long finishedTime = _driver.getWorkflowContext(jobResource).getFinishTime();

http://git-wip-us.apache.org/repos/asf/helix/blob/183a26ae/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskWithInstanceDisabled.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskWithInstanceDisabled.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskWithInstanceDisabled.java
index 84e5168..919dc99 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskWithInstanceDisabled.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskWithInstanceDisabled.java
@@ -51,7 +51,7 @@ public class TestTaskWithInstanceDisabled extends TaskTestBase {
         WorkflowGenerator.generateSingleJobWorkflowBuilder(jobResource, jobBuilder).build();
     _driver.start(flow);
 
-    TaskTestUtil.pollForWorkflowState(_driver, jobResource, TaskState.COMPLETED);
+    _driver.pollForWorkflowState(jobResource, TaskState.COMPLETED);
     JobContext ctx = _driver.getJobContext(TaskUtil.getNamespacedJobName(jobResource));
     Assert.assertEquals(ctx.getAssignedParticipant(0), PARTICIPANT_PREFIX + "_" + (_startPort + 1));
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/183a26ae/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java
index b43c49e..73e6355 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestUpdateWorkflow.java
@@ -63,12 +63,12 @@ public class TestUpdateWorkflow extends TaskTestBase {
 
     // ensure current schedule is started
     String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
-    TaskTestUtil.pollForWorkflowState(_driver, scheduledQueue, TaskState.IN_PROGRESS);
+    _driver.pollForWorkflowState(scheduledQueue, TaskState.IN_PROGRESS);
 
     _driver.updateWorkflow(queueName, configBuilder.build());
 
     // ensure current schedule is completed
-    TaskTestUtil.pollForWorkflowState(_driver, scheduledQueue, TaskState.COMPLETED);
+    _driver.pollForWorkflowState(scheduledQueue, TaskState.COMPLETED);
 
     Thread.sleep(1000);
 
@@ -98,7 +98,7 @@ public class TestUpdateWorkflow extends TaskTestBase {
 
     // ensure current schedule is started
     String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
-    TaskTestUtil.pollForWorkflowState(_driver, scheduledQueue, TaskState.IN_PROGRESS);
+    _driver.pollForWorkflowState(scheduledQueue, TaskState.IN_PROGRESS);
 
     _driver.stop(queueName);
 
@@ -122,7 +122,7 @@ public class TestUpdateWorkflow extends TaskTestBase {
     _driver.resume(queueName);
 
     // ensure current schedule is completed
-    TaskTestUtil.pollForWorkflowState(_driver, scheduledQueue, TaskState.COMPLETED);
+    _driver.pollForWorkflowState(scheduledQueue, TaskState.COMPLETED);
 
     Thread.sleep(1000);
 

http://git-wip-us.apache.org/repos/asf/helix/blob/183a26ae/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowAndJobPoll.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowAndJobPoll.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowAndJobPoll.java
new file mode 100644
index 0000000..91b65a9
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowAndJobPoll.java
@@ -0,0 +1,62 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.Workflow;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestWorkflowAndJobPoll extends TaskTestBase {
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _numDbs = 1;
+    _numNodes = 1;
+    _numParitions = 1;
+    _numReplicas = 1;
+    super.beforeClass();
+  }
+
+  @Test public void testWorkflowPoll() throws InterruptedException {
+    String jobResource = TestHelper.getTestMethodName();
+    Workflow.Builder builder =
+        WorkflowGenerator.generateDefaultSingleJobWorkflowBuilder(jobResource);
+    _driver.start(builder.build());
+
+    TaskState polledState =
+        _driver.pollForWorkflowState(jobResource, 4000L, TaskState.COMPLETED, TaskState.FAILED);
+    Assert.assertEquals(TaskState.COMPLETED, polledState);
+  }
+
+  @Test public void testJobPoll() throws InterruptedException {
+    String jobResource = TestHelper.getTestMethodName();
+    Workflow.Builder builder =
+        WorkflowGenerator.generateDefaultSingleJobWorkflowBuilder(jobResource);
+    _driver.start(builder.build());
+
+    TaskState polledState = _driver
+        .pollForJobState(jobResource, String.format("%s_%s", jobResource, jobResource), 4000L,
+            TaskState.COMPLETED, TaskState.FAILED);
+    Assert.assertEquals(TaskState.COMPLETED, polledState);
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/183a26ae/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowJobDependency.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowJobDependency.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowJobDependency.java
index 1ced40b..6cae0e5 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowJobDependency.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowJobDependency.java
@@ -65,7 +65,7 @@ public class TestWorkflowJobDependency extends TaskTestBase {
     _driver.start(workflow);
 
     // Wait until the workflow completes
-    TaskTestUtil.pollForWorkflowState(_driver, workflowName, TaskState.COMPLETED);
+    _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
     WorkflowContext workflowContext = _driver.getWorkflowContext(workflowName);
     long startTime = workflowContext.getStartTime();
     long finishTime = workflowContext.getFinishTime();


Mime
View raw message