tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject git commit: TEZ-140. Tez/hive task failure on large DAG with Invalid event: TA_SCHEDULE at KILLED (bikas)
Date Fri, 17 Jan 2014 05:34:40 GMT
Updated Branches:
  refs/heads/master 56f42f4b7 -> 9138f7906


TEZ-140. Tez/hive task failure on large DAG with Invalid event: TA_SCHEDULE at KILLED (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/9138f790
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/9138f790
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/9138f790

Branch: refs/heads/master
Commit: 9138f790659cb074c70571592b769eda0095172f
Parents: 56f42f4
Author: Bikas Saha <bikas@apache.org>
Authored: Thu Jan 16 21:34:03 2014 -0800
Committer: Bikas Saha <bikas@apache.org>
Committed: Thu Jan 16 21:34:26 2014 -0800

----------------------------------------------------------------------
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |  4 ++--
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |  3 +++
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 22 ++++++++++++++------
 3 files changed, 21 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9138f790/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 0e4b3b9..8ecabd7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -223,10 +223,10 @@ public class TaskAttemptImpl implements TaskAttempt,
         .addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.FAIL_IN_PROGRESS,
EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_CONTAINER_PREEMPTED,
TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT,
TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED,
TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_OUTPUT_FAILED))
 
         .addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
-        .addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED,
EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_CONTAINER_PREEMPTED,
TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT,
TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED,
TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED,
TaskAttemptEventType.TA_OUTPUT_FAILED))
+        .addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED,
EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_SCHEDULE, TaskAttemptEventType.TA_CONTAINER_PREEMPTED,
TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT,
TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED,
TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED,
TaskAttemptEventType.TA_OUTPUT_FAILED))
 
         .addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
-        .addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED,
EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_CONTAINER_PREEMPTED,
TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT,
TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED,
TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED,
TaskAttemptEventType.TA_OUTPUT_FAILED))
+        .addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED,
EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_SCHEDULE, TaskAttemptEventType.TA_CONTAINER_PREEMPTED,
TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT,
TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED,
TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED,
TaskAttemptEventType.TA_OUTPUT_FAILED))
 
         // How will duplicate history events be handled ?
         // TODO Maybe consider not failing REDUCE tasks in this case. Also, MAP_TASKS in
case there's only one phase in the job.

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9138f790/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 416f9f8..9451d6f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -1054,6 +1054,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent>
{
           task.addAndScheduleAttempt();
         }
       } else {
+        LOG.info("Failing task: " + task.getTaskId()
+            + ", currentFailedAttempts: " + task.failedAttempts + ", maxAttempts: "
+            + task.maxAttempts);
         task.handleTaskAttemptCompletion(
             ((TaskEventTAUpdate) event).getTaskAttemptID(),
             TaskAttemptStateInternal.FAILED);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9138f790/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 8eb8327..d785c78 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1051,6 +1051,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     if (vertex.completedTaskCount == vertex.tasks.size()) {
       //Only succeed if tasks complete successfully and no terminationCause is registered.
       if(vertex.succeededTaskCount == vertex.tasks.size() && vertex.terminationCause
== null) {
+        LOG.info("Vertex succeeded: " + vertex.logIdentifier);
         try {
           if (!vertex.committed.getAndSet(true)) {
             // commit only once
@@ -1130,9 +1131,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
    * Set the terminationCause and send a kill-message to all tasks.
    * The task-kill messages are only sent once.
    */
-  void enactKill(VertexTerminationCause trigger,
+  void tryEnactKill(VertexTerminationCause trigger,
       TaskTerminationCause taskterminationCause) {
     if(trySetTerminationCause(trigger)){
+      LOG.info("Killing tasks in vertex: " + logIdentifier + " due to trigger: "
+          + trigger);
       for (Task task : tasks.values()) {
         eventHandler.handle(
             new TaskEventTermination(task.getTaskId(), taskterminationCause));
@@ -1765,9 +1768,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       VertexEventTermination vet = (VertexEventTermination) event;
       VertexTerminationCause trigger = vet.getTerminationCause();
       switch(trigger){
-        case DAG_KILL : vertex.enactKill(trigger, TaskTerminationCause.DAG_KILL); break;
-        case OTHER_VERTEX_FAILURE: vertex.enactKill(trigger, TaskTerminationCause.OTHER_VERTEX_FAILURE);
break;
-        case OWN_TASK_FAILURE: vertex.enactKill(trigger, TaskTerminationCause.OTHER_TASK_FAILURE);
break;
+        case DAG_KILL : vertex.tryEnactKill(trigger, TaskTerminationCause.DAG_KILL); break;
+        case OWN_TASK_FAILURE: vertex.tryEnactKill(trigger, TaskTerminationCause.OTHER_TASK_FAILURE);
break;
+        case ROOT_INPUT_INIT_FAILURE:
+        case COMMIT_FAILURE:
+        case INVALID_NUM_OF_TASKS: 
+        case INIT_FAILURE:
+        case INTERNAL_ERROR:
+        case OTHER_VERTEX_FAILURE: vertex.tryEnactKill(trigger, TaskTerminationCause.OTHER_VERTEX_FAILURE);
break;
         default://should not occur
           throw new TezUncheckedException("VertexKilledTransition: event.terminationCause
is unexpected: " + trigger);
       }
@@ -1841,7 +1849,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       if (taskEvent.getState() == TaskState.SUCCEEDED) {
         taskSucceeded(vertex, task);
       } else if (taskEvent.getState() == TaskState.FAILED) {
-        vertex.enactKill(VertexTerminationCause.OWN_TASK_FAILURE, TaskTerminationCause.OTHER_TASK_FAILURE);
+        LOG.info("Failing vertex: " + vertex.logIdentifier + 
+            " because task failed: " + taskEvent.getTaskID());
+        vertex.tryEnactKill(VertexTerminationCause.OWN_TASK_FAILURE, TaskTerminationCause.OTHER_TASK_FAILURE);
         forceTransitionToKillWait = true;
         taskFailed(vertex, task);
       } else if (taskEvent.getState() == TaskState.KILLED) {
@@ -1915,7 +1925,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       LOG.info(vertex.getVertexId() + " failed due to post-commit rescheduling of "
           + ((VertexEventTaskReschedule)event).getTaskID());
       // terminate any running tasks
-      vertex.enactKill(VertexTerminationCause.OWN_TASK_FAILURE,
+      vertex.tryEnactKill(VertexTerminationCause.OWN_TASK_FAILURE,
           TaskTerminationCause.OWN_TASK_FAILURE);
       // since the DAG thinks this vertex is completed it must be notified of
       // an error


Mime
View raw message