tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjf...@apache.org
Subject tez git commit: TEZ-2781. Fallback to send only TaskAttemptFailedEvent if taskFailed heartbeat fails (zjffdu)
Date Fri, 09 Oct 2015 07:35:16 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.5 7600f3861 -> fa31127c1


TEZ-2781. Fallback to send only TaskAttemptFailedEvent if taskFailed heartbeat fails (zjffdu)

(cherry picked from commit f9d15c8695de7975817631b051450336bc5eadee)

Conflicts:
	CHANGES.txt
	tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java

(cherry picked from commit 8d49fd5285016fb64ebccdc9cf31c408c79ebaaf)

Conflicts:
	CHANGES.txt
	tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
	tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java

(cherry picked from commit 3a8bf0e0afb0a30d5c4cbf82d1e2ce9e0dd22b4a)

Conflicts:
	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fa31127c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fa31127c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fa31127c

Branch: refs/heads/branch-0.5
Commit: fa31127c198e5a960a08d08663e06a974a6e8e93
Parents: 7600f38
Author: Jeff Zhang <zjffdu@apache.org>
Authored: Fri Oct 9 15:07:17 2015 +0800
Committer: Jeff Zhang <zjffdu@apache.org>
Committed: Fri Oct 9 15:35:08 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                           |  1 +
 .../org/apache/tez/runtime/task/TaskReporter.java     | 14 +++++++++++---
 .../org/apache/tez/test/TestExceptionPropagation.java | 12 +++++++++++-
 3 files changed, 23 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/fa31127c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3a46867..1055fa0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
   TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
 
 ALL CHANGES:
+  TEZ-2781. Fallback to send only TaskAttemptFailedEvent if taskFailed heartbeat fails
   TEZ-2398. Flaky test: TestFaultTolerance
   TEZ-2808. Race condition between preemption and container assignment
   TEZ-1929. pre-empted tasks should be marked as killed instead of failed

http://git-wip-us.apache.org/repos/asf/tez/blob/fa31127c/tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
index defc6bd..4def43f 100644
--- a/tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
+++ b/tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
@@ -332,8 +332,15 @@ public class TaskReporter {
      */
     private boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics,
         EventMetaData srcMeta) throws IOException, TezException {
-      TezEvent statusUpdateEvent = new TezEvent(new TaskStatusUpdateEvent(task.getCounters(),
-          task.getProgress()), updateEventMetadata);
+      List<TezEvent> tezEvents = new ArrayList<TezEvent>();
+      try {
+        TezEvent statusUpdateEvent = new TezEvent(new TaskStatusUpdateEvent(task.getCounters(),
+            task.getProgress()), updateEventMetadata);
+        tezEvents.add(statusUpdateEvent);
+      } catch (Exception e) {
+        // Counter may exceed limitation
+        LOG.warn("Error when get constructing TaskStatusUpdateEvent");
+      }
       if (diagnostics == null) {
         diagnostics = ExceptionUtils.getStackTrace(t);
       } else {
@@ -341,7 +348,8 @@ public class TaskReporter {
       }
       TezEvent taskAttemptFailedEvent = new TezEvent(new TaskAttemptFailedEvent(diagnostics),
           srcMeta == null ? updateEventMetadata : srcMeta);
-      return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskAttemptFailedEvent)).shouldDie;
+      tezEvents.add(taskAttemptFailedEvent);
+      return !heartbeat(tezEvents).shouldDie;
     }
 
     private void addEvents(TezTaskAttemptID taskAttemptID, Collection<TezEvent> events)
{

http://git-wip-us.apache.org/repos/asf/tez/blob/fa31127c/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
index bb9888a..d30d73f 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
@@ -223,7 +223,11 @@ public class TestExceptionPropagation {
         DAGStatus dagStatus = dagClient.waitForCompletion();
         String diagnostics = StringUtils.join(dagStatus.getDiagnostics(), ",");
         LOG.info("Diagnostics:" + diagnostics);
-        assertTrue(diagnostics.contains(exLocation.name()));
+        if (exLocation == ExceptionLocation.PROCESSOR_COUNTER_EXCEEDED) {
+          assertTrue(diagnostics.contains("Too many counters"));
+        } else {
+          assertTrue(diagnostics.contains(exLocation.name()));
+        }
       }
     } finally {
       stopSessionClient();
@@ -300,6 +304,7 @@ public class TestExceptionPropagation {
     // PROCESSOR_HANDLE_EVENTS
     PROCESSOR_RUN_ERROR, PROCESSOR_CLOSE_ERROR, PROCESSOR_INITIALIZE_ERROR,
     PROCESSOR_RUN_EXCEPTION, PROCESSOR_CLOSE_EXCEPTION, PROCESSOR_INITIALIZE_EXCEPTION,
+    PROCESSOR_COUNTER_EXCEEDED,
 
     // VM
     VM_INITIALIZE, VM_ON_ROOTVERTEX_INITIALIZE,VM_ON_SOURCETASK_COMPLETED, VM_ON_VERTEX_STARTED,
@@ -624,6 +629,11 @@ public class TestExceptionPropagation {
         throw new Error(this.exLocation.name());
       } else if (this.exLocation == ExceptionLocation.PROCESSOR_RUN_EXCEPTION) {
         throw new Exception(this.exLocation.name());
+      } else if (this.exLocation == ExceptionLocation.PROCESSOR_COUNTER_EXCEEDED) {
+        // simulate the counter limitation exceeded
+        for (int i=0;i< TezConfiguration.TEZ_COUNTERS_MAX_DEFAULT+1; ++i) {
+          getContext().getCounters().findCounter("mycounter", "counter_"+i).increment(1);
+        }
       }
     }
 


Mime
View raw message