tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [17/23] git commit: TEZ-1034. Shuffling can sometimes hang with duplicate inputs for the same index. (Bikas Saha via hitesh)
Date Fri, 20 Jun 2014 22:35:55 GMT
TEZ-1034. Shuffling can sometimes hang with duplicate inputs for the same index. (Bikas Saha
via hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/7079ae91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/7079ae91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/7079ae91

Branch: refs/heads/branch-0.4.1-incubating
Commit: 7079ae9127e7840350503d0fde3b55d8a86b8011
Parents: 279add4
Author: Hitesh Shah <hitesh@apache.org>
Authored: Wed Apr 9 23:07:39 2014 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Fri Jun 20 15:34:45 2014 -0700

----------------------------------------------------------------------
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   | 33 ++++++++++++--------
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   | 16 ++++++++--
 .../common/shuffle/impl/ShuffleScheduler.java   |  5 +++
 3 files changed, 38 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7079ae91/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 6d512ba..7f9cce1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -1457,6 +1457,8 @@ public class TaskAttemptImpl implements TaskAttempt,
       if (attempt.leafVertex) {
         return TaskAttemptStateInternal.SUCCEEDED;
       }
+      // TODO - TEZ-834. This assumes that the outputs were on that node
+      attempt.sendInputFailedToConsumers();
       TaskAttemptImpl.TERMINATED_AFTER_SUCCESS_HELPER.transition(attempt, event);
       return TaskAttemptStateInternal.KILLED;
     }
@@ -1498,19 +1500,7 @@ public class TaskAttemptImpl implements TaskAttempt,
       LOG.info(message);
       attempt.addDiagnosticInfo(message);
       // send input failed event
-      Vertex vertex = attempt.getVertex();
-      Map<Vertex, Edge> edges = vertex.getOutputVertices();
-      if (edges != null && !edges.isEmpty()) {
-        List<TezEvent> tezIfEvents = Lists.newArrayListWithCapacity(edges.size());
-        for (Vertex edgeVertex : edges.keySet()) {
-          tezIfEvents.add(new TezEvent(new InputFailedEvent(), 
-              new EventMetaData(EventProducerConsumerType.SYSTEM, 
-                  vertex.getName(), 
-                  edgeVertex.getName(), 
-                  attempt.getID())));
-        }
-        attempt.sendEvent(new VertexEventRouteEvent(vertex.getVertexId(), tezIfEvents));
-      }
+      attempt.sendInputFailedToConsumers();
       // Not checking for leafVertex since a READ_ERROR should only be reported for intermediate
tasks.
       if (attempt.getInternalState() == TaskAttemptStateInternal.SUCCEEDED) {
         (new TerminatedAfterSuccessHelper(FAILED_HELPER)).transition(
@@ -1525,6 +1515,23 @@ public class TaskAttemptImpl implements TaskAttempt,
       // Can be used to blacklist nodes.
     }
   }
+  
+  @VisibleForTesting
+  protected void sendInputFailedToConsumers() {
+    Vertex vertex = getVertex();
+    Map<Vertex, Edge> edges = vertex.getOutputVertices();
+    if (edges != null && !edges.isEmpty()) {
+      List<TezEvent> tezIfEvents = Lists.newArrayListWithCapacity(edges.size());
+      for (Vertex edgeVertex : edges.keySet()) {
+        tezIfEvents.add(new TezEvent(new InputFailedEvent(), 
+            new EventMetaData(EventProducerConsumerType.SYSTEM, 
+                vertex.getName(), 
+                edgeVertex.getName(), 
+                getID())));
+      }
+      sendEvent(new VertexEventRouteEvent(vertex.getVertexId(), tezIfEvents));
+    }
+  }
 
   private void initTaskAttemptStatus(TaskAttemptStatus result) {
     result.progress = 0.0f;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7079ae91/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index fe7fce8..55c3867 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -713,7 +713,7 @@ public class TestTaskAttempt {
     doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
     doReturn(containers).when(appCtx).getAllContainers();
 
-    TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
+    MockTaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
         taListener, taskConf, new SystemClock(),
         mock(TaskHeartbeatHandler.class), appCtx, locationHint, false,
         resource, createFakeContainerContext(), false);
@@ -750,7 +750,10 @@ public class TestTaskAttempt {
 
     // Send out a Node Failure.
     taImpl.handle(new TaskAttemptEventNodeFailed(taskAttemptID, "NodeDecomissioned"));
-
+    // Verify in KILLED state
+    assertEquals("Task attempt is not in the  KILLED state", TaskAttemptState.KILLED,
+        taImpl.getState());
+    assertEquals(true, taImpl.inputFailedReported);
     // Verify one event to the Task informing it about FAILURE. No events to scheduler. Counter
event.
     int expectedEventsNodeFailure = expectedEvenstAfterTerminating + 2;
     arg = ArgumentCaptor.forClass(Event.class);
@@ -894,7 +897,7 @@ public class TestTaskAttempt {
     doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
     doReturn(containers).when(appCtx).getAllContainers();
 
-    TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
+    MockTaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
         taListener, taskConf, new SystemClock(),
         mock(TaskHeartbeatHandler.class), appCtx, locationHint, false,
         resource, createFakeContainerContext(), false);
@@ -937,6 +940,7 @@ public class TestTaskAttempt {
     assertEquals("Task attempt is not in FAILED state", taImpl.getState(),
         TaskAttemptState.FAILED);
 
+    assertEquals(true, taImpl.inputFailedReported);
     int expectedEventsAfterFetchFailure = expectedEventsTillSucceeded + 2;
     arg.getAllValues().clear();
     verify(eventHandler, times(expectedEventsAfterFetchFailure)).handle(arg.capture());
@@ -996,6 +1000,7 @@ public class TestTaskAttempt {
     }
     
     Vertex mockVertex = mock(Vertex.class);
+    boolean inputFailedReported = false;
     
     @Override
     protected Vertex getVertex() {
@@ -1022,6 +1027,11 @@ public class TestTaskAttempt {
     protected void logJobHistoryAttemptUnsuccesfulCompletion(
         TaskAttemptState state) {
     }
+    
+    @Override
+    protected void sendInputFailedToConsumers() {
+      inputFailedReported = true;
+    }
   }
 
   private static ContainerContext createFakeContainerContext() {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7079ae91/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
index b145a40..993489e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
@@ -199,6 +199,11 @@ class ShuffleScheduler {
                 inputContext.getSourceVertexName(), srcAttemptIdentifier.getInputIdentifier().getInputIndex(),
                 srcAttemptIdentifier.getAttemptNumber()) + " done");
       }
+    } else {
+      // input is already finished. duplicate fetch.
+      LOG.warn("Duplicate fetch of input no longer needs to be fetched: " + srcAttemptIdentifier);
+      // free the resource - specially memory
+      output.abort();
     }
     // TODO NEWTEZ Should this be releasing the output, if not committed ? Possible memory
leak in case of speculation.
   }


Mime
View raw message