tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject tez git commit: TEZ-3816. Add ability to automatically speculate single-task vertices. Contributed by Muhammad Samir Khan.
Date Mon, 14 Aug 2017 21:21:40 GMT
Repository: tez
Updated Branches:
  refs/heads/master 1061cf5c3 -> 823b1bb3b


TEZ-3816. Add ability to automatically speculate single-task vertices. Contributed by Muhammad
Samir Khan.


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/823b1bb3
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/823b1bb3
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/823b1bb3

Branch: refs/heads/master
Commit: 823b1bb3b3ab034639bfb693ef83baa18dfde34b
Parents: 1061cf5
Author: Siddharth Seth <sseth@apache.org>
Authored: Mon Aug 14 14:21:11 2017 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Mon Aug 14 14:21:11 2017 -0700

----------------------------------------------------------------------
 .../apache/tez/dag/api/TezConfiguration.java    |  11 ++
 .../speculation/legacy/LegacySpeculator.java    | 100 ++++++++++++-------
 .../org/apache/tez/dag/app/TestSpeculation.java |  51 +++++++++-
 3 files changed, 124 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/823b1bb3/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 39688d6..5df5259 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -527,6 +527,17 @@ public class TezConfiguration extends Configuration {
                                      TEZ_AM_PREFIX + "legacy.speculative.slowtask.threshold";
 
   /**
+   * Long value. Specifies the timeout after which tasks on a single task vertex must be
speculated.
+   * A negative value means not to use timeout for speculation of single task vertices.
+   */
+  @Unstable
+  @ConfigurationScope(Scope.AM)
+  @ConfigurationProperty(type="long")
+  public static final String TEZ_AM_LEGACY_SPECULATIVE_SINGLE_TASK_VERTEX_TIMEOUT =
+                                     TEZ_AM_PREFIX + "legacy.speculative.single.task.vertex.timeout";
+  public static final long TEZ_AM_LEGACY_SPECULATIVE_SINGLE_TASK_VERTEX_TIMEOUT_DEFAULT =
-1;
+
+  /**
    * Int value. Upper limit on the number of threads user to launch containers in the app
    * master. Expert level setting. 
    */

http://git-wip-us.apache.org/repos/asf/tez/blob/823b1bb3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
index dd54d86..9fbea19 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
@@ -24,6 +24,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.tez.dag.api.TezConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -67,6 +68,7 @@ public class LegacySpeculator {
   private static final double PROPORTION_RUNNING_TASKS_SPECULATABLE = 0.1;
   private static final double PROPORTION_TOTAL_TASKS_SPECULATABLE = 0.01;
   private static final int  MINIMUM_ALLOWED_SPECULATIVE_TASKS = 10;
+  private static final int VERTEX_SIZE_THRESHOLD_FOR_TIMEOUT_SPECULATION = 1;
 
   private static final Logger LOG = LoggerFactory.getLogger(LegacySpeculator.class);
 
@@ -88,7 +90,7 @@ public class LegacySpeculator {
 
   private Vertex vertex;
   private TaskRuntimeEstimator estimator;
-
+  private final long taskTimeout;
   private final Clock clock;
   private long nextSpeculateTime = Long.MIN_VALUE;
 
@@ -116,6 +118,9 @@ public class LegacySpeculator {
     this.vertex = vertex;
     this.estimator = estimator;
     this.clock = clock;
+    taskTimeout = conf.getLong(
+            TezConfiguration.TEZ_AM_LEGACY_SPECULATIVE_SINGLE_TASK_VERTEX_TIMEOUT,
+            TezConfiguration.TEZ_AM_LEGACY_SPECULATIVE_SINGLE_TASK_VERTEX_TIMEOUT_DEFAULT);
   }
 
 /*   *************************************************************    */
@@ -209,7 +214,12 @@ public class LegacySpeculator {
   //
   // All of these values are negative.  Any value that should be allowed to
   //  speculate is 0 or positive.
-  private long speculationValue(Task task, long now) {
+  //
+  // If shouldUseTimeout is true, we will use timeout to decide on
+  // speculation instead of the task statistics. This can be useful, for
+  // example for single task vertices for which there are no tasks to compare
+  // with
+  private long speculationValue(Task task, long now, boolean shouldUseTimeout) {
     Map<TezTaskAttemptID, TaskAttempt> attempts = task.getAttempts();
     TezTaskID taskID = task.getTaskId();
     long acceptableRuntime = Long.MIN_VALUE;
@@ -220,7 +230,7 @@ public class LegacySpeculator {
       return NOT_RUNNING;
     }
     
-    if (!mayHaveSpeculated.contains(taskID)) {
+    if (!mayHaveSpeculated.contains(taskID) && !shouldUseTimeout) {
       acceptableRuntime = estimator.thresholdRuntime(taskID);
       if (acceptableRuntime == Long.MAX_VALUE) {
         return ON_SCHEDULE;
@@ -239,8 +249,6 @@ public class LegacySpeculator {
         }
         runningTaskAttemptID = taskAttempt.getID();
 
-        long estimatedRunTime = estimator.estimatedRuntime(runningTaskAttemptID);
-
         long taskAttemptStartTime
             = estimator.attemptEnrolledTime(runningTaskAttemptID);
         if (taskAttemptStartTime > now) {
@@ -249,43 +257,57 @@ public class LegacySpeculator {
           return TOO_NEW;
         }
 
-        long estimatedEndTime = estimatedRunTime + taskAttemptStartTime;
+        if (shouldUseTimeout) {
+          if ((now - taskAttemptStartTime) > taskTimeout) {
+            // If the task has timed out, then we want to schedule a speculation
+            // immediately. However we cannot return immediately since we may
+            // already have a speculation running.
+            result = Long.MAX_VALUE;
+          } else {
+            // Task has not timed out so we are good
+            return ON_SCHEDULE;
+          }
+        } else {
+          long estimatedRunTime = estimator.estimatedRuntime(runningTaskAttemptID);
 
-        long estimatedReplacementEndTime
-            = now + estimator.newAttemptEstimatedRuntime();
+          long estimatedEndTime = estimatedRunTime + taskAttemptStartTime;
 
-        float progress = taskAttempt.getProgress();
-        TaskAttemptHistoryStatistics data =
-            runningTaskAttemptStatistics.get(runningTaskAttemptID);
-        if (data == null) {
-          runningTaskAttemptStatistics.put(runningTaskAttemptID,
-            new TaskAttemptHistoryStatistics(estimatedRunTime, progress, now));
-        } else {
-          if (estimatedRunTime == data.getEstimatedRunTime()
-              && progress == data.getProgress()) {
-            // Previous stats are same as same stats
-            if (data.notHeartbeatedInAWhile(now)) {
-              // Stats have stagnated for a while, simulate heart-beat.
-              // Now simulate the heart-beat
-              statusUpdate(taskAttempt.getID(), taskAttempt.getState(), clock.getTime());
-            }
+          long estimatedReplacementEndTime
+                  = now + estimator.newAttemptEstimatedRuntime();
+
+          float progress = taskAttempt.getProgress();
+          TaskAttemptHistoryStatistics data =
+                  runningTaskAttemptStatistics.get(runningTaskAttemptID);
+          if (data == null) {
+            runningTaskAttemptStatistics.put(runningTaskAttemptID,
+                    new TaskAttemptHistoryStatistics(estimatedRunTime, progress, now));
           } else {
-            // Stats have changed - update our data structure
-            data.setEstimatedRunTime(estimatedRunTime);
-            data.setProgress(progress);
-            data.resetHeartBeatTime(now);
+            if (estimatedRunTime == data.getEstimatedRunTime()
+                    && progress == data.getProgress()) {
+              // Previous stats are same as same stats
+              if (data.notHeartbeatedInAWhile(now)) {
+                // Stats have stagnated for a while, simulate heart-beat.
+                // Now simulate the heart-beat
+                statusUpdate(taskAttempt.getID(), taskAttempt.getState(), clock.getTime());
+              }
+            } else {
+              // Stats have changed - update our data structure
+              data.setEstimatedRunTime(estimatedRunTime);
+              data.setProgress(progress);
+              data.resetHeartBeatTime(now);
+            }
           }
-        }
 
-        if (estimatedEndTime < now) {
-          return PROGRESS_IS_GOOD;
-        }
+          if (estimatedEndTime < now) {
+            return PROGRESS_IS_GOOD;
+          }
 
-        if (estimatedReplacementEndTime >= estimatedEndTime) {
-          return TOO_LATE_TO_SPECULATE;
-        }
+          if (estimatedReplacementEndTime >= estimatedEndTime) {
+            return TOO_LATE_TO_SPECULATE;
+          }
 
-        result = estimatedEndTime - estimatedReplacementEndTime;
+          result = estimatedEndTime - estimatedReplacementEndTime;
+        }
       }
     }
 
@@ -296,7 +318,7 @@ public class LegacySpeculator {
 
 
 
-    if (acceptableRuntime == Long.MIN_VALUE) {
+    if ((acceptableRuntime == Long.MIN_VALUE) && !shouldUseTimeout) {
       acceptableRuntime = estimator.thresholdRuntime(taskID);
       if (acceptableRuntime == Long.MAX_VALUE) {
         return ON_SCHEDULE;
@@ -329,11 +351,15 @@ public class LegacySpeculator {
 
     TezTaskID bestTaskID = null;
     long bestSpeculationValue = -1L;
+    boolean shouldUseTimeout =
+            (tasks.size() <= VERTEX_SIZE_THRESHOLD_FOR_TIMEOUT_SPECULATION) &&
+            (taskTimeout >= 0);
 
     // this loop is potentially pricey.
     // TODO track the tasks that are potentially worth looking at
     for (Map.Entry<TezTaskID, Task> taskEntry : tasks.entrySet()) {
-      long mySpeculationValue = speculationValue(taskEntry.getValue(), now);
+      long mySpeculationValue = speculationValue(taskEntry.getValue(), now,
+              shouldUseTimeout);
 
       if (mySpeculationValue == ALREADY_SPECULATING) {
         ++numberSpeculationsAlready;

http://git-wip-us.apache.org/repos/asf/tez/blob/823b1bb3/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
index 9a39fac..1df5af4 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
@@ -19,6 +19,8 @@
 package org.apache.tez.dag.app;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
@@ -97,7 +99,54 @@ public class TestSpeculation {
       mockAppLauncherGoFlag.notify();
     }     
   }
-  
+
+  @Test (timeout = 10000)
+  public void testSingleTaskSpeculation() throws Exception {
+    // Map<Timeout conf value, expected number of tasks>
+    Map<Long, Integer> confToExpected = new HashMap<Long, Integer>();
+    confToExpected.put(Long.MAX_VALUE >> 1, 1); // Really long time to speculate
+    confToExpected.put(100L, 2);
+    confToExpected.put(-1L, 1); // Don't speculate
+
+    for(Map.Entry<Long, Integer> entry : confToExpected.entrySet()) {
+      defaultConf.setLong(
+              TezConfiguration.TEZ_AM_LEGACY_SPECULATIVE_SINGLE_TASK_VERTEX_TIMEOUT,
+              entry.getKey());
+      DAG dag = DAG.create("test");
+      Vertex vA = Vertex.create("A",
+              ProcessorDescriptor.create("Proc.class"),
+              1);
+      dag.addVertex(vA);
+
+      MockTezClient tezClient = createTezSession();
+
+      DAGClient dagClient = tezClient.submitDAG(dag);
+      DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
+      TezVertexID vertexId = TezVertexID.getInstance(dagImpl.getID(), 0);
+      // original attempt is killed and speculative one is successful
+      TezTaskAttemptID killedTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId,
0), 0);
+      TezTaskAttemptID successTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId,
0), 1);
+
+      Thread.sleep(200);
+      // cause speculation trigger
+      mockLauncher.setStatusUpdatesForTask(killedTaId, 100);
+
+      mockLauncher.startScheduling(true);
+      dagClient.waitForCompletion();
+      Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
+      Task task = dagImpl.getTask(killedTaId.getTaskID());
+      Assert.assertEquals(entry.getValue().intValue(), task.getAttempts().size());
+      if (entry.getValue() > 1) {
+        Assert.assertEquals(successTaId, task.getSuccessfulAttempt().getID());
+        TaskAttempt killedAttempt = task.getAttempt(killedTaId);
+        Joiner.on(",").join(killedAttempt.getDiagnostics()).contains("Killed as speculative
attempt");
+        Assert.assertEquals(TaskAttemptTerminationCause.TERMINATED_EFFECTIVE_SPECULATION,
+                killedAttempt.getTerminationCause());
+      }
+      tezClient.stop();
+    }
+  }
+
   public void testBasicSpeculation(boolean withProgress) throws Exception {
     DAG dag = DAG.create("test");
     Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5);


Mime
View raw message