helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject helix git commit: [HELIX-636] Add Java API and REST API for clean up JobQueue
Date Sun, 30 Oct 2016 20:40:40 GMT
Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x bed7f5c4f -> 33dae525a


[HELIX-636] Add Java API and REST API for clean up JobQueue

To clean up the jobs in the final states, provide a Java API and REST API in helix.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/33dae525
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/33dae525
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/33dae525

Branch: refs/heads/helix-0.6.x
Commit: 33dae525a8b31cae4d6a80a89f1d08190db0beda
Parents: bed7f5c
Author: Junkai Xue <jxue@linkedin.com>
Authored: Tue Oct 25 13:27:55 2016 -0700
Committer: Junkai Xue <jxue@linkedin.com>
Committed: Fri Oct 28 20:33:21 2016 -0700

----------------------------------------------------------------------
 .../webapp/resources/JobQueueResource.java      |  6 +-
 .../java/org/apache/helix/task/TaskDriver.java  | 45 +++++++++--
 .../org/apache/helix/task/TaskRebalancer.java   |  2 +-
 .../java/org/apache/helix/task/TaskUtil.java    | 54 +++++++++++++
 .../apache/helix/task/WorkflowRebalancer.java   | 16 ++--
 .../apache/helix/integration/task/MockTask.java | 23 ++++--
 .../task/TestJobFailureDependence.java          |  2 -
 .../integration/task/TestJobQueueCleanUp.java   | 77 +++++++++++++++++++
 .../task/TestRunJobsWithMissingTarget.java      | 79 +++++++++++---------
 9 files changed, 244 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/33dae525/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java
b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java
index 954ae73..32b782d 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java
@@ -91,7 +91,7 @@ public class JobQueueResource extends ServerResource {
     // Get job queue config
     // TODO: fix this to use workflowConfig.
     ResourceConfig jobQueueConfig = accessor.getProperty(keyBuilder.resourceConfig(jobQueueName));
-    
+
     // Get job queue context
     WorkflowContext ctx = taskDriver.getWorkflowContext(jobQueueName);
 
@@ -179,6 +179,10 @@ public class JobQueueResource extends ServerResource {
         driver.delete(jobQueueName);
         break;
       }
+      case clean: {
+        driver.cleanupJobQueue(jobQueueName);
+        break;
+      }
       default:
         throw new HelixException("Unsupported job queue command: " + cmd);
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/33dae525/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index bc582e1..da861f5 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -102,7 +102,8 @@ public class TaskDriver {
     delete,
     resume,
     list,
-    flush
+    flush,
+    clean
   }
 
   public TaskDriver(HelixManager manager) {
@@ -177,6 +178,9 @@ public class TaskDriver {
       case flush:
         driver.flushQueue(resource);
         break;
+      case clean:
+        driver.cleanupJobQueue(resource);
+        break;
       default:
         throw new IllegalArgumentException("Unknown command " + args[0]);
       }
@@ -405,10 +409,14 @@ public class TaskDriver {
     String workflowState =
         (wCtx != null) ? wCtx.getWorkflowState().name() : TaskState.NOT_STARTED.name();
 
-    if (workflowState.equals(TaskState.IN_PROGRESS)) {
+    if (workflowState.equals(TaskState.IN_PROGRESS.name())) {
       throw new IllegalStateException("Queue " + queueName + " is still in progress!");
     }
 
+    removeJob(queueName, jobName);
+  }
+
+  private void removeJob(String queueName, String jobName) {
     // Remove the job from the queue in the DAG
     removeJobFromDag(queueName, jobName);
 
@@ -420,10 +428,7 @@ public class TaskDriver {
     removeJobStateFromQueue(queueName, jobName);
 
     // Delete the job from property store
-    String jobPropertyPath =
-        Joiner.on("/")
-            .join(TaskConstants.REBALANCER_CONTEXT_ROOT, namespacedJobName);
-    _propertyStore.remove(jobPropertyPath, AccessOption.PERSISTENT);
+    TaskUtil.removeJobContext(_propertyStore, jobName);
   }
 
   /** Remove the job name from the DAG from the queue configuration */
@@ -590,6 +595,34 @@ public class TaskDriver {
     TaskUtil.invokeRebalance(_accessor, queueName);
   }
 
+  /**
+   * Clean up final state jobs (ABORTED, FAILED, COMPLETED),
+   * which will consume the capacity, in job queue
+   *
+   * @param queueName The name of job queue
+   */
+  public void cleanupJobQueue(String queueName) {
+    WorkflowConfig workflowCfg =
+        TaskUtil.getWorkflowCfg(_accessor, queueName);
+
+    if (workflowCfg == null) {
+      throw new IllegalArgumentException("Queue " + queueName + " does not yet exist!");
+    }
+
+    WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, queueName);
+    if (wCtx != null && wCtx.getWorkflowState() == null) {
+      throw new IllegalStateException("Queue " + queueName + " does not have a valid work
state!");
+    }
+
+    for (String jobNode : workflowCfg.getJobDag().getAllNodes()) {
+      TaskState curState = wCtx.getJobState(jobNode);
+      if (curState != null && (curState == TaskState.ABORTED || curState == TaskState.COMPLETED
+          || curState == TaskState.FAILED)) {
+        removeJob(queueName, TaskUtil.getDenamespacedJobName(queueName, jobNode));
+      }
+    }
+  }
+
   /** Posts new workflow resource to cluster */
   private void addWorkflowResource(String workflow) {
     // Add workflow resource

http://git-wip-us.apache.org/repos/asf/helix/blob/33dae525/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 01e07b1..f7f8d05 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -79,7 +79,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
     int failedJobs = 0;
     for (String job : cfg.getJobDag().getAllNodes()) {
       TaskState jobState = ctx.getJobState(job);
-      if (jobState == TaskState.FAILED) {
+      if (!cfg.isJobQueue() && jobState == TaskState.FAILED) {
         failedJobs ++;
         if (failedJobs > cfg.getFailureThreshold()) {
           ctx.setWorkflowState(TaskState.FAILED);

http://git-wip-us.apache.org/repos/asf/helix/blob/33dae525/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 1d89656..0d30f54 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -180,6 +180,33 @@ public class TaskUtil {
   }
 
   /**
+   * Remove the runtime context of a single job.
+   * This method is internal API.
+   *
+   * @param manager     A connection to Helix
+   * @param jobResource The name of the job
+   * @return            True if remove success, otherwise false
+   */
+  protected static boolean removeJobContext(HelixManager manager, String jobResource) {
+    return removeJobContext(manager.getHelixPropertyStore(), jobResource);
+  }
+
+  /**
+   * Remove the runtime context of a single job.
+   * This method is internal API.
+   *
+   * @param propertyStore Property store for the cluster
+   * @param jobResource   The name of the job
+   * @return              True if remove success, otherwise false
+   */
+  protected static boolean removeJobContext(HelixPropertyStore<ZNRecord> propertyStore,
+      String jobResource) {
+    return propertyStore.remove(
+        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource),
+        AccessOption.PERSISTENT);
+  }
+
+  /**
    * Get the runtime context of a single workflow.
    * This method is internal API, please use the corresponding one in TaskDriver.getWorkflowContext();
    *
@@ -222,6 +249,33 @@ public class TaskUtil {
   }
 
   /**
+   * Remove the runtime context of a single workflow.
+   * This method is internal API.
+   *
+   * @param manager     A connection to Helix
+   * @param workflowResource The name of the workflow
+   * @return            True if remove success, otherwise false
+   */
+  protected static boolean removeWorkflowContext(HelixManager manager, String workflowResource)
{
+    return removeWorkflowContext(manager.getHelixPropertyStore(), workflowResource);
+  }
+
+  /**
+   * Remove the runtime context of a single workflow.
+   * This method is internal API.
+   *
+   * @param propertyStore      Property store for the cluster
+   * @param workflowResource   The name of the workflow
+   * @return                   True if remove success, otherwise false
+   */
+  protected static boolean removeWorkflowContext(HelixPropertyStore<ZNRecord> propertyStore,
+      String workflowResource) {
+    return propertyStore.remove(
+        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowResource),
+        AccessOption.PERSISTENT);
+  }
+
+  /**
    * Intialize the user content store znode setup
    * @param propertyStore       zookeeper property store
    * @param workflowJobResource the name of workflow or job

http://git-wip-us.apache.org/repos/asf/helix/blob/33dae525/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index dc7c90b..84b50cf 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -428,12 +428,11 @@ public class WorkflowRebalancer extends TaskRebalancer {
         }
       }
       // Delete workflow context
-      String workflowPropStoreKey = TaskUtil.getWorkflowContextKey(workflow);
-      LOG.info("Removing workflow context: " + workflowPropStoreKey);
-      if (!_manager.getHelixPropertyStore().remove(workflowPropStoreKey, AccessOption.PERSISTENT))
{
+      LOG.info("Removing workflow context: " + workflow);
+      if (!TaskUtil.removeWorkflowContext(_manager, workflow)) {
         LOG.error(String.format(
-            "Error occurred while trying to clean up workflow %s. Failed to remove node %s
from Helix. Aborting further clean up steps.",
-            workflow, workflowPropStoreKey));
+            "Error occurred while trying to clean up workflow %s. Aborting further clean
up steps.",
+            workflow));
       }
 
       // Remove pending timer task for this workflow if exists
@@ -496,11 +495,8 @@ public class WorkflowRebalancer extends TaskRebalancer {
 
     // Delete job context
     // For recurring workflow, it's OK if the node doesn't exist.
-    String propStoreKey = TaskUtil.getWorkflowContextKey(job);
-    if (!_manager.getHelixPropertyStore().remove(propStoreKey, AccessOption.PERSISTENT))
{
-      LOG.warn(String.format(
-          "Error occurred while trying to clean up job %s. Failed to remove node %s from
Helix.",
-          job, propStoreKey));
+    if (!TaskUtil.removeJobContext(_manager, job)) {
+      LOG.warn(String.format("Error occurred while trying to clean up job %s.", job));
     }
 
     LOG.info(String.format("Successfully cleaned up job context %s.", job));

http://git-wip-us.apache.org/repos/asf/helix/blob/33dae525/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java b/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
index db0c8f4..948e8f3 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
@@ -26,19 +26,22 @@ import org.apache.helix.task.Task;
 import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskConfig;
 import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.UserContentStore;
 
-public class MockTask implements Task {
+public class MockTask extends UserContentStore implements Task {
   public static final String TASK_COMMAND = "Reindex";
   public static final String TIMEOUT_CONFIG = "Timeout";
   public static final String TASK_RESULT_STATUS = "TaskResultStatus";
   public static final String THROW_EXCEPTION = "ThrowException";
   public static final String ERROR_MESSAGE = "ErrorMessage";
   public static final String FAILURE_COUNT_BEFORE_SUCCESS = "FailureCountBeforeSuccess";
+  public static final String SUCCESS_COUNT_BEFORE_FAIL = "SuccessCountBeforeFail";
   private final long _delay;
   private volatile boolean _canceled;
   private TaskResult.Status _taskResultStatus;
   private boolean _throwException;
-  private int _expectedToSuccess;
+  private int _numOfFailBeforeSuccess;
+  private int _numOfSuccessBeforeFail;
   private String _errorMsg;
 
   public MockTask(TaskCallbackContext context) {
@@ -60,9 +63,11 @@ public class MockTask implements Task {
     _throwException = cfg.containsKey(THROW_EXCEPTION) ?
         Boolean.valueOf(cfg.containsKey(THROW_EXCEPTION)) :
         false;
-    _expectedToSuccess =
+    _numOfFailBeforeSuccess =
         cfg.containsKey(FAILURE_COUNT_BEFORE_SUCCESS) ? Integer.parseInt(
             cfg.get(FAILURE_COUNT_BEFORE_SUCCESS)) : 0;
+    _numOfSuccessBeforeFail = cfg.containsKey(SUCCESS_COUNT_BEFORE_FAIL) ? Integer
+        .parseInt(cfg.get(SUCCESS_COUNT_BEFORE_FAIL)) : Integer.MAX_VALUE;
 
     _errorMsg = cfg.containsKey(ERROR_MESSAGE) ? cfg.get(ERROR_MESSAGE) : null;
   }
@@ -82,15 +87,21 @@ public class MockTask implements Task {
     timeLeft = expiry - System.currentTimeMillis();
 
     if (_throwException) {
-      _expectedToSuccess--;
+      _numOfFailBeforeSuccess--;
       if (_errorMsg == null) {
         _errorMsg = "Test failed";
       }
       throw new RuntimeException(_errorMsg != null ? _errorMsg : "Test failed");
     }
 
-    if (_expectedToSuccess > 0){
-      _expectedToSuccess--;
+    if (getUserContent(SUCCESS_COUNT_BEFORE_FAIL, Scope.WORKFLOW) != null) {
+      _numOfSuccessBeforeFail =
+          Integer.parseInt(getUserContent(SUCCESS_COUNT_BEFORE_FAIL, Scope.WORKFLOW));
+    }
+    putUserContent(SUCCESS_COUNT_BEFORE_FAIL, "" + --_numOfSuccessBeforeFail, Scope.WORKFLOW);
+
+    if (_numOfFailBeforeSuccess > 0 || _numOfSuccessBeforeFail < 0){
+      _numOfFailBeforeSuccess--;
       throw new RuntimeException(_errorMsg != null ? _errorMsg : "Test failed");
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/33dae525/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailureDependence.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailureDependence.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailureDependence.java
index 254a35d..4ebb9f6 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailureDependence.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailureDependence.java
@@ -93,7 +93,6 @@ public class TestJobFailureDependence extends TaskTestBase {
 
     String namedSpaceJob1 = String.format("%s_%s", queueName, currentJobNames.get(2));
     _driver.pollForJobState(queueName, namedSpaceJob1, TaskState.FAILED);
-    _driver.pollForWorkflowState(queueName, TaskState.FAILED);
   }
 
   @Test
@@ -175,7 +174,6 @@ public class TestJobFailureDependence extends TaskTestBase {
 
     namedSpaceJob1 = String.format("%s_%s", queueName, currentJobNames.get(1));
     _driver.pollForJobState(queueName, namedSpaceJob1, TaskState.FAILED);
-    _driver.pollForWorkflowState(queueName, TaskState.FAILED);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/33dae525/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
new file mode 100644
index 0000000..71fed49
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
@@ -0,0 +1,77 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableMap;
+
+public class TestJobQueueCleanUp extends TaskTestBase {
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    // TODO: Reenable this after Test Refactoring code checkin
+    // setSingleTestEnvironment();
+    super.beforeClass();
+  }
+
+  @Test
+  public void testJobQueueCleanUp() throws InterruptedException {
+    String queueName = TestHelper.getTestMethodName();
+    JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queueName);
+    JobConfig.Builder jobBuilder =
+        new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+            .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2)
+            .setJobCommandConfigMap(ImmutableMap.of(MockTask.SUCCESS_COUNT_BEFORE_FAIL, "2"));
+    for (int i = 0; i < 5; i++) {
+      builder.enqueueJob("JOB" + i, jobBuilder);
+    }
+    _driver.start(builder.build());
+    _driver.pollForJobState(queueName, TaskUtil.getNamespacedJobName(queueName, "JOB" + 4),
+        TaskState.FAILED);
+    _driver.cleanupJobQueue(queueName);
+    Assert.assertEquals(_driver.getWorkflowConfig(queueName).getJobDag().size(), 0);
+  }
+
+  @Test public void testJobQueueNotCleanupRunningJobs() throws InterruptedException {
+    String queueName = TestHelper.getTestMethodName();
+    JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queueName);
+    JobConfig.Builder jobBuilder =
+        new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+            .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2);
+    for (int i = 0; i < 3; i++) {
+      builder.enqueueJob("JOB" + i, jobBuilder);
+    }
+    builder.enqueueJob("JOB" + 3,
+        jobBuilder.setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "1000000L")));
+    builder.enqueueJob("JOB" + 4, jobBuilder);
+    _driver.start(builder.build());
+    _driver.pollForJobState(queueName, TaskUtil.getNamespacedJobName(queueName, "JOB" + 3),
+        TaskState.IN_PROGRESS);
+    _driver.cleanupJobQueue(queueName);
+    Assert.assertEquals(_driver.getWorkflowConfig(queueName).getJobDag().size(), 2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/33dae525/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
index 879bfea..3cca57e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
@@ -26,6 +26,8 @@ import org.apache.helix.TestHelper;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobQueue;
 import org.apache.helix.task.TaskState;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.task.WorkflowConfig;
 import org.apache.log4j.Logger;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -43,11 +45,11 @@ public class TestRunJobsWithMissingTarget extends TaskTestBase {
 
   @Test
   public void testJobFailsWithMissingTarget() throws Exception {
-    String queueName = TestHelper.getTestMethodName();
+    String workflowName = TestHelper.getTestMethodName();
 
-    // Create a queue
-    LOG.info("Starting job-queue: " + queueName);
-    JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(queueName);
+    // Create a workflow
+    LOG.info("Starting job-queue: " + workflowName);
+    Workflow.Builder builder = new Workflow.Builder(workflowName);
     // Create and Enqueue jobs
     List<String> currentJobNames = new ArrayList<String>();
     for (int i = 0; i < _numDbs; i++) {
@@ -56,74 +58,83 @@ public class TestRunJobsWithMissingTarget extends TaskTestBase {
               _testDbs.get(i))
               .setTargetPartitionStates(Sets.newHashSet("SLAVE"));
       String jobName = "job" + _testDbs.get(i);
-      queueBuilder.enqueueJob(jobName, jobConfig);
+      builder.addJob(jobName, jobConfig);
+      if (i > 0) {
+        builder.addParentChildDependency("job" + _testDbs.get(i - 1), "job" + _testDbs.get(i));
+      }
       currentJobNames.add(jobName);
     }
 
     _setupTool.dropResourceFromCluster(CLUSTER_NAME, _testDbs.get(1));
-    _driver.start(queueBuilder.build());
+    _driver.start(builder.build());
 
-    String namedSpaceJob = String.format("%s_%s", queueName, currentJobNames.get(1));
-    _driver.pollForJobState(queueName, namedSpaceJob, TaskState.FAILED);
-    _driver.pollForWorkflowState(queueName, TaskState.FAILED);
+    String namedSpaceJob = String.format("%s_%s", workflowName, currentJobNames.get(1));
+    _driver.pollForJobState(workflowName, namedSpaceJob, TaskState.FAILED);
+    _driver.pollForWorkflowState(workflowName, TaskState.FAILED);
 
-    _driver.delete(queueName);
+    _driver.delete(workflowName);
   }
 
   @Test(dependsOnMethods = "testJobFailsWithMissingTarget")
   public void testJobContinueUponParentJobFailure() throws Exception {
-    String queueName = TestHelper.getTestMethodName();
+    String workflowName = TestHelper.getTestMethodName();
 
-    // Create a queue
-    LOG.info("Starting job-queue: " + queueName);
-    JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(queueName, 0, 3);
-    // Create and Enqueue jobs
+    // Create a workflow
+    LOG.info("Starting job-queue: " + workflowName);
+    Workflow.Builder builder = new Workflow.Builder(workflowName)
+        .setWorkflowConfig(new WorkflowConfig.Builder().setFailureThreshold(10).build());
+    // Create and add jobs
     List<String> currentJobNames = new ArrayList<String>();
     for (int i = 0; i < _numDbs; i++) {
       JobConfig.Builder jobConfig =
           new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(_testDbs.get(i))
               .setTargetPartitionStates(Sets.newHashSet("SLAVE")).setIgnoreDependentJobFailure(true);
       String jobName = "job" + _testDbs.get(i);
-      queueBuilder.enqueueJob(jobName, jobConfig);
+      builder.addJob(jobName, jobConfig);
+      if (i > 0) {
+        builder.addParentChildDependency("job" + _testDbs.get(i - 1), "job" + _testDbs.get(i));
+      }
       currentJobNames.add(jobName);
     }
 
-    _driver.start(queueBuilder.build());
+    _driver.start(builder.build());
 
-    String namedSpaceJob1 = String.format("%s_%s", queueName, currentJobNames.get(1));
-    _driver.pollForJobState(queueName, namedSpaceJob1, TaskState.FAILED);
+    String namedSpaceJob1 = String.format("%s_%s", workflowName, currentJobNames.get(1));
+    _driver.pollForJobState(workflowName, namedSpaceJob1, TaskState.FAILED);
     String lastJob =
-        String.format("%s_%s", queueName, currentJobNames.get(currentJobNames.size() - 1));
-    _driver.pollForJobState(queueName, lastJob, TaskState.COMPLETED);
+        String.format("%s_%s", workflowName, currentJobNames.get(currentJobNames.size() -
1));
+    _driver.pollForJobState(workflowName, lastJob, TaskState.COMPLETED);
 
-    _driver.delete(queueName);
+    _driver.delete(workflowName);
   }
 
   @Test(dependsOnMethods = "testJobContinueUponParentJobFailure")
   public void testJobFailsWithMissingTargetInRunning() throws Exception {
-    String queueName = TestHelper.getTestMethodName();
+    String workflowName = TestHelper.getTestMethodName();
 
-    // Create a queue
-    LOG.info("Starting job-queue: " + queueName);
-    JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(queueName);
-    // Create and Enqueue jobs
+    // Create a workflow
+    LOG.info("Starting job-queue: " + workflowName);
+    Workflow.Builder builder = new Workflow.Builder(workflowName);
+    // Create and add jobs
     List<String> currentJobNames = new ArrayList<String>();
     for (int i = 0; i < _numDbs; i++) {
       JobConfig.Builder jobConfig =
           new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(_testDbs.get(i))
               .setTargetPartitionStates(Sets.newHashSet("SLAVE"));
       String jobName = "job" + _testDbs.get(i);
-      queueBuilder.enqueueJob(jobName, jobConfig);
-      currentJobNames.add(jobName);
+      builder.addJob(jobName, jobConfig);
+      if (i > 0) {
+        builder.addParentChildDependency("job" + _testDbs.get(i - 1), "job" + _testDbs.get(i));
+      }      currentJobNames.add(jobName);
     }
 
-    _driver.start(queueBuilder.build());
+    _driver.start(builder.build());
     _setupTool.dropResourceFromCluster(CLUSTER_NAME, _testDbs.get(0));
 
-    String namedSpaceJob1 = String.format("%s_%s", queueName, currentJobNames.get(0));
-    _driver.pollForJobState(queueName, namedSpaceJob1, TaskState.FAILED);
-    _driver.pollForWorkflowState(queueName, TaskState.FAILED);
+    String namedSpaceJob1 = String.format("%s_%s", workflowName, currentJobNames.get(0));
+    _driver.pollForJobState(workflowName, namedSpaceJob1, TaskState.FAILED);
+    _driver.pollForWorkflowState(workflowName, TaskState.FAILED);
 
-    _driver.delete(queueName);
+    _driver.delete(workflowName);
   }
 }


Mime
View raw message