tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject tez git commit: TEZ-3700 addendum. Consumer attempt should kill itself instead of failing during validation checks with final merge avoidance (rbalamohan)
Date Mon, 08 May 2017 18:38:54 GMT
Repository: tez
Updated Branches:
  refs/heads/master d9f542f4c -> 93bd26ebb


TEZ-3700 addendum. Consumer attempt should kill itself instead of
failing during validation checks with final merge avoidance (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/93bd26eb
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/93bd26eb
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/93bd26eb

Branch: refs/heads/master
Commit: 93bd26ebbd159d62ebd33c37affbd9b0837679f1
Parents: d9f542f
Author: Siddharth Seth <sseth@apache.org>
Authored: Mon May 8 11:35:24 2017 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Mon May 8 11:35:24 2017 -0700

----------------------------------------------------------------------
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  9 +++---
 .../common/shuffle/impl/ShuffleManager.java     | 34 +++++++++++++++-----
 .../orderedgrouped/ShuffleScheduler.java        |  2 +-
 .../impl/TestShuffleInputEventHandlerImpl.java  | 14 ++++++--
 4 files changed, 43 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/93bd26eb/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 246f477..79b84e8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -727,8 +727,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
   // must be a random access structure
   
   private final List<EventInfo> onDemandRouteEvents = Lists.newArrayListWithCapacity(1000);
-  // Do not send any events if attempt is already failed. TaskAttemptId
-  private final Set<TezTaskAttemptID> failedTaskIds = Sets.newHashSet();
+  // Do not send any events if attempt is failed due to INPUT_FAILED_EVENTS.
+  private final Set<TezTaskAttemptID> failedTaskAttemptIDs = Sets.newHashSet();
   private final ReadWriteLock onDemandRouteEventsReadWriteLock = new ReentrantReadWriteLock();
   private final Lock onDemandRouteEventsReadLock = onDemandRouteEventsReadWriteLock.readLock();
   private final Lock onDemandRouteEventsWriteLock = onDemandRouteEventsReadWriteLock.writeLock();
@@ -3986,7 +3986,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
     try {
       if (tezEvent.getEventType() == EventType.DATA_MOVEMENT_EVENT ||
           tezEvent.getEventType() == EventType.COMPOSITE_DATA_MOVEMENT_EVENT) {
-        if (failedTaskIds.contains(tezEvent.getSourceInfo().getTaskAttemptID())) {
+        // Prevent any failed task (due to INPUT_FAILED_EVENT) sending events downstream.
E.g LLAP
+        if (failedTaskAttemptIDs.contains(tezEvent.getSourceInfo().getTaskAttemptID())) {
           return;
         }
       }
@@ -4004,7 +4005,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
             // can be obsoleted by an input failed event from the
             // same source edge+task
             eventInfo.isObsolete = true;
-            failedTaskIds.add(tezEvent.getSourceInfo().getTaskAttemptID());
+            failedTaskAttemptIDs.add(tezEvent.getSourceInfo().getTaskAttemptID());
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/93bd26eb/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index b2ff51d..8716b92 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -392,18 +392,28 @@ public class ShuffleManager implements FetcherCallback {
     if (input.canRetrieveInputInChunks()) {
       ShuffleEventInfo eventInfo = shuffleInfoEventsMap.get(input.getInputIdentifier());
       if (eventInfo != null && input.getAttemptNumber() != eventInfo.attemptNum)
{
-        //speculative attempts or failure attempts. Fail fast here.
-        reportFatalError(new IOException(), input + " already exists. "
-            + "Previous attempt's data could have been already merged "
-            + "to memory/disk outputs.  Failing the fetch early. currentAttemptNum=" + eventInfo
-            .attemptNum + ", eventsProcessed=" + eventInfo.eventsProcessed + ", newAttemptNum="
+
-            input.getAttemptNumber());
-        return false;
+        if (eventInfo.scheduledForDownload || !eventInfo.eventsProcessed.isEmpty()) {
+          IOException exception = new IOException("Previous event already got scheduled for
" +
+              input + ". Previous attempt's data could have been already merged "
+              + "to memory/disk outputs.  Killing (self) this task early."
+              + " currentAttemptNum=" + eventInfo.attemptNum
+              + ", eventsProcessed=" + eventInfo.eventsProcessed
+              + ", scheduledForDownload=" + eventInfo.scheduledForDownload
+              + ", newAttemptNum=" + input.getAttemptNumber());
+          String message = "Killing self as previous attempt data could have been consumed";
+          killSelf(exception, message);
+          return false;
+        }
       }
     }
     return true;
   }
 
+  void killSelf(Exception exception, String message) {
+    LOG.error(message, exception);
+    this.inputContext.killSelf(exception, message);
+  }
+
   @VisibleForTesting
   Fetcher constructFetcherForHost(InputHost inputHost, Configuration conf) {
 
@@ -465,6 +475,12 @@ public class ShuffleManager implements FetcherCallback {
     if (inputHost.getNumPendingPartitions() > 0) {
       pendingHosts.add(inputHost); //add it to queue
     }
+    for(InputAttemptIdentifier input : pendingInputsOfOnePartition.getInputs()) {
+      ShuffleEventInfo eventInfo = shuffleInfoEventsMap.get(input.getInputIdentifier());
+      if (eventInfo != null) {
+        eventInfo.scheduledForDownload = true;
+      }
+    }
     fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(),
         pendingInputsOfOnePartition.getPartition(),
             pendingInputsOfOnePartition.getInputs());
@@ -575,6 +591,7 @@ public class ShuffleManager implements FetcherCallback {
     int finalEventId = -1; //0 indexed
     int attemptNum;
     String id;
+    boolean scheduledForDownload; // whether chunks got scheduled for download
 
 
     ShuffleEventInfo(InputAttemptIdentifier input) {
@@ -606,7 +623,8 @@ public class ShuffleManager implements FetcherCallback {
 
     public String toString() {
       return "[eventsProcessed=" + eventsProcessed + ", finalEventId=" + finalEventId
-          +  ", id=" + id + ", attemptNum=" + attemptNum + "]";
+          +  ", id=" + id + ", attemptNum=" + attemptNum
+          + ", scheduledForDownload=" + scheduledForDownload + "]";
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/93bd26eb/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 73a6214..39f2138 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -714,7 +714,7 @@ class ShuffleScheduler {
 
   @VisibleForTesting
   void killSelf(Exception exception, String message) {
-    LOG.error(srcNameTrimmed + ": " + "Reporting exception for input", exception);
+    LOG.error(message, exception);
     try {
       this.close();
     } finally {

http://git-wip-us.apache.org/repos/asf/tez/blob/93bd26eb/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
index 6bcbeb6..af52f90 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
@@ -224,10 +224,13 @@ public class TestShuffleInputEventHandlerImpl {
         PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1);
     verify(shuffleManager, times(2)).addKnownInput(eq(HOST), eq(PORT), eq(expectedId2), eq(0));
 
+    // Let attemptNum 0 be scheduled.
+    shuffleManager.shuffleInfoEventsMap.get(expectedId2.getInputIdentifier()).scheduledForDownload
= true;
+
     //0--> 1 with spill id 1 (attemptNum 1).  This should report exception
     dme = createDataMovementEvent(true, 0, 1, 1, false, new BitSet(), 4, 1);
     handler.handleEvents(Collections.singletonList(dme));
-    verify(inputContext).reportFailure(any(TaskFailureType.class), any(Throwable.class),
anyString());
+    verify(inputContext).killSelf(any(Throwable.class), anyString());
   }
 
   /**
@@ -253,10 +256,13 @@ public class TestShuffleInputEventHandlerImpl {
         PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1);
     verify(shuffleManager, times(1)).addKnownInput(eq(HOST), eq(PORT), eq(expected), eq(0));
 
+    // Let attemptNum 1 be scheduled.
+    shuffleManager.shuffleInfoEventsMap.get(expected.getInputIdentifier()).scheduledForDownload
= true;
+
     //Now send attemptNum 0.  This should throw exception, because attempt #1 is already
added
     dme = createDataMovementEvent(true, 0, 1, 0, false, new BitSet(), 4, 0);
     handler.handleEvents(Collections.singletonList(dme));
-    verify(inputContext).reportFailure(any(TaskFailureType.class), any(Throwable.class),
anyString());
+    verify(inputContext).killSelf(any(Throwable.class), anyString());
   }
 
   /**
@@ -291,11 +297,13 @@ public class TestShuffleInputEventHandlerImpl {
         PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1);
     verify(shuffleManager, times(2)).addCompletedInputWithNoData(expected);
 
+    // Let attemptNum 0 be scheduled.
+    shuffleManager.shuffleInfoEventsMap.get(expected.getInputIdentifier()).scheduledForDownload
= true;
 
     //Now send attemptNum 1.  This should throw exception, because attempt #1 is already
added
     dme = createDataMovementEvent(true, 0, 1, 0, false, new BitSet(), 4, 1);
     handler.handleEvents(Collections.singletonList(dme));
-    verify(inputContext).reportFailure(any(TaskFailureType.class), any(Throwable.class),
anyString());
+    verify(inputContext).killSelf(any(Throwable.class), anyString());
   }
 
   private Event createDataMovementEvent(boolean addSpillDetails, int srcIdx, int targetIdx,


Mime
View raw message