Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 22A8110DA7 for ; Thu, 10 Apr 2014 06:08:46 +0000 (UTC) Received: (qmail 68571 invoked by uid 500); 10 Apr 2014 06:08:45 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 68547 invoked by uid 500); 10 Apr 2014 06:08:44 -0000 Mailing-List: contact commits-help@tez.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.incubator.apache.org Delivered-To: mailing list commits@tez.incubator.apache.org Received: (qmail 68468 invoked by uid 99); 10 Apr 2014 06:08:42 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 10 Apr 2014 06:08:42 +0000 X-ASF-Spam-Status: No, hits=-2000.3 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 10 Apr 2014 06:08:40 +0000 Received: (qmail 68289 invoked by uid 99); 10 Apr 2014 06:08:16 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 10 Apr 2014 06:08:16 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 732BB9507AB; Thu, 10 Apr 2014 06:08:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hitesh@apache.org To: commits@tez.incubator.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: TEZ-1034. Shuffling can sometimes hang with duplicate inputs for the same index. (Bikas Saha via hitesh) Date: Thu, 10 Apr 2014 06:08:16 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-tez Updated Branches: refs/heads/master ffe6455c2 -> 2e91bdeb0 TEZ-1034. Shuffling can sometimes hang with duplicate inputs for the same index. (Bikas Saha via hitesh) Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/2e91bdeb Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/2e91bdeb Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/2e91bdeb Branch: refs/heads/master Commit: 2e91bdeb00d91eb1d6ed4879ebb5f7fee6cf61b6 Parents: ffe6455 Author: Hitesh Shah Authored: Wed Apr 9 23:07:39 2014 -0700 Committer: Hitesh Shah Committed: Wed Apr 9 23:07:39 2014 -0700 ---------------------------------------------------------------------- .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 33 ++++++++++++-------- .../tez/dag/app/dag/impl/TestTaskAttempt.java | 16 ++++++++-- .../common/shuffle/impl/ShuffleScheduler.java | 5 +++ 3 files changed, 38 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e91bdeb/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index 6d512ba..7f9cce1 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -1457,6 +1457,8 @@ public class TaskAttemptImpl implements TaskAttempt, if (attempt.leafVertex) { return TaskAttemptStateInternal.SUCCEEDED; } + // TODO - TEZ-834. This assumes that the outputs were on that node + attempt.sendInputFailedToConsumers(); TaskAttemptImpl.TERMINATED_AFTER_SUCCESS_HELPER.transition(attempt, event); return TaskAttemptStateInternal.KILLED; } @@ -1498,19 +1500,7 @@ public class TaskAttemptImpl implements TaskAttempt, LOG.info(message); attempt.addDiagnosticInfo(message); // send input failed event - Vertex vertex = attempt.getVertex(); - Map edges = vertex.getOutputVertices(); - if (edges != null && !edges.isEmpty()) { - List tezIfEvents = Lists.newArrayListWithCapacity(edges.size()); - for (Vertex edgeVertex : edges.keySet()) { - tezIfEvents.add(new TezEvent(new InputFailedEvent(), - new EventMetaData(EventProducerConsumerType.SYSTEM, - vertex.getName(), - edgeVertex.getName(), - attempt.getID()))); - } - attempt.sendEvent(new VertexEventRouteEvent(vertex.getVertexId(), tezIfEvents)); - } + attempt.sendInputFailedToConsumers(); // Not checking for leafVertex since a READ_ERROR should only be reported for intermediate tasks. if (attempt.getInternalState() == TaskAttemptStateInternal.SUCCEEDED) { (new TerminatedAfterSuccessHelper(FAILED_HELPER)).transition( @@ -1525,6 +1515,23 @@ public class TaskAttemptImpl implements TaskAttempt, // Can be used to blacklist nodes. } } + + @VisibleForTesting + protected void sendInputFailedToConsumers() { + Vertex vertex = getVertex(); + Map edges = vertex.getOutputVertices(); + if (edges != null && !edges.isEmpty()) { + List tezIfEvents = Lists.newArrayListWithCapacity(edges.size()); + for (Vertex edgeVertex : edges.keySet()) { + tezIfEvents.add(new TezEvent(new InputFailedEvent(), + new EventMetaData(EventProducerConsumerType.SYSTEM, + vertex.getName(), + edgeVertex.getName(), + getID()))); + } + sendEvent(new VertexEventRouteEvent(vertex.getVertexId(), tezIfEvents)); + } + } private void initTaskAttemptStatus(TaskAttemptStatus result) { result.progress = 0.0f; http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e91bdeb/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index fe7fce8..55c3867 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -713,7 +713,7 @@ public class TestTaskAttempt { doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); doReturn(containers).when(appCtx).getAllContainers(); - TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, + MockTaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, taListener, taskConf, new SystemClock(), mock(TaskHeartbeatHandler.class), appCtx, locationHint, false, resource, createFakeContainerContext(), false); @@ -750,7 +750,10 @@ public class TestTaskAttempt { // Send out a Node Failure. taImpl.handle(new TaskAttemptEventNodeFailed(taskAttemptID, "NodeDecomissioned")); - + // Verify in KILLED state + assertEquals("Task attempt is not in the KILLED state", TaskAttemptState.KILLED, + taImpl.getState()); + assertEquals(true, taImpl.inputFailedReported); // Verify one event to the Task informing it about FAILURE. No events to scheduler. Counter event. int expectedEventsNodeFailure = expectedEvenstAfterTerminating + 2; arg = ArgumentCaptor.forClass(Event.class); @@ -894,7 +897,7 @@ public class TestTaskAttempt { doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); doReturn(containers).when(appCtx).getAllContainers(); - TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, + MockTaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, taListener, taskConf, new SystemClock(), mock(TaskHeartbeatHandler.class), appCtx, locationHint, false, resource, createFakeContainerContext(), false); @@ -937,6 +940,7 @@ public class TestTaskAttempt { assertEquals("Task attempt is not in FAILED state", taImpl.getState(), TaskAttemptState.FAILED); + assertEquals(true, taImpl.inputFailedReported); int expectedEventsAfterFetchFailure = expectedEventsTillSucceeded + 2; arg.getAllValues().clear(); verify(eventHandler, times(expectedEventsAfterFetchFailure)).handle(arg.capture()); @@ -996,6 +1000,7 @@ public class TestTaskAttempt { } Vertex mockVertex = mock(Vertex.class); + boolean inputFailedReported = false; @Override protected Vertex getVertex() { @@ -1022,6 +1027,11 @@ public class TestTaskAttempt { protected void logJobHistoryAttemptUnsuccesfulCompletion( TaskAttemptState state) { } + + @Override + protected void sendInputFailedToConsumers() { + inputFailedReported = true; + } } private static ContainerContext createFakeContainerContext() { http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e91bdeb/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java index b145a40..993489e 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java @@ -199,6 +199,11 @@ class ShuffleScheduler { inputContext.getSourceVertexName(), srcAttemptIdentifier.getInputIdentifier().getInputIndex(), srcAttemptIdentifier.getAttemptNumber()) + " done"); } + } else { + // input is already finished. duplicate fetch. + LOG.warn("Duplicate fetch of input no longer needs to be fetched: " + srcAttemptIdentifier); + // free the resource - specially memory + output.abort(); } // TODO NEWTEZ Should this be releasing the output, if not committed ? Possible memory leak in case of speculation. }