Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 237D310F95 for ; Fri, 20 Sep 2013 17:33:48 +0000 (UTC) Received: (qmail 69288 invoked by uid 500); 20 Sep 2013 17:33:47 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 69258 invoked by uid 500); 20 Sep 2013 17:33:46 -0000 Mailing-List: contact commits-help@tez.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.incubator.apache.org Delivered-To: mailing list commits@tez.incubator.apache.org Received: (qmail 69250 invoked by uid 99); 20 Sep 2013 17:33:44 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Sep 2013 17:33:44 +0000 X-ASF-Spam-Status: No, hits=-2000.6 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Fri, 20 Sep 2013 17:33:41 +0000 Received: (qmail 69086 invoked by uid 99); 20 Sep 2013 17:33:21 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Sep 2013 17:33:21 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 16D098AF289; Fri, 20 Sep 2013 17:33:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: bikas@apache.org To: commits@tez.incubator.apache.org Message-Id: <6df00d190e314cd99f63c64f6dc29cb9@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: TEZ-439. Fix task attempt commit flow (bikas) Date: Fri, 20 Sep 2013 17:33:21 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Updated Branches: refs/heads/TEZ-398 bd76ffcf2 -> 6ca59ac72 TEZ-439. Fix task attempt commit flow (bikas) Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/6ca59ac7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/6ca59ac7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/6ca59ac7 Branch: refs/heads/TEZ-398 Commit: 6ca59ac727a0db2d1b033cca269325f49ece6ccd Parents: bd76ffc Author: Bikas Saha Authored: Fri Sep 20 10:31:28 2013 -0700 Committer: Bikas Saha Committed: Fri Sep 20 10:31:28 2013 -0700 ---------------------------------------------------------------------- .../apache/hadoop/mapred/YarnTezDagChild.java | 6 - .../dag/api/oldrecords/TaskAttemptState.java | 1 - .../dag/app/TaskAttemptListenerImpTezDag.java | 30 ----- .../org/apache/tez/dag/app/dag/TaskAttempt.java | 1 + .../dag/app/dag/TaskAttemptStateInternal.java | 1 - .../tez/dag/app/dag/event/TaskEventType.java | 1 - .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 50 ++------ .../apache/tez/dag/app/dag/impl/TaskImpl.java | 114 +++++++++++-------- .../tez/dag/app/dag/impl/TestTaskAttempt.java | 62 ---------- .../tez/dag/app/dag/impl/TestTaskImpl.java | 63 +++++++--- .../tez/engine/newapi/TezProcessorContext.java | 7 -- .../tez/common/TezTaskUmbilicalProtocol.java | 3 - .../newapi/impl/TezProcessorContextImpl.java | 5 - .../tez/engine/newapi/impl/TezUmbilical.java | 3 - .../apache/hadoop/mapred/LocalJobRunnerTez.java | 7 -- .../tez/mapreduce/newprocessor/MRTask.java | 32 ++---- .../apache/tez/mapreduce/processor/MRTask.java | 18 --- .../tez/mapreduce/TestUmbilicalProtocol.java | 7 -- 18 files changed, 141 insertions(+), 270 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ca59ac7/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java index 6e4e418..5034262 100644 --- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java +++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java @@ -333,12 +333,6 @@ public class YarnTezDagChild { throws IOException { return umbilical.canCommit(taskAttemptID); } - - @Override - public void commitPending(TezTaskAttemptID taskAttemptID) - throws IOException, InterruptedException { - umbilical.commitPending(taskAttemptID); - } }; // report non-pid to application master http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ca59ac7/tez-dag/src/main/java/org/apache/tez/dag/api/oldrecords/TaskAttemptState.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/oldrecords/TaskAttemptState.java b/tez-dag/src/main/java/org/apache/tez/dag/api/oldrecords/TaskAttemptState.java index 068913b..926835a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/oldrecords/TaskAttemptState.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/oldrecords/TaskAttemptState.java @@ -22,7 +22,6 @@ public enum TaskAttemptState { NEW, STARTING, RUNNING, - COMMIT_PENDING, SUCCEEDED, FAILED, KILLED http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ca59ac7/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java index 8c29fd9..2be9c5f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java @@ -44,9 +44,7 @@ import org.apache.tez.common.records.ProceedToCompletionResponse; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.dag.Task; -import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputConsumable; -import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely; import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent; import org.apache.tez.dag.app.rm.container.AMContainerImpl; @@ -346,34 +344,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements */ /** - * TaskAttempt is reporting that it is in commit_pending and it is waiting for - * the commit Response - * - *
- * Commit it a two-phased protocol. First the attempt informs the - * ApplicationMaster that it is - * {@link #commitPending(TaskAttemptID, TaskStatus)}. Then it repeatedly polls - * the ApplicationMaster whether it {@link #canCommit(TaskAttemptID)} This is - * a legacy from the centralized commit protocol handling by the JobTracker. - */ - @Override - public void commitPending(TezTaskAttemptID taskAttemptId) - throws IOException, InterruptedException { - LOG.info("Commit-pending state update from " + taskAttemptId.toString()); - // An attempt is asking if it can commit its output. This can be decided - // only by the task which is managing the multiple attempts. So redirect the - // request there. - taskHeartbeatHandler.progressing(taskAttemptId); - pingContainerHeartbeatHandler(taskAttemptId); - //Ignorable TaskStatus? - since a task will send a LastStatusUpdate - context.getEventHandler().handle( - new TaskAttemptEvent( - taskAttemptId, - TaskAttemptEventType.TA_COMMIT_PENDING) - ); - } - - /** * Child checking whether it can commit. * *
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ca59ac7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java index ae70022..0cc9163 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java @@ -70,6 +70,7 @@ public interface TaskAttempt { TezCounters getCounters(); float getProgress(); TaskAttemptState getState(); + TaskAttemptState getStateNoLock(); /** * Has attempt reached the final state or not. http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ca59ac7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttemptStateInternal.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttemptStateInternal.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttemptStateInternal.java index 9ad5460..a49c2a3 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttemptStateInternal.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttemptStateInternal.java @@ -30,7 +30,6 @@ public enum TaskAttemptStateInternal { START_WAIT, RUNNING, OUTPUT_CONSUMABLE, - COMMIT_PENDING, KILL_IN_PROGRESS, FAIL_IN_PROGRESS, KILLED, http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ca59ac7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java index d0ad8a0..a0b99a9 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java @@ -38,7 +38,6 @@ public enum TaskEventType { //Producer:TaskAttempt T_ATTEMPT_LAUNCHED, T_ATTEMPT_OUTPUT_CONSUMABLE, - T_ATTEMPT_COMMIT_PENDING, T_ATTEMPT_FAILED, T_ATTEMPT_SUCCEEDED, T_ATTEMPT_KILLED http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ca59ac7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index f171afe..1ae9dcd 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -176,7 +176,6 @@ public class TaskAttemptImpl implements TaskAttempt, .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING, TaskAttemptEventType.TA_STATUS_UPDATE, STATUS_UPDATER) .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, new OutputConsumableTransition()) //Optional, may not come in for all tasks. - .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_COMMIT_PENDING, new CommitPendingTransition()) .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_DONE, new SucceededTransition()) .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAILED, new TerminatedWhileRunningTransition(FAILED_HELPER)) .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_TIMED_OUT, new TerminatedWhileRunningTransition(FAILED_HELPER)) @@ -185,11 +184,11 @@ public class TaskAttemptImpl implements TaskAttempt, .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, new TerminatedWhileRunningTransition(KILLED_HELPER)) .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_CONTAINER_TERMINATING, new TerminatedWhileRunningTransition(FAILED_HELPER)) .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedWhileRunningTransition()) + .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES, new TerminatedWhileRunningTransition(FAILED_HELPER)) .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_STATUS_UPDATE, STATUS_UPDATER) .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE) // Stuck RPC. The client retries in a loop. - .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_COMMIT_PENDING, new CommitPendingAtOutputConsumableTransition()) .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_DONE, new SucceededTransition()) .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAILED, new TerminatedWhileRunningTransition(FAILED_HELPER)) .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_TIMED_OUT, new TerminatedWhileRunningTransition(FAILED_HELPER)) @@ -201,19 +200,6 @@ public class TaskAttemptImpl implements TaskAttempt, .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedBeforeRunningTransition()) .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES, new TerminatedWhileRunningTransition(FAILED_HELPER)) - .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_STATUS_UPDATE, STATUS_UPDATER) - .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) - .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_COMMIT_PENDING) - .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_DONE, new SucceededTransition()) - .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAILED, new TerminatedWhileRunningTransition(FAILED_HELPER)) - .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_TIMED_OUT, new TerminatedWhileRunningTransition(FAILED_HELPER)) - .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_FAIL_REQUEST, new TerminatedWhileRunningTransition(FAILED_HELPER)) - .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_KILL_REQUEST, new TerminatedWhileRunningTransition(KILLED_HELPER)) - .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, new TerminatedWhileRunningTransition(KILLED_HELPER)) - .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_CONTAINER_TERMINATING, new TerminatedWhileRunningTransition(FAILED_HELPER)) - .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedBeforeRunningTransition()) - .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES, new TerminatedWhileRunningTransition(FAILED_HELPER)) - .addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedWhileTerminating()) .addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) .addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILL_IN_PROGRESS, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES)) @@ -369,13 +355,18 @@ public class TaskAttemptImpl implements TaskAttempt, public TaskAttemptState getState() { readLock.lock(); try { - return getExternalState(stateMachine.getCurrentState()); + return getStateNoLock(); } finally { readLock.unlock(); } } @Override + public TaskAttemptState getStateNoLock() { + return getExternalState(stateMachine.getCurrentState()); + } + + @Override public boolean isFinished() { readLock.lock(); try { @@ -530,10 +521,8 @@ public class TaskAttemptImpl implements TaskAttempt, case START_WAIT: return TaskAttemptState.STARTING; case RUNNING: - return TaskAttemptState.RUNNING; - case COMMIT_PENDING: case OUTPUT_CONSUMABLE: - return TaskAttemptState.COMMIT_PENDING; + return TaskAttemptState.RUNNING; case FAILED: case FAIL_IN_PROGRESS: return TaskAttemptState.FAILED; @@ -1076,15 +1065,6 @@ public class TaskAttemptImpl implements TaskAttempt, } } - protected static class CommitPendingTransition implements - SingleArcTransition { - @Override - public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) { - ta.sendEvent(new TaskEventTAUpdate(ta.attemptId, - TaskEventType.T_ATTEMPT_COMMIT_PENDING)); - } - } - protected static class SucceededTransition implements SingleArcTransition { @Override @@ -1139,18 +1119,8 @@ public class TaskAttemptImpl implements TaskAttempt, public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) { super.transition(ta, event); ta.sendTaskAttemptCleanupEvent(); - } - } - - protected static class CommitPendingAtOutputConsumableTransition extends - CommitPendingTransition { - - @Override - public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) { - // TODO Figure out the interaction between OUTPUT_CONSUMABLE AND - // COMMIT_PENDING, Ideally both should not exist for the same task. - super.transition(ta, event); - LOG.info("Received a commit pending while in the OutputConsumable state"); + TaskAttemptEventContainerTerminated tEvent = (TaskAttemptEventContainerTerminated) event; + ta.addDiagnosticInfo(tEvent.getDiagnosticInfo()); } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ca59ac7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index ff9ded7..92a1859 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -167,9 +167,6 @@ public class TaskImpl implements Task, EventHandler { TaskEventType.T_ATTEMPT_OUTPUT_CONSUMABLE, new AttemptProcessingCompleteTransition()) .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING, - TaskEventType.T_ATTEMPT_COMMIT_PENDING, - new AttemptCommitPendingTransition()) - .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING, TaskEventType.T_ADD_SPEC_ATTEMPT, new RedundantScheduleTransition()) .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.SUCCEEDED, TaskEventType.T_ATTEMPT_SUCCEEDED, @@ -203,7 +200,6 @@ public class TaskImpl implements Task, EventHandler { TaskEventType.T_TERMINATE, TaskEventType.T_ATTEMPT_LAUNCHED, TaskEventType.T_ATTEMPT_OUTPUT_CONSUMABLE, - TaskEventType.T_ATTEMPT_COMMIT_PENDING, TaskEventType.T_ATTEMPT_FAILED, TaskEventType.T_ATTEMPT_SUCCEEDED, TaskEventType.T_ADD_SPEC_ATTEMPT)) @@ -584,17 +580,50 @@ public class TaskImpl implements Task, EventHandler { @Override public boolean canCommit(TezTaskAttemptID taskAttemptID) { - readLock.lock(); - boolean canCommit = false; + writeLock.lock(); try { - if (commitAttempt != null) { - canCommit = taskAttemptID.equals(commitAttempt); - LOG.info("Result of canCommit for " + taskAttemptID + ":" + canCommit); + if (getState() != TaskState.RUNNING) { + LOG.info("Task not running. Issuing kill to bad commit attempt " + taskAttemptID); + eventHandler.handle(new TaskAttemptEventKillRequest(taskAttemptID + , "Task not running. Bad attempt.")); + return false; } + if (commitAttempt == null) { + TaskAttempt ta = getAttempt(taskAttemptID); + if (ta == null) { + throw new TezUncheckedException("Unknown task for commit: " + taskAttemptID); + } + // Its ok to get a non-locked state snapshot since we handle changes of + // state in the task attempt. Dont want to deadlock here. + TaskAttemptState taState = ta.getStateNoLock(); + if (taState == TaskAttemptState.RUNNING) { + commitAttempt = taskAttemptID; + LOG.info(taskAttemptID + " given a go for committing the task output."); + return true; + } else { + LOG.info(taskAttemptID + " with state: " + taState + + " given a no-go for commit because its not running."); + return false; + } + } else { + if (commitAttempt.equals(taskAttemptID)) { + LOG.info(taskAttemptID + " given a go for committing the task output."); + return true; + } + // Don't think this can be a pluggable decision, so simply raise an + // event for the TaskAttempt to delete its output. + // Wait for commit attempt to succeed. Dont kill this. If commit + // attempt fails then choose a different committer. When commit attempt + // succeeds then this and others will be killed + LOG.info(commitAttempt + + " is current committer. Commit waiting for: " + + taskAttemptID); + return false; + } + } finally { - readLock.unlock(); + writeLock.unlock(); } - return canCommit; } // TODO remove hacky name lookup @@ -887,50 +916,34 @@ public class TaskImpl implements Task, EventHandler { } } - private static class AttemptCommitPendingTransition implements - SingleArcTransition { - @Override - public void transition(TaskImpl task, TaskEvent event) { - TaskEventTAUpdate ev = (TaskEventTAUpdate) event; - // The nextAttemptNumber is commit pending, decide on set the - // commitAttempt - TezTaskAttemptID attemptID = ev.getTaskAttemptID(); - if (task.commitAttempt == null) { - // TODO: validate attemptID - task.commitAttempt = attemptID; - LOG.info(attemptID + " given a go for committing the task output."); - } else { - // Don't think this can be a pluggable decision, so simply raise an - // event for the TaskAttempt to delete its output. - // TODO . Wait for commit attempt to succeed. Dont kill this. If commit - // attempt fails then choose a different committer. - LOG.info(task.commitAttempt - + " already given a go for committing the task output, so killing " - + attemptID); - task.eventHandler.handle(new TaskAttemptEventKillRequest(attemptID, - "Output being committed by alternate attemptId.")); - } - } - } - private static class AttemptSucceededTransition implements SingleArcTransition { @Override public void transition(TaskImpl task, TaskEvent event) { - task.handleTaskAttemptCompletion( - ((TaskEventTAUpdate) event).getTaskAttemptID(), + TezTaskAttemptID successTaId = ((TaskEventTAUpdate) event).getTaskAttemptID(); + + if (task.commitAttempt != null && + !task.commitAttempt.equals(successTaId)) { + // The succeeded attempt is not the one that was selected to commit + // This is impossible and has to be a bug + throw new TezUncheckedException("TA: " + successTaId + + " succeeded but TA: " + task.commitAttempt + + " was expected to commit and succeed"); + } + + task.handleTaskAttemptCompletion(successTaId, TezDependentTaskCompletionEvent.Status.SUCCEEDED); task.finishedAttempts++; --task.numberUncompletedAttempts; - task.successfulAttempt = ((TaskEventTAUpdate) event).getTaskAttemptID(); + task.successfulAttempt = successTaId; task.eventHandler.handle(new VertexEventTaskCompleted( task.taskId, TaskState.SUCCEEDED)); LOG.info("Task succeeded with attempt " + task.successfulAttempt); - // issue kill to all other attempts if (task.historyTaskStartGenerated) { task.logJobHistoryTaskFinishedEvent(); } + // issue kill to all other attempts for (TaskAttempt attempt : task.attempts.values()) { if (attempt.getID() != task.successfulAttempt && // This is okay because it can only talk us out of sending a @@ -954,12 +967,18 @@ public class TaskImpl implements Task, EventHandler { SingleArcTransition { @Override public void transition(TaskImpl task, TaskEvent event) { + TaskEventTAUpdate castEvent = (TaskEventTAUpdate) event; + if (task.commitAttempt !=null && + castEvent.getTaskAttemptID().equals(task.commitAttempt)) { + task.commitAttempt = null; + } task.handleTaskAttemptCompletion( - ((TaskEventTAUpdate) event).getTaskAttemptID(), + castEvent.getTaskAttemptID(), TezDependentTaskCompletionEvent.Status.KILLED); task.finishedAttempts++; - --task.numberUncompletedAttempts; - if (task.successfulAttempt == null) { + // we don't need a new event if we already have a spare + if (--task.numberUncompletedAttempts == 0 + && task.successfulAttempt == null) { task.addAndScheduleAttempt(); } } @@ -1001,7 +1020,8 @@ public class TaskImpl implements Task, EventHandler { public TaskStateInternal transition(TaskImpl task, TaskEvent event) { task.failedAttempts++; TaskEventTAUpdate castEvent = (TaskEventTAUpdate) event; - if (castEvent.getTaskAttemptID().equals(task.commitAttempt)) { + if (task.commitAttempt != null && + castEvent.getTaskAttemptID().equals(task.commitAttempt)) { task.commitAttempt = null; } if (castEvent.getTaskAttemptID().equals(task.outputConsumableAttempt)) { @@ -1143,6 +1163,10 @@ public class TaskImpl implements Task, EventHandler { } private void killUnfinishedAttempt(TaskAttempt attempt, String logMsg) { + if (commitAttempt != null && commitAttempt.equals(attempt)) { + LOG.info("Removing commit attempt: " + commitAttempt); + commitAttempt = null; + } if (attempt != null && !attempt.isFinished()) { eventHandler.handle(new TaskAttemptEventKillRequest(attempt.getID(), logMsg)); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ca59ac7/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index daac3c7..b5e283b 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -493,68 +493,6 @@ public class TestTaskAttempt { } @Test - // Ensure ContainerTerminated is handled correctly by the TaskAttempt - public void testContainerTerminatedWhileCommitting() throws Exception { - ApplicationId appId = ApplicationId.newInstance(1, 2); - ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( - appId, 0); - TezDAGID dagID = new TezDAGID(appId, 1); - TezVertexID vertexID = new TezVertexID(dagID, 1); - TezTaskID taskID = new TezTaskID(vertexID, 1); - TezTaskAttemptID taskAttemptID = new TezTaskAttemptID(taskID, 0); - - MockEventHandler eventHandler = new MockEventHandler(); - TaskAttemptListener taListener = mock(TaskAttemptListener.class); - when(taListener.getAddress()).thenReturn( - new InetSocketAddress("localhost", 0)); - - Configuration taskConf = new Configuration(); - taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); - taskConf.setBoolean("fs.file.impl.disable.cache", true); - - TaskLocationHint locationHint = new TaskLocationHint( - new HashSet(Arrays.asList(new String[] {"127.0.0.1"})), null); - Resource resource = Resource.newInstance(1024, 1); - - NodeId nid = NodeId.newInstance("127.0.0.1", 0); - ContainerId contId = ContainerId.newInstance(appAttemptId, 3); - Container container = mock(Container.class); - when(container.getId()).thenReturn(contId); - when(container.getNodeId()).thenReturn(nid); - when(container.getNodeHttpAddress()).thenReturn("localhost:0"); - - AppContext appCtx = mock(AppContext.class); - AMContainerMap containers = new AMContainerMap( - mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), - appCtx); - containers.addContainerIfNew(container); - - doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); - doReturn(containers).when(appCtx).getAllContainers(); - - TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, - taListener, taskConf, new SystemClock(), - mock(TaskHeartbeatHandler.class), appCtx, locationHint, false, - resource, createFakeContainerContext()); - - taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, null)); - // At state STARTING. - taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId, - null)); - assertEquals("Task attempt is not in running state", taImpl.getState(), - TaskAttemptState.RUNNING); - taImpl.handle(new TaskAttemptEvent(taskAttemptID, - TaskAttemptEventType.TA_COMMIT_PENDING)); - assertEquals("Task attempt is not in commit pending state", - taImpl.getState(), TaskAttemptState.COMMIT_PENDING); - taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID, null)); - assertFalse( - "InternalError occurred trying to handle TA_CONTAINER_TERMINATED", - eventHandler.internalError); - // TODO Verify diagnostics - } - - @Test // Ensure ContainerTerminating and ContainerTerminated is handled correctly by // the TaskAttempt public void testContainerTerminatedAfterSuccess() throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ca59ac7/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java index ad36b7b..be3915d 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java @@ -146,12 +146,6 @@ public class TestTaskImpl { assertTaskRunningState(); } - private void commitTaskAttempt(TezTaskAttemptID attemptId) { - mockTask.handle(new TaskEventTAUpdate(attemptId, - TaskEventType.T_ATTEMPT_COMMIT_PENDING)); - assertTaskRunningState(); - } - private void updateAttemptProgress(MockTaskAttemptImpl attempt, float p) { attempt.setProgress(p); } @@ -315,9 +309,9 @@ public class TestTaskImpl { TezTaskID taskId = getNewTaskID(); scheduleTaskAttempt(taskId); launchTaskAttempt(mockTask.getLastAttempt().getID()); - updateAttemptState(mockTask.getLastAttempt(), - TaskAttemptState.COMMIT_PENDING); - commitTaskAttempt(mockTask.getLastAttempt().getID()); + updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING); + assertTrue("First attempt should commit", + mockTask.canCommit(mockTask.getLastAttempt().getID())); // During the task attempt commit there is an exception which causes // the attempt to fail @@ -325,15 +319,54 @@ public class TestTaskImpl { failRunningTaskAttempt(mockTask.getLastAttempt().getID()); assertEquals(2, mockTask.getAttemptList().size()); + + assertFalse("First attempt should not commit", + mockTask.canCommit(mockTask.getAttemptList().get(0).getID())); + updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING); + assertTrue("Second attempt should commit", + mockTask.canCommit(mockTask.getLastAttempt().getID())); + updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.SUCCEEDED); - commitTaskAttempt(mockTask.getLastAttempt().getID()); mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), TaskEventType.T_ATTEMPT_SUCCEEDED)); + assertTaskSucceededState(); + } + + + @Test + public void testChangeCommitTaskAttempt() { + TezTaskID taskId = getNewTaskID(); + scheduleTaskAttempt(taskId); + launchTaskAttempt(mockTask.getLastAttempt().getID()); + updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING); + + // Add a speculative task attempt that succeeds + mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), + TaskEventType.T_ADD_SPEC_ATTEMPT)); + launchTaskAttempt(mockTask.getLastAttempt().getID()); + updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING); + + assertTrue("Second attempt should commit", + mockTask.canCommit(mockTask.getAttemptList().get(1).getID())); assertFalse("First attempt should not commit", mockTask.canCommit(mockTask.getAttemptList().get(0).getID())); - assertTrue("Second attempt should commit", - mockTask.canCommit(mockTask.getLastAttempt().getID())); + + // During the task attempt commit there is an exception which causes + // the second attempt to fail + updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.FAILED); + failRunningTaskAttempt(mockTask.getLastAttempt().getID()); + + assertEquals(2, mockTask.getAttemptList().size()); + + assertFalse("Second attempt should not commit", + mockTask.canCommit(mockTask.getAttemptList().get(1).getID())); + assertTrue("First attempt should commit", + mockTask.canCommit(mockTask.getAttemptList().get(0).getID())); + + updateAttemptState(mockTask.getAttemptList().get(0), TaskAttemptState.SUCCEEDED); + mockTask.handle(new TaskEventTAUpdate(mockTask.getAttemptList().get(0).getID(), + TaskEventType.T_ATTEMPT_SUCCEEDED)); assertTaskSucceededState(); } @@ -349,7 +382,6 @@ public class TestTaskImpl { mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), TaskEventType.T_ADD_SPEC_ATTEMPT)); launchTaskAttempt(mockTask.getLastAttempt().getID()); - commitTaskAttempt(mockTask.getLastAttempt().getID()); mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(), TaskEventType.T_ATTEMPT_SUCCEEDED)); @@ -456,6 +488,11 @@ public class TestTaskImpl { public TaskAttemptState getState() { return state; } + + @Override + public TaskAttemptState getStateNoLock() { + return state; + } } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ca59ac7/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezProcessorContext.java ---------------------------------------------------------------------- diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezProcessorContext.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezProcessorContext.java index cbe6e34..5b44f23 100644 --- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezProcessorContext.java +++ b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezProcessorContext.java @@ -38,11 +38,4 @@ public interface TezProcessorContext extends TezTaskContext { */ public boolean canCommit() throws IOException; - /** - * Tell the AM that this processor has a pending commit - * @throws IOException - * @throws InterruptedException - */ - public void commitPending() throws IOException, InterruptedException; - } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ca59ac7/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java b/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java index 0ff424c..28991a8 100644 --- a/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java +++ b/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java @@ -42,9 +42,6 @@ public interface TezTaskUmbilicalProtocol extends Master { ContainerTask getTask(ContainerContext containerContext) throws IOException; - void commitPending(TezTaskAttemptID taskId) - throws IOException, InterruptedException; - boolean canCommit(TezTaskAttemptID taskid) throws IOException; // TODO TEZAM5 Can commitPending and outputReady be collapsed into a single http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ca59ac7/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java index fd3cdf0..d710f7a 100644 --- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java +++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java @@ -83,9 +83,4 @@ public class TezProcessorContextImpl extends TezTaskContextImpl return tezUmbilical.canCommit(this.taskAttemptID); } - @Override - public void commitPending() throws IOException, InterruptedException { - tezUmbilical.commitPending(this.taskAttemptID); - } - } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ca59ac7/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java index 43f5edc..5889622 100644 --- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java +++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java @@ -33,7 +33,4 @@ public interface TezUmbilical { public boolean canCommit(TezTaskAttemptID taskAttemptID) throws IOException; - public void commitPending(TezTaskAttemptID taskAttemptID) - throws IOException, InterruptedException; - } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ca59ac7/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java index 4ad1026..1362396 100644 --- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java +++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java @@ -606,13 +606,6 @@ public class LocalJobRunnerTez implements ClientProtocol { return null; } - @Override - public void commitPending(TezTaskAttemptID taskId) - throws IOException, InterruptedException { - // TODO Auto-generated method stub - // TODO TODONEWTEZ - } - } public LocalJobRunnerTez(Configuration conf) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ca59ac7/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTask.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTask.java index 2db823d..d71dba0 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTask.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTask.java @@ -464,26 +464,6 @@ public abstract class MRTask { if (output instanceof SimpleOutput) { SimpleOutput sOut = (SimpleOutput)output; if (sOut.isCommitRequired()) { - processorContext.commitPending(); - // TODO NEWTEZ TEZ-439 - // int retries = MAX_RETRIES; - // setState(TezTaskStatus.State.COMMIT_PENDING); - // //say the task tracker that task is commit pending - // // TODO TEZAM2 - Why is the commitRequired check missing ? - // while (true) { - // try { - // umbilical.commitPending(taskAttemptId, status); - // break; - // } catch (InterruptedException ie) { - // // ignore - // } catch (IOException ie) { - // LOG.warn("Failure sending commit pending: " + - // StringUtils.stringifyException(ie)); - // if (--retries == 0) { - // System.exit(67); - // } - // } - // } //wait for commit approval and commit // TODO EVENTUALLY - Commit is not required for map tasks. // skip a couple of RPCs before exiting. @@ -517,14 +497,24 @@ public abstract class MRTask { } private void commit(SimpleOutput output) throws IOException { - while (!processorContext.canCommit()) { + int retries = 3; + while (true) { // This will loop till the AM asks for the task to be killed. As // against, the AM sending a signal to the task to kill itself // gracefully. try { + if (processorContext.canCommit()) { + break; + } Thread.sleep(1000); } catch(InterruptedException ie) { //ignore + } catch (IOException ie) { + LOG.warn("Failure sending canCommit: " + + StringUtils.stringifyException(ie)); + if (--retries == 0) { + throw ie; + } } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ca59ac7/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java index a6b5470..0653cc8 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java @@ -400,24 +400,6 @@ public abstract class MRTask extends RunningTaskContext { TezTaskUmbilicalProtocol umbilical = getUmbilical(); // TODO TEZ Interaciton between Commit and OutputReady. Merge ? if (isCommitRequired()) { - int retries = MAX_RETRIES; - setState(TezTaskStatus.State.COMMIT_PENDING); - // say the task tracker that task is commit pending - // TODO TEZAM2 - Why is the commitRequired check missing ? - while (true) { - try { - umbilical.commitPending(taskAttemptId); - break; - } catch (InterruptedException ie) { - // ignore - } catch (IOException ie) { - LOG.warn("Failure sending commit pending: " + - StringUtils.stringifyException(ie)); - if (--retries == 0) { - System.exit(67); - } - } - } //wait for commit approval and commit // TODO EVENTUALLY - Commit is not required for map tasks. skip a couple of RPCs before exiting. commit(umbilical, reporter, committer); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ca59ac7/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java index 07f3a2c..e5cc902 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java @@ -79,13 +79,6 @@ public class TestUmbilicalProtocol implements TezTaskUmbilicalProtocol { return null; } - - @Override - public void commitPending(TezTaskAttemptID taskId) - throws IOException, InterruptedException { - LOG.info("Got 'commit-pending' from " + taskId); - } - @Override public boolean canCommit(TezTaskAttemptID taskid) throws IOException { LOG.info("Got 'can-commit' from " + taskid);