helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject [1/4] helix git commit: [HELIX-618] Job hung if the target resource does not exist anymore at the time when it is scheduled.
Date Thu, 16 Jun 2016 23:36:10 GMT
Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x 6b6bb8f60 -> fe540ac9e


[HELIX-618]  Job hung if the target resource does not exist anymore at the time when it is
scheduled.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/fe540ac9
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/fe540ac9
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/fe540ac9

Branch: refs/heads/helix-0.6.x
Commit: fe540ac9ec93fb3fb1caa71acaede9c3a63e9fd4
Parents: d381a3a
Author: Lei Xia <lxia@linkedin.com>
Authored: Wed Feb 10 16:34:31 2016 -0800
Committer: Lei Xia <lxia@linkedin.com>
Committed: Wed Apr 13 10:43:23 2016 -0700

----------------------------------------------------------------------
 .../FixedTargetTaskAssignmentCalculator.java    |  14 +-
 .../org/apache/helix/task/JobRebalancer.java    |  51 +++--
 .../org/apache/helix/task/TaskRebalancer.java   |  41 ++--
 .../apache/helix/task/WorkflowRebalancer.java   |  31 +--
 .../task/TestRunJobsWithMissingTarget.java      | 214 +++++++++++++++++++
 5 files changed, 306 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/fe540ac9/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
index 8760524..60cd92f 100644
--- a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
@@ -37,6 +37,7 @@ import org.apache.helix.model.ResourceAssignment;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import org.apache.log4j.Logger;
 
 /**
  * A TaskAssignmentCalculator for when a task group must be assigned according to partitions/states
on a target
@@ -44,6 +45,7 @@ import com.google.common.collect.Sets;
  * (if desired) only where those partitions are in a given state.
  */
 public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculator {
+  private static final Logger LOG = Logger.getLogger(FixedTargetTaskAssignmentCalculator.class);
 
   @Override
   public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
@@ -58,6 +60,7 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato
       Set<Integer> partitionSet, ClusterDataCache cache) {
     IdealState tgtIs = getTgtIdealState(jobCfg, cache);
     if (tgtIs == null) {
+      LOG.warn("Missing target resource for the scheduled job!");
       return Collections.emptyMap();
     }
     Set<String> tgtStates = jobCfg.getTargetPartitionStates();
@@ -78,21 +81,22 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato
 
   /**
    * Returns the set of all partition ids for a job.
-   * <p/>
    * If a set of partition ids was explicitly specified in the config, that is used. Otherwise,
we
    * use the list of all partition ids from the target resource.
+   * return empty set if target resource does not exist.
    */
   private static Set<Integer> getAllTaskPartitions(IdealState tgtResourceIs, JobConfig
jobCfg,
       JobContext taskCtx) {
-    if (tgtResourceIs == null) {
-      return null;
-    }
     Map<String, List<Integer>> currentTargets = taskCtx.getPartitionsByTarget();
     SortedSet<String> targetPartitions = Sets.newTreeSet();
     if (jobCfg.getTargetPartitions() != null) {
       targetPartitions.addAll(jobCfg.getTargetPartitions());
     } else {
-      targetPartitions.addAll(tgtResourceIs.getPartitionSet());
+      if (tgtResourceIs != null) {
+        targetPartitions.addAll(tgtResourceIs.getPartitionSet());
+      } else {
+        LOG.warn("Missing target resource for the scheduled job!");
+      }
     }
 
     Set<Integer> taskPartitions = Sets.newTreeSet();

http://git-wip-us.apache.org/repos/asf/helix/blob/fe540ac9/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index 0e2ab15..93d4689 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -31,7 +31,6 @@ import org.apache.log4j.Logger;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -206,9 +205,23 @@ public class JobRebalancer extends TaskRebalancer {
     TaskAssignmentCalculator taskAssignmentCal = getAssignmentCalulator(jobCfg);
     Set<Integer> allPartitions =
         taskAssignmentCal.getAllTaskPartitions(jobCfg, jobCtx, workflowConfig, workflowCtx,
cache);
+
+    if (allPartitions == null || allPartitions.isEmpty()) {
+      // Empty target partitions, mark the job as FAILED.
+      LOG.warn(
+          "Missing task partition mapping for job " + jobResource + ", marked the job as
FAILED!");
+      markJobFailed(jobResource, jobCtx, workflowConfig, workflowCtx);
+      markAllPartitionsError(jobCtx, TaskPartitionState.ERROR, false);
+      return new ResourceAssignment(jobResource);
+    }
+
     Map<String, SortedSet<Integer>> taskAssignments =
         getTaskPartitionAssignments(liveInstances, prevAssignment, allPartitions);
     long currentTime = System.currentTimeMillis();
+
+    LOG.debug("All partitions: " + allPartitions + " taskAssignment: " + taskAssignments
+        + " excludedInstances: " + excludedInstances);
+
     for (String instance : taskAssignments.keySet()) {
       if (excludedInstances.contains(instance)) {
         continue;
@@ -322,13 +335,7 @@ public class JobRebalancer extends TaskRebalancer {
             }
 
             if (!successOptional) {
-              long finishTime = currentTime;
-              workflowCtx.setJobState(jobResource, TaskState.FAILED);
-              if (workflowConfig.isTerminable()) {
-                workflowCtx.setWorkflowState(TaskState.FAILED);
-                workflowCtx.setFinishTime(finishTime);
-              }
-              jobCtx.setFinishTime(finishTime);
+              markJobFailed(jobResource, jobCtx, workflowConfig, workflowCtx);
               markAllPartitionsError(jobCtx, currState, false);
               addAllPartitions(allPartitions, partitionsToDropFromIs);
 
@@ -367,13 +374,7 @@ public class JobRebalancer extends TaskRebalancer {
     scheduleForNextTask(jobResource, jobCtx, currentTime);
 
     if (isJobComplete(jobCtx, allPartitions, skippedPartitions, jobCfg)) {
-      workflowCtx.setJobState(jobResource, TaskState.COMPLETED);
-      jobCtx.setFinishTime(currentTime);
-      if (isWorkflowComplete(workflowCtx, workflowConfig)) {
-        workflowCtx.setWorkflowState(TaskState.COMPLETED);
-        workflowCtx.setFinishTime(currentTime);
-      }
-
+      markJobComplete(jobResource, jobCtx, workflowConfig, workflowCtx);
       // remove IdealState of this job
       TaskUtil.cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
     }
@@ -428,6 +429,26 @@ public class JobRebalancer extends TaskRebalancer {
     return ra;
   }
 
+  private void markJobFailed(String jobName, JobContext jobContext, WorkflowConfig workflowConfig,
+      WorkflowContext workflowContext) {
+    long currentTime = System.currentTimeMillis();
+    workflowContext.setJobState(jobName, TaskState.FAILED);
+    jobContext.setFinishTime(currentTime);
+    if (isWorkflowFinished(workflowContext, workflowConfig)) {
+      workflowContext.setFinishTime(currentTime);
+    }
+  }
+
+  private void markJobComplete(String jobName, JobContext jobContext,
+      WorkflowConfig workflowConfig, WorkflowContext workflowContext) {
+    long currentTime = System.currentTimeMillis();
+    workflowContext.setJobState(jobName, TaskState.COMPLETED);
+    jobContext.setFinishTime(currentTime);
+    if (isWorkflowFinished(workflowContext, workflowConfig)) {
+      workflowContext.setFinishTime(currentTime);
+    }
+  }
+
   private void scheduleForNextTask(String job, JobContext jobCtx, long now) {
     // Clear current entries if they exist and are expired
     long currentTime = now;

http://git-wip-us.apache.org/repos/asf/helix/blob/fe540ac9/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 1526883..f35ce69 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -62,22 +62,33 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
       CurrentStateOutput currStateOutput);
 
   /**
-   * Checks if the workflow has completed.
+   * Checks if the workflow has finished (either completed or failed).
+   * Set the state in workflow context properly.
    *
    * @param ctx Workflow context containing job states
    * @param cfg Workflow config containing set of jobs
-   * @return returns true if all tasks are {@link TaskState#COMPLETED}, false otherwise.
+   * @return returns true if the workflow either completed (all tasks are {@link TaskState#COMPLETED})
+   * or failed (any task is {@link TaskState#FAILED}, false otherwise.
    */
-  protected boolean isWorkflowComplete(WorkflowContext ctx, WorkflowConfig cfg) {
-    if (!cfg.isTerminable()) {
-      return false;
-    }
+  protected boolean isWorkflowFinished(WorkflowContext ctx, WorkflowConfig cfg) {
+    boolean incomplete = false;
     for (String job : cfg.getJobDag().getAllNodes()) {
-      if (ctx.getJobState(job) != TaskState.COMPLETED) {
-        return false;
+      TaskState jobState = ctx.getJobState(job);
+      if (jobState == TaskState.FAILED) {
+        ctx.setWorkflowState(TaskState.FAILED);
+        return true;
+      }
+      if (jobState != TaskState.COMPLETED) {
+        incomplete = true;
       }
     }
-    return true;
+
+    if (!incomplete && cfg.isTerminable()) {
+      ctx.setWorkflowState(TaskState.COMPLETED);
+      return true;
+    }
+
+    return false;
   }
 
   /**
@@ -124,6 +135,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
       WorkflowContext workflowCtx) {
     int notStartedCount = 0;
     int inCompleteCount = 0;
+    int failedCount = 0;
 
     for (String ancestor : workflowCfg.getJobDag().getAncestors(job)) {
       TaskState jobState = workflowCtx.getJobState(ancestor);
@@ -131,13 +143,16 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
         ++notStartedCount;
       } else if (jobState == TaskState.IN_PROGRESS || jobState == TaskState.STOPPED) {
         ++inCompleteCount;
+      } else if (jobState == TaskState.FAILED) {
+        ++failedCount;
       }
     }
 
-    if (notStartedCount > 0 || inCompleteCount >= workflowCfg.getParallelJobs()) {
-      LOG.debug(String
-          .format("Job %s is not ready to start, notStartedParent(s)=%d, inCompleteParent(s)=%d.",
-              job, notStartedCount, inCompleteCount));
+    if (notStartedCount > 0 || inCompleteCount >= workflowCfg.getParallelJobs()
+        || failedCount > 0) {
+      LOG.debug(String.format(
+          "Job %s is not ready to start, notStartedParent(s)=%d, inCompleteParent(s)=%d,
failedParent(s)=%d.",
+          job, notStartedCount, inCompleteCount, failedCount));
       return false;
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/fe540ac9/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 912f501..db5426c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -77,17 +77,15 @@ public class WorkflowRebalancer extends TaskRebalancer {
     }
 
     long currentTime = System.currentTimeMillis();
-    // Check if workflow is completed and mark it if it is completed.
-    if (workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED) {
-      if (isWorkflowComplete(workflowCtx, workflowCfg)) {
-        workflowCtx.setWorkflowState(TaskState.COMPLETED);
-        workflowCtx.setFinishTime(currentTime);
-        TaskUtil.setWorkflowContext(_manager, workflow, workflowCtx);
-      }
+    // Check if workflow has been finished and mark it if it is.
+    if (workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED
+        && isWorkflowFinished(workflowCtx, workflowCfg)) {
+      workflowCtx.setFinishTime(currentTime);
+      TaskUtil.setWorkflowContext(_manager, workflow, workflowCtx);
     }
 
     if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED) {
-      LOG.info("Workflow " + workflow + " is completed.");
+      LOG.info("Workflow " + workflow + " is finished.");
       long expiryTime = workflowCfg.getExpiry();
       // Check if this workflow has been finished past its expiry.
       if (workflowCtx.getFinishTime() + expiryTime <= currentTime) {
@@ -162,10 +160,19 @@ public class WorkflowRebalancer extends TaskRebalancer {
 
     // Set up job resource based on partitions from target resource
     int numIndependentTasks = jobConfig.getTaskConfigMap().size();
-    int numPartitions = (numIndependentTasks > 0) ?
-        numIndependentTasks :
-        admin.getResourceIdealState(_manager.getClusterName(), jobConfig.getTargetResource())
-            .getPartitionSet().size();
+
+    int numPartitions = numIndependentTasks;
+    if (numPartitions == 0) {
+      IdealState targetIs =
+          admin.getResourceIdealState(_manager.getClusterName(), jobConfig.getTargetResource());
+      if (targetIs == null) {
+        LOG.warn("Target resource does not exist for job " + jobResource);
+        // do not need to fail here, the job will be marked as failure immediately when job
starts running.
+      } else {
+        numPartitions = targetIs.getPartitionSet().size();
+      }
+    }
+
     admin.addResource(_manager.getClusterName(), jobResource, numPartitions,
         TaskConstants.STATE_MODEL_NAME);
 

http://git-wip-us.apache.org/repos/asf/helix/blob/fe540ac9/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
new file mode 100644
index 0000000..74a8610
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java
@@ -0,0 +1,214 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.Sets;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestRunJobsWithMissingTarget extends ZkIntegrationTestBase {
+  private static final Logger LOG = Logger.getLogger(TestRunJobsWithMissingTarget.class);
+  private static final int num_nodes = 5;
+  private static final int num_dbs = 5;
+  private static final int START_PORT = 12918;
+  private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
+  private static final String TIMEOUT_CONFIG = "Timeout";
+  private static final int NUM_PARTITIONS = 20;
+  private static final int NUM_REPLICAS = 3;
+  private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
+  private final MockParticipantManager[] _participants = new MockParticipantManager[num_nodes];
+  private ClusterControllerManager _controller;
+  private ClusterSetup _setupTool;
+
+  private List<String> _test_dbs = new ArrayList<String>();
+
+  private HelixManager _manager;
+  private TaskDriver _driver;
+
+  @BeforeClass public void beforeClass() throws Exception {
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
+    }
+
+    _setupTool = new ClusterSetup(ZK_ADDR);
+    _setupTool.addCluster(CLUSTER_NAME, true);
+    for (int i = 0; i < num_nodes; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    }
+
+    // Set up target dbs
+    for (int i = 0; i < num_dbs; i++) {
+      String db = "TestDB" + i;
+      _setupTool
+          .addResourceToCluster(CLUSTER_NAME, db, NUM_PARTITIONS + 10 * i, MASTER_SLAVE_STATE_MODEL,
+              IdealState.RebalanceMode.FULL_AUTO.toString());
+      _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, NUM_REPLICAS);
+      _test_dbs.add(db);
+    }
+
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
+    taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
+      @Override public Task createNewTask(TaskCallbackContext context) {
+        return new MockTask(context);
+      }
+    });
+
+    // start dummy participants
+    for (int i = 0; i < num_nodes; i++) {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+
+      // Register a Task state model factory.
+      StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
+      stateMachine.registerStateModelFactory("Task",
+          new TaskStateModelFactory(_participants[i], taskFactoryReg));
+
+      _participants[i].syncStart();
+    }
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    // create cluster manager
+    _manager = HelixManagerFactory
+        .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+
+    _driver = new TaskDriver(_manager);
+
+    boolean result = ClusterStateVerifier.verifyByZkCallback(
+        new ClusterStateVerifier.MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME));
+    Assert.assertTrue(result);
+
+    result = ClusterStateVerifier.verifyByZkCallback(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+    Assert.assertTrue(result);
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    _controller.syncStop();
+    for (int i = 0; i < num_nodes; i++) {
+      _participants[i].syncStop();
+    }
+    _manager.disconnect();
+  }
+
+  private JobQueue.Builder buildJobQueue(String jobQueueName, int delayStart) {
+    Map<String, String> cfgMap = new HashMap<String, String>();
+    cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(120000));
+    Calendar cal = Calendar.getInstance();
+    cal.set(Calendar.MINUTE, cal.get(Calendar.MINUTE) + delayStart / 60);
+    cal.set(Calendar.SECOND, cal.get(Calendar.SECOND) + delayStart % 60);
+    cal.set(Calendar.MILLISECOND, 0);
+    cfgMap.put(WorkflowConfig.START_TIME,
+        WorkflowConfig.getDefaultDateFormat().format(cal.getTime()));
+    return new JobQueue.Builder(jobQueueName).fromMap(cfgMap);
+  }
+
+  private JobQueue.Builder buildJobQueue(String jobQueueName) {
+    return buildJobQueue(jobQueueName, 0);
+  }
+
+  @Test public void testJobFailsWithMissingTarget() throws Exception {
+    String queueName = TestHelper.getTestMethodName();
+
+    // Create a queue
+    LOG.info("Starting job-queue: " + queueName);
+    JobQueue.Builder queueBuilder = buildJobQueue(queueName);
+    // Create and Enqueue jobs
+    List<String> currentJobNames = new ArrayList<String>();
+    for (int i = 0; i < num_dbs; i++) {
+      JobConfig.Builder jobConfig =
+          new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(_test_dbs.get(i))
+              .setTargetPartitionStates(Sets.newHashSet("SLAVE"));
+      String jobName = "job" + _test_dbs.get(i);
+      queueBuilder.enqueueJob(jobName, jobConfig);
+      currentJobNames.add(jobName);
+    }
+
+    _driver.start(queueBuilder.build());
+    _setupTool.dropResourceFromCluster(CLUSTER_NAME, _test_dbs.get(2));
+
+    String namedSpaceJob1 = String.format("%s_%s", queueName, currentJobNames.get(2));
+    TaskTestUtil.pollForJobState(_manager, queueName, namedSpaceJob1, TaskState.FAILED);
+    TaskTestUtil.pollForWorkflowState(_manager, queueName, TaskState.FAILED);
+  }
+
+  @Test public void testJobFailsWithMissingTargetInRunning() throws Exception {
+    String queueName = TestHelper.getTestMethodName();
+
+    // Create a queue
+    LOG.info("Starting job-queue: " + queueName);
+    JobQueue.Builder queueBuilder = buildJobQueue(queueName);
+    // Create and Enqueue jobs
+    List<String> currentJobNames = new ArrayList<String>();
+    for (int i = 0; i < num_dbs; i++) {
+      JobConfig.Builder jobConfig =
+          new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(_test_dbs.get(i))
+              .setTargetPartitionStates(Sets.newHashSet("SLAVE"));
+      String jobName = "job" + _test_dbs.get(i);
+      queueBuilder.enqueueJob(jobName, jobConfig);
+      currentJobNames.add(jobName);
+    }
+
+    _driver.start(queueBuilder.build());
+    _setupTool.dropResourceFromCluster(CLUSTER_NAME, _test_dbs.get(0));
+
+    String namedSpaceJob1 = String.format("%s_%s", queueName, currentJobNames.get(0));
+    TaskTestUtil.pollForJobState(_manager, queueName, namedSpaceJob1, TaskState.FAILED);
+    TaskTestUtil.pollForWorkflowState(_manager, queueName, TaskState.FAILED);
+  }
+}


Mime
View raw message