Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 091F610B75 for ; Wed, 19 Nov 2014 19:44:34 +0000 (UTC) Received: (qmail 72488 invoked by uid 500); 19 Nov 2014 19:44:34 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 72462 invoked by uid 500); 19 Nov 2014 19:44:33 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 72395 invoked by uid 99); 19 Nov 2014 19:44:33 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Nov 2014 19:44:33 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 9B2039ACC80; Wed, 19 Nov 2014 19:44:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: bikas@apache.org To: commits@tez.apache.org Date: Wed, 19 Nov 2014 19:44:34 -0000 Message-Id: <8a3137ec034247e2908096f576ecbe34@git.apache.org> In-Reply-To: <2dd48619087b49d7bcceeb649984dc34@git.apache.org> References: <2dd48619087b49d7bcceeb649984dc34@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] tez git commit: TEZ-14. Support MR like speculation capabilities based on latency deviation from the mean (bikas) TEZ-14. Support MR like speculation capabilities based on latency deviation from the mean (bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6be75661 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6be75661 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6be75661 Branch: refs/heads/master Commit: 6be7566142a657a3dbcfc262af9c55546da45728 Parents: c56bb01 Author: Bikas Saha Authored: Wed Nov 19 11:44:27 2014 -0800 Committer: Bikas Saha Committed: Wed Nov 19 11:44:27 2014 -0800 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/tez/dag/api/TezConfiguration.java | 16 +- .../org/apache/tez/dag/app/dag/TaskAttempt.java | 6 +- .../java/org/apache/tez/dag/app/dag/Vertex.java | 1 + .../dag/event/TaskAttemptEventKillRequest.java | 7 +- .../dag/event/TaskAttemptEventStatusUpdate.java | 53 --- .../VertexEventTaskAttemptStatusUpdate.java | 60 +++ .../tez/dag/app/dag/event/VertexEventType.java | 3 + .../apache/tez/dag/app/dag/impl/DAGImpl.java | 5 + .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 31 +- .../apache/tez/dag/app/dag/impl/TaskImpl.java | 23 +- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 61 ++- .../dag/speculation/legacy/DataStatistics.java | 86 ++++ .../speculation/legacy/LegacySpeculator.java | 396 +++++++++++++++++++ .../legacy/LegacyTaskRuntimeEstimator.java | 136 +++++++ .../speculation/legacy/StartEndTimesBase.java | 138 +++++++ .../legacy/TaskRuntimeEstimator.java | 91 +++++ .../java/org/apache/tez/dag/app/MockClock.java | 36 ++ .../apache/tez/dag/app/MockDAGAppMaster.java | 36 +- .../org/apache/tez/dag/app/MockLocalClient.java | 6 +- .../org/apache/tez/dag/app/MockTezClient.java | 5 +- .../tez/dag/app/TestMockDAGAppMaster.java | 8 +- .../org/apache/tez/dag/app/TestPreemption.java | 4 +- .../org/apache/tez/dag/app/TestSpeculation.java | 161 ++++++++ .../tez/dag/app/dag/impl/TestTaskAttempt.java | 195 ++++++++- .../speculation/legacy/TestDataStatistics.java | 73 ++++ .../org/apache/tez/test/TestDAGRecovery.java | 2 +- .../apache/tez/test/dag/MultiAttemptDAG.java | 1 + 28 files changed, 1557 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 3c8c676..2eaf873 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,8 @@ Release 0.6.0: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-14. Support MR like speculation capabilities based on latency deviation + from the mean TEZ-1733. TezMerger should sort FileChunks on size when merging TEZ-1738. Tez tfile parser for log parsing TEZ-1627. Remove OUTPUT_CONSUMABLE and related Event in TaskAttemptImpl http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 6873863..84ee906 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -272,7 +272,21 @@ public class TezConfiguration extends Configuration { TEZ_PREFIX + "counters.group-name.max-length"; public static final int TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH_DEFAULT = 128; - + @Unstable + /** + * Boolean value. Enable speculative execution of slower tasks. This can help reduce job latency + * when some tasks are running slower due bad/slow machines + */ + public static final String TEZ_AM_SPECULATION_ENABLED = TEZ_AM_PREFIX + "speculation.enabled"; + public static final boolean TEZ_AM_SPECULATION_ENABLED_DEFAULT = false; + + /** + * Float value. Specifies how many standard deviations away from the mean task execution time + * should be considered as an outlier/slow task. + */ + @Unstable + public static final String TEZ_AM_LEGACY_SPECULATIVE_SLOWTASK_THRESHOLD = + TEZ_AM_PREFIX + "legacy.speculative.slowtask.threshold"; /** * Int value. Upper limit on the number of threads user to launch containers in the app http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java index f30fc5c..4aa220d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java @@ -40,10 +40,14 @@ import org.apache.tez.dag.records.TezVertexID; public interface TaskAttempt { public static class TaskAttemptStatus { + public TezTaskAttemptID id; public TaskAttemptState state; - public DAGCounter localityCounter; public float progress; public TezCounters counters; + + public TaskAttemptStatus(TezTaskAttemptID id) { + this.id = id; + } // insert these counters till they come natively from the task itself. // HDFS-5098 http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java index cfedc41..7487fd9 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java @@ -124,6 +124,7 @@ public interface Vertex extends Comparable { int getInputVerticesCount(); int getOutputVerticesCount(); void scheduleTasks(List tasks); + void scheduleSpeculativeTask(TezTaskID taskId); Resource getTaskResource(); ProcessorDescriptor getProcessorDescriptor(); http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java index 9bceb1d..0205fcf 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java @@ -19,7 +19,7 @@ package org.apache.tez.dag.app.dag.event; import org.apache.tez.dag.records.TezTaskAttemptID; -public class TaskAttemptEventKillRequest extends TaskAttemptEvent { +public class TaskAttemptEventKillRequest extends TaskAttemptEvent implements DiagnosableEvent { private final String message; @@ -28,8 +28,9 @@ public class TaskAttemptEventKillRequest extends TaskAttemptEvent { this.message = message; } - public String getMessage() { - return this.message; + @Override + public String getDiagnosticInfo() { + return message; } } http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java index 13577c5..c5a6ea7 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java @@ -18,12 +18,6 @@ package org.apache.tez.dag.app.dag.event; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.tez.common.counters.DAGCounter; -import org.apache.tez.common.counters.TezCounters; -import org.apache.tez.dag.api.oldrecords.TaskAttemptState; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; @@ -40,51 +34,4 @@ public class TaskAttemptEventStatusUpdate extends TaskAttemptEvent { public TaskStatusUpdateEvent getStatusEvent() { return this.taskAttemptStatus; } - - private TaskAttemptStatusOld reportedTaskAttemptStatus; - - public TaskAttemptEventStatusUpdate(TezTaskAttemptID id, - TaskAttemptStatusOld taskAttemptStatus) { - super(id, TaskAttemptEventType.TA_STATUS_UPDATE); - this.reportedTaskAttemptStatus = taskAttemptStatus; - } - - public TaskAttemptStatusOld getReportedTaskAttemptStatus() { - return reportedTaskAttemptStatus; - } - - /** - * The internal TaskAttemptStatus object corresponding to remote Task status. - * - */ - public static class TaskAttemptStatusOld { - - private AtomicBoolean localitySet = new AtomicBoolean(false); - - public TezTaskAttemptID id; - public float progress; - public TezCounters counters; - public String stateString; - //public Phase phase; - public long outputSize; - public List fetchFailedMaps; - public long mapFinishTime; - public long shuffleFinishTime; - public long sortFinishTime; - public TaskAttemptState taskState; - - public void setLocalityCounter(DAGCounter localityCounter) { - if (!localitySet.get()) { - localitySet.set(true); - if (counters == null) { - counters = new TezCounters(); - } - if (localityCounter != null) { - counters.findCounter(localityCounter).increment(1); - // TODO Maybe validate that the correct value is being set. - } - } - } - - } } http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTaskAttemptStatusUpdate.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTaskAttemptStatusUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTaskAttemptStatusUpdate.java new file mode 100644 index 0000000..696680d --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTaskAttemptStatusUpdate.java @@ -0,0 +1,60 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.dag.app.dag.event; + +import org.apache.tez.dag.api.oldrecords.TaskAttemptState; +import org.apache.tez.dag.records.TezTaskAttemptID; + +public class VertexEventTaskAttemptStatusUpdate extends VertexEvent { + final TezTaskAttemptID id; + final TaskAttemptState state; + final long timestamp; + final boolean justStarted; + + public VertexEventTaskAttemptStatusUpdate(TezTaskAttemptID taId, TaskAttemptState state, + long timestamp) { + this(taId, state, timestamp, false); + } + + public VertexEventTaskAttemptStatusUpdate(TezTaskAttemptID taId, TaskAttemptState state, + long timestamp, boolean justStarted) { + super(taId.getTaskID().getVertexID(), VertexEventType.V_TASK_ATTEMPT_STATUS_UPDATE); + this.id = taId; + this.state = state; + this.timestamp = timestamp; + this.justStarted = justStarted; + } + + public long getTimestamp() { + return timestamp; + } + + public TezTaskAttemptID getAttemptId() { + return id; + } + + public boolean hasJustStarted() { + return justStarted; + } + + public TaskAttemptState getTaskAttemptState() { + return state; + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java index b4f7e29..5565f93 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java @@ -40,6 +40,9 @@ public enum VertexEventType { V_TASK_RESCHEDULED, V_TASK_ATTEMPT_COMPLETED, + //Producer:TaskAttempt + V_TASK_ATTEMPT_STATUS_UPDATE, + //Producer:Any component V_INTERNAL_ERROR, V_MANAGER_USER_CODE_ERROR, http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index de62752..6e7805e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -111,6 +111,7 @@ import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent; import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.dag.utils.TaskSpecificLaunchCmdOption; import org.apache.tez.dag.utils.RelocalizationUtils; @@ -768,6 +769,10 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, .getAttempt(taId); } + public TaskImpl getTask(TezTaskID tId) { + return (TaskImpl) getVertex(tId.getVertexID()).getTask(tId); + } + protected void initializeVerticesAndStart() { for (Vertex v : vertices.values()) { if (v.getInputVerticesCount() == 0) { http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index deaba42..3056c1e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.util.Records; import org.apache.tez.common.counters.DAGCounter; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.TaskLocationHint; import org.apache.tez.dag.api.oldrecords.TaskAttemptReport; @@ -83,6 +84,7 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate; import org.apache.tez.dag.app.dag.event.TaskEventType; import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent; +import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptStatusUpdate; import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded; import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest; import org.apache.tez.dag.history.DAGHistoryEvent; @@ -413,7 +415,7 @@ public class TaskAttemptImpl implements TaskAttempt, this.clock = clock; this.taskHeartbeatHandler = taskHeartbeatHandler; this.appContext = appContext; - this.reportedStatus = new TaskAttemptStatus(); + this.reportedStatus = new TaskAttemptStatus(this.attemptId); initTaskAttemptStatus(reportedStatus); RackResolver.init(conf); this.stateMachine = stateMachineFactory.make(this); @@ -1151,10 +1153,20 @@ public class TaskAttemptImpl implements TaskAttempt, // Inform the Task ta.sendEvent(new TaskEventTAUpdate(ta.attemptId, TaskEventType.T_ATTEMPT_LAUNCHED)); + + if (ta.isSpeculationEnabled()) { + ta.sendEvent(new VertexEventTaskAttemptStatusUpdate(ta.attemptId, TaskAttemptState.RUNNING, + ta.launchTime, true)); + } ta.taskHeartbeatHandler.register(ta.attemptId); } } + + private boolean isSpeculationEnabled() { + return conf.getBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED, + TezConfiguration.TEZ_AM_SPECULATION_ENABLED_DEFAULT); + } protected static class TerminatedBeforeRunningTransition extends TerminateTransition { @@ -1235,6 +1247,10 @@ public class TaskAttemptImpl implements TaskAttempt, ta.updateProgressSplits(); + if (ta.isSpeculationEnabled()) { + ta.sendEvent(new VertexEventTaskAttemptStatusUpdate(ta.attemptId, ta.getState(), + ta.clock.getTime())); + } } } @@ -1259,6 +1275,14 @@ public class TaskAttemptImpl implements TaskAttempt, // Unregister from the TaskHeartbeatHandler. ta.taskHeartbeatHandler.unregister(ta.attemptId); + + ta.reportedStatus.state = TaskAttemptState.SUCCEEDED; + ta.reportedStatus.progress = 1.0f; + + if (ta.isSpeculationEnabled()) { + ta.sendEvent(new VertexEventTaskAttemptStatusUpdate(ta.attemptId, TaskAttemptState.SUCCEEDED, + ta.clock.getTime())); + } // TODO maybe. For reuse ... Stacking pulls for a reduce task, even if the // TA finishes independently. // Will likely be the Job's responsibility. @@ -1278,6 +1302,11 @@ public class TaskAttemptImpl implements TaskAttempt, public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) { super.transition(ta, event); ta.taskHeartbeatHandler.unregister(ta.attemptId); + ta.reportedStatus.state = helper.getTaskAttemptState(); // FAILED or KILLED + if (ta.isSpeculationEnabled()) { + ta.sendEvent(new VertexEventTaskAttemptStatusUpdate(ta.attemptId, helper.getTaskAttemptState(), + ta.clock.getTime())); + } } } http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index 4ded9be..c3ba11d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -46,7 +46,6 @@ import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.util.Clock; import org.apache.tez.common.counters.TezCounters; -import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.oldrecords.TaskAttemptState; @@ -1016,6 +1015,7 @@ public class TaskImpl implements Task, EventHandler { if (task.historyTaskStartGenerated) { task.logJobHistoryTaskFinishedEvent(); } + TaskAttempt successfulAttempt = task.attempts.get(successTaId); // issue kill to all other attempts for (TaskAttempt attempt : task.attempts.values()) { @@ -1024,9 +1024,18 @@ public class TaskImpl implements Task, EventHandler { // TA_KILL message to an attempt that doesn't need one for // other reasons. !attempt.isFinished()) { - LOG.info("Issuing kill to other attempt " + attempt.getID()); + LOG.info("Issuing kill to other attempt " + attempt.getID() + " as attempt: " + + task.successfulAttempt + " has succeeded"); + String diagnostics = null; + if (attempt.getLaunchTime() < successfulAttempt.getLaunchTime()) { + diagnostics = "Killed this attempt as other speculative attempt : " + successTaId + + " succeeded"; + } else { + diagnostics = "Killed this speculative attempt as original attempt: " + successTaId + + " succeeded"; + } task.eventHandler.handle(new TaskAttemptEventKillRequest(attempt - .getID(), "Alternate attempt succeeded")); + .getID(), diagnostics)); } } // send notification to DAG scheduler @@ -1336,12 +1345,6 @@ public class TaskImpl implements Task, EventHandler { @Override public TaskStateInternal transition(TaskImpl task, TaskEvent event) { - // verify that this occurs only for map task - // TODO: consider moving it to MapTaskImpl - if (task.leafVertex) { - LOG.error("Unexpected event for task of leaf vertex " + event.getType()); - task.internalError(event.getType()); - } TaskEventTAUpdate attemptEvent = (TaskEventTAUpdate) event; TezTaskAttemptID attemptId = attemptEvent.getTaskAttemptID(); @@ -1365,6 +1368,8 @@ public class TaskImpl implements Task, EventHandler { return TaskStateInternal.SCHEDULED; } else { // nothing to do + LOG.info("Ignoring kill of attempt: " + attemptId + " because attempt: " + + task.successfulAttempt + " is already successful"); return TaskStateInternal.SUCCEEDED; } } http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index d19c4cc..54cd6c4 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -68,6 +68,7 @@ import org.apache.tez.dag.api.OutputCommitterDescriptor; import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.RootInputLeafOutput; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.VertexLocationHint; import org.apache.tez.dag.api.TaskLocationHint; @@ -121,11 +122,13 @@ import org.apache.tez.dag.app.dag.event.VertexEventSourceTaskAttemptCompleted; import org.apache.tez.dag.app.dag.event.VertexEventSourceVertexRecovered; import org.apache.tez.dag.app.dag.event.VertexEventSourceVertexStarted; import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted; +import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptStatusUpdate; import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted; import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule; import org.apache.tez.dag.app.dag.event.VertexEventTermination; import org.apache.tez.dag.app.dag.event.VertexEventType; import org.apache.tez.dag.app.dag.impl.DAGImpl.VertexGroupInfo; +import org.apache.tez.dag.app.dag.speculation.legacy.LegacySpeculator; import org.apache.tez.dag.history.DAGHistoryEvent; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.events.VertexCommitStartedEvent; @@ -205,6 +208,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, private Resource taskResource; private Configuration conf; + + private final boolean isSpeculationEnabled; //fields initialized in init @@ -235,6 +240,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, private static final TaskAttemptCompletedEventTransition TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION = new TaskAttemptCompletedEventTransition(); + private static final TaskAttempStatusUpdateEventTransition + TASK_ATTEMPT_STATUS_UPDATE_EVENT_TRANSITION = new TaskAttempStatusUpdateEventTransition(); private static final SourceTaskAttemptCompletedEventTransition SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION = new SourceTaskAttemptCompletedEventTransition(); @@ -248,6 +255,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, @VisibleForTesting final List pendingInitializerEvents = new LinkedList(); + + LegacySpeculator speculator; protected static final StateMachineFactory @@ -460,6 +469,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EnumSet.of(VertexState.RUNNING, VertexState.TERMINATING), VertexEventType.V_ROUTE_EVENT, ROUTE_EVENT_TRANSITION) + .addTransition( + VertexState.RUNNING, + VertexState.RUNNING, VertexEventType.V_TASK_ATTEMPT_STATUS_UPDATE, + TASK_ATTEMPT_STATUS_UPDATE_EVENT_TRANSITION) // Transitions from TERMINATING state. .addTransition @@ -477,6 +490,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, VertexEventType.V_MANAGER_USER_CODE_ERROR, VertexEventType.V_ROOT_INPUT_FAILED, VertexEventType.V_SOURCE_VERTEX_STARTED, + VertexEventType.V_TASK_ATTEMPT_STATUS_UPDATE, VertexEventType.V_ROOT_INPUT_INITIALIZED, VertexEventType.V_NULL_EDGE_INITIALIZED, VertexEventType.V_ROUTE_EVENT, @@ -494,7 +508,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, VertexEventType.V_TASK_RESCHEDULED, new TaskRescheduledAfterVertexSuccessTransition()) - // Ignore-able events .addTransition( VertexState.SUCCEEDED, EnumSet.of(VertexState.SUCCEEDED, VertexState.FAILED), @@ -506,10 +519,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EnumSet.of(VertexState.FAILED, VertexState.ERROR), VertexEventType.V_TASK_COMPLETED, new TaskCompletedAfterVertexSuccessTransition()) + // Ignore-able events .addTransition(VertexState.SUCCEEDED, VertexState.SUCCEEDED, EnumSet.of(VertexEventType.V_TERMINATE, VertexEventType.V_ROOT_INPUT_FAILED, VertexEventType.V_TASK_ATTEMPT_COMPLETED, + VertexEventType.V_TASK_ATTEMPT_STATUS_UPDATE, // after we are done reruns of source tasks should not affect // us. These reruns may be triggered by other consumer vertices. // We should have been in RUNNING state if we had triggered the @@ -519,6 +534,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, VertexEventType.V_TASK_ATTEMPT_COMPLETED, new TaskAttemptCompletedEventTransition()) + // Transitions from FAILED state .addTransition( VertexState.FAILED, @@ -534,6 +550,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, VertexEventType.V_START, VertexEventType.V_ROUTE_EVENT, VertexEventType.V_TASK_ATTEMPT_COMPLETED, + VertexEventType.V_TASK_ATTEMPT_STATUS_UPDATE, VertexEventType.V_TASK_COMPLETED, VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT, VertexEventType.V_ROOT_INPUT_INITIALIZED, @@ -558,6 +575,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, VertexEventType.V_ROUTE_EVENT, VertexEventType.V_TASK_RESCHEDULED, VertexEventType.V_TASK_ATTEMPT_COMPLETED, + VertexEventType.V_TASK_ATTEMPT_STATUS_UPDATE, VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT, VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, VertexEventType.V_TASK_COMPLETED, @@ -577,6 +595,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, VertexEventType.V_ROUTE_EVENT, VertexEventType.V_TERMINATE, VertexEventType.V_MANAGER_USER_CODE_ERROR, + VertexEventType.V_TASK_ATTEMPT_STATUS_UPDATE, VertexEventType.V_TASK_COMPLETED, VertexEventType.V_TASK_ATTEMPT_COMPLETED, VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT, @@ -773,7 +792,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, // Not sending the notifier a parallelism update since this is the initial parallelism this.dagVertexGroups = dagVertexGroups; - + + isSpeculationEnabled = conf.getBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED, + TezConfiguration.TEZ_AM_SPECULATION_ENABLED_DEFAULT); + + if (isSpeculationEnabled()) { + speculator = new LegacySpeculator(conf, getAppContext(), this); + } + logIdentifier = this.getVertexId() + " [" + this.getName() + "]"; // This "this leak" is okay because the retained pointer is in an // instance variable. @@ -782,6 +808,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, stateMachineFactory.make(this), this); augmentStateMachine(); } + + private boolean isSpeculationEnabled() { + return isSpeculationEnabled; + } protected StateMachine getStateMachine() { return stateMachine; @@ -1194,6 +1224,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } @Override + public void scheduleSpeculativeTask(TezTaskID taskId) { + Preconditions.checkState(taskId.getId() < numTasks); + eventHandler.handle(new TaskEvent(taskId, TaskEventType.T_ADD_SPEC_ATTEMPT)); + } + + @Override public void scheduleTasks(List tasksToSchedule) { writeLock.lock(); try { @@ -3282,6 +3318,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, eventHandler.handle(new VertexEvent( this.vertexId, VertexEventType.V_COMPLETED)); } + return VertexState.RUNNING; } @@ -3483,7 +3520,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, String msg = "Exception in " + e.getSource() + ", vertex:" + vertex.getLogIdentifier(); LOG.error(msg, e); vertex.addDiagnostic(msg + "," + ExceptionUtils.getStackTrace(e.getCause())); - vertex.tryEnactKill(VertexTerminationCause.AM_USERCODE_FAILURE, TaskTerminationCause.AM_USERCODE_FAILURE); + vertex.tryEnactKill(VertexTerminationCause.AM_USERCODE_FAILURE, + TaskTerminationCause.AM_USERCODE_FAILURE); return VertexState.TERMINATING; } } else { @@ -3515,6 +3553,23 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } } + private static class TaskAttempStatusUpdateEventTransition implements + SingleArcTransition { + @Override + public void transition(VertexImpl vertex, VertexEvent event) { + VertexEventTaskAttemptStatusUpdate updateEvent = + ((VertexEventTaskAttemptStatusUpdate) event); + if (vertex.isSpeculationEnabled()) { + if (updateEvent.hasJustStarted()) { + vertex.speculator.notifyAttemptStarted(updateEvent.getAttemptId(), + updateEvent.getTimestamp()); + } else { + vertex.speculator.notifyAttemptStatusUpdate(updateEvent.getAttemptId(), + updateEvent.getTaskAttemptState(), updateEvent.getTimestamp()); + } + } + } + } private static class TaskCompletedTransition implements MultipleArcTransition { http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/DataStatistics.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/DataStatistics.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/DataStatistics.java new file mode 100644 index 0000000..7e6f1c2 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/DataStatistics.java @@ -0,0 +1,86 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.dag.app.dag.speculation.legacy; + +import com.google.common.annotations.VisibleForTesting; + +public class DataStatistics { + private int count = 0; + private double sum = 0; + private double sumSquares = 0; + + public DataStatistics() { + } + + public DataStatistics(double initNum) { + this.count = 1; + this.sum = initNum; + this.sumSquares = initNum * initNum; + } + + public synchronized void add(double newNum) { + this.count++; + this.sum += newNum; + this.sumSquares += newNum * newNum; + } + + @VisibleForTesting + synchronized void updateStatistics(double old, double update) { + this.sum += update - old; + this.sumSquares += (update * update) - (old * old); + } + + public synchronized double mean() { + // when no data then mean estimate should be large + //return count == 0 ? 0.0 : sum/count; + return count == 0 ? Long.MAX_VALUE : sum/count; + } + + public synchronized double var() { + // E(X^2) - E(X)^2 + if (count <= 1) { + return 0.0; + } + double mean = mean(); + return Math.max((sumSquares/count) - mean * mean, 0.0d); + } + + public synchronized double std() { + return Math.sqrt(this.var()); + } + + public synchronized double outlier(float sigma) { + if (count != 0.0) { + return mean() + std() * sigma; + } + + // when no data available then outlier estimate should be large + //return 0.0; + return Long.MAX_VALUE; + } + + public synchronized double count() { + return count; + } + + public String toString() { + return "DataStatistics: count is " + count + ", sum is " + sum + + ", sumSquares is " + sumSquares + " mean is " + mean() + " std() is " + std(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java new file mode 100644 index 0000000..8f76e05 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java @@ -0,0 +1,396 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.dag.app.dag.speculation.legacy; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.tez.dag.api.oldrecords.TaskAttemptState; +import org.apache.tez.dag.api.oldrecords.TaskState; +import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.app.dag.Task; +import org.apache.tez.dag.app.dag.TaskAttempt; +import org.apache.tez.dag.app.dag.Vertex; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; + +import com.google.common.base.Preconditions; + +/** + * Maintains runtime estimation statistics. Makes periodic updates + * estimates based on progress and decides on when to trigger a + * speculative attempt. Speculation attempts are triggered when the + * estimated runtime is more than a threshold beyond the mean runtime + * and the original task still has enough estimated runtime left that + * the speculative version is expected to finish sooner than that. If + * the original is close to completion then we dont start a speculation + * because it may be likely a wasted attempt. There is a delay between + * successive speculations. + */ +public class LegacySpeculator { + + private static final long ON_SCHEDULE = Long.MIN_VALUE; + private static final long ALREADY_SPECULATING = Long.MIN_VALUE + 1; + private static final long TOO_NEW = Long.MIN_VALUE + 2; + private static final long PROGRESS_IS_GOOD = Long.MIN_VALUE + 3; + private static final long NOT_RUNNING = Long.MIN_VALUE + 4; + private static final long TOO_LATE_TO_SPECULATE = Long.MIN_VALUE + 5; + + private static final long SOONEST_RETRY_AFTER_NO_SPECULATE = 1000L * 1L; + private static final long SOONEST_RETRY_AFTER_SPECULATE = 1000L * 15L; + + private static final double PROPORTION_RUNNING_TASKS_SPECULATABLE = 0.1; + private static final double PROPORTION_TOTAL_TASKS_SPECULATABLE = 0.01; + private static final int MINIMUM_ALLOWED_SPECULATIVE_TASKS = 10; + + private static final Log LOG = LogFactory.getLog(LegacySpeculator.class); + + private final ConcurrentMap runningTasks + = new ConcurrentHashMap(); + + // Used to track any TaskAttempts that aren't heart-beating for a while, so + // that we can aggressively speculate instead of waiting for task-timeout. + private final ConcurrentMap + runningTaskAttemptStatistics = new ConcurrentHashMap(); + // Regular heartbeat from tasks is every 3 secs. So if we don't get a + // heartbeat in 9 secs (3 heartbeats), we simulate a heartbeat with no change + // in progress. + private static final long MAX_WAITTING_TIME_FOR_HEARTBEAT = 9 * 1000; + + + private final Set mayHaveSpeculated = new HashSet(); + + private Vertex vertex; + private TaskRuntimeEstimator estimator; + + private final Clock clock; + private long nextSpeculateTime = Long.MIN_VALUE; + + public LegacySpeculator(Configuration conf, AppContext context, Vertex vertex) { + this(conf, context.getClock(), vertex); + } + + public LegacySpeculator(Configuration conf, Clock clock, Vertex vertex) { + this(conf, getEstimator(conf, vertex), clock, vertex); + } + + static private TaskRuntimeEstimator getEstimator + (Configuration conf, Vertex vertex) { + TaskRuntimeEstimator estimator = new LegacyTaskRuntimeEstimator(); + estimator.contextualize(conf, vertex); + + return estimator; + } + + // This constructor is designed to be called by other constructors. + // However, it's public because we do use it in the test cases. + // Normally we figure out our own estimator. + public LegacySpeculator + (Configuration conf, TaskRuntimeEstimator estimator, Clock clock, Vertex vertex) { + this.vertex = vertex; + this.estimator = estimator; + this.clock = clock; + } + +/* ************************************************************* */ + + void maybeSpeculate() { + long now = clock.getTime(); + + if (now < nextSpeculateTime) { + return; + } + + int speculations = maybeScheduleASpeculation(); + long mininumRecomp + = speculations > 0 ? SOONEST_RETRY_AFTER_SPECULATE + : SOONEST_RETRY_AFTER_NO_SPECULATE; + + long wait = Math.max(mininumRecomp, + clock.getTime() - now); + nextSpeculateTime = now + wait; + + if (speculations > 0) { + LOG.info("We launched " + speculations + + " speculations. Waiting " + wait + " milliseconds."); + } + } + +/* ************************************************************* */ + + public void notifyAttemptStarted(TezTaskAttemptID taId, long timestamp) { + estimator.enrollAttempt(taId, timestamp); + } + + public void notifyAttemptStatusUpdate(TezTaskAttemptID taId, TaskAttemptState reportedState, + long timestamp) { + statusUpdate(taId, reportedState, timestamp); + maybeSpeculate(); + } + + /** + * Absorbs one TaskAttemptStatus + * + * @param reportedStatus the status report that we got from a task attempt + * that we want to fold into the speculation data for this job + * @param timestamp the time this status corresponds to. This matters + * because statuses contain progress. + */ + private void statusUpdate(TezTaskAttemptID attemptID, TaskAttemptState reportedState, long timestamp) { + + TezTaskID taskID = attemptID.getTaskID(); + Task task = vertex.getTask(taskID); + + Preconditions.checkState(task != null, "Null task for attempt: " + attemptID); + + estimator.updateAttempt(attemptID, reportedState, timestamp); + + //if (stateString.equals(TaskAttemptState.RUNNING.name())) { + if (reportedState == TaskAttemptState.RUNNING) { + runningTasks.putIfAbsent(taskID, Boolean.TRUE); + } else { + runningTasks.remove(taskID, Boolean.TRUE); + //if (!stateString.equals(TaskAttemptState.STARTING.name())) { + if (reportedState == TaskAttemptState.STARTING) { + runningTaskAttemptStatistics.remove(attemptID); + } + } + } + +/* ************************************************************* */ + +// This is the code section that runs periodically and adds speculations for +// those jobs that need them. + + + // This can return a few magic values for tasks that shouldn't speculate: + // returns ON_SCHEDULE if thresholdRuntime(taskID) says that we should not + // considering speculating this task + // returns ALREADY_SPECULATING if that is true. This has priority. + // returns TOO_NEW if our companion task hasn't gotten any information + // returns PROGRESS_IS_GOOD if the task is sailing through + // returns NOT_RUNNING if the task is not running + // + // All of these values are negative. Any value that should be allowed to + // speculate is 0 or positive. + private long speculationValue(Task task, long now) { + Map attempts = task.getAttempts(); + TezTaskID taskID = task.getTaskId(); + long acceptableRuntime = Long.MIN_VALUE; + long result = Long.MIN_VALUE; + + // short circuit completed tasks. no need to spend time on them + if (task.getState() == TaskState.SUCCEEDED) { + return NOT_RUNNING; + } + + if (!mayHaveSpeculated.contains(taskID)) { + acceptableRuntime = estimator.thresholdRuntime(taskID); + if (acceptableRuntime == Long.MAX_VALUE) { + return ON_SCHEDULE; + } + } + + TezTaskAttemptID runningTaskAttemptID = null; + + int numberRunningAttempts = 0; + + for (TaskAttempt taskAttempt : attempts.values()) { + if (taskAttempt.getState() == TaskAttemptState.RUNNING + || taskAttempt.getState() == TaskAttemptState.STARTING) { + if (++numberRunningAttempts > 1) { + return ALREADY_SPECULATING; + } + runningTaskAttemptID = taskAttempt.getID(); + + long estimatedRunTime = estimator.estimatedRuntime(runningTaskAttemptID); + + long taskAttemptStartTime + = estimator.attemptEnrolledTime(runningTaskAttemptID); + if (taskAttemptStartTime > now) { + // This background process ran before we could process the task + // attempt status change that chronicles the attempt start + return TOO_NEW; + } + + long estimatedEndTime = estimatedRunTime + taskAttemptStartTime; + + long estimatedReplacementEndTime + = now + estimator.newAttemptEstimatedRuntime(); + + float progress = taskAttempt.getProgress(); + TaskAttemptHistoryStatistics data = + runningTaskAttemptStatistics.get(runningTaskAttemptID); + if (data == null) { + runningTaskAttemptStatistics.put(runningTaskAttemptID, + new TaskAttemptHistoryStatistics(estimatedRunTime, progress, now)); + } else { + if (estimatedRunTime == data.getEstimatedRunTime() + && progress == data.getProgress()) { + // Previous stats are same as same stats + if (data.notHeartbeatedInAWhile(now)) { + // Stats have stagnated for a while, simulate heart-beat. + // Now simulate the heart-beat + statusUpdate(taskAttempt.getID(), taskAttempt.getState(), clock.getTime()); + } + } else { + // Stats have changed - update our data structure + data.setEstimatedRunTime(estimatedRunTime); + data.setProgress(progress); + data.resetHeartBeatTime(now); + } + } + + if (estimatedEndTime < now) { + return PROGRESS_IS_GOOD; + } + + if (estimatedReplacementEndTime >= estimatedEndTime) { + return TOO_LATE_TO_SPECULATE; + } + + result = estimatedEndTime - estimatedReplacementEndTime; + } + } + + // If we are here, there's at most one task attempt. + if (numberRunningAttempts == 0) { + return NOT_RUNNING; + } + + + + if (acceptableRuntime == Long.MIN_VALUE) { + acceptableRuntime = estimator.thresholdRuntime(taskID); + if (acceptableRuntime == Long.MAX_VALUE) { + return ON_SCHEDULE; + } + } + + return result; + } + + //Add attempt to a given Task. + protected void addSpeculativeAttempt(TezTaskID taskID) { + LOG.info + ("DefaultSpeculator.addSpeculativeAttempt -- we are speculating " + taskID); + vertex.scheduleSpeculativeTask(taskID); + mayHaveSpeculated.add(taskID); + } + + private int maybeScheduleASpeculation() { + int successes = 0; + + long now = clock.getTime(); + + int numberSpeculationsAlready = 0; + int numberRunningTasks = 0; + + Map tasks = vertex.getTasks(); + + int numberAllowedSpeculativeTasks + = (int) Math.max(MINIMUM_ALLOWED_SPECULATIVE_TASKS, + PROPORTION_TOTAL_TASKS_SPECULATABLE * tasks.size()); + + TezTaskID bestTaskID = null; + long bestSpeculationValue = -1L; + + // this loop is potentially pricey. + // TODO track the tasks that are potentially worth looking at + for (Map.Entry taskEntry : tasks.entrySet()) { + long mySpeculationValue = speculationValue(taskEntry.getValue(), now); + + if (mySpeculationValue == ALREADY_SPECULATING) { + ++numberSpeculationsAlready; + } + + if (mySpeculationValue != NOT_RUNNING) { + ++numberRunningTasks; + } + + if (mySpeculationValue > bestSpeculationValue) { + bestTaskID = taskEntry.getKey(); + bestSpeculationValue = mySpeculationValue; + } + } + numberAllowedSpeculativeTasks + = (int) Math.max(numberAllowedSpeculativeTasks, + PROPORTION_RUNNING_TASKS_SPECULATABLE * numberRunningTasks); + + // If we found a speculation target, fire it off + if (bestTaskID != null + && numberAllowedSpeculativeTasks > numberSpeculationsAlready) { + addSpeculativeAttempt(bestTaskID); + ++successes; + } + + return successes; + } + + static class TaskAttemptHistoryStatistics { + + private long estimatedRunTime; + private float progress; + private long lastHeartBeatTime; + + public TaskAttemptHistoryStatistics(long estimatedRunTime, float progress, + long nonProgressStartTime) { + this.estimatedRunTime = estimatedRunTime; + this.progress = progress; + resetHeartBeatTime(nonProgressStartTime); + } + + public long getEstimatedRunTime() { + return this.estimatedRunTime; + } + + public float getProgress() { + return this.progress; + } + + public void setEstimatedRunTime(long estimatedRunTime) { + this.estimatedRunTime = estimatedRunTime; + } + + public void setProgress(float progress) { + this.progress = progress; + } + + public boolean notHeartbeatedInAWhile(long now) { + if (now - lastHeartBeatTime <= MAX_WAITTING_TIME_FOR_HEARTBEAT) { + return false; + } else { + resetHeartBeatTime(now); + return true; + } + } + + public void resetHeartBeatTime(long lastHeartBeatTime) { + this.lastHeartBeatTime = lastHeartBeatTime; + } + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacyTaskRuntimeEstimator.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacyTaskRuntimeEstimator.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacyTaskRuntimeEstimator.java new file mode 100644 index 0000000..14d269c --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacyTaskRuntimeEstimator.java @@ -0,0 +1,136 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.dag.app.dag.speculation.legacy; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.tez.dag.api.oldrecords.TaskAttemptState; +import org.apache.tez.dag.app.dag.Task; +import org.apache.tez.dag.app.dag.TaskAttempt; +import org.apache.tez.dag.records.TezTaskAttemptID; + +/** + * Runtime estimator that uses a simple scheme of estimating task attempt + * runtime based on current elapsed runtime and reported progress. + */ +public class LegacyTaskRuntimeEstimator extends StartEndTimesBase { + + private final Map attemptRuntimeEstimates + = new ConcurrentHashMap(); + private final ConcurrentHashMap attemptRuntimeEstimateVariances + = new ConcurrentHashMap(); + + @Override + public void updateAttempt(TezTaskAttemptID attemptID, TaskAttemptState state, long timestamp) { + super.updateAttempt(attemptID, state, timestamp); + + + Task task = vertex.getTask(attemptID.getTaskID()); + + if (task == null) { + return; + } + + TaskAttempt taskAttempt = task.getAttempt(attemptID); + + if (taskAttempt == null) { + return; + } + + float progress = taskAttempt.getProgress(); + + Long boxedStart = startTimes.get(attemptID); + long start = boxedStart == null ? Long.MIN_VALUE : boxedStart; + + // We need to do two things. + // 1: If this is a completion, we accumulate statistics in the superclass + // 2: If this is not a completion, we learn more about it. + + // This is not a completion, but we're cooking. + // + if (taskAttempt.getState() == TaskAttemptState.RUNNING) { + // See if this task is already in the registry + AtomicLong estimateContainer = attemptRuntimeEstimates.get(taskAttempt); + AtomicLong estimateVarianceContainer + = attemptRuntimeEstimateVariances.get(taskAttempt); + + if (estimateContainer == null) { + if (attemptRuntimeEstimates.get(taskAttempt) == null) { + attemptRuntimeEstimates.put(taskAttempt, new AtomicLong()); + + estimateContainer = attemptRuntimeEstimates.get(taskAttempt); + } + } + + if (estimateVarianceContainer == null) { + attemptRuntimeEstimateVariances.putIfAbsent(taskAttempt, new AtomicLong()); + estimateVarianceContainer = attemptRuntimeEstimateVariances.get(taskAttempt); + } + + + long estimate = -1; + long varianceEstimate = -1; + + // This code assumes that we'll never consider starting a third + // speculative task attempt if two are already running for this task + if (start > 0 && timestamp > start) { + estimate = (long) ((timestamp - start) / Math.max(0.0001, progress)); + varianceEstimate = (long) (estimate * progress / 10); + } + if (estimateContainer != null) { + estimateContainer.set(estimate); + } + if (estimateVarianceContainer != null) { + estimateVarianceContainer.set(varianceEstimate); + } + } + } + + private long storedPerAttemptValue + (Map data, TezTaskAttemptID attemptID) { + Task task = vertex.getTask(attemptID.getTaskID()); + + if (task == null) { + return -1L; + } + + TaskAttempt taskAttempt = task.getAttempt(attemptID); + + if (taskAttempt == null) { + return -1L; + } + + AtomicLong estimate = data.get(taskAttempt); + + return estimate == null ? -1L : estimate.get(); + + } + + @Override + public long estimatedRuntime(TezTaskAttemptID attemptID) { + return storedPerAttemptValue(attemptRuntimeEstimates, attemptID); + } + + @Override + public long runtimeEstimateVariance(TezTaskAttemptID attemptID) { + return storedPerAttemptValue(attemptRuntimeEstimateVariances, attemptID); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/StartEndTimesBase.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/StartEndTimesBase.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/StartEndTimesBase.java new file mode 100644 index 0000000..d4d1a7f --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/StartEndTimesBase.java @@ -0,0 +1,138 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.dag.app.dag.speculation.legacy; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.oldrecords.TaskAttemptState; +import org.apache.tez.dag.app.dag.Task; +import org.apache.tez.dag.app.dag.TaskAttempt; +import org.apache.tez.dag.app.dag.Vertex; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; + +/** + * Base class that uses the attempt runtime estimations from a derived class + * and uses it to determine outliers based on deviating beyond the mean + * estimated runtime by some threshold + */ +abstract class StartEndTimesBase implements TaskRuntimeEstimator { + static final float MINIMUM_COMPLETE_PROPORTION_TO_SPECULATE + = 0.05F; + static final int MINIMUM_COMPLETE_NUMBER_TO_SPECULATE + = 1; + + protected Vertex vertex; + + protected final Map startTimes + = new ConcurrentHashMap(); + + protected final DataStatistics taskStatistics = new DataStatistics(); + + private float slowTaskRelativeTresholds; + + protected final Set doneTasks = new HashSet(); + + @Override + public void enrollAttempt(TezTaskAttemptID id, long timestamp) { + startTimes.put(id, timestamp); + } + + @Override + public long attemptEnrolledTime(TezTaskAttemptID attemptID) { + Long result = startTimes.get(attemptID); + + return result == null ? Long.MAX_VALUE : result; + } + + @Override + public void contextualize(Configuration conf, Vertex vertex) { + slowTaskRelativeTresholds = conf.getFloat( + TezConfiguration.TEZ_AM_LEGACY_SPECULATIVE_SLOWTASK_THRESHOLD, 1.0f); + this.vertex = vertex; + } + + protected DataStatistics dataStatisticsForTask(TezTaskID taskID) { + return taskStatistics; + } + + @Override + public long thresholdRuntime(TezTaskID taskID) { + int completedTasks = vertex.getCompletedTasks(); + + int totalTasks = vertex.getTotalTasks(); + + if (completedTasks < MINIMUM_COMPLETE_NUMBER_TO_SPECULATE + || (((float)completedTasks) / totalTasks) + < MINIMUM_COMPLETE_PROPORTION_TO_SPECULATE ) { + return Long.MAX_VALUE; + } + + long result = (long)taskStatistics.outlier(slowTaskRelativeTresholds); + return result; + } + + @Override + public long newAttemptEstimatedRuntime() { + return (long)taskStatistics.mean(); + } + + @Override + public void updateAttempt(TezTaskAttemptID attemptID, TaskAttemptState state, long timestamp) { + + Task task = vertex.getTask(attemptID.getTaskID()); + + if (task == null) { + return; + } + + Long boxedStart = startTimes.get(attemptID); + long start = boxedStart == null ? Long.MIN_VALUE : boxedStart; + + TaskAttempt taskAttempt = task.getAttempt(attemptID); + + if (taskAttempt.getState() == TaskAttemptState.SUCCEEDED) { + boolean isNew = false; + // is this a new success? + synchronized (doneTasks) { + if (!doneTasks.contains(task)) { + doneTasks.add(task); + isNew = true; + } + } + + // It's a new completion + // Note that if a task completes twice [because of a previous speculation + // and a race, or a success followed by loss of the machine with the + // local data] we only count the first one. + if (isNew) { + long finish = timestamp; + if (start > 1L && finish > 1L && start <= finish) { + long duration = finish - start; + taskStatistics.add(duration); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/TaskRuntimeEstimator.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/TaskRuntimeEstimator.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/TaskRuntimeEstimator.java new file mode 100644 index 0000000..c8edd1e --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/TaskRuntimeEstimator.java @@ -0,0 +1,91 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.dag.app.dag.speculation.legacy; + +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.dag.api.oldrecords.TaskAttemptState; +import org.apache.tez.dag.app.dag.Vertex; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; + +/** + * Estimate the runtime for tasks of a given vertex. + * + */ +public interface TaskRuntimeEstimator { + public void enrollAttempt(TezTaskAttemptID id, long timestamp); + + public long attemptEnrolledTime(TezTaskAttemptID attemptID); + + public void updateAttempt(TezTaskAttemptID taId, TaskAttemptState reportedState, long timestamp); + + public void contextualize(Configuration conf, Vertex vertex); + + /** + * + * Find a maximum reasonable execution wallclock time. Includes the time + * already elapsed. + * + * Find a maximum reasonable execution time. Includes the time + * already elapsed. If the projected total execution time for this task + * ever exceeds its reasonable execution time, we may speculate it. + * + * @param id the {@link TezTaskID} of the task we are asking about + * @return the task's maximum reasonable runtime, or MAX_VALUE if + * we don't have enough information to rule out any runtime, + * however long. + * + */ + public long thresholdRuntime(TezTaskID id); + + /** + * + * Estimate a task attempt's total runtime. Includes the time already + * elapsed. + * + * @param id the {@link TezTaskAttemptID} of the attempt we are asking about + * @return our best estimate of the attempt's runtime, or {@code -1} if + * we don't have enough information yet to produce an estimate. + * + */ + public long estimatedRuntime(TezTaskAttemptID id); + + /** + * + * Estimates how long a new attempt on this task will take if we start + * one now + * + * @return our best estimate of a new attempt's runtime, or {@code -1} if + * we don't have enough information yet to produce an estimate. + * + */ + public long newAttemptEstimatedRuntime(); + + /** + * + * Computes the width of the error band of our estimate of the task + * runtime as returned by {@link #estimatedRuntime(TezTaskAttemptID)} + * + * @param id the {@link TezTaskAttemptID} of the attempt we are asking about + * @return our best estimate of the attempt's runtime, or {@code -1} if + * we don't have enough information yet to produce an estimate. + * + */ + public long runtimeEstimateVariance(TezTaskAttemptID id); +} http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/test/java/org/apache/tez/dag/app/MockClock.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockClock.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockClock.java new file mode 100644 index 0000000..d015714 --- /dev/null +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockClock.java @@ -0,0 +1,36 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.dag.app; + +import org.apache.hadoop.yarn.util.Clock; + +public class MockClock implements Clock { + + long time = 1000; + + @Override + public long getTime() { + return time; + } + + public void incrementTime(long inc) { + time += inc; + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java index b4109e7..a548e3c 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java @@ -50,6 +50,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent; +import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; import org.apache.tez.runtime.api.impl.EventMetaData; import org.apache.tez.runtime.api.impl.TezEvent; import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType; @@ -75,9 +76,12 @@ public class MockDAGAppMaster extends DAGAppMaster { AtomicBoolean startScheduling = new AtomicBoolean(true); AtomicBoolean goFlag; + boolean updateProgress = true; Map preemptedTasks = Maps.newConcurrentMap(); + Map tasksWithStatusUpdates = Maps.newConcurrentMap(); + public MockContainerLauncher(AtomicBoolean goFlag) { super("MockContainerLauncher"); this.goFlag = goFlag; @@ -88,6 +92,7 @@ public class MockDAGAppMaster extends DAGAppMaster { TezTaskAttemptID taId; String vName; ContainerLaunchContext launchContext; + int numUpdates = 0; boolean completed; public ContainerData(ContainerId cId, ContainerLaunchContext context) { @@ -149,6 +154,10 @@ public class MockDAGAppMaster extends DAGAppMaster { public void startScheduling(boolean value) { startScheduling.set(value); } + + public void updateProgress(boolean value) { + this.updateProgress = value; + } public Map getContainers() { return containers; @@ -164,6 +173,10 @@ public class MockDAGAppMaster extends DAGAppMaster { cData.clear(); } + public void setStatusUpdatesForTask(TezTaskAttemptID tId, int numUpdates) { + tasksWithStatusUpdates.put(tId, numUpdates); + } + void stop(NMCommunicatorStopRequestEvent event) { // remove from simulated container list containers.remove(event.getContainerId()); @@ -183,6 +196,13 @@ public class MockDAGAppMaster extends DAGAppMaster { Thread.sleep(50); } } + + void incrementTime(long inc) { + Clock clock = getContext().getClock(); + if (clock instanceof MockClock) { + ((MockClock) clock).incrementTime(inc); + } + } @Override public void run() { @@ -192,6 +212,7 @@ public class MockDAGAppMaster extends DAGAppMaster { if (!startScheduling.get()) { // schedule when asked to do so by the test code continue; } + incrementTime(1000); for (Map.Entry entry : containers.entrySet()) { ContainerData cData = entry.getValue(); ContainerId cId = entry.getKey(); @@ -214,8 +235,19 @@ public class MockDAGAppMaster extends DAGAppMaster { } else if (!cData.completed) { // container is assigned a task and task is not completed // complete the task or preempt the task - Integer version = preemptedTasks.get(cData.taId.getTaskID()); - if (version != null && cData.taId.getId() <= version.intValue()) { + Integer version = preemptedTasks.get(cData.taId.getTaskID()); + Integer updatesToMake = tasksWithStatusUpdates.get(cData.taId); + if (cData.numUpdates == 0 || // do at least one update + updatesToMake != null && cData.numUpdates < updatesToMake) { + cData.numUpdates++; + float maxUpdates = (updatesToMake != null) ? updatesToMake.intValue() : 1; + float progress = updateProgress ? cData.numUpdates/maxUpdates : 0f; + TezVertexID vertexId = cData.taId.getTaskID().getVertexID(); + getContext().getEventHandler().handle( + new VertexEventRouteEvent(vertexId, Collections.singletonList(new TezEvent( + new TaskStatusUpdateEvent(null, progress), new EventMetaData( + EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId))))); + } else if (version != null && cData.taId.getId() <= version.intValue()) { preemptContainer(cData); } else { // send a done notification http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java index 7e408e1..2631e3c 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java @@ -29,16 +29,18 @@ import org.apache.tez.client.LocalClient; public class MockLocalClient extends LocalClient { MockDAGAppMaster mockApp; AtomicBoolean mockAppLauncherGoFlag; + Clock mockClock; - public MockLocalClient(AtomicBoolean mockAppLauncherGoFlag) { + public MockLocalClient(AtomicBoolean mockAppLauncherGoFlag, Clock clock) { this.mockAppLauncherGoFlag = mockAppLauncherGoFlag; + this.mockClock = clock; } protected DAGAppMaster createDAGAppMaster(ApplicationAttemptId applicationAttemptId, ContainerId cId, String currentHost, int nmPort, int nmHttpPort, Clock clock, long appSubmitTime, boolean isSession, String userDir) { mockApp = new MockDAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort, - new SystemClock(), appSubmitTime, isSession, userDir, mockAppLauncherGoFlag); + (mockClock!=null ? mockClock : clock), appSubmitTime, isSession, userDir, mockAppLauncherGoFlag); return mockApp; } http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java index 617415e..0ff3340 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.util.Clock; import org.apache.tez.client.FrameworkClient; import org.apache.tez.client.TezClient; import org.apache.tez.dag.api.TezConfiguration; @@ -32,9 +33,9 @@ public class MockTezClient extends TezClient { MockTezClient(String name, TezConfiguration tezConf, boolean isSession, Map localResources, Credentials credentials, - AtomicBoolean mockAppLauncherGoFlag) { + Clock clock, AtomicBoolean mockAppLauncherGoFlag) { super(name, tezConf, isSession, localResources, credentials); - this.client = new MockLocalClient(mockAppLauncherGoFlag); + this.client = new MockLocalClient(mockAppLauncherGoFlag, clock); } protected FrameworkClient createFrameworkClient() { http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java index 8650aea..682e6ed 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java @@ -67,7 +67,7 @@ public class TestMockDAGAppMaster { public void testLocalResourceSetup() throws Exception { TezConfiguration tezconf = new TezConfiguration(defaultConf); - MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null); + MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null); tezClient.start(); MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp(); @@ -119,7 +119,7 @@ public class TestMockDAGAppMaster { TezConfiguration tezconf = new TezConfiguration(defaultConf); - MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null); + MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null); tezClient.start(); DAGClient dagClient = tezClient.submitDAG(dag); dagClient.waitForCompletion(); @@ -127,7 +127,7 @@ public class TestMockDAGAppMaster { tezClient.stop(); // submit the same DAG again to verify it can be done. - tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null); + tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null); tezClient.start(); dagClient = tezClient.submitDAG(dag); dagClient.waitForCompletion(); @@ -139,7 +139,7 @@ public class TestMockDAGAppMaster { public void testSchedulerErrorHandling() throws Exception { TezConfiguration tezconf = new TezConfiguration(defaultConf); - MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null); + MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null); tezClient.start(); MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp(); http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java index 0958c48..bc15954 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java @@ -92,7 +92,7 @@ public class TestPreemption { tezconf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 0); AtomicBoolean mockAppLauncherGoFlag = new AtomicBoolean(false); MockTezClient tezClient = new MockTezClient("testPreemption", tezconf, false, null, null, - mockAppLauncherGoFlag); + null, mockAppLauncherGoFlag); tezClient.start(); DAGClient dagClient = tezClient.submitDAG(createDAG(DataMovementType.SCATTER_GATHER)); @@ -148,7 +148,7 @@ public class TestPreemption { tezconf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 0); AtomicBoolean mockAppLauncherGoFlag = new AtomicBoolean(false); MockTezClient tezClient = new MockTezClient("testPreemption", tezconf, true, null, null, - mockAppLauncherGoFlag); + null, mockAppLauncherGoFlag); tezClient.start(); syncWithMockAppLauncher(false, mockAppLauncherGoFlag, tezClient); return tezClient; http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java new file mode 100644 index 0000000..114c44b --- /dev/null +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java @@ -0,0 +1,161 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.dag.app; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.Vertex; +import org.apache.tez.dag.api.client.DAGClient; +import org.apache.tez.dag.api.client.DAGStatus; +import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher; +import org.apache.tez.dag.app.dag.Task; +import org.apache.tez.dag.app.dag.TaskAttempt; +import org.apache.tez.dag.app.dag.impl.DAGImpl; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.base.Joiner; + + +@SuppressWarnings("deprecation") +public class TestSpeculation { + static Configuration defaultConf; + static FileSystem localFs; + static Path workDir; + + MockDAGAppMaster mockApp; + MockContainerLauncher mockLauncher; + + static { + try { + defaultConf = new Configuration(false); + defaultConf.set("fs.defaultFS", "file:///"); + defaultConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true); + defaultConf.setBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED, true); + defaultConf.setInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS, 2); + localFs = FileSystem.getLocal(defaultConf); + workDir = new Path(new Path(System.getProperty("test.build.data", "/tmp")), + "TestSpeculation").makeQualified(localFs); + } catch (IOException e) { + throw new RuntimeException("init failure", e); + } + } + + MockTezClient createTezSession() throws Exception { + TezConfiguration tezconf = new TezConfiguration(defaultConf); + AtomicBoolean mockAppLauncherGoFlag = new AtomicBoolean(false); + MockTezClient tezClient = new MockTezClient("testspeculation", tezconf, true, null, null, + new MockClock(), mockAppLauncherGoFlag); + tezClient.start(); + syncWithMockAppLauncher(false, mockAppLauncherGoFlag, tezClient); + return tezClient; + } + + void syncWithMockAppLauncher(boolean allowScheduling, AtomicBoolean mockAppLauncherGoFlag, + MockTezClient tezClient) throws Exception { + synchronized (mockAppLauncherGoFlag) { + while (!mockAppLauncherGoFlag.get()) { + mockAppLauncherGoFlag.wait(); + } + mockApp = tezClient.getLocalClient().getMockApp(); + mockLauncher = mockApp.getContainerLauncher(); + mockLauncher.startScheduling(allowScheduling); + mockAppLauncherGoFlag.notify(); + } + } + + public void testBasicSpeculation(boolean withProgress) throws Exception { + DAG dag = DAG.create("test"); + Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5); + dag.addVertex(vA); + + MockTezClient tezClient = createTezSession(); + + DAGClient dagClient = tezClient.submitDAG(dag); + DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG(); + TezVertexID vertexId = TezVertexID.getInstance(dagImpl.getID(), 0); + // original attempt is killed and speculative one is successful + TezTaskAttemptID killedTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 0); + TezTaskAttemptID successTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 1); + + mockLauncher.updateProgress(withProgress); + mockLauncher.setStatusUpdatesForTask(killedTaId, 100); + + mockLauncher.startScheduling(true); + dagClient.waitForCompletion(); + Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState()); + Task task = dagImpl.getTask(killedTaId.getTaskID()); + Assert.assertEquals(2, task.getAttempts().size()); + Assert.assertEquals(successTaId, task.getSuccessfulAttempt().getID()); + TaskAttempt killedAttempt = task.getAttempt(killedTaId); + Joiner.on(",").join(killedAttempt.getDiagnostics()).contains("Killed as speculative attempt"); + tezClient.stop(); + } + + @Test (timeout=10000) + public void testBasicSpeculationWithProgress() throws Exception { + testBasicSpeculation(true); + } + + @Test (timeout=10000) + public void testBasicSpeculationWithoutProgress() throws Exception { + testBasicSpeculation(false); + } + + @Test (timeout=10000) + public void testBasicSpeculationNotUseful() throws Exception { + DAG dag = DAG.create("test"); + Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5); + dag.addVertex(vA); + + MockTezClient tezClient = createTezSession(); + + DAGClient dagClient = tezClient.submitDAG(dag); + DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG(); + TezVertexID vertexId = TezVertexID.getInstance(dagImpl.getID(), 0); + // original attempt is successful and speculative one is killed + TezTaskAttemptID successTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 0); + TezTaskAttemptID killedTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 1); + + mockLauncher.setStatusUpdatesForTask(successTaId, 100); + mockLauncher.setStatusUpdatesForTask(killedTaId, 100); + + mockLauncher.startScheduling(true); + dagClient.waitForCompletion(); + Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState()); + Task task = dagImpl.getTask(killedTaId.getTaskID()); + Assert.assertEquals(2, task.getAttempts().size()); + Assert.assertEquals(successTaId, task.getSuccessfulAttempt().getID()); + TaskAttempt killedAttempt = task.getAttempt(killedTaId); + Joiner.on(",").join(killedAttempt.getDiagnostics()).contains("Killed speculative attempt as"); + tezClient.stop(); + } + + +}