tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject tez git commit: Manual merge of TEZ-2317 from master
Date Thu, 16 Apr 2015 18:50:24 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.6 51903629d -> 3288225a8


Manual merge of TEZ-2317 from master


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3288225a
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3288225a
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3288225a

Branch: refs/heads/branch-0.6
Commit: 3288225a8ef0807000a98dd087203852efbd4699
Parents: 5190362
Author: Bikas Saha <bikas@apache.org>
Authored: Thu Apr 16 11:50:17 2015 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Thu Apr 16 11:50:17 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   | 17 +++++++++++++--
 .../tez/dag/app/dag/impl/TestTaskImpl.java      | 22 ++++++++++++++++++++
 3 files changed, 39 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/3288225a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3eb5b2e..3b238fa 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -177,6 +177,8 @@ TEZ-UI CHANGES (TEZ-8):
 Release 0.5.4: Unreleased
 
 ALL CHANGES:
+  TEZ-2317. Event processing backlog can result in task failures for short
+  tasks
   TEZ-2289. ATSHistoryLoggingService can generate ArrayOutOfBoundsException.
   TEZ-2257. Fix potential NPEs in TaskReporter.
   TEZ-2192. Relocalization does not check for source.

http://git-wip-us.apache.org/repos/asf/tez/blob/3288225a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 77e724d..ebd27ff 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -699,7 +699,20 @@ public class TaskImpl implements Task, EventHandler<TaskEvent>
{
   public boolean canCommit(TezTaskAttemptID taskAttemptID) {
     writeLock.lock();
     try {
-      if (getState() != TaskState.RUNNING) {
+      TaskState state = getState();
+      if (state == TaskState.SCHEDULED) {
+        // the actual running task ran and is done and asking for commit. we are still stuck

+        // in the scheduled state which indicates a backlog in event processing. lets wait
for the
+        // backlog to clear. returning false will make the attempt come back to us.
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Event processing delay. "
+            + "Attempt committing before state machine transitioned to running : Task:" +
taskId);
+          return false;
+        }
+      }
+      // at this point the attempt is no longer in scheduled state or else we would still

+      // have been in scheduled state in task impl.
+      if (state != TaskState.RUNNING) {
         LOG.info("Task not running. Issuing kill to bad commit attempt " + taskAttemptID);
         eventHandler.handle(new TaskAttemptEventKillRequest(taskAttemptID
             , "Task not running. Bad attempt.", TaskAttemptTerminationCause.TERMINATED_ORPHANED));
@@ -742,7 +755,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       writeLock.unlock();
     }
   }
-
+  
   TaskAttemptImpl createAttempt(int attemptNumber) {
     return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler,
         taskAttemptListener, conf, clock, taskHeartbeatHandler, appContext,

http://git-wip-us.apache.org/repos/asf/tez/blob/3288225a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index fb1db0b..ec13607 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -467,6 +467,28 @@ public class TestTaskImpl {
 
     assertTaskSucceededState();
   }
+  
+
+  @Test(timeout = 5000)
+  public void testEventBacklogDuringTaskAttemptCommit() {
+    TezTaskID taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    assertEquals(TaskState.SCHEDULED, mockTask.getState());
+    // simulate
+    // task in scheduled state due to event backlog - real task done and calling canCommit
+    assertFalse("Commit should return false to make running task wait",
+        mockTask.canCommit(mockTask.getLastAttempt().getID()));
+    launchTaskAttempt(mockTask.getLastAttempt().getID());
+    updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING);
+    assertTrue("Task state in AM is running now. Can commit.",
+        mockTask.canCommit(mockTask.getLastAttempt().getID()));
+
+    updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.SUCCEEDED);
+    mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
+        TaskEventType.T_ATTEMPT_SUCCEEDED));
+
+    assertTaskSucceededState();
+  }
 
 
   @Test(timeout = 5000)


Mime
View raw message