tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject tez git commit: TEZ-2317. Event processing backlog can result in task failures for short tasks (bikas)
Date Thu, 16 Apr 2015 18:43:02 GMT
Repository: tez
Updated Branches:
  refs/heads/master bfb34afba -> e1968681c


TEZ-2317. Event processing backlog can result in task failures for short tasks (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e1968681
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e1968681
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e1968681

Branch: refs/heads/master
Commit: e1968681cee821103e0105e4948c4fc6dc949776
Parents: bfb34af
Author: Bikas Saha <bikas@apache.org>
Authored: Thu Apr 16 11:42:09 2015 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Thu Apr 16 11:42:09 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   | 15 +++++++++++--
 .../tez/dag/app/dag/impl/TestTaskImpl.java      | 22 ++++++++++++++++++++
 .../api/events/TaskStatusUpdateEvent.java       |  2 ++
 4 files changed, 39 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/e1968681/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a41a724..ce9bfe8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -288,6 +288,8 @@ TEZ-UI CHANGES (TEZ-8):
 Release 0.5.4: Unreleased
 
 ALL CHANGES:
+  TEZ-2317. Event processing backlog can result in task failures for short
+  tasks
   TEZ-2289. ATSHistoryLoggingService can generate ArrayOutOfBoundsException.
   TEZ-2257. Fix potential NPEs in TaskReporter.
   TEZ-2192. Relocalization does not check for source.

http://git-wip-us.apache.org/repos/asf/tez/blob/e1968681/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 10a688f..a1eed07 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -715,7 +715,18 @@ public class TaskImpl implements Task, EventHandler<TaskEvent>
{
   public boolean canCommit(TezTaskAttemptID taskAttemptID) {
     writeLock.lock();
     try {
-      if (getState() != TaskState.RUNNING) {
+      TaskState state = getState();
+      if (state == TaskState.SCHEDULED) {
+        // the actual running task ran and is done and asking for commit. we are still stuck

+        // in the scheduled state which indicates a backlog in event processing. lets wait
for the
+        // backlog to clear. returning false will make the attempt come back to us.
+        LOG.debug("Event processing delay. "
+            + "Attempt committing before state machine transitioned to running : Task {}",
taskId);
+        return false;
+      }
+      // at this point the attempt is no longer in scheduled state or else we would still

+      // have been in scheduled state in task impl.
+      if (state != TaskState.RUNNING) {
         LOG.info("Task not running. Issuing kill to bad commit attempt " + taskAttemptID);
         eventHandler.handle(new TaskAttemptEventKillRequest(taskAttemptID
             , "Task not running. Bad attempt.", TaskAttemptTerminationCause.TERMINATED_ORPHANED));
@@ -758,7 +769,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       writeLock.unlock();
     }
   }
-
+  
   TaskAttemptImpl createAttempt(int attemptNumber) {
     return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler,
         taskAttemptListener, conf, clock, taskHeartbeatHandler, appContext,

http://git-wip-us.apache.org/repos/asf/tez/blob/e1968681/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 1c4b319..9509df4 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -467,6 +467,28 @@ public class TestTaskImpl {
 
     assertTaskSucceededState();
   }
+  
+
+  @Test(timeout = 5000)
+  public void testEventBacklogDuringTaskAttemptCommit() {
+    TezTaskID taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    assertEquals(TaskState.SCHEDULED, mockTask.getState());
+    // simulate
+    // task in scheduled state due to event backlog - real task done and calling canCommit
+    assertFalse("Commit should return false to make running task wait",
+        mockTask.canCommit(mockTask.getLastAttempt().getID()));
+    launchTaskAttempt(mockTask.getLastAttempt().getID());
+    updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING);
+    assertTrue("Task state in AM is running now. Can commit.",
+        mockTask.canCommit(mockTask.getLastAttempt().getID()));
+
+    updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.SUCCEEDED);
+    mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
+        TaskEventType.T_ATTEMPT_SUCCEEDED));
+
+    assertTaskSucceededState();
+  }
 
 
   @Test(timeout = 5000)

http://git-wip-us.apache.org/repos/asf/tez/blob/e1968681/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java
index 875a345..6465bed 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/events/TaskStatusUpdateEvent.java
@@ -66,6 +66,8 @@ public class TaskStatusUpdateEvent extends Event implements Writable {
     if (statistics != null) {
       out.writeBoolean(true);
       statistics.write(out);
+    } else {
+      out.writeBoolean(false);
     }
   }
 


Mime
View raw message