tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject tez git commit: TEZ-3716. Allow attempt retries to be treated the same as the first attempt. (sseth)
Date Sun, 14 May 2017 17:27:12 GMT
Repository: tez
Updated Branches:
  refs/heads/master dd9c517e3 -> 63177255c


TEZ-3716. Allow attempt retries to be treated the same as the first
attempt. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/63177255
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/63177255
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/63177255

Branch: refs/heads/master
Commit: 63177255c7c9f181b8fdbde63b896ddbffae6e07
Parents: dd9c517
Author: Siddharth Seth <sseth@HW10890.local>
Authored: Sun May 14 10:27:01 2017 -0700
Committer: Siddharth Seth <sseth@HW10890.local>
Committed: Sun May 14 10:27:01 2017 -0700

----------------------------------------------------------------------
 .../apache/tez/dag/api/TezConfiguration.java    | 10 +++++
 .../java/org/apache/tez/dag/app/dag/Vertex.java |  7 +++
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |  7 +--
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |  5 +--
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 33 +++++++++++++-
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   | 47 +++++++++++++++++++-
 .../tez/dag/app/dag/impl/TestTaskImpl.java      |  9 +++-
 7 files changed, 107 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/63177255/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index c0179f8..cc57b4e 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -571,6 +571,16 @@ public class TezConfiguration extends Configuration {
   public static final int TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT = 4;
 
   /**
+   * Boolean value. Specifies whether a re-scheduled attempt of a task, caused by previous
+   * failures gets special treatment - higher priority, dropped location hints.
+   */
+  @ConfigurationScope(Scope.VERTEX)
+  @ConfigurationProperty(type="boolean")
+  public static final String TEZ_AM_TASK_RESCHEDULE_HIGHER_PRIORITY =
+      TEZ_AM_PREFIX + "task.reschedule.higher.priority";
+  public static final boolean TEZ_AM_TASK_RESCHEDULE_HIGHER_PRIORITY_DEFAULT=true;
+
+  /**
    * Boolean value. Enabled blacklisting of nodes of nodes that are considered faulty. These
nodes 
    * will not be used to execute tasks.
    */

http://git-wip-us.apache.org/repos/asf/tez/blob/63177255/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 51847d4..0a6e9c5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -200,4 +200,11 @@ public interface Vertex extends Comparable<Vertex> {
   void reportTaskStartTime(long taskStartTime);
   public long getFirstTaskStartTime();
   public long getLastTaskFinishTime();
+
+  VertexConfig getVertexConfig();
+
+  interface VertexConfig {
+    int getMaxFailedTaskAttempts();
+    boolean getTaskRescheduleHigherPriority();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/63177255/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 07aed5e..a4c4652 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -91,7 +91,6 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventTezEventUpdate;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventTerminationCauseEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
@@ -165,6 +164,7 @@ public class TaskAttemptImpl implements TaskAttempt,
 
   static final TezCounters EMPTY_COUNTERS = new TezCounters();
 
+  // Should not be used to access configuration. User vertex.VertexConfig instead
   protected final Configuration conf;
   @SuppressWarnings("rawtypes")
   protected EventHandler eventHandler;
@@ -542,6 +542,7 @@ public class TaskAttemptImpl implements TaskAttempt,
       Vertex vertex, TaskLocationHint locationHint, TaskSpec taskSpec,
       TezTaskAttemptID schedulingCausalTA) {
 
+    // TODO: Move these configs over to Vertex.VertexConfig
     MAX_ALLOWED_OUTPUT_FAILURES = conf.getInt(TezConfiguration
         .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES, TezConfiguration
         .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_DEFAULT);
@@ -1307,7 +1308,7 @@ public class TaskAttemptImpl implements TaskAttempt,
       ta.taskRacks = racks;
 
       // Ask for hosts / racks only if not a re-scheduled task.
-      if (ta.isRescheduled) {
+      if (ta.isRescheduled && ta.getVertex().getVertexConfig().getTaskRescheduleHigherPriority())
{
         locationHint = null;
       }
 
@@ -1315,7 +1316,7 @@ public class TaskAttemptImpl implements TaskAttempt,
       
       // Send out a launch request to the scheduler.
       int priority;
-      if (ta.isRescheduled) {
+      if (ta.isRescheduled  && ta.getVertex().getVertexConfig().getTaskRescheduleHigherPriority())
{
         // higher priority for rescheduled attempts
         priority = scheduleEvent.getPriorityHighLimit();
       } else {

http://git-wip-us.apache.org/repos/asf/tez/blob/63177255/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 04074af..f25e583 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -53,7 +53,6 @@ import org.apache.hadoop.yarn.util.Clock;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.TaskLocationHint;
-import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.api.oldrecords.TaskReport;
@@ -358,9 +357,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     readLock = readWriteLock.readLock();
     writeLock = readWriteLock.writeLock();
     this.attempts = Collections.emptyMap();
-    // TODO Avoid reading this from configuration for each task.
-    maxFailedAttempts = this.conf.getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS,
-                              TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT);
+    maxFailedAttempts = vertex.getVertexConfig().getMaxFailedTaskAttempts();
     taskId = TezTaskID.getInstance(vertexId, taskIndex);
     this.taskCommunicatorManagerInterface = taskCommunicatorManagerInterface;
     this.taskHeartbeatHandler = thh;

http://git-wip-us.apache.org/repos/asf/tez/blob/63177255/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index b6d66df..30d65c4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -258,6 +258,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
   private final float maxFailuresPercent;
   private boolean logSuccessDiagnostics = false;
 
+  private final VertexConfigImpl vertexContextConfig;
   //fields initialized in init
 
   @VisibleForTesting
@@ -878,7 +879,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
         vertexOnlyConf.set(keyValuePair.getKey(), keyValuePair.getValue());
       }
     }
-
+    this.vertexContextConfig = new VertexConfigImpl(vertexConf);
 
     this.clock = clock;
     this.appContext = appContext;
@@ -1304,6 +1305,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
     }
   }
 
+  @Override
+  public VertexConfig getVertexConfig() {
+    return vertexContextConfig;
+  }
+
   boolean inTerminalState() {
     VertexState state = getInternalState();
     if (state == VertexState.ERROR || state == VertexState.FAILED
@@ -4635,4 +4641,29 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
       writeLock.unlock();
     }
   }
+
+  @VisibleForTesting
+  static class VertexConfigImpl implements VertexConfig {
+
+    private final int maxFailedTaskAttempts;
+    private final boolean taskRescheduleHigherPriority;
+
+    public VertexConfigImpl(Configuration conf) {
+      this.maxFailedTaskAttempts = conf.getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS,
+          TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT);
+      this.taskRescheduleHigherPriority =
+          conf.getBoolean(TezConfiguration.TEZ_AM_TASK_RESCHEDULE_HIGHER_PRIORITY,
+              TezConfiguration.TEZ_AM_TASK_RESCHEDULE_HIGHER_PRIORITY_DEFAULT);
+    }
+
+    @Override
+    public int getMaxFailedTaskAttempts() {
+      return maxFailedTaskAttempts;
+    }
+
+    @Override
+    public boolean getTaskRescheduleHigherPriority() {
+      return taskRescheduleHigherPriority;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/63177255/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index acf8f23..d5464c8 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -139,6 +139,7 @@ public class TestTaskAttempt {
   }
   
   AppContext appCtx;
+  TezConfiguration vertexConf = new TezConfiguration();
   TaskLocationHint locationHint;
   Vertex mockVertex;
   ServicePluginInfo servicePluginInfo = new ServicePluginInfo()
@@ -157,6 +158,7 @@ public class TestTaskAttempt {
 
     mockVertex = mock(Vertex.class);
     when(mockVertex.getServicePluginInfo()).thenReturn(servicePluginInfo);
+    when(mockVertex.getVertexConfig()).thenReturn(new VertexImpl.VertexConfigImpl(vertexConf));
 
     HistoryEventHandler mockHistHandler = mock(HistoryEventHandler.class);
     doReturn(mockHistHandler).when(appCtx).getHistoryHandler();
@@ -202,7 +204,50 @@ public class TestTaskAttempt {
       assertEquals(host, true, taImpl.taskHosts.contains(host));
     }
   }
-  
+
+  @Test(timeout = 5000)
+  public void testRetriesAtSamePriorityConfig() {
+
+    // Override the test defaults to setup the config change
+    TezConfiguration vertexConf = new TezConfiguration();
+    vertexConf.setBoolean(TezConfiguration.TEZ_AM_TASK_RESCHEDULE_HIGHER_PRIORITY, false);
+    when(mockVertex.getVertexConfig()).thenReturn(new VertexImpl.VertexConfigImpl(vertexConf));
+
+    TaskAttemptImpl.ScheduleTaskattemptTransition sta =
+        new TaskAttemptImpl.ScheduleTaskattemptTransition();
+
+    EventHandler eventHandler = mock(EventHandler.class);
+    TezTaskID taskID = TezTaskID.getInstance(
+        TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1);
+    TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
+        mock(TaskCommunicatorManagerInterface.class), new Configuration(), new SystemClock(),
+        mock(TaskHeartbeatHandler.class), appCtx,
+        false, Resource.newInstance(1024, 1), createFakeContainerContext(), false);
+
+    TaskAttemptImpl taImplReScheduled = new MockTaskAttemptImpl(taskID, 1, eventHandler,
+        mock(TaskCommunicatorManagerInterface.class), new Configuration(), new SystemClock(),
+        mock(TaskHeartbeatHandler.class), appCtx,
+        true, Resource.newInstance(1024, 1), createFakeContainerContext(), false);
+
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+
+    TaskAttemptEventSchedule sEvent = mock(TaskAttemptEventSchedule.class);
+    when(sEvent.getPriorityLowLimit()).thenReturn(3);
+    when(sEvent.getPriorityHighLimit()).thenReturn(1);
+
+    // Verify priority for a non-retried attempt
+    sta.transition(taImpl, sEvent);
+    verify(eventHandler, times(1)).handle(arg.capture());
+    AMSchedulerEventTALaunchRequest launchEvent = (AMSchedulerEventTALaunchRequest) arg.getValue();
+    Assert.assertEquals(2, launchEvent.getPriority());
+
+    // Verify priority for a retried attempt is the same
+    sta.transition(taImplReScheduled, sEvent);
+    verify(eventHandler, times(2)).handle(arg.capture());
+    launchEvent = (AMSchedulerEventTALaunchRequest) arg.getValue();
+    Assert.assertEquals(2, launchEvent.getPriority());
+  }
+
   @Test(timeout = 5000)
   public void testPriority() {
     TaskAttemptImpl.ScheduleTaskattemptTransition sta =

http://git-wip-us.apache.org/repos/asf/tez/blob/63177255/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 3375047..da25927 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -170,6 +170,7 @@ public class TestTaskImpl {
     containerContext = new ContainerContext(localResources, credentials,
         environment, javaOpts);
     Vertex vertex = mock(Vertex.class);
+    doReturn(new VertexImpl.VertexConfigImpl(conf)).when(vertex).getVertexConfig();
     eventHandler = new TestEventHandler();
     
     mockTask = new MockTaskImpl(vertexId, partition,
@@ -784,10 +785,12 @@ public class TestTaskImpl {
   @Test(timeout = 20000)
   public void testFailedThenSpeculativeFailed() {
     conf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1);
+    Vertex vertex = mock(Vertex.class);
+    doReturn(new VertexImpl.VertexConfigImpl(conf)).when(vertex).getVertexConfig();
     mockTask = new MockTaskImpl(vertexId, partition,
         eventHandler, conf, taskCommunicatorManagerInterface, clock,
         taskHeartbeatHandler, appContext, leafVertex,
-        taskResource, containerContext, mock(Vertex.class));
+        taskResource, containerContext, vertex);
     TezTaskID taskId = getNewTaskID();
     scheduleTaskAttempt(taskId);
     MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt();
@@ -817,10 +820,12 @@ public class TestTaskImpl {
   @Test(timeout = 20000)
   public void testFailedThenSpeculativeSucceeded() {
     conf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1);
+    Vertex vertex = mock(Vertex.class);
+    doReturn(new VertexImpl.VertexConfigImpl(conf)).when(vertex).getVertexConfig();
     mockTask = new MockTaskImpl(vertexId, partition,
         eventHandler, conf, taskCommunicatorManagerInterface, clock,
         taskHeartbeatHandler, appContext, leafVertex,
-        taskResource, containerContext, mock(Vertex.class));
+        taskResource, containerContext, vertex);
     TezTaskID taskId = getNewTaskID();
     scheduleTaskAttempt(taskId);
     MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt();


Mime
View raw message