Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B9E6E200B4C for ; Fri, 17 Jun 2016 01:36:12 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B8910160A61; Thu, 16 Jun 2016 23:36:12 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 73850160A51 for ; Fri, 17 Jun 2016 01:36:11 +0200 (CEST) Received: (qmail 6693 invoked by uid 500); 16 Jun 2016 23:36:10 -0000 Mailing-List: contact commits-help@helix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@helix.apache.org Delivered-To: mailing list commits@helix.apache.org Received: (qmail 6680 invoked by uid 99); 16 Jun 2016 23:36:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Jun 2016 23:36:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 85689E009D; Thu, 16 Jun 2016 23:36:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: lxia@apache.org To: commits@helix.apache.org Date: Thu, 16 Jun 2016 23:36:10 -0000 Message-Id: <7f988172fb3f491693524794835210b5@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/4] helix git commit: [HELIX-618] Job hung if the target resource does not exist anymore at the time when it is scheduled. archived-at: Thu, 16 Jun 2016 23:36:12 -0000 Repository: helix Updated Branches: refs/heads/helix-0.6.x 6b6bb8f60 -> fe540ac9e [HELIX-618] Job hung if the target resource does not exist anymore at the time when it is scheduled. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/fe540ac9 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/fe540ac9 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/fe540ac9 Branch: refs/heads/helix-0.6.x Commit: fe540ac9ec93fb3fb1caa71acaede9c3a63e9fd4 Parents: d381a3a Author: Lei Xia Authored: Wed Feb 10 16:34:31 2016 -0800 Committer: Lei Xia Committed: Wed Apr 13 10:43:23 2016 -0700 ---------------------------------------------------------------------- .../FixedTargetTaskAssignmentCalculator.java | 14 +- .../org/apache/helix/task/JobRebalancer.java | 51 +++-- .../org/apache/helix/task/TaskRebalancer.java | 41 ++-- .../apache/helix/task/WorkflowRebalancer.java | 31 +-- .../task/TestRunJobsWithMissingTarget.java | 214 +++++++++++++++++++ 5 files changed, 306 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/fe540ac9/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java index 8760524..60cd92f 100644 --- a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java +++ b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java @@ -37,6 +37,7 @@ import org.apache.helix.model.ResourceAssignment; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import org.apache.log4j.Logger; /** * A TaskAssignmentCalculator for when a task group must be assigned according to partitions/states on a target @@ -44,6 +45,7 @@ import com.google.common.collect.Sets; * (if desired) only where those partitions are in a given state. */ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculator { + private static final Logger LOG = Logger.getLogger(FixedTargetTaskAssignmentCalculator.class); @Override public Set getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx, @@ -58,6 +60,7 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato Set partitionSet, ClusterDataCache cache) { IdealState tgtIs = getTgtIdealState(jobCfg, cache); if (tgtIs == null) { + LOG.warn("Missing target resource for the scheduled job!"); return Collections.emptyMap(); } Set tgtStates = jobCfg.getTargetPartitionStates(); @@ -78,21 +81,22 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato /** * Returns the set of all partition ids for a job. - *

* If a set of partition ids was explicitly specified in the config, that is used. Otherwise, we * use the list of all partition ids from the target resource. + * return empty set if target resource does not exist. */ private static Set getAllTaskPartitions(IdealState tgtResourceIs, JobConfig jobCfg, JobContext taskCtx) { - if (tgtResourceIs == null) { - return null; - } Map> currentTargets = taskCtx.getPartitionsByTarget(); SortedSet targetPartitions = Sets.newTreeSet(); if (jobCfg.getTargetPartitions() != null) { targetPartitions.addAll(jobCfg.getTargetPartitions()); } else { - targetPartitions.addAll(tgtResourceIs.getPartitionSet()); + if (tgtResourceIs != null) { + targetPartitions.addAll(tgtResourceIs.getPartitionSet()); + } else { + LOG.warn("Missing target resource for the scheduled job!"); + } } Set taskPartitions = Sets.newTreeSet(); http://git-wip-us.apache.org/repos/asf/helix/blob/fe540ac9/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java index 0e2ab15..93d4689 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java @@ -31,7 +31,6 @@ import org.apache.log4j.Logger; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -206,9 +205,23 @@ public class JobRebalancer extends TaskRebalancer { TaskAssignmentCalculator taskAssignmentCal = getAssignmentCalulator(jobCfg); Set allPartitions = taskAssignmentCal.getAllTaskPartitions(jobCfg, jobCtx, workflowConfig, workflowCtx, cache); + + if (allPartitions == null || allPartitions.isEmpty()) { + // Empty target partitions, mark the job as FAILED. + LOG.warn( + "Missing task partition mapping for job " + jobResource + ", marked the job as FAILED!"); + markJobFailed(jobResource, jobCtx, workflowConfig, workflowCtx); + markAllPartitionsError(jobCtx, TaskPartitionState.ERROR, false); + return new ResourceAssignment(jobResource); + } + Map> taskAssignments = getTaskPartitionAssignments(liveInstances, prevAssignment, allPartitions); long currentTime = System.currentTimeMillis(); + + LOG.debug("All partitions: " + allPartitions + " taskAssignment: " + taskAssignments + + " excludedInstances: " + excludedInstances); + for (String instance : taskAssignments.keySet()) { if (excludedInstances.contains(instance)) { continue; @@ -322,13 +335,7 @@ public class JobRebalancer extends TaskRebalancer { } if (!successOptional) { - long finishTime = currentTime; - workflowCtx.setJobState(jobResource, TaskState.FAILED); - if (workflowConfig.isTerminable()) { - workflowCtx.setWorkflowState(TaskState.FAILED); - workflowCtx.setFinishTime(finishTime); - } - jobCtx.setFinishTime(finishTime); + markJobFailed(jobResource, jobCtx, workflowConfig, workflowCtx); markAllPartitionsError(jobCtx, currState, false); addAllPartitions(allPartitions, partitionsToDropFromIs); @@ -367,13 +374,7 @@ public class JobRebalancer extends TaskRebalancer { scheduleForNextTask(jobResource, jobCtx, currentTime); if (isJobComplete(jobCtx, allPartitions, skippedPartitions, jobCfg)) { - workflowCtx.setJobState(jobResource, TaskState.COMPLETED); - jobCtx.setFinishTime(currentTime); - if (isWorkflowComplete(workflowCtx, workflowConfig)) { - workflowCtx.setWorkflowState(TaskState.COMPLETED); - workflowCtx.setFinishTime(currentTime); - } - + markJobComplete(jobResource, jobCtx, workflowConfig, workflowCtx); // remove IdealState of this job TaskUtil.cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobResource); } @@ -428,6 +429,26 @@ public class JobRebalancer extends TaskRebalancer { return ra; } + private void markJobFailed(String jobName, JobContext jobContext, WorkflowConfig workflowConfig, + WorkflowContext workflowContext) { + long currentTime = System.currentTimeMillis(); + workflowContext.setJobState(jobName, TaskState.FAILED); + jobContext.setFinishTime(currentTime); + if (isWorkflowFinished(workflowContext, workflowConfig)) { + workflowContext.setFinishTime(currentTime); + } + } + + private void markJobComplete(String jobName, JobContext jobContext, + WorkflowConfig workflowConfig, WorkflowContext workflowContext) { + long currentTime = System.currentTimeMillis(); + workflowContext.setJobState(jobName, TaskState.COMPLETED); + jobContext.setFinishTime(currentTime); + if (isWorkflowFinished(workflowContext, workflowConfig)) { + workflowContext.setFinishTime(currentTime); + } + } + private void scheduleForNextTask(String job, JobContext jobCtx, long now) { // Clear current entries if they exist and are expired long currentTime = now; http://git-wip-us.apache.org/repos/asf/helix/blob/fe540ac9/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java index 1526883..f35ce69 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java @@ -62,22 +62,33 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { CurrentStateOutput currStateOutput); /** - * Checks if the workflow has completed. + * Checks if the workflow has finished (either completed or failed). + * Set the state in workflow context properly. * * @param ctx Workflow context containing job states * @param cfg Workflow config containing set of jobs - * @return returns true if all tasks are {@link TaskState#COMPLETED}, false otherwise. + * @return returns true if the workflow either completed (all tasks are {@link TaskState#COMPLETED}) + * or failed (any task is {@link TaskState#FAILED}, false otherwise. */ - protected boolean isWorkflowComplete(WorkflowContext ctx, WorkflowConfig cfg) { - if (!cfg.isTerminable()) { - return false; - } + protected boolean isWorkflowFinished(WorkflowContext ctx, WorkflowConfig cfg) { + boolean incomplete = false; for (String job : cfg.getJobDag().getAllNodes()) { - if (ctx.getJobState(job) != TaskState.COMPLETED) { - return false; + TaskState jobState = ctx.getJobState(job); + if (jobState == TaskState.FAILED) { + ctx.setWorkflowState(TaskState.FAILED); + return true; + } + if (jobState != TaskState.COMPLETED) { + incomplete = true; } } - return true; + + if (!incomplete && cfg.isTerminable()) { + ctx.setWorkflowState(TaskState.COMPLETED); + return true; + } + + return false; } /** @@ -124,6 +135,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { WorkflowContext workflowCtx) { int notStartedCount = 0; int inCompleteCount = 0; + int failedCount = 0; for (String ancestor : workflowCfg.getJobDag().getAncestors(job)) { TaskState jobState = workflowCtx.getJobState(ancestor); @@ -131,13 +143,16 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { ++notStartedCount; } else if (jobState == TaskState.IN_PROGRESS || jobState == TaskState.STOPPED) { ++inCompleteCount; + } else if (jobState == TaskState.FAILED) { + ++failedCount; } } - if (notStartedCount > 0 || inCompleteCount >= workflowCfg.getParallelJobs()) { - LOG.debug(String - .format("Job %s is not ready to start, notStartedParent(s)=%d, inCompleteParent(s)=%d.", - job, notStartedCount, inCompleteCount)); + if (notStartedCount > 0 || inCompleteCount >= workflowCfg.getParallelJobs() + || failedCount > 0) { + LOG.debug(String.format( + "Job %s is not ready to start, notStartedParent(s)=%d, inCompleteParent(s)=%d, failedParent(s)=%d.", + job, notStartedCount, inCompleteCount, failedCount)); return false; } http://git-wip-us.apache.org/repos/asf/helix/blob/fe540ac9/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java index 912f501..db5426c 100644 --- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java @@ -77,17 +77,15 @@ public class WorkflowRebalancer extends TaskRebalancer { } long currentTime = System.currentTimeMillis(); - // Check if workflow is completed and mark it if it is completed. - if (workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED) { - if (isWorkflowComplete(workflowCtx, workflowCfg)) { - workflowCtx.setWorkflowState(TaskState.COMPLETED); - workflowCtx.setFinishTime(currentTime); - TaskUtil.setWorkflowContext(_manager, workflow, workflowCtx); - } + // Check if workflow has been finished and mark it if it is. + if (workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED + && isWorkflowFinished(workflowCtx, workflowCfg)) { + workflowCtx.setFinishTime(currentTime); + TaskUtil.setWorkflowContext(_manager, workflow, workflowCtx); } if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED) { - LOG.info("Workflow " + workflow + " is completed."); + LOG.info("Workflow " + workflow + " is finished."); long expiryTime = workflowCfg.getExpiry(); // Check if this workflow has been finished past its expiry. if (workflowCtx.getFinishTime() + expiryTime <= currentTime) { @@ -162,10 +160,19 @@ public class WorkflowRebalancer extends TaskRebalancer { // Set up job resource based on partitions from target resource int numIndependentTasks = jobConfig.getTaskConfigMap().size(); - int numPartitions = (numIndependentTasks > 0) ? - numIndependentTasks : - admin.getResourceIdealState(_manager.getClusterName(), jobConfig.getTargetResource()) - .getPartitionSet().size(); + + int numPartitions = numIndependentTasks; + if (numPartitions == 0) { + IdealState targetIs = + admin.getResourceIdealState(_manager.getClusterName(), jobConfig.getTargetResource()); + if (targetIs == null) { + LOG.warn("Target resource does not exist for job " + jobResource); + // do not need to fail here, the job will be marked as failure immediately when job starts running. + } else { + numPartitions = targetIs.getPartitionSet().size(); + } + } + admin.addResource(_manager.getClusterName(), jobResource, numPartitions, TaskConstants.STATE_MODEL_NAME); http://git-wip-us.apache.org/repos/asf/helix/blob/fe540ac9/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java new file mode 100644 index 0000000..74a8610 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRunJobsWithMissingTarget.java @@ -0,0 +1,214 @@ +package org.apache.helix.integration.task; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.google.common.collect.Sets; +import org.apache.helix.HelixManager; +import org.apache.helix.HelixManagerFactory; +import org.apache.helix.InstanceType; +import org.apache.helix.TestHelper; +import org.apache.helix.integration.ZkIntegrationTestBase; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.model.IdealState; +import org.apache.helix.participant.StateMachineEngine; +import org.apache.helix.task.JobConfig; +import org.apache.helix.task.JobQueue; +import org.apache.helix.task.Task; +import org.apache.helix.task.TaskCallbackContext; +import org.apache.helix.task.TaskDriver; +import org.apache.helix.task.TaskFactory; +import org.apache.helix.task.TaskResult; +import org.apache.helix.task.TaskState; +import org.apache.helix.task.TaskStateModelFactory; +import org.apache.helix.task.WorkflowConfig; +import org.apache.helix.tools.ClusterSetup; +import org.apache.helix.tools.ClusterStateVerifier; +import org.apache.log4j.Logger; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TestRunJobsWithMissingTarget extends ZkIntegrationTestBase { + private static final Logger LOG = Logger.getLogger(TestRunJobsWithMissingTarget.class); + private static final int num_nodes = 5; + private static final int num_dbs = 5; + private static final int START_PORT = 12918; + private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave"; + private static final String TIMEOUT_CONFIG = "Timeout"; + private static final int NUM_PARTITIONS = 20; + private static final int NUM_REPLICAS = 3; + private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName(); + private final MockParticipantManager[] _participants = new MockParticipantManager[num_nodes]; + private ClusterControllerManager _controller; + private ClusterSetup _setupTool; + + private List _test_dbs = new ArrayList(); + + private HelixManager _manager; + private TaskDriver _driver; + + @BeforeClass public void beforeClass() throws Exception { + String namespace = "/" + CLUSTER_NAME; + if (_gZkClient.exists(namespace)) { + _gZkClient.deleteRecursive(namespace); + } + + _setupTool = new ClusterSetup(ZK_ADDR); + _setupTool.addCluster(CLUSTER_NAME, true); + for (int i = 0; i < num_nodes; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + } + + // Set up target dbs + for (int i = 0; i < num_dbs; i++) { + String db = "TestDB" + i; + _setupTool + .addResourceToCluster(CLUSTER_NAME, db, NUM_PARTITIONS + 10 * i, MASTER_SLAVE_STATE_MODEL, + IdealState.RebalanceMode.FULL_AUTO.toString()); + _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, NUM_REPLICAS); + _test_dbs.add(db); + } + + Map taskFactoryReg = new HashMap(); + taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() { + @Override public Task createNewTask(TaskCallbackContext context) { + return new MockTask(context); + } + }); + + // start dummy participants + for (int i = 0; i < num_nodes; i++) { + String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName); + + // Register a Task state model factory. + StateMachineEngine stateMachine = _participants[i].getStateMachineEngine(); + stateMachine.registerStateModelFactory("Task", + new TaskStateModelFactory(_participants[i], taskFactoryReg)); + + _participants[i].syncStart(); + } + + // start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + // create cluster manager + _manager = HelixManagerFactory + .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR); + _manager.connect(); + + _driver = new TaskDriver(_manager); + + boolean result = ClusterStateVerifier.verifyByZkCallback( + new ClusterStateVerifier.MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME)); + Assert.assertTrue(result); + + result = ClusterStateVerifier.verifyByZkCallback( + new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME)); + Assert.assertTrue(result); + } + + @AfterClass + public void afterClass() throws Exception { + _controller.syncStop(); + for (int i = 0; i < num_nodes; i++) { + _participants[i].syncStop(); + } + _manager.disconnect(); + } + + private JobQueue.Builder buildJobQueue(String jobQueueName, int delayStart) { + Map cfgMap = new HashMap(); + cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(120000)); + Calendar cal = Calendar.getInstance(); + cal.set(Calendar.MINUTE, cal.get(Calendar.MINUTE) + delayStart / 60); + cal.set(Calendar.SECOND, cal.get(Calendar.SECOND) + delayStart % 60); + cal.set(Calendar.MILLISECOND, 0); + cfgMap.put(WorkflowConfig.START_TIME, + WorkflowConfig.getDefaultDateFormat().format(cal.getTime())); + return new JobQueue.Builder(jobQueueName).fromMap(cfgMap); + } + + private JobQueue.Builder buildJobQueue(String jobQueueName) { + return buildJobQueue(jobQueueName, 0); + } + + @Test public void testJobFailsWithMissingTarget() throws Exception { + String queueName = TestHelper.getTestMethodName(); + + // Create a queue + LOG.info("Starting job-queue: " + queueName); + JobQueue.Builder queueBuilder = buildJobQueue(queueName); + // Create and Enqueue jobs + List currentJobNames = new ArrayList(); + for (int i = 0; i < num_dbs; i++) { + JobConfig.Builder jobConfig = + new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(_test_dbs.get(i)) + .setTargetPartitionStates(Sets.newHashSet("SLAVE")); + String jobName = "job" + _test_dbs.get(i); + queueBuilder.enqueueJob(jobName, jobConfig); + currentJobNames.add(jobName); + } + + _driver.start(queueBuilder.build()); + _setupTool.dropResourceFromCluster(CLUSTER_NAME, _test_dbs.get(2)); + + String namedSpaceJob1 = String.format("%s_%s", queueName, currentJobNames.get(2)); + TaskTestUtil.pollForJobState(_manager, queueName, namedSpaceJob1, TaskState.FAILED); + TaskTestUtil.pollForWorkflowState(_manager, queueName, TaskState.FAILED); + } + + @Test public void testJobFailsWithMissingTargetInRunning() throws Exception { + String queueName = TestHelper.getTestMethodName(); + + // Create a queue + LOG.info("Starting job-queue: " + queueName); + JobQueue.Builder queueBuilder = buildJobQueue(queueName); + // Create and Enqueue jobs + List currentJobNames = new ArrayList(); + for (int i = 0; i < num_dbs; i++) { + JobConfig.Builder jobConfig = + new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(_test_dbs.get(i)) + .setTargetPartitionStates(Sets.newHashSet("SLAVE")); + String jobName = "job" + _test_dbs.get(i); + queueBuilder.enqueueJob(jobName, jobConfig); + currentJobNames.add(jobName); + } + + _driver.start(queueBuilder.build()); + _setupTool.dropResourceFromCluster(CLUSTER_NAME, _test_dbs.get(0)); + + String namedSpaceJob1 = String.format("%s_%s", queueName, currentJobNames.get(0)); + TaskTestUtil.pollForJobState(_manager, queueName, namedSpaceJob1, TaskState.FAILED); + TaskTestUtil.pollForWorkflowState(_manager, queueName, TaskState.FAILED); + } +}