tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject git commit: TEZ-439. Fix task attempt commit flow (bikas)
Date Fri, 20 Sep 2013 17:33:21 GMT
Updated Branches:
  refs/heads/TEZ-398 bd76ffcf2 -> 6ca59ac72


TEZ-439. Fix task attempt commit flow (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/6ca59ac7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/6ca59ac7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/6ca59ac7

Branch: refs/heads/TEZ-398
Commit: 6ca59ac727a0db2d1b033cca269325f49ece6ccd
Parents: bd76ffc
Author: Bikas Saha <bikas@apache.org>
Authored: Fri Sep 20 10:31:28 2013 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Fri Sep 20 10:31:28 2013 -0700

----------------------------------------------------------------------
 .../apache/hadoop/mapred/YarnTezDagChild.java   |   6 -
 .../dag/api/oldrecords/TaskAttemptState.java    |   1 -
 .../dag/app/TaskAttemptListenerImpTezDag.java   |  30 -----
 .../org/apache/tez/dag/app/dag/TaskAttempt.java |   1 +
 .../dag/app/dag/TaskAttemptStateInternal.java   |   1 -
 .../tez/dag/app/dag/event/TaskEventType.java    |   1 -
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |  50 ++------
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   | 114 +++++++++++--------
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   |  62 ----------
 .../tez/dag/app/dag/impl/TestTaskImpl.java      |  63 +++++++---
 .../tez/engine/newapi/TezProcessorContext.java  |   7 --
 .../tez/common/TezTaskUmbilicalProtocol.java    |   3 -
 .../newapi/impl/TezProcessorContextImpl.java    |   5 -
 .../tez/engine/newapi/impl/TezUmbilical.java    |   3 -
 .../apache/hadoop/mapred/LocalJobRunnerTez.java |   7 --
 .../tez/mapreduce/newprocessor/MRTask.java      |  32 ++----
 .../apache/tez/mapreduce/processor/MRTask.java  |  18 ---
 .../tez/mapreduce/TestUmbilicalProtocol.java    |   7 --
 18 files changed, 141 insertions(+), 270 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ca59ac7/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index 6e4e418..5034262 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -333,12 +333,6 @@ public class YarnTezDagChild {
           throws IOException {
         return umbilical.canCommit(taskAttemptID);
       }
-
-      @Override
-      public void commitPending(TezTaskAttemptID taskAttemptID)
-          throws IOException, InterruptedException {
-        umbilical.commitPending(taskAttemptID);
-      }
     };
 
     // report non-pid to application master

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ca59ac7/tez-dag/src/main/java/org/apache/tez/dag/api/oldrecords/TaskAttemptState.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/oldrecords/TaskAttemptState.java
b/tez-dag/src/main/java/org/apache/tez/dag/api/oldrecords/TaskAttemptState.java
index 068913b..926835a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/oldrecords/TaskAttemptState.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/oldrecords/TaskAttemptState.java
@@ -22,7 +22,6 @@ public enum TaskAttemptState {
   NEW, 
   STARTING, 
   RUNNING, 
-  COMMIT_PENDING,  
   SUCCEEDED,
   FAILED,
   KILLED

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ca59ac7/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 8c29fd9..2be9c5f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -44,9 +44,7 @@ import org.apache.tez.common.records.ProceedToCompletionResponse;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.Task;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputConsumable;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
 import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
 import org.apache.tez.dag.app.rm.container.AMContainerImpl;
@@ -346,34 +344,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   */
 
   /**
-   * TaskAttempt is reporting that it is in commit_pending and it is waiting for
-   * the commit Response
-   *
-   * <br/>
-   * Commit it a two-phased protocol. First the attempt informs the
-   * ApplicationMaster that it is
-   * {@link #commitPending(TaskAttemptID, TaskStatus)}. Then it repeatedly polls
-   * the ApplicationMaster whether it {@link #canCommit(TaskAttemptID)} This is
-   * a legacy from the centralized commit protocol handling by the JobTracker.
-   */
-  @Override
-  public void commitPending(TezTaskAttemptID taskAttemptId)
-      throws IOException, InterruptedException {
-    LOG.info("Commit-pending state update from " + taskAttemptId.toString());
-    // An attempt is asking if it can commit its output. This can be decided
-    // only by the task which is managing the multiple attempts. So redirect the
-    // request there.
-    taskHeartbeatHandler.progressing(taskAttemptId);
-    pingContainerHeartbeatHandler(taskAttemptId);
-    //Ignorable TaskStatus? - since a task will send a LastStatusUpdate
-    context.getEventHandler().handle(
-        new TaskAttemptEvent(
-            taskAttemptId,
-            TaskAttemptEventType.TA_COMMIT_PENDING)
-        );
-  }
-
-  /**
    * Child checking whether it can commit.
    *
    * <br/>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ca59ac7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
index ae70022..0cc9163 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
@@ -70,6 +70,7 @@ public interface TaskAttempt {
   TezCounters getCounters();
   float getProgress();
   TaskAttemptState getState();
+  TaskAttemptState getStateNoLock();
 
   /** 
    * Has attempt reached the final state or not.

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ca59ac7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttemptStateInternal.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttemptStateInternal.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttemptStateInternal.java
index 9ad5460..a49c2a3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttemptStateInternal.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttemptStateInternal.java
@@ -30,7 +30,6 @@ public enum TaskAttemptStateInternal {
   START_WAIT,
   RUNNING,
   OUTPUT_CONSUMABLE, 
-  COMMIT_PENDING,
   KILL_IN_PROGRESS, 
   FAIL_IN_PROGRESS,
   KILLED,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ca59ac7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
index d0ad8a0..a0b99a9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
@@ -38,7 +38,6 @@ public enum TaskEventType {
   //Producer:TaskAttempt
   T_ATTEMPT_LAUNCHED,
   T_ATTEMPT_OUTPUT_CONSUMABLE,
-  T_ATTEMPT_COMMIT_PENDING,
   T_ATTEMPT_FAILED,
   T_ATTEMPT_SUCCEEDED,
   T_ATTEMPT_KILLED

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ca59ac7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index f171afe..1ae9dcd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -176,7 +176,6 @@ public class TaskAttemptImpl implements TaskAttempt,
         .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING,
TaskAttemptEventType.TA_STATUS_UPDATE, STATUS_UPDATER)
         .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
         .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, new OutputConsumableTransition()) //Optional, may
not come in for all tasks.
-        .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptEventType.TA_COMMIT_PENDING, new CommitPendingTransition())
         .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.SUCCEEDED,
TaskAttemptEventType.TA_DONE, new SucceededTransition())
         .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAIL_IN_PROGRESS,
TaskAttemptEventType.TA_FAILED, new TerminatedWhileRunningTransition(FAILED_HELPER))
         .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAIL_IN_PROGRESS,
TaskAttemptEventType.TA_TIMED_OUT, new TerminatedWhileRunningTransition(FAILED_HELPER))
@@ -185,11 +184,11 @@ public class TaskAttemptImpl implements TaskAttempt,
         .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.KILL_IN_PROGRESS,
TaskAttemptEventType.TA_NODE_FAILED, new TerminatedWhileRunningTransition(KILLED_HELPER))
         .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAIL_IN_PROGRESS,
TaskAttemptEventType.TA_CONTAINER_TERMINATING, new TerminatedWhileRunningTransition(FAILED_HELPER))
         .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedWhileRunningTransition())
+        .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAIL_IN_PROGRESS,
TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES, new TerminatedWhileRunningTransition(FAILED_HELPER))
 
         .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
TaskAttemptEventType.TA_STATUS_UPDATE, STATUS_UPDATER)
         .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
         .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
TaskAttemptEventType.TA_OUTPUT_CONSUMABLE) // Stuck RPC. The client retries in a loop.
-        .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptEventType.TA_COMMIT_PENDING, new CommitPendingAtOutputConsumableTransition())
         .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.SUCCEEDED,
TaskAttemptEventType.TA_DONE, new SucceededTransition())
         .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.FAIL_IN_PROGRESS,
TaskAttemptEventType.TA_FAILED,  new TerminatedWhileRunningTransition(FAILED_HELPER))
         .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.FAIL_IN_PROGRESS,
TaskAttemptEventType.TA_TIMED_OUT,  new TerminatedWhileRunningTransition(FAILED_HELPER))
@@ -201,19 +200,6 @@ public class TaskAttemptImpl implements TaskAttempt,
         .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedBeforeRunningTransition())
         .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.FAIL_IN_PROGRESS,
TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES, new TerminatedWhileRunningTransition(FAILED_HELPER))
 
-        .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptEventType.TA_STATUS_UPDATE, STATUS_UPDATER)
-        .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
-        .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptEventType.TA_COMMIT_PENDING)
-        .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.SUCCEEDED,
TaskAttemptEventType.TA_DONE, new SucceededTransition())
-        .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.FAIL_IN_PROGRESS,
TaskAttemptEventType.TA_FAILED, new TerminatedWhileRunningTransition(FAILED_HELPER))
-        .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.FAIL_IN_PROGRESS,
TaskAttemptEventType.TA_TIMED_OUT, new TerminatedWhileRunningTransition(FAILED_HELPER))
-        .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.FAIL_IN_PROGRESS,
TaskAttemptEventType.TA_FAIL_REQUEST, new TerminatedWhileRunningTransition(FAILED_HELPER))
-        .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.KILL_IN_PROGRESS,
TaskAttemptEventType.TA_KILL_REQUEST, new TerminatedWhileRunningTransition(KILLED_HELPER))
-        .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.KILL_IN_PROGRESS,
TaskAttemptEventType.TA_NODE_FAILED, new TerminatedWhileRunningTransition(KILLED_HELPER))
-        .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.FAIL_IN_PROGRESS,
TaskAttemptEventType.TA_CONTAINER_TERMINATING, new TerminatedWhileRunningTransition(FAILED_HELPER))
-        .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedBeforeRunningTransition())
-        .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.FAIL_IN_PROGRESS,
TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES, new TerminatedWhileRunningTransition(FAILED_HELPER))
-
         .addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILLED,
TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedWhileTerminating())
         .addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILL_IN_PROGRESS,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
         .addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILL_IN_PROGRESS,
EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE,
TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE,
TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST,
TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING,
TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES))
@@ -369,13 +355,18 @@ public class TaskAttemptImpl implements TaskAttempt,
   public TaskAttemptState getState() {
     readLock.lock();
     try {
-      return getExternalState(stateMachine.getCurrentState());
+      return getStateNoLock();
     } finally {
       readLock.unlock();
     }
   }
 
   @Override
+  public TaskAttemptState getStateNoLock() {
+    return getExternalState(stateMachine.getCurrentState());
+  }
+
+  @Override
   public boolean isFinished() {
     readLock.lock();
     try {
@@ -530,10 +521,8 @@ public class TaskAttemptImpl implements TaskAttempt,
     case START_WAIT:
       return TaskAttemptState.STARTING;
     case RUNNING:
-      return TaskAttemptState.RUNNING;
-    case COMMIT_PENDING:
     case OUTPUT_CONSUMABLE:
-      return TaskAttemptState.COMMIT_PENDING;
+      return TaskAttemptState.RUNNING;
     case FAILED:
     case FAIL_IN_PROGRESS:
       return TaskAttemptState.FAILED;
@@ -1076,15 +1065,6 @@ public class TaskAttemptImpl implements TaskAttempt,
     }
   }
 
-  protected static class CommitPendingTransition implements
-      SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
-    @Override
-    public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
-      ta.sendEvent(new TaskEventTAUpdate(ta.attemptId,
-          TaskEventType.T_ATTEMPT_COMMIT_PENDING));
-    }
-  }
-
   protected static class SucceededTransition implements
       SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
     @Override
@@ -1139,18 +1119,8 @@ public class TaskAttemptImpl implements TaskAttempt,
     public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
       super.transition(ta, event);
       ta.sendTaskAttemptCleanupEvent();
-    }
-  }
-
-  protected static class CommitPendingAtOutputConsumableTransition extends
-      CommitPendingTransition {
-
-    @Override
-    public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
-      // TODO Figure out the interaction between OUTPUT_CONSUMABLE AND
-      // COMMIT_PENDING, Ideally both should not exist for the same task.
-      super.transition(ta, event);
-      LOG.info("Received a commit pending while in the OutputConsumable state");
+      TaskAttemptEventContainerTerminated tEvent = (TaskAttemptEventContainerTerminated)
event;
+      ta.addDiagnosticInfo(tEvent.getDiagnosticInfo());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ca59ac7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index ff9ded7..92a1859 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -167,9 +167,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         TaskEventType.T_ATTEMPT_OUTPUT_CONSUMABLE,
         new AttemptProcessingCompleteTransition())
     .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
-        TaskEventType.T_ATTEMPT_COMMIT_PENDING,
-        new AttemptCommitPendingTransition())
-    .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
         TaskEventType.T_ADD_SPEC_ATTEMPT, new RedundantScheduleTransition())
     .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.SUCCEEDED,
         TaskEventType.T_ATTEMPT_SUCCEEDED,
@@ -203,7 +200,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
             TaskEventType.T_TERMINATE,
             TaskEventType.T_ATTEMPT_LAUNCHED,
             TaskEventType.T_ATTEMPT_OUTPUT_CONSUMABLE,
-            TaskEventType.T_ATTEMPT_COMMIT_PENDING,
             TaskEventType.T_ATTEMPT_FAILED,
             TaskEventType.T_ATTEMPT_SUCCEEDED,
             TaskEventType.T_ADD_SPEC_ATTEMPT))
@@ -584,17 +580,50 @@ public class TaskImpl implements Task, EventHandler<TaskEvent>
{
 
   @Override
   public boolean canCommit(TezTaskAttemptID taskAttemptID) {
-    readLock.lock();
-    boolean canCommit = false;
+    writeLock.lock();
     try {
-      if (commitAttempt != null) {
-        canCommit = taskAttemptID.equals(commitAttempt);
-        LOG.info("Result of canCommit for " + taskAttemptID + ":" + canCommit);
+      if (getState() != TaskState.RUNNING) {
+        LOG.info("Task not running. Issuing kill to bad commit attempt " + taskAttemptID);
+        eventHandler.handle(new TaskAttemptEventKillRequest(taskAttemptID
+            , "Task not running. Bad attempt."));
+        return false;
       }
+      if (commitAttempt == null) {
+        TaskAttempt ta = getAttempt(taskAttemptID);
+        if (ta == null) {
+          throw new TezUncheckedException("Unknown task for commit: " + taskAttemptID);
+        }
+        // Its ok to get a non-locked state snapshot since we handle changes of 
+        // state in the task attempt. Dont want to deadlock here.
+        TaskAttemptState taState = ta.getStateNoLock();
+        if (taState == TaskAttemptState.RUNNING) {
+          commitAttempt = taskAttemptID;
+          LOG.info(taskAttemptID + " given a go for committing the task output.");
+          return true;
+        } else {
+          LOG.info(taskAttemptID + " with state: " + taState + 
+              " given a no-go for commit because its not running.");
+          return false;
+        }
+      } else {
+        if (commitAttempt.equals(taskAttemptID)) {
+          LOG.info(taskAttemptID + " given a go for committing the task output.");
+          return true;
+        }
+        // Don't think this can be a pluggable decision, so simply raise an
+        // event for the TaskAttempt to delete its output.
+        // Wait for commit attempt to succeed. Dont kill this. If commit
+        // attempt fails then choose a different committer. When commit attempt 
+        // succeeds then this and others will be killed
+        LOG.info(commitAttempt
+            + " is current committer. Commit waiting for:  "
+            + taskAttemptID);
+        return false;
+      }
+    
     } finally {
-      readLock.unlock();
+      writeLock.unlock();
     }
-    return canCommit;
   }
 
   // TODO remove hacky name lookup
@@ -887,50 +916,34 @@ public class TaskImpl implements Task, EventHandler<TaskEvent>
{
     }
   }
 
-  private static class AttemptCommitPendingTransition implements
-      SingleArcTransition<TaskImpl, TaskEvent> {
-    @Override
-    public void transition(TaskImpl task, TaskEvent event) {
-      TaskEventTAUpdate ev = (TaskEventTAUpdate) event;
-      // The nextAttemptNumber is commit pending, decide on set the
-      // commitAttempt
-      TezTaskAttemptID attemptID = ev.getTaskAttemptID();
-      if (task.commitAttempt == null) {
-        // TODO: validate attemptID
-        task.commitAttempt = attemptID;
-        LOG.info(attemptID + " given a go for committing the task output.");
-      } else {
-        // Don't think this can be a pluggable decision, so simply raise an
-        // event for the TaskAttempt to delete its output.
-        // TODO . Wait for commit attempt to succeed. Dont kill this. If commit
-        // attempt fails then choose a different committer.
-        LOG.info(task.commitAttempt
-            + " already given a go for committing the task output, so killing "
-            + attemptID);
-        task.eventHandler.handle(new TaskAttemptEventKillRequest(attemptID,
-            "Output being committed by alternate attemptId."));
-      }
-    }
-  }
-
   private static class AttemptSucceededTransition
       implements SingleArcTransition<TaskImpl, TaskEvent> {
     @Override
     public void transition(TaskImpl task, TaskEvent event) {
-      task.handleTaskAttemptCompletion(
-          ((TaskEventTAUpdate) event).getTaskAttemptID(),
+      TezTaskAttemptID successTaId = ((TaskEventTAUpdate) event).getTaskAttemptID();
+      
+      if (task.commitAttempt != null && 
+          !task.commitAttempt.equals(successTaId)) {
+        // The succeeded attempt is not the one that was selected to commit
+        // This is impossible and has to be a bug
+        throw new TezUncheckedException("TA: " + successTaId 
+            + " succeeded but TA: " + task.commitAttempt 
+            + " was expected to commit and succeed");
+      }
+      
+      task.handleTaskAttemptCompletion(successTaId, 
           TezDependentTaskCompletionEvent.Status.SUCCEEDED);
       task.finishedAttempts++;
       --task.numberUncompletedAttempts;
-      task.successfulAttempt = ((TaskEventTAUpdate) event).getTaskAttemptID();
+      task.successfulAttempt = successTaId;
       task.eventHandler.handle(new VertexEventTaskCompleted(
           task.taskId, TaskState.SUCCEEDED));
       LOG.info("Task succeeded with attempt " + task.successfulAttempt);
-      // issue kill to all other attempts
       if (task.historyTaskStartGenerated) {
         task.logJobHistoryTaskFinishedEvent();
       }
 
+      // issue kill to all other attempts
       for (TaskAttempt attempt : task.attempts.values()) {
         if (attempt.getID() != task.successfulAttempt &&
             // This is okay because it can only talk us out of sending a
@@ -954,12 +967,18 @@ public class TaskImpl implements Task, EventHandler<TaskEvent>
{
       SingleArcTransition<TaskImpl, TaskEvent> {
     @Override
     public void transition(TaskImpl task, TaskEvent event) {
+      TaskEventTAUpdate castEvent = (TaskEventTAUpdate) event;
+      if (task.commitAttempt !=null && 
+          castEvent.getTaskAttemptID().equals(task.commitAttempt)) {
+        task.commitAttempt = null;
+      }
       task.handleTaskAttemptCompletion(
-          ((TaskEventTAUpdate) event).getTaskAttemptID(),
+          castEvent.getTaskAttemptID(),
           TezDependentTaskCompletionEvent.Status.KILLED);
       task.finishedAttempts++;
-      --task.numberUncompletedAttempts;
-      if (task.successfulAttempt == null) {
+      // we don't need a new event if we already have a spare
+      if (--task.numberUncompletedAttempts == 0
+          && task.successfulAttempt == null) {
         task.addAndScheduleAttempt();
       }
     }
@@ -1001,7 +1020,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent>
{
     public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
       task.failedAttempts++;
       TaskEventTAUpdate castEvent = (TaskEventTAUpdate) event;
-      if (castEvent.getTaskAttemptID().equals(task.commitAttempt)) {
+      if (task.commitAttempt != null && 
+          castEvent.getTaskAttemptID().equals(task.commitAttempt)) {
         task.commitAttempt = null;
       }
       if (castEvent.getTaskAttemptID().equals(task.outputConsumableAttempt)) {
@@ -1143,6 +1163,10 @@ public class TaskImpl implements Task, EventHandler<TaskEvent>
{
   }
 
   private void killUnfinishedAttempt(TaskAttempt attempt, String logMsg) {
+    if (commitAttempt != null && commitAttempt.equals(attempt)) {
+      LOG.info("Removing commit attempt: " + commitAttempt);
+      commitAttempt = null;
+    }
     if (attempt != null && !attempt.isFinished()) {
       eventHandler.handle(new TaskAttemptEventKillRequest(attempt.getID(),
           logMsg));

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ca59ac7/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index daac3c7..b5e283b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -493,68 +493,6 @@ public class TestTaskAttempt {
   }
 
   @Test
-  // Ensure ContainerTerminated is handled correctly by the TaskAttempt
-  public void testContainerTerminatedWhileCommitting() throws Exception {
-    ApplicationId appId = ApplicationId.newInstance(1, 2);
-    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
-        appId, 0);
-    TezDAGID dagID = new TezDAGID(appId, 1);
-    TezVertexID vertexID = new TezVertexID(dagID, 1);
-    TezTaskID taskID = new TezTaskID(vertexID, 1);
-    TezTaskAttemptID taskAttemptID = new TezTaskAttemptID(taskID, 0);
-
-    MockEventHandler eventHandler = new MockEventHandler();
-    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
-    when(taListener.getAddress()).thenReturn(
-        new InetSocketAddress("localhost", 0));
-
-    Configuration taskConf = new Configuration();
-    taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
-    taskConf.setBoolean("fs.file.impl.disable.cache", true);
-
-    TaskLocationHint locationHint = new TaskLocationHint(
-        new HashSet<String>(Arrays.asList(new String[] {"127.0.0.1"})), null);
-    Resource resource = Resource.newInstance(1024, 1);
-
-    NodeId nid = NodeId.newInstance("127.0.0.1", 0);
-    ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
-    Container container = mock(Container.class);
-    when(container.getId()).thenReturn(contId);
-    when(container.getNodeId()).thenReturn(nid);
-    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
-
-    AppContext appCtx = mock(AppContext.class);
-    AMContainerMap containers = new AMContainerMap(
-        mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
-        appCtx);
-    containers.addContainerIfNew(container);
-
-    doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
-    doReturn(containers).when(appCtx).getAllContainers();
-
-    TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
-        taListener, taskConf, new SystemClock(),
-        mock(TaskHeartbeatHandler.class), appCtx, locationHint, false,
-        resource, createFakeContainerContext());
-
-    taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, null));
-    // At state STARTING.
-    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId,
-        null));
-    assertEquals("Task attempt is not in running state", taImpl.getState(),
-        TaskAttemptState.RUNNING);
-    taImpl.handle(new TaskAttemptEvent(taskAttemptID,
-        TaskAttemptEventType.TA_COMMIT_PENDING));
-    assertEquals("Task attempt is not in commit pending state",
-        taImpl.getState(), TaskAttemptState.COMMIT_PENDING);
-    taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID, null));
-    assertFalse(
-        "InternalError occurred trying to handle TA_CONTAINER_TERMINATED",
-        eventHandler.internalError);
-    // TODO Verify diagnostics
-  }
-
-  @Test
   // Ensure ContainerTerminating and ContainerTerminated is handled correctly by
   // the TaskAttempt
   public void testContainerTerminatedAfterSuccess() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ca59ac7/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index ad36b7b..be3915d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -146,12 +146,6 @@ public class TestTaskImpl {
     assertTaskRunningState();
   }
 
-  private void commitTaskAttempt(TezTaskAttemptID attemptId) {
-    mockTask.handle(new TaskEventTAUpdate(attemptId,
-        TaskEventType.T_ATTEMPT_COMMIT_PENDING));
-    assertTaskRunningState();
-  }
-
   private void updateAttemptProgress(MockTaskAttemptImpl attempt, float p) {
     attempt.setProgress(p);
   }
@@ -315,9 +309,9 @@ public class TestTaskImpl {
     TezTaskID taskId = getNewTaskID();
     scheduleTaskAttempt(taskId);
     launchTaskAttempt(mockTask.getLastAttempt().getID());
-    updateAttemptState(mockTask.getLastAttempt(),
-        TaskAttemptState.COMMIT_PENDING);
-    commitTaskAttempt(mockTask.getLastAttempt().getID());
+    updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING);
+    assertTrue("First attempt should commit",
+        mockTask.canCommit(mockTask.getLastAttempt().getID()));
 
     // During the task attempt commit there is an exception which causes
     // the attempt to fail
@@ -325,15 +319,54 @@ public class TestTaskImpl {
     failRunningTaskAttempt(mockTask.getLastAttempt().getID());
 
     assertEquals(2, mockTask.getAttemptList().size());
+    
+    assertFalse("First attempt should not commit",
+        mockTask.canCommit(mockTask.getAttemptList().get(0).getID()));
+    updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING);
+    assertTrue("Second attempt should commit",
+        mockTask.canCommit(mockTask.getLastAttempt().getID()));
+
     updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.SUCCEEDED);
-    commitTaskAttempt(mockTask.getLastAttempt().getID());
     mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
         TaskEventType.T_ATTEMPT_SUCCEEDED));
 
+    assertTaskSucceededState();
+  }
+
+
+  @Test
+  public void testChangeCommitTaskAttempt() {
+    TezTaskID taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    launchTaskAttempt(mockTask.getLastAttempt().getID());
+    updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING);
+    
+    // Add a speculative task attempt that succeeds
+    mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
+        TaskEventType.T_ADD_SPEC_ATTEMPT));
+    launchTaskAttempt(mockTask.getLastAttempt().getID());
+    updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING);
+    
+    assertTrue("Second attempt should commit",
+        mockTask.canCommit(mockTask.getAttemptList().get(1).getID()));
     assertFalse("First attempt should not commit",
         mockTask.canCommit(mockTask.getAttemptList().get(0).getID()));
-    assertTrue("Second attempt should commit",
-        mockTask.canCommit(mockTask.getLastAttempt().getID()));
+
+    // During the task attempt commit there is an exception which causes
+    // the second attempt to fail
+    updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.FAILED);
+    failRunningTaskAttempt(mockTask.getLastAttempt().getID());
+
+    assertEquals(2, mockTask.getAttemptList().size());
+    
+    assertFalse("Second attempt should not commit",
+        mockTask.canCommit(mockTask.getAttemptList().get(1).getID()));
+    assertTrue("First attempt should commit",
+        mockTask.canCommit(mockTask.getAttemptList().get(0).getID()));
+
+    updateAttemptState(mockTask.getAttemptList().get(0), TaskAttemptState.SUCCEEDED);
+    mockTask.handle(new TaskEventTAUpdate(mockTask.getAttemptList().get(0).getID(),
+        TaskEventType.T_ATTEMPT_SUCCEEDED));
 
     assertTaskSucceededState();
   }
@@ -349,7 +382,6 @@ public class TestTaskImpl {
     mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
         TaskEventType.T_ADD_SPEC_ATTEMPT));
     launchTaskAttempt(mockTask.getLastAttempt().getID());
-    commitTaskAttempt(mockTask.getLastAttempt().getID());
     mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
         TaskEventType.T_ATTEMPT_SUCCEEDED));
 
@@ -456,6 +488,11 @@ public class TestTaskImpl {
     public TaskAttemptState getState() {
       return state;
     }
+    
+    @Override
+    public TaskAttemptState getStateNoLock() {
+      return state;
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ca59ac7/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezProcessorContext.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezProcessorContext.java
b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezProcessorContext.java
index cbe6e34..5b44f23 100644
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezProcessorContext.java
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezProcessorContext.java
@@ -38,11 +38,4 @@ public interface TezProcessorContext extends TezTaskContext {
    */
   public boolean canCommit() throws IOException;
 
-  /**
-   * Tell the AM that this processor has a pending commit
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  public void commitPending() throws IOException, InterruptedException;
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ca59ac7/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
b/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
index 0ff424c..28991a8 100644
--- a/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
+++ b/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
@@ -42,9 +42,6 @@ public interface TezTaskUmbilicalProtocol extends Master {
 
   ContainerTask getTask(ContainerContext containerContext) throws IOException;
 
-  void commitPending(TezTaskAttemptID taskId)
-      throws IOException, InterruptedException;
-
   boolean canCommit(TezTaskAttemptID taskid) throws IOException;
 
   // TODO TEZAM5 Can commitPending and outputReady be collapsed into a single

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ca59ac7/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
index fd3cdf0..d710f7a 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
@@ -83,9 +83,4 @@ public class TezProcessorContextImpl extends TezTaskContextImpl
     return tezUmbilical.canCommit(this.taskAttemptID);
   }
 
-  @Override
-  public void commitPending() throws IOException, InterruptedException {
-    tezUmbilical.commitPending(this.taskAttemptID);
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ca59ac7/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
index 43f5edc..5889622 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
@@ -33,7 +33,4 @@ public interface TezUmbilical {
 
   public boolean canCommit(TezTaskAttemptID taskAttemptID) throws IOException;
 
-  public void commitPending(TezTaskAttemptID taskAttemptID)
-      throws IOException, InterruptedException;
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ca59ac7/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
index 4ad1026..1362396 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
@@ -606,13 +606,6 @@ public class LocalJobRunnerTez implements ClientProtocol {
       return null;
     }
 
-    @Override
-    public void commitPending(TezTaskAttemptID taskId)
-        throws IOException, InterruptedException {
-      // TODO Auto-generated method stub
-      // TODO TODONEWTEZ
-    }
-
   }
 
   public LocalJobRunnerTez(Configuration conf) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ca59ac7/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTask.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTask.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTask.java
index 2db823d..d71dba0 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTask.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTask.java
@@ -464,26 +464,6 @@ public abstract class MRTask {
     if (output instanceof SimpleOutput) {
       SimpleOutput sOut = (SimpleOutput)output;
       if (sOut.isCommitRequired()) {
-        processorContext.commitPending();
-        // TODO NEWTEZ TEZ-439
-  //      int retries = MAX_RETRIES;
-  //      setState(TezTaskStatus.State.COMMIT_PENDING);
-  //      //say the task tracker that task is commit pending
-  //      // TODO TEZAM2 - Why is the commitRequired check missing ?
-  //      while (true) {
-  //        try {
-  //          umbilical.commitPending(taskAttemptId, status);
-  //          break;
-  //        } catch (InterruptedException ie) {
-  //          // ignore
-  //        } catch (IOException ie) {
-  //          LOG.warn("Failure sending commit pending: " +
-  //              StringUtils.stringifyException(ie));
-  //          if (--retries == 0) {
-  //            System.exit(67);
-  //          }
-  //        }
-  //      }
         //wait for commit approval and commit
         // TODO EVENTUALLY - Commit is not required for map tasks.
         // skip a couple of RPCs before exiting.
@@ -517,14 +497,24 @@ public abstract class MRTask {
   }
 
   private void commit(SimpleOutput output) throws IOException {
-    while (!processorContext.canCommit()) {
+    int retries = 3;
+    while (true) {
       // This will loop till the AM asks for the task to be killed. As
       // against, the AM sending a signal to the task to kill itself
       // gracefully.
       try {
+        if (processorContext.canCommit()) {
+          break;
+        }
         Thread.sleep(1000);
       } catch(InterruptedException ie) {
         //ignore
+      } catch (IOException ie) {
+        LOG.warn("Failure sending canCommit: "
+            + StringUtils.stringifyException(ie));
+        if (--retries == 0) {
+          throw ie;
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ca59ac7/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
index a6b5470..0653cc8 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
@@ -400,24 +400,6 @@ public abstract class MRTask extends RunningTaskContext {
     TezTaskUmbilicalProtocol umbilical = getUmbilical();
     // TODO TEZ Interaciton between Commit and OutputReady. Merge ?
     if (isCommitRequired()) {
-      int retries = MAX_RETRIES;
-      setState(TezTaskStatus.State.COMMIT_PENDING);
-      // say the task tracker that task is commit pending
-      // TODO TEZAM2 - Why is the commitRequired check missing ?
-      while (true) {
-        try {
-          umbilical.commitPending(taskAttemptId);
-          break;
-        } catch (InterruptedException ie) {
-          // ignore
-        } catch (IOException ie) {
-          LOG.warn("Failure sending commit pending: " +
-              StringUtils.stringifyException(ie));
-          if (--retries == 0) {
-            System.exit(67);
-          }
-        }
-      }
       //wait for commit approval and commit
       // TODO EVENTUALLY - Commit is not required for map tasks. skip a couple of RPCs before
exiting.
       commit(umbilical, reporter, committer);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ca59ac7/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
index 07f3a2c..e5cc902 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
@@ -79,13 +79,6 @@ public class TestUmbilicalProtocol implements TezTaskUmbilicalProtocol
{
     return null;
   }
 
-
-  @Override
-  public void commitPending(TezTaskAttemptID taskId)
-      throws IOException, InterruptedException {
-    LOG.info("Got 'commit-pending' from " + taskId);
-  }
-
   @Override
   public boolean canCommit(TezTaskAttemptID taskid) throws IOException {
     LOG.info("Got 'can-commit' from " + taskid);


Mime
View raw message