Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7597B18AD8 for ; Fri, 18 Sep 2015 06:14:31 +0000 (UTC) Received: (qmail 48184 invoked by uid 500); 18 Sep 2015 06:14:31 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 48148 invoked by uid 500); 18 Sep 2015 06:14:31 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 48139 invoked by uid 99); 18 Sep 2015 06:14:31 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Sep 2015 06:14:31 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3292BDFDDE; Fri, 18 Sep 2015 06:14:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: bikas@apache.org To: commits@tez.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: tez git commit: TEZ-814. Improve heuristic for determining a task has failed outputs (bikas) (cherry picked from commit 94488e79d8d3d73e93337a57a48d82f82182591b) Date: Fri, 18 Sep 2015 06:14:31 +0000 (UTC) Repository: tez Updated Branches: refs/heads/branch-0.7 f14d5127f -> 94e256de6 TEZ-814. Improve heuristic for determining a task has failed outputs (bikas) (cherry picked from commit 94488e79d8d3d73e93337a57a48d82f82182591b) Conflicts: CHANGES.txt Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/94e256de Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/94e256de Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/94e256de Branch: refs/heads/branch-0.7 Commit: 94e256de6dc760c2ae5f17ec4b5a951ca1651118 Parents: f14d512 Author: Bikas Saha Authored: Thu Sep 17 23:09:11 2015 -0700 Committer: Bikas Saha Committed: Thu Sep 17 23:14:12 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 3 ++ .../apache/tez/dag/api/TezConfiguration.java | 15 ++++++ tez-dag/findbugs-exclude.xml | 1 + .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 33 +++++++++---- .../tez/dag/app/dag/impl/TestTaskAttempt.java | 49 ++++++++++++++++++-- 5 files changed, 88 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/94e256de/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 88b40c1..ec01f75 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.7.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-814. Improve heuristic for determining a task has failed outputs TEZ-2840. MRInputLegacy.init should set splitInfoViaEvents. TEZ-2830. Backport TEZ-2774 to branch-0.7. Improvements to logging in the AM and part of the runtime. TEZ-2829. Tez UI: minor fixes to in-progress update of UI from AM @@ -267,6 +268,7 @@ Release 0.6.3: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-814. Improve heuristic for determining a task has failed outputs TEZ-2809. Minimal distribution compiled on 2.6 fails to run on 2.7 TEZ-2768. Log a useful error message when the summary stream cannot be closed when shutting down an AM. @@ -487,6 +489,7 @@ INCOMPATIBLE CHANGES TEZ-2552. CRC errors can cause job to run for very long time in large jobs. ALL CHANGES: + TEZ-814. Improve heuristic for determining a task has failed outputs TEZ-2768. Log a useful error message when the summary stream cannot be closed when shutting down an AM. TEZ-2745. ClassNotFoundException of user code should fail dag http://git-wip-us.apache.org/repos/asf/tez/blob/94e256de/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index cd9e59c..d0a76d0 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -232,6 +232,21 @@ public class TezConfiguration extends Configuration { public static final int TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_DEFAULT = 10; /** + * int value. Represents the maximum time in seconds for which a consumer attempt can report + * a read error against its producer attempt, after which the producer attempt will be re-run + * to re-generate the output. There are other heuristics which determine the retry and mainly + * try to guard against a flurry of re-runs due to intermittent read errors + * (due to network issues). This configuration puts a time limit on those heuristics to ensure + * jobs dont hang indefinitely due to lack of closure in those heuristics + * + * Expert level setting. + */ + @ConfigurationScope(Scope.AM) + public static final String TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC = + TEZ_AM_PREFIX + "max.allowed.time-sec.for-read-error"; + public static final int TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC_DEFAULT = 300; + + /** * Boolean value. Determines when the final outputs to data sinks are committed. Commit is an * output specific operation and typically involves making the output visible for consumption. * If the config is true, then the outputs are committed at the end of DAG completion after all http://git-wip-us.apache.org/repos/asf/tez/blob/94e256de/tez-dag/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/tez-dag/findbugs-exclude.xml b/tez-dag/findbugs-exclude.xml index b2990ed..4c01edc 100644 --- a/tez-dag/findbugs-exclude.xml +++ b/tez-dag/findbugs-exclude.xml @@ -228,6 +228,7 @@ + http://git-wip-us.apache.org/repos/asf/tez/blob/94e256de/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index f63f461..5357063 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -110,6 +110,7 @@ import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; public class TaskAttemptImpl implements TaskAttempt, EventHandler { @@ -195,10 +196,10 @@ public class TaskAttemptImpl implements TaskAttempt, Set taskHosts = new HashSet(); Set taskRacks = new HashSet(); - private Set uniquefailedOutputReports = - new HashSet(); + private Map uniquefailedOutputReports = Maps.newHashMap(); private static double MAX_ALLOWED_OUTPUT_FAILURES_FRACTION; private static int MAX_ALLOWED_OUTPUT_FAILURES; + private static int MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC; protected final boolean isRescheduled; private final Resource taskResource; @@ -470,6 +471,10 @@ public class TaskAttemptImpl implements TaskAttempt, MAX_ALLOWED_OUTPUT_FAILURES_FRACTION = conf.getDouble(TezConfiguration .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION, TezConfiguration .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION_DEFAULT); + + MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC = conf.getInt( + TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC, + TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC_DEFAULT); ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); this.readLock = rwLock.readLock(); this.writeLock = rwLock.writeLock(); @@ -1569,7 +1574,17 @@ public class TaskAttemptImpl implements TaskAttempt, LOG.info(attempt.getID() + " blamed for read error from " + failedDestTaId + " at inputIndex " + failedInputIndexOnDestTa); - attempt.uniquefailedOutputReports.add(failedDestTaId); + long time = attempt.clock.getTime(); + Long firstErrReportTime = attempt.uniquefailedOutputReports.get(failedDestTaId); + if (firstErrReportTime == null) { + attempt.uniquefailedOutputReports.put(failedDestTaId, time); + firstErrReportTime = time; + } + + int readErrorTimespanSec = (int)((time - firstErrReportTime)/1000); + boolean crossTimeDeadline = readErrorTimespanSec >= + MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC ? true : false; + float failureFraction = ((float) attempt.uniquefailedOutputReports.size()) / outputFailedEvent.getConsumerTaskNumber(); @@ -1581,14 +1596,16 @@ public class TaskAttemptImpl implements TaskAttempt, // If needed we can launch a background task without failing this task // to generate a copy of the output just in case. // If needed we can consider only running consumer tasks - if (withinFailureFractionLimits && withinOutputFailureLimits) { + if (!crossTimeDeadline && withinFailureFractionLimits && withinOutputFailureLimits) { return attempt.getInternalState(); } String message = attempt.getID() + " being failed for too many output errors. " - + "failureFraction=" + failureFraction + ", " - + "MAX_ALLOWED_OUTPUT_FAILURES_FRACTION=" + MAX_ALLOWED_OUTPUT_FAILURES_FRACTION + ", " - + "uniquefailedOutputReports=" + attempt.uniquefailedOutputReports.size() + ", " - + "MAX_ALLOWED_OUTPUT_FAILURES=" + MAX_ALLOWED_OUTPUT_FAILURES; + + "failureFraction=" + failureFraction + + ", MAX_ALLOWED_OUTPUT_FAILURES_FRACTION=" + MAX_ALLOWED_OUTPUT_FAILURES_FRACTION + + ", uniquefailedOutputReports=" + attempt.uniquefailedOutputReports.size() + + ", MAX_ALLOWED_OUTPUT_FAILURES=" + MAX_ALLOWED_OUTPUT_FAILURES + + ", MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC=" + MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC + + ", readErrorTimespan=" + readErrorTimespanSec; LOG.info(message); attempt.addDiagnosticInfo(message); // send input failed event http://git-wip-us.apache.org/repos/asf/tez/blob/94e256de/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index 1a1cb11..d526a1d 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -1188,7 +1188,7 @@ public class TestTaskAttempt { assertEquals(TaskAttemptTerminationCause.UNKNOWN_ERROR, taImpl.getTerminationCause()); } - @Test(timeout = 5000) + @Test//(timeout = 5000) // Verifies that multiple TooManyFetchFailures are handled correctly by the // TaskAttempt. public void testMultipleOutputFailed() throws Exception { @@ -1335,15 +1335,54 @@ public class TestTaskAttempt { //This should fail even when MAX_ALLOWED_OUTPUT_FAILURES_FRACTION is within limits, as // MAX_ALLOWED_OUTPUT_FAILURES has crossed the limit. taImpl2.handle(new TaskAttemptEventOutputFailed(taskAttemptID2, tzEvent, 8)); - assertEquals("Task attempt is not in succeeded state", taImpl2.getState(), - TaskAttemptState.FAILED); - - assertEquals("Task attempt is not in FAILED state", taImpl2.getState(), + assertEquals("Task attempt is not in failed state", taImpl2.getState(), TaskAttemptState.FAILED); assertEquals(TaskAttemptTerminationCause.OUTPUT_LOST, taImpl2.getTerminationCause()); // verify unregister is not invoked again verify(mockHeartbeatHandler, times(1)).unregister(taskAttemptID2); + Clock mockClock = mock(Clock.class); + int readErrorTimespanSec = 1; + taskConf.setInt(TezConfiguration.TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES, 10); + taskConf.setInt(TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC, readErrorTimespanSec); + TezTaskID taskID3 = TezTaskID.getInstance(vertexID, 3); + MockTaskAttemptImpl taImpl3 = new MockTaskAttemptImpl(taskID3, 1, eventHandler, + taListener, taskConf, mockClock, + mockHeartbeatHandler, appCtx, false, + resource, createFakeContainerContext(), false); + TezTaskAttemptID taskAttemptID3 = taImpl3.getID(); + + taImpl3.handle(new TaskAttemptEventSchedule(taskAttemptID3, 0, 0)); + // At state STARTING. + taImpl3.handle(new TaskAttemptEventStartedRemotely(taskAttemptID3, contId, null)); + verify(mockHeartbeatHandler).register(taskAttemptID3); + taImpl3.handle(new TaskAttemptEvent(taskAttemptID3, TaskAttemptEventType.TA_DONE)); + assertEquals("Task attempt is not in succeeded state", taImpl3.getState(), + TaskAttemptState.SUCCEEDED); + verify(mockHeartbeatHandler).unregister(taskAttemptID3); + + mockReEvent = InputReadErrorEvent.create("", 1, 1); + mockMeta = mock(EventMetaData.class); + mockDestId1 = mock(TezTaskAttemptID.class); + when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId1); + tzEvent = new TezEvent(mockReEvent, mockMeta); + when(mockClock.getTime()).thenReturn(1000L); + // time deadline not exceeded for a couple of read error events + taImpl3.handle(new TaskAttemptEventOutputFailed(taskAttemptID3, tzEvent, 1000)); + assertEquals("Task attempt is not in succeeded state", taImpl3.getState(), + TaskAttemptState.SUCCEEDED); + when(mockClock.getTime()).thenReturn(1500L); + taImpl3.handle(new TaskAttemptEventOutputFailed(taskAttemptID3, tzEvent, 1000)); + assertEquals("Task attempt is not in succeeded state", taImpl3.getState(), + TaskAttemptState.SUCCEEDED); + // exceed the time threshold + when(mockClock.getTime()).thenReturn(2001L); + taImpl3.handle(new TaskAttemptEventOutputFailed(taskAttemptID3, tzEvent, 1000)); + assertEquals("Task attempt is not in FAILED state", taImpl3.getState(), + TaskAttemptState.FAILED); + assertEquals(TaskAttemptTerminationCause.OUTPUT_LOST, taImpl3.getTerminationCause()); + // verify unregister is not invoked again + verify(mockHeartbeatHandler, times(1)).unregister(taskAttemptID3); } @SuppressWarnings("deprecation")