Return-Path: X-Original-To: apmail-helix-commits-archive@minotaur.apache.org Delivered-To: apmail-helix-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2C0DB11721 for ; Mon, 25 Aug 2014 23:07:13 +0000 (UTC) Received: (qmail 72577 invoked by uid 500); 25 Aug 2014 23:07:12 -0000 Delivered-To: apmail-helix-commits-archive@helix.apache.org Received: (qmail 72547 invoked by uid 500); 25 Aug 2014 23:07:12 -0000 Mailing-List: contact commits-help@helix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@helix.apache.org Delivered-To: mailing list commits@helix.apache.org Received: (qmail 72537 invoked by uid 99); 25 Aug 2014 23:07:12 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 25 Aug 2014 23:07:12 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 49530A009B8; Mon, 25 Aug 2014 23:07:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kanak@apache.org To: commits@helix.apache.org Message-Id: <3945c9da9d554bbf9756cec54fe19769@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: [HELIX-482] Support delayed task reassignment Date: Mon, 25 Aug 2014 23:07:12 +0000 (UTC) Repository: helix Updated Branches: refs/heads/helix-0.6.x 19de46fe5 -> c2e411328 [HELIX-482] Support delayed task reassignment Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c2e41132 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c2e41132 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c2e41132 Branch: refs/heads/helix-0.6.x Commit: c2e411328d4b6fbfd94eddc2c48c69284d2130c3 Parents: 19de46f Author: Kanak Biscuitwala Authored: Fri Aug 22 13:46:07 2014 -0700 Committer: Kanak Biscuitwala Committed: Mon Aug 25 16:06:51 2014 -0700 ---------------------------------------------------------------------- .../java/org/apache/helix/task/JobConfig.java | 27 ++++- .../java/org/apache/helix/task/JobContext.java | 102 ++++++++----------- .../org/apache/helix/task/TaskRebalancer.java | 86 +++++++++++++--- .../org/apache/helix/task/beans/JobBean.java | 1 + .../task/TestIndependentTaskRebalancer.java | 51 ++++++++++ 5 files changed, 190 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/c2e41132/helix-core/src/main/java/org/apache/helix/task/JobConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java index 1dad5e4..780db55 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java @@ -67,6 +67,8 @@ public class JobConfig { public static final String NUM_CONCURRENT_TASKS_PER_INSTANCE = "ConcurrentTasksPerInstance"; /** The number of tasks within the job that are allowed to fail. */ public static final String FAILURE_THRESHOLD = "FailureThreshold"; + /** The amount of time in ms to wait before retrying a task */ + public static final String TASK_RETRY_DELAY = "TaskRetryDelay"; /** The individual task configurations, if any **/ public static final String TASK_CONFIGS = "TaskConfigs"; @@ -74,6 +76,7 @@ public class JobConfig { // // Default property values //// public static final long DEFAULT_TIMEOUT_PER_TASK = 60 * 60 * 1000; // 1 hr. + public static final long DEFAULT_TASK_RETRY_DELAY = -1; // no delay public static final int DEFAULT_MAX_ATTEMPTS_PER_TASK = 10; public static final int DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE = 1; public static final int DEFAULT_FAILURE_THRESHOLD = 0; @@ -90,12 +93,14 @@ public class JobConfig { private final int _maxAttemptsPerTask; private final int _maxForcedReassignmentsPerTask; private final int _failureThreshold; + private final long _retryDelay; private final Map _taskConfigMap; private JobConfig(String workflow, String targetResource, List targetPartitions, Set targetPartitionStates, String command, Map jobCommandConfigMap, long timeoutPerTask, int numConcurrentTasksPerInstance, int maxAttemptsPerTask, - int maxForcedReassignmentsPerTask, int failureThreshold, Map taskConfigMap) { + int maxForcedReassignmentsPerTask, int failureThreshold, long retryDelay, + Map taskConfigMap) { _workflow = workflow; _targetResource = targetResource; _targetPartitions = targetPartitions; @@ -107,6 +112,7 @@ public class JobConfig { _maxAttemptsPerTask = maxAttemptsPerTask; _maxForcedReassignmentsPerTask = maxForcedReassignmentsPerTask; _failureThreshold = failureThreshold; + _retryDelay = retryDelay; if (taskConfigMap != null) { _taskConfigMap = taskConfigMap; } else { @@ -158,6 +164,10 @@ public class JobConfig { return _failureThreshold; } + public long getTaskRetryDelay() { + return _retryDelay; + } + public Map getTaskConfigMap() { return _taskConfigMap; } @@ -187,6 +197,9 @@ public class JobConfig { if (_targetPartitions != null) { cfgMap.put(JobConfig.TARGET_PARTITIONS, Joiner.on(",").join(_targetPartitions)); } + if (_retryDelay > 0) { + cfgMap.put(JobConfig.TASK_RETRY_DELAY, "" + _retryDelay); + } cfgMap.put(JobConfig.TIMEOUT_PER_TASK, "" + _timeoutPerTask); cfgMap.put(JobConfig.MAX_ATTEMPTS_PER_TASK, "" + _maxAttemptsPerTask); cfgMap.put(JobConfig.MAX_FORCED_REASSIGNMENTS_PER_TASK, "" + _maxForcedReassignmentsPerTask); @@ -210,13 +223,15 @@ public class JobConfig { private int _maxAttemptsPerTask = DEFAULT_MAX_ATTEMPTS_PER_TASK; private int _maxForcedReassignmentsPerTask = DEFAULT_MAX_FORCED_REASSIGNMENTS_PER_TASK; private int _failureThreshold = DEFAULT_FAILURE_THRESHOLD; + private long _retryDelay = DEFAULT_TASK_RETRY_DELAY; public JobConfig build() { validate(); return new JobConfig(_workflow, _targetResource, _targetPartitions, _targetPartitionStates, _command, _commandConfig, _timeoutPerTask, _numConcurrentTasksPerInstance, - _maxAttemptsPerTask, _maxForcedReassignmentsPerTask, _failureThreshold, _taskConfigMap); + _maxAttemptsPerTask, _maxForcedReassignmentsPerTask, _failureThreshold, _retryDelay, + _taskConfigMap); } /** @@ -264,6 +279,9 @@ public class JobConfig { if (cfg.containsKey(FAILURE_THRESHOLD)) { b.setFailureThreshold(Integer.parseInt(cfg.get(FAILURE_THRESHOLD))); } + if (cfg.containsKey(TASK_RETRY_DELAY)) { + b.setTaskRetryDelay(Long.parseLong(cfg.get(TASK_RETRY_DELAY))); + } return b; } @@ -322,6 +340,11 @@ public class JobConfig { return this; } + public Builder setTaskRetryDelay(long v) { + _retryDelay = v; + return this; + } + public Builder addTaskConfigs(List taskConfigs) { if (taskConfigs != null) { for (TaskConfig taskConfig : taskConfigs) { http://git-wip-us.apache.org/repos/asf/helix/blob/c2e41132/helix-core/src/main/java/org/apache/helix/task/JobContext.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/JobContext.java b/helix-core/src/main/java/org/apache/helix/task/JobContext.java index f843834..77885cd 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobContext.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobContext.java @@ -44,7 +44,8 @@ public class JobContext extends HelixProperty { FINISH_TIME, TARGET, TASK_ID, - ASSIGNED_PARTICIPANT + ASSIGNED_PARTICIPANT, + NEXT_RETRY_TIME } public JobContext(ZNRecord record) { @@ -76,21 +77,15 @@ public class JobContext extends HelixProperty { } public void setPartitionState(int p, TaskPartitionState s) { - String pStr = String.valueOf(p); - Map map = _record.getMapField(pStr); - if (map == null) { - map = new TreeMap(); - _record.setMapField(pStr, map); - } + Map map = getMapField(p); map.put(ContextProperties.STATE.toString(), s.name()); } public TaskPartitionState getPartitionState(int p) { - Map map = _record.getMapField(String.valueOf(p)); + Map map = getMapField(p); if (map == null) { return null; } - String str = map.get(ContextProperties.STATE.toString()); if (str != null) { return TaskPartitionState.valueOf(str); @@ -100,12 +95,7 @@ public class JobContext extends HelixProperty { } public void setPartitionNumAttempts(int p, int n) { - String pStr = String.valueOf(p); - Map map = _record.getMapField(pStr); - if (map == null) { - map = new TreeMap(); - _record.setMapField(pStr, map); - } + Map map = getMapField(p); map.put(ContextProperties.NUM_ATTEMPTS.toString(), String.valueOf(n)); } @@ -120,61 +110,42 @@ public class JobContext extends HelixProperty { } public int getPartitionNumAttempts(int p) { - Map map = _record.getMapField(String.valueOf(p)); + Map map = getMapField(p); if (map == null) { return -1; } - String nStr = map.get(ContextProperties.NUM_ATTEMPTS.toString()); if (nStr == null) { return -1; } - return Integer.parseInt(nStr); } public void setPartitionFinishTime(int p, long t) { - String pStr = String.valueOf(p); - Map map = _record.getMapField(pStr); - if (map == null) { - map = new TreeMap(); - _record.setMapField(pStr, map); - } + Map map = getMapField(p); map.put(ContextProperties.FINISH_TIME.toString(), String.valueOf(t)); } public long getPartitionFinishTime(int p) { - Map map = _record.getMapField(String.valueOf(p)); + Map map = getMapField(p); if (map == null) { return -1; } - String tStr = map.get(ContextProperties.FINISH_TIME.toString()); if (tStr == null) { return -1; } - return Long.parseLong(tStr); } public void setPartitionTarget(int p, String targetPName) { - String pStr = String.valueOf(p); - Map map = _record.getMapField(pStr); - if (map == null) { - map = new TreeMap(); - _record.setMapField(pStr, map); - } + Map map = getMapField(p); map.put(ContextProperties.TARGET.toString(), targetPName); } public String getTargetForPartition(int p) { - String pStr = String.valueOf(p); - Map map = _record.getMapField(pStr); - if (map == null) { - return null; - } else { - return map.get(ContextProperties.TARGET.toString()); - } + Map map = getMapField(p); + return (map != null) ? map.get(ContextProperties.TARGET.toString()) : null; } public Map> getPartitionsByTarget() { @@ -206,23 +177,13 @@ public class JobContext extends HelixProperty { } public void setTaskIdForPartition(int p, String taskId) { - String pStr = String.valueOf(p); - Map map = _record.getMapField(pStr); - if (map == null) { - map = new TreeMap(); - _record.setMapField(pStr, map); - } + Map map = getMapField(p); map.put(ContextProperties.TASK_ID.toString(), taskId); } public String getTaskIdForPartition(int p) { - String pStr = String.valueOf(p); - Map map = _record.getMapField(pStr); - if (map == null) { - return null; - } else { - return map.get(ContextProperties.TASK_ID.toString()); - } + Map map = getMapField(p); + return (map != null) ? map.get(ContextProperties.TASK_ID.toString()) : null; } public Map getTaskIdPartitionMap() { @@ -238,18 +199,39 @@ public class JobContext extends HelixProperty { } public void setAssignedParticipant(int p, String participantName) { - String pStr = String.valueOf(p); - Map map = _record.getMapField(pStr); - if (map == null) { - map = new TreeMap(); - _record.setMapField(pStr, map); - } + Map map = getMapField(p); map.put(ContextProperties.ASSIGNED_PARTICIPANT.toString(), participantName); } public String getAssignedParticipant(int p) { + Map map = getMapField(p); + return (map != null) ? map.get(ContextProperties.ASSIGNED_PARTICIPANT.toString()) : null; + } + + public void setNextRetryTime(int p, long t) { + Map map = getMapField(p); + map.put(ContextProperties.NEXT_RETRY_TIME.toString(), String.valueOf(t)); + } + + public long getNextRetryTime(int p) { + Map map = getMapField(p); + if (map == null) { + return -1; + } + String tStr = map.get(ContextProperties.NEXT_RETRY_TIME.toString()); + if (tStr == null) { + return -1; + } + return Long.parseLong(tStr); + } + + public Map getMapField(int p) { String pStr = String.valueOf(p); Map map = _record.getMapField(pStr); - return (map != null) ? map.get(ContextProperties.ASSIGNED_PARTICIPANT.toString()) : null; + if (map == null) { + map = new TreeMap(); + _record.setMapField(pStr, map); + } + return map; } } http://git-wip-us.apache.org/repos/asf/helix/blob/c2e41132/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java index f8c6415..131236e 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java @@ -64,8 +64,8 @@ import com.google.common.collect.Sets; public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { private static final Logger LOG = Logger.getLogger(TaskRebalancer.class); - // Management of already-scheduled workflows across jobs - private static final BiMap SCHEDULED_WORKFLOWS = HashBiMap.create(); + // Management of already-scheduled rebalances across jobs + private static final BiMap SCHEDULED_TIMES = HashBiMap.create(); private static final ScheduledExecutorService SCHEDULED_EXECUTOR = Executors .newSingleThreadScheduledExecutor(); @@ -252,6 +252,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { getAllTaskPartitions(jobCfg, jobCtx, workflowConfig, workflowCtx, cache); Map> taskAssignments = getTaskPartitionAssignments(liveInstances, prevAssignment, allPartitions); + long currentTime = System.currentTimeMillis(); for (String instance : taskAssignments.keySet()) { Set pSet = taskAssignments.get(instance); // Used to keep track of partitions that are in one of the final states: COMPLETED, TIMED_OUT, @@ -360,7 +361,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { } if (!successOptional) { - long finishTime = System.currentTimeMillis(); + long finishTime = currentTime; workflowCtx.setJobState(jobResource, TaskState.FAILED); if (workflowConfig.isTerminable()) { workflowCtx.setWorkflowState(TaskState.FAILED); @@ -374,6 +375,9 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { skippedPartitions.add(pId); partitionsToDropFromIs.add(pId); } + } else { + // Mark the task to be started at some later time (if enabled) + markPartitionDelayed(jobCfg, jobCtx, pId); } } break; @@ -395,8 +399,10 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { pSet.removeAll(donePartitions); } + // For delayed tasks, trigger a rebalance event for the closest upcoming ready time + scheduleForNextTask(jobResource, jobCtx, currentTime); + if (isJobComplete(jobCtx, allPartitions, skippedPartitions)) { - long currentTime = System.currentTimeMillis(); workflowCtx.setJobState(jobResource, TaskState.COMPLETED); jobCtx.setFinishTime(currentTime); if (isWorkflowComplete(workflowCtx, workflowConfig)) { @@ -409,10 +415,11 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { if (jobTgtState == TargetState.START) { // Contains the set of task partitions that must be excluded from consideration when making // any new assignments. - // This includes all completed, failed, already assigned partitions. + // This includes all completed, failed, delayed, and already assigned partitions. Set excludeSet = Sets.newTreeSet(assignedPartitions); addCompletedPartitions(excludeSet, jobCtx, allPartitions); excludeSet.addAll(skippedPartitions); + excludeSet.addAll(getNonReadyPartitions(jobCtx, currentTime)); // Get instance->[partition, ...] mappings for the target resource. Map> tgtPartitionAssignments = getTaskAssignment(currStateOutput, prevAssignment, liveInstances, jobCfg, jobCtx, @@ -476,9 +483,9 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { if (delayFromStart <= 0) { // Remove any timers that are past-time for this workflow - Date scheduledTime = SCHEDULED_WORKFLOWS.get(workflowResource); + Date scheduledTime = SCHEDULED_TIMES.get(workflowResource); if (scheduledTime != null && currentTime > scheduledTime.getTime()) { - SCHEDULED_WORKFLOWS.remove(workflowResource); + SCHEDULED_TIMES.remove(workflowResource); } // Recurring workflows are just templates that spawn new workflows @@ -534,8 +541,8 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { } // No need to schedule the same runnable at the same time - if (SCHEDULED_WORKFLOWS.containsKey(workflowResource) - || SCHEDULED_WORKFLOWS.inverse().containsKey(startTime)) { + if (SCHEDULED_TIMES.containsKey(workflowResource) + || SCHEDULED_TIMES.inverse().containsKey(startTime)) { return false; } @@ -543,20 +550,50 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { return false; } - private void scheduleRebalance(String workflowResource, String jobResource, Date startTime, - long delayFromStart) { + private void scheduleRebalance(String id, String jobResource, Date startTime, long delayFromStart) { // No need to schedule the same runnable at the same time - if (SCHEDULED_WORKFLOWS.containsKey(workflowResource) - || SCHEDULED_WORKFLOWS.inverse().containsKey(startTime)) { + if (SCHEDULED_TIMES.containsKey(id) || SCHEDULED_TIMES.inverse().containsKey(startTime)) { return; } // For workflows not yet scheduled, schedule them and record it RebalanceInvoker rebalanceInvoker = new RebalanceInvoker(_manager, jobResource); - SCHEDULED_WORKFLOWS.put(workflowResource, startTime); + SCHEDULED_TIMES.put(id, startTime); SCHEDULED_EXECUTOR.schedule(rebalanceInvoker, delayFromStart, TimeUnit.MILLISECONDS); } + private void scheduleForNextTask(String jobResource, JobContext ctx, long now) { + // Clear current entries if they exist and are expired + long currentTime = now; + Date scheduledTime = SCHEDULED_TIMES.get(jobResource); + if (scheduledTime != null && currentTime > scheduledTime.getTime()) { + SCHEDULED_TIMES.remove(jobResource); + } + + // Figure out the earliest schedulable time in the future of a non-complete job + boolean shouldSchedule = false; + long earliestTime = Long.MAX_VALUE; + for (int p : ctx.getPartitionSet()) { + long retryTime = ctx.getNextRetryTime(p); + TaskPartitionState state = ctx.getPartitionState(p); + state = (state != null) ? state : TaskPartitionState.INIT; + Set errorStates = + Sets.newHashSet(TaskPartitionState.ERROR, TaskPartitionState.TASK_ERROR, + TaskPartitionState.TIMED_OUT); + if (errorStates.contains(state) && retryTime > currentTime && retryTime < earliestTime) { + earliestTime = retryTime; + shouldSchedule = true; + } + } + + // If any was found, then schedule it + if (shouldSchedule) { + long delay = earliestTime - currentTime; + Date startTime = new Date(earliestTime); + scheduleRebalance(jobResource, jobResource, startTime, delay); + } + } + /** * Checks if the job has completed. * @param ctx The rebalancer context. @@ -770,6 +807,15 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { return result; } + private static void markPartitionDelayed(JobConfig cfg, JobContext ctx, int p) { + long delayInterval = cfg.getTaskRetryDelay(); + if (delayInterval <= 0) { + return; + } + long nextStartTime = ctx.getPartitionFinishTime(p) + delayInterval; + ctx.setNextRetryTime(p, nextStartTime); + } + private static void markPartitionCompleted(JobContext ctx, int pId) { ctx.setPartitionState(pId, TaskPartitionState.COMPLETED); ctx.setPartitionFinishTime(pId, System.currentTimeMillis()); @@ -814,10 +860,20 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { } } } - return result; } + private static Set getNonReadyPartitions(JobContext ctx, long now) { + Set nonReadyPartitions = Sets.newHashSet(); + for (int p : ctx.getPartitionSet()) { + long toStart = ctx.getNextRetryTime(p); + if (now < toStart) { + nonReadyPartitions.add(p); + } + } + return nonReadyPartitions; + } + /** * Computes the partition name given the resource name and partition id. */ http://git-wip-us.apache.org/repos/asf/helix/blob/c2e41132/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java index bc5350a..32fd5ac 100644 --- a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java +++ b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java @@ -41,4 +41,5 @@ public class JobBean { public int maxAttemptsPerTask = JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK; public int maxForcedReassignmentsPerTask = JobConfig.DEFAULT_MAX_FORCED_REASSIGNMENTS_PER_TASK; public int failureThreshold = JobConfig.DEFAULT_FAILURE_THRESHOLD; + public long taskRetryDelay = JobConfig.DEFAULT_TASK_RETRY_DELAY; } http://git-wip-us.apache.org/repos/asf/helix/blob/c2e41132/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java index b7f20d1..1f17e92 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java @@ -35,6 +35,7 @@ import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.integration.task.TestTaskRebalancerStopResume.ReindexTask; import org.apache.helix.participant.StateMachineEngine; import org.apache.helix.task.JobConfig; +import org.apache.helix.task.JobContext; import org.apache.helix.task.ScheduleConfig; import org.apache.helix.task.Task; import org.apache.helix.task.TaskCallbackContext; @@ -104,6 +105,12 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase { return new TaskTwo(context, instanceName); } }); + taskFactoryReg.put("SingleFailTask", new TaskFactory() { + @Override + public Task createNewTask(TaskCallbackContext context) { + return new SingleFailTask(); + } + }); _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName); @@ -279,6 +286,33 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase { Assert.assertTrue((startTime + 1000) >= inFiveSeconds); } + @Test + public void testDelayedRetry() throws Exception { + // Create a single job with single task, set retry delay + int delay = 3000; + String jobName = TestHelper.getTestMethodName(); + Workflow.Builder workflowBuilder = new Workflow.Builder(jobName); + List taskConfigs = Lists.newArrayListWithCapacity(1); + Map taskConfigMap = Maps.newHashMap(); + TaskConfig taskConfig1 = new TaskConfig("SingleFailTask", taskConfigMap, false); + taskConfigs.add(taskConfig1); + workflowBuilder.addTaskConfigs(jobName, taskConfigs); + workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand"); + workflowBuilder.addConfig(jobName, JobConfig.TASK_RETRY_DELAY, String.valueOf(delay)); + Map jobConfigMap = Maps.newHashMap(); + workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap); + SingleFailTask.hasFailed = false; + _driver.start(workflowBuilder.build()); + + // Ensure completion + TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED); + + // Ensure a single retry happened + JobContext jobCtx = TaskUtil.getJobContext(_manager, jobName + "_" + jobName); + Assert.assertEquals(jobCtx.getPartitionNumAttempts(0), 2); + Assert.assertTrue(jobCtx.getFinishTime() - jobCtx.getStartTime() >= delay); + } + private class TaskOne extends ReindexTask { private final boolean _shouldFail; private final String _instanceName; @@ -327,4 +361,21 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase { super(context, instanceName); } } + + private static class SingleFailTask implements Task { + public static boolean hasFailed = false; + + @Override + public TaskResult run() { + if (!hasFailed) { + hasFailed = true; + return new TaskResult(Status.ERROR, null); + } + return new TaskResult(Status.COMPLETED, null); + } + + @Override + public void cancel() { + } + } }