Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5F2FF20049B for ; Mon, 14 Aug 2017 23:21:44 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 5D7FD165DFF; Mon, 14 Aug 2017 21:21:44 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 56A2E165DFD for ; Mon, 14 Aug 2017 23:21:43 +0200 (CEST) Received: (qmail 75116 invoked by uid 500); 14 Aug 2017 21:21:42 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 75106 invoked by uid 99); 14 Aug 2017 21:21:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 14 Aug 2017 21:21:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2C03BDFC61; Mon, 14 Aug 2017 21:21:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.apache.org Message-Id: <96d22852241547f89e4d627ed31efba9@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: tez git commit: TEZ-3816. Add ability to automatically speculate single-task vertices. Contributed by Muhammad Samir Khan. Date: Mon, 14 Aug 2017 21:21:40 +0000 (UTC) archived-at: Mon, 14 Aug 2017 21:21:44 -0000 Repository: tez Updated Branches: refs/heads/master 1061cf5c3 -> 823b1bb3b TEZ-3816. Add ability to automatically speculate single-task vertices. Contributed by Muhammad Samir Khan. Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/823b1bb3 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/823b1bb3 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/823b1bb3 Branch: refs/heads/master Commit: 823b1bb3b3ab034639bfb693ef83baa18dfde34b Parents: 1061cf5 Author: Siddharth Seth Authored: Mon Aug 14 14:21:11 2017 -0700 Committer: Siddharth Seth Committed: Mon Aug 14 14:21:11 2017 -0700 ---------------------------------------------------------------------- .../apache/tez/dag/api/TezConfiguration.java | 11 ++ .../speculation/legacy/LegacySpeculator.java | 100 ++++++++++++------- .../org/apache/tez/dag/app/TestSpeculation.java | 51 +++++++++- 3 files changed, 124 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/823b1bb3/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 39688d6..5df5259 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -527,6 +527,17 @@ public class TezConfiguration extends Configuration { TEZ_AM_PREFIX + "legacy.speculative.slowtask.threshold"; /** + * Long value. Specifies the timeout after which tasks on a single task vertex must be speculated. + * A negative value means not to use timeout for speculation of single task vertices. + */ + @Unstable + @ConfigurationScope(Scope.AM) + @ConfigurationProperty(type="long") + public static final String TEZ_AM_LEGACY_SPECULATIVE_SINGLE_TASK_VERTEX_TIMEOUT = + TEZ_AM_PREFIX + "legacy.speculative.single.task.vertex.timeout"; + public static final long TEZ_AM_LEGACY_SPECULATIVE_SINGLE_TASK_VERTEX_TIMEOUT_DEFAULT = -1; + + /** * Int value. Upper limit on the number of threads user to launch containers in the app * master. Expert level setting. */ http://git-wip-us.apache.org/repos/asf/tez/blob/823b1bb3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java index dd54d86..9fbea19 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java @@ -24,6 +24,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.apache.tez.dag.api.TezConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -67,6 +68,7 @@ public class LegacySpeculator { private static final double PROPORTION_RUNNING_TASKS_SPECULATABLE = 0.1; private static final double PROPORTION_TOTAL_TASKS_SPECULATABLE = 0.01; private static final int MINIMUM_ALLOWED_SPECULATIVE_TASKS = 10; + private static final int VERTEX_SIZE_THRESHOLD_FOR_TIMEOUT_SPECULATION = 1; private static final Logger LOG = LoggerFactory.getLogger(LegacySpeculator.class); @@ -88,7 +90,7 @@ public class LegacySpeculator { private Vertex vertex; private TaskRuntimeEstimator estimator; - + private final long taskTimeout; private final Clock clock; private long nextSpeculateTime = Long.MIN_VALUE; @@ -116,6 +118,9 @@ public class LegacySpeculator { this.vertex = vertex; this.estimator = estimator; this.clock = clock; + taskTimeout = conf.getLong( + TezConfiguration.TEZ_AM_LEGACY_SPECULATIVE_SINGLE_TASK_VERTEX_TIMEOUT, + TezConfiguration.TEZ_AM_LEGACY_SPECULATIVE_SINGLE_TASK_VERTEX_TIMEOUT_DEFAULT); } /* ************************************************************* */ @@ -209,7 +214,12 @@ public class LegacySpeculator { // // All of these values are negative. Any value that should be allowed to // speculate is 0 or positive. - private long speculationValue(Task task, long now) { + // + // If shouldUseTimeout is true, we will use timeout to decide on + // speculation instead of the task statistics. This can be useful, for + // example for single task vertices for which there are no tasks to compare + // with + private long speculationValue(Task task, long now, boolean shouldUseTimeout) { Map attempts = task.getAttempts(); TezTaskID taskID = task.getTaskId(); long acceptableRuntime = Long.MIN_VALUE; @@ -220,7 +230,7 @@ public class LegacySpeculator { return NOT_RUNNING; } - if (!mayHaveSpeculated.contains(taskID)) { + if (!mayHaveSpeculated.contains(taskID) && !shouldUseTimeout) { acceptableRuntime = estimator.thresholdRuntime(taskID); if (acceptableRuntime == Long.MAX_VALUE) { return ON_SCHEDULE; @@ -239,8 +249,6 @@ public class LegacySpeculator { } runningTaskAttemptID = taskAttempt.getID(); - long estimatedRunTime = estimator.estimatedRuntime(runningTaskAttemptID); - long taskAttemptStartTime = estimator.attemptEnrolledTime(runningTaskAttemptID); if (taskAttemptStartTime > now) { @@ -249,43 +257,57 @@ public class LegacySpeculator { return TOO_NEW; } - long estimatedEndTime = estimatedRunTime + taskAttemptStartTime; + if (shouldUseTimeout) { + if ((now - taskAttemptStartTime) > taskTimeout) { + // If the task has timed out, then we want to schedule a speculation + // immediately. However we cannot return immediately since we may + // already have a speculation running. + result = Long.MAX_VALUE; + } else { + // Task has not timed out so we are good + return ON_SCHEDULE; + } + } else { + long estimatedRunTime = estimator.estimatedRuntime(runningTaskAttemptID); - long estimatedReplacementEndTime - = now + estimator.newAttemptEstimatedRuntime(); + long estimatedEndTime = estimatedRunTime + taskAttemptStartTime; - float progress = taskAttempt.getProgress(); - TaskAttemptHistoryStatistics data = - runningTaskAttemptStatistics.get(runningTaskAttemptID); - if (data == null) { - runningTaskAttemptStatistics.put(runningTaskAttemptID, - new TaskAttemptHistoryStatistics(estimatedRunTime, progress, now)); - } else { - if (estimatedRunTime == data.getEstimatedRunTime() - && progress == data.getProgress()) { - // Previous stats are same as same stats - if (data.notHeartbeatedInAWhile(now)) { - // Stats have stagnated for a while, simulate heart-beat. - // Now simulate the heart-beat - statusUpdate(taskAttempt.getID(), taskAttempt.getState(), clock.getTime()); - } + long estimatedReplacementEndTime + = now + estimator.newAttemptEstimatedRuntime(); + + float progress = taskAttempt.getProgress(); + TaskAttemptHistoryStatistics data = + runningTaskAttemptStatistics.get(runningTaskAttemptID); + if (data == null) { + runningTaskAttemptStatistics.put(runningTaskAttemptID, + new TaskAttemptHistoryStatistics(estimatedRunTime, progress, now)); } else { - // Stats have changed - update our data structure - data.setEstimatedRunTime(estimatedRunTime); - data.setProgress(progress); - data.resetHeartBeatTime(now); + if (estimatedRunTime == data.getEstimatedRunTime() + && progress == data.getProgress()) { + // Previous stats are same as same stats + if (data.notHeartbeatedInAWhile(now)) { + // Stats have stagnated for a while, simulate heart-beat. + // Now simulate the heart-beat + statusUpdate(taskAttempt.getID(), taskAttempt.getState(), clock.getTime()); + } + } else { + // Stats have changed - update our data structure + data.setEstimatedRunTime(estimatedRunTime); + data.setProgress(progress); + data.resetHeartBeatTime(now); + } } - } - if (estimatedEndTime < now) { - return PROGRESS_IS_GOOD; - } + if (estimatedEndTime < now) { + return PROGRESS_IS_GOOD; + } - if (estimatedReplacementEndTime >= estimatedEndTime) { - return TOO_LATE_TO_SPECULATE; - } + if (estimatedReplacementEndTime >= estimatedEndTime) { + return TOO_LATE_TO_SPECULATE; + } - result = estimatedEndTime - estimatedReplacementEndTime; + result = estimatedEndTime - estimatedReplacementEndTime; + } } } @@ -296,7 +318,7 @@ public class LegacySpeculator { - if (acceptableRuntime == Long.MIN_VALUE) { + if ((acceptableRuntime == Long.MIN_VALUE) && !shouldUseTimeout) { acceptableRuntime = estimator.thresholdRuntime(taskID); if (acceptableRuntime == Long.MAX_VALUE) { return ON_SCHEDULE; @@ -329,11 +351,15 @@ public class LegacySpeculator { TezTaskID bestTaskID = null; long bestSpeculationValue = -1L; + boolean shouldUseTimeout = + (tasks.size() <= VERTEX_SIZE_THRESHOLD_FOR_TIMEOUT_SPECULATION) && + (taskTimeout >= 0); // this loop is potentially pricey. // TODO track the tasks that are potentially worth looking at for (Map.Entry taskEntry : tasks.entrySet()) { - long mySpeculationValue = speculationValue(taskEntry.getValue(), now); + long mySpeculationValue = speculationValue(taskEntry.getValue(), now, + shouldUseTimeout); if (mySpeculationValue == ALREADY_SPECULATING) { ++numberSpeculationsAlready; http://git-wip-us.apache.org/repos/asf/tez/blob/823b1bb3/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java index 9a39fac..1df5af4 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java @@ -19,6 +19,8 @@ package org.apache.tez.dag.app; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; @@ -97,7 +99,54 @@ public class TestSpeculation { mockAppLauncherGoFlag.notify(); } } - + + @Test (timeout = 10000) + public void testSingleTaskSpeculation() throws Exception { + // Map + Map confToExpected = new HashMap(); + confToExpected.put(Long.MAX_VALUE >> 1, 1); // Really long time to speculate + confToExpected.put(100L, 2); + confToExpected.put(-1L, 1); // Don't speculate + + for(Map.Entry entry : confToExpected.entrySet()) { + defaultConf.setLong( + TezConfiguration.TEZ_AM_LEGACY_SPECULATIVE_SINGLE_TASK_VERTEX_TIMEOUT, + entry.getKey()); + DAG dag = DAG.create("test"); + Vertex vA = Vertex.create("A", + ProcessorDescriptor.create("Proc.class"), + 1); + dag.addVertex(vA); + + MockTezClient tezClient = createTezSession(); + + DAGClient dagClient = tezClient.submitDAG(dag); + DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG(); + TezVertexID vertexId = TezVertexID.getInstance(dagImpl.getID(), 0); + // original attempt is killed and speculative one is successful + TezTaskAttemptID killedTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 0); + TezTaskAttemptID successTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 1); + + Thread.sleep(200); + // cause speculation trigger + mockLauncher.setStatusUpdatesForTask(killedTaId, 100); + + mockLauncher.startScheduling(true); + dagClient.waitForCompletion(); + Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState()); + Task task = dagImpl.getTask(killedTaId.getTaskID()); + Assert.assertEquals(entry.getValue().intValue(), task.getAttempts().size()); + if (entry.getValue() > 1) { + Assert.assertEquals(successTaId, task.getSuccessfulAttempt().getID()); + TaskAttempt killedAttempt = task.getAttempt(killedTaId); + Joiner.on(",").join(killedAttempt.getDiagnostics()).contains("Killed as speculative attempt"); + Assert.assertEquals(TaskAttemptTerminationCause.TERMINATED_EFFECTIVE_SPECULATION, + killedAttempt.getTerminationCause()); + } + tezClient.stop(); + } + } + public void testBasicSpeculation(boolean withProgress) throws Exception { DAG dag = DAG.create("test"); Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5);