tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbalamo...@apache.org
Subject tez git commit: TEZ-3700. Consumer attempt should kill itself instead of failing during validation checks with final merge avoidance (rbalamohan)
Date Wed, 03 May 2017 00:43:03 GMT
Repository: tez
Updated Branches:
  refs/heads/master ebf3fb133 -> 7b30785bd


TEZ-3700. Consumer attempt should kill itself instead of failing during validation checks
with final merge avoidance (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7b30785b
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7b30785b
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7b30785b

Branch: refs/heads/master
Commit: 7b30785bdfee141e7a2dce80749f756fb5ec2d06
Parents: ebf3fb1
Author: Rajesh Balamohan <rbalamohan@apache.org>
Authored: Wed May 3 06:12:51 2017 +0530
Committer: Rajesh Balamohan <rbalamohan@apache.org>
Committed: Wed May 3 06:12:51 2017 +0530

----------------------------------------------------------------------
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  9 +++
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 15 ++++
 .../orderedgrouped/ShuffleScheduler.java        | 83 +++++++++++++++-----
 ...tShuffleInputEventHandlerOrderedGrouped.java | 44 ++++++++++-
 4 files changed, 130 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/7b30785b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index ab17fe4..246f477 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -727,6 +727,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
   // must be a random access structure
   
   private final List<EventInfo> onDemandRouteEvents = Lists.newArrayListWithCapacity(1000);
+  // Do not send any events if attempt is already failed. TaskAttemptId
+  private final Set<TezTaskAttemptID> failedTaskIds = Sets.newHashSet();
   private final ReadWriteLock onDemandRouteEventsReadWriteLock = new ReentrantReadWriteLock();
   private final Lock onDemandRouteEventsReadLock = onDemandRouteEventsReadWriteLock.readLock();
   private final Lock onDemandRouteEventsWriteLock = onDemandRouteEventsReadWriteLock.writeLock();
@@ -3982,6 +3984,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
   private void processOnDemandEvent(TezEvent tezEvent, Edge srcEdge, int srcTaskIndex) {
     onDemandRouteEventsWriteLock.lock();
     try {
+      if (tezEvent.getEventType() == EventType.DATA_MOVEMENT_EVENT ||
+          tezEvent.getEventType() == EventType.COMPOSITE_DATA_MOVEMENT_EVENT) {
+        if (failedTaskIds.contains(tezEvent.getSourceInfo().getTaskAttemptID())) {
+          return;
+        }
+      }
       onDemandRouteEvents.add(new EventInfo(tezEvent, srcEdge, srcTaskIndex));
       if (tezEvent.getEventType() == EventType.INPUT_FAILED_EVENT) {
         for (EventInfo eventInfo : onDemandRouteEvents) {
@@ -3996,6 +4004,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
             // can be obsoleted by an input failed event from the
             // same source edge+task
             eventInfo.isObsolete = true;
+            failedTaskIds.add(tezEvent.getSourceInfo().getTaskAttemptID());
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/7b30785b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index bc06fd0..76ccf91 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -3052,6 +3052,21 @@ public class TestVertexImpl {
     Assert.assertEquals(12, fromEventId);
     Assert.assertEquals(1, eventInfo.getEvents().size());
     Assert.assertEquals(EventType.INPUT_FAILED_EVENT, eventInfo.getEvents().get(0).getEventType());
+
+    // Let failed task send more event
+    for (int i=11; i<14; ++i) {
+      v4.handle(new VertexEventRouteEvent(v4.getVertexId(), Collections.singletonList(
+          new TezEvent(DataMovementEvent.create(0, null),
+              new EventMetaData(EventProducerConsumerType.OUTPUT, v3.getName(), v3.getName(),
v3TaId)))));
+    }
+    dispatcher.await();
+    // 11 events + 1 INPUT_FAILED_EVENT.
+    // Events sent out later by failed tasks should not be available.
+    Assert.assertEquals(12, v4.getOnDemandRouteEvents().size());
+
+    fromEventId = 0;
+    eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 0, 100);
+    Assert.assertEquals(EventType.INPUT_FAILED_EVENT, eventInfo.getEvents().get(0).getEventType());
   }
   
   @Test (timeout = 5000)

http://git-wip-us.apache.org/repos/asf/tez/blob/7b30785b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 953c73e..73a6214 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -46,6 +46,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.LinkedListMultimap;
 import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -54,7 +55,6 @@ import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.tez.http.HttpConnectionParams;
 import org.apache.tez.common.CallableWithNdc;
 import org.apache.tez.common.security.JobTokenSecretManager;
@@ -405,7 +405,7 @@ class ShuffleScheduler {
     this.firstEventReceived = inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED);
     this.lastEventReceived = inputContext.getCounters().findCounter(TaskCounter.LAST_EVENT_RECEIVED);
 
-    pipelinedShuffleInfoEventsMap = new HashMap<Integer, ShuffleEventInfo>();
+    pipelinedShuffleInfoEventsMap = Maps.newConcurrentMap();
     LOG.info("ShuffleScheduler running for sourceVertex: "
         + inputContext.getSourceVertexName() + " with configuration: "
         + "maxFetchFailuresBeforeReporting=" + maxFetchFailuresBeforeReporting
@@ -520,6 +520,7 @@ class ShuffleScheduler {
     int finalEventId = -1; //0 indexed
     int attemptNum;
     String id;
+    boolean scheduledForDownload; // whether chunks got scheduled for download (getMapHost)
 
 
     ShuffleEventInfo(InputAttemptIdentifier input) {
@@ -547,7 +548,8 @@ class ShuffleScheduler {
 
     public String toString() {
       return "[eventsProcessed=" + eventsProcessed + ", finalEventId=" + finalEventId
-          +  ", id=" + id + ", attemptNum=" + attemptNum + "]";
+          +  ", id=" + id + ", attemptNum=" + attemptNum
+          + ", scheduledForDownload=" + scheduledForDownload + "]";
     }
   }
 
@@ -677,12 +679,29 @@ class ShuffleScheduler {
     if (input.canRetrieveInputInChunks()) {
       ShuffleEventInfo eventInfo = pipelinedShuffleInfoEventsMap.get(input.getInputIdentifier());
       if (eventInfo != null && input.getAttemptNumber() != eventInfo.attemptNum)
{
-        reportExceptionForInput(new IOException("Previous event already got scheduled for
" +
-            input + ". Previous attempt's data could have been already merged "
-            + "to memory/disk outputs.  Failing the fetch early. currentAttemptNum="
-            + eventInfo.attemptNum + ", eventsProcessed=" + eventInfo.eventsProcessed
-            + ", newAttemptNum=" + input.getAttemptNumber()));
-        return false;
+        /*
+         * Check if current attempt has been scheduled for download.
+         * e.g currentAttemptNum=0, eventsProcessed={}, newAttemptNum=1
+         * If nothing is scheduled in current attempt and no events are processed
+         * (i.e copySucceeded), we can ignore current attempt and start processing the new
+         * attempt (e.g LLAP).
+         */
+        if (eventInfo.scheduledForDownload || !eventInfo.eventsProcessed.isEmpty()) {
+          IOException exception = new IOException("Previous event already got scheduled for
" +
+              input + ". Previous attempt's data could have been already merged "
+              + "to memory/disk outputs.  Killing (self) this task early."
+              + " currentAttemptNum=" + eventInfo.attemptNum
+              + ", eventsProcessed=" + eventInfo.eventsProcessed
+              + ", scheduledForDownload=" + eventInfo.scheduledForDownload
+              + ", newAttemptNum=" + input.getAttemptNumber());
+          String message = "Killing self as previous attempt data could have been consumed";
+          killSelf(exception, message);
+          return false;
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Ignoring current attempt=" + eventInfo.attemptNum + " with eventInfo="
+
+              eventInfo.toString() + "and processing new attempt=" + input.getAttemptNumber());
+        }
       }
 
       if (eventInfo == null) {
@@ -694,9 +713,13 @@ class ShuffleScheduler {
   }
 
   @VisibleForTesting
-  void reportExceptionForInput(Exception exception) {
+  void killSelf(Exception exception, String message) {
     LOG.error(srcNameTrimmed + ": " + "Reporting exception for input", exception);
-    exceptionReporter.reportException(exception);
+    try {
+      this.close();
+    } finally {
+      this.inputContext.killSelf(exception, message);
+    }
   }
 
   private final AtomicInteger nextProgressLineEventCount = new AtomicInteger(0);
@@ -1048,18 +1071,33 @@ class ShuffleScheduler {
     }
   }
   
-  public synchronized void obsoleteInput(InputAttemptIdentifier srcAttempt) {
+  public void obsoleteInput(InputAttemptIdentifier srcAttempt) {
     // The incoming srcAttempt does not contain a path component.
     LOG.info(srcNameTrimmed + ": " + "Adding obsolete input: " + srcAttempt);
-    if (pipelinedShuffleInfoEventsMap.containsKey(srcAttempt.getInputIdentifier())) {
-      //Pipelined shuffle case (where pipelinedShuffleInfoEventsMap gets populated).
-      //Fail fast here.
-      exceptionReporter.reportException(new IOException(srcAttempt + " is marked as obsoleteInput,
but it "
+    ShuffleEventInfo eventInfo = pipelinedShuffleInfoEventsMap.get(srcAttempt.getInputIdentifier());
+
+    //Pipelined shuffle case (where pipelinedShuffleInfoEventsMap gets populated).
+    //Fail fast here.
+    if (eventInfo != null) {
+      // In case this we haven't started downloading it, get rid of it.
+      if (eventInfo.eventsProcessed.isEmpty() && !eventInfo.scheduledForDownload)
{
+        // obsoleted anyways; no point tracking if nothing is started
+        pipelinedShuffleInfoEventsMap.remove(srcAttempt.getInputIdentifier());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Removing " + eventInfo + " from tracking");
+        }
+        return;
+      }
+      IOException exception = new IOException(srcAttempt + " is marked as obsoleteInput,
but it "
           + "exists in shuffleInfoEventMap. Some data could have been already merged "
-          + "to memory/disk outputs.  Failing the fetch early."));
+          + "to memory/disk outputs.  Failing the fetch early. eventInfo:" + eventInfo.toString());
+      String message = "Got obsolete event. Killing self as attempt's data could have been
consumed";
+      killSelf(exception, message);
       return;
     }
-    obsoleteInputs.add(srcAttempt);
+    synchronized (this) {
+      obsoleteInputs.add(srcAttempt);
+    }
   }
   
   public synchronized void putBackKnownMapOutput(MapHost host,
@@ -1102,7 +1140,7 @@ class ShuffleScheduler {
     return pathToIdentifierMap.get(getIdentifierFromPathAndReduceId(path, reduceId));
   }
   
-  private boolean inputShouldBeConsumed(InputAttemptIdentifier id) {
+  private synchronized boolean inputShouldBeConsumed(InputAttemptIdentifier id) {
     return (!obsoleteInputs.contains(id) && 
              !isInputFinished(id.getInputIdentifier()));
   }
@@ -1178,6 +1216,13 @@ class ShuffleScheduler {
         if (includedMaps++ >= maxTaskOutputAtOnce) {
           host.addKnownMap(inputAttemptIdentifier);
         } else {
+          if (inputAttemptIdentifier.canRetrieveInputInChunks()) {
+            ShuffleEventInfo shuffleEventInfo =
+                pipelinedShuffleInfoEventsMap.get(inputAttemptIdentifier.getInputIdentifier());
+            if (shuffleEventInfo != null) {
+              shuffleEventInfo.scheduledForDownload = true;
+            }
+          }
           result.add(inputAttemptIdentifier);
         }
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/7b30785b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
index 26aa298..695a307 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
@@ -232,16 +232,56 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
     assertTrue("Shuffle info events should not be empty for pipelined shuffle",
         !scheduler.pipelinedShuffleInfoEventsMap.isEmpty());
 
+    int valuesInMapLocations = scheduler.mapLocations.values().size();
+    assertTrue("Maplocations should have values. current size: " + valuesInMapLocations,
+        valuesInMapLocations > 0);
+
+    // start scheduling for download
+    scheduler.getMapsForHost(scheduler.mapLocations.values().iterator().next());
+
     //Attempt #0 comes up. When processing this, it should report exception
     attemptNum = 0;
     inputIdx = 1;
     Event dme2 = createDataMovementEvent(attemptNum, inputIdx, null, false, true, true, 0,
attemptNum);
     handler.handleEvents(Collections.singletonList(dme2));
 
-    InputAttemptIdentifier id2 =
+    // task should issue kill request
+    verify(scheduler, times(1)).killSelf(any(IOException.class), any(String.class));
+  }
+
+  @Test (timeout = 5000)
+  public void testPipelinedShuffle_WithObsoleteEvents() throws IOException, InterruptedException
{
+    //Process attempt #1 first
+    int attemptNum = 1;
+    int inputIdx = 1;
+
+    Event dme1 = createDataMovementEvent(attemptNum, inputIdx, null, false, true, true, 0,
attemptNum);
+    handler.handleEvents(Collections.singletonList(dme1));
+
+    InputAttemptIdentifier id1 =
         new InputAttemptIdentifier(inputIdx, attemptNum,
             PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE,
0);
-    verify(scheduler, times(1)).reportExceptionForInput(any(IOException.class));
+
+    verify(scheduler, times(1)).addKnownMapOutput(eq(HOST), eq(PORT), eq(1), eq(id1));
+    assertTrue("Shuffle info events should not be empty for pipelined shuffle",
+        !scheduler.pipelinedShuffleInfoEventsMap.isEmpty());
+
+    int valuesInMapLocations = scheduler.mapLocations.values().size();
+    assertTrue("Maplocations should have values. current size: " + valuesInMapLocations,
+        valuesInMapLocations > 0);
+
+    // start scheduling for download. Sets up scheduledForDownload in eventInfo.
+    scheduler.getMapsForHost(scheduler.mapLocations.values().iterator().next());
+
+    // send input failed event.
+    List<Event> events = new LinkedList<Event>();
+    int targetIdx = 1;
+    InputFailedEvent failedEvent = InputFailedEvent.create(targetIdx, 0);
+    events.add(failedEvent);
+    handler.handleEvents(events);
+
+    // task should issue kill request, as inputs are scheduled for download already.
+    verify(scheduler, times(1)).killSelf(any(IOException.class), any(String.class));
   }
 
   @Test(timeout = 5000)


Mime
View raw message