From common-commits-return-91220-archive-asf-public=cust-asf.ponee.io@hadoop.apache.org Wed Nov 28 10:05:47 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id C444B180658 for ; Wed, 28 Nov 2018 10:05:46 +0100 (CET) Received: (qmail 14933 invoked by uid 500); 28 Nov 2018 09:05:45 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 14916 invoked by uid 99); 28 Nov 2018 09:05:45 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 28 Nov 2018 09:05:45 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 993DDE10D1; Wed, 28 Nov 2018 09:05:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aajisaka@apache.org To: common-commits@hadoop.apache.org Message-Id: <18efad43babd4add861be6800bd932d5@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: MAPREDUCE-6190. If a task stucks before its first heartbeat, it never timeouts and the MR job becomes stuck. Contributed by Zhaohui Xin. Date: Wed, 28 Nov 2018 09:05:45 +0000 (UTC) Repository: hadoop Updated Branches: refs/heads/trunk b3a052d19 -> 13a21f660 MAPREDUCE-6190. If a task stucks before its first heartbeat, it never timeouts and the MR job becomes stuck. Contributed by Zhaohui Xin. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/13a21f66 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/13a21f66 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/13a21f66 Branch: refs/heads/trunk Commit: 13a21f66078c91df97088b01f49a5919895f7110 Parents: b3a052d Author: Akira Ajisaka Authored: Wed Nov 28 17:57:42 2018 +0900 Committer: Akira Ajisaka Committed: Wed Nov 28 17:57:42 2018 +0900 ---------------------------------------------------------------------- .../mapreduce/v2/app/TaskHeartbeatHandler.java | 34 +++++++++++-- .../v2/app/TestTaskHeartbeatHandler.java | 53 ++++++++++++++++++-- .../apache/hadoop/mapreduce/MRJobConfig.java | 8 +++ .../src/main/resources/mapred-default.xml | 9 ++++ 4 files changed, 96 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/13a21f66/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java index f8f5015..456f2a6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java @@ -22,6 +22,7 @@ import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; @@ -46,12 +47,14 @@ import org.slf4j.LoggerFactory; */ @SuppressWarnings({"unchecked", "rawtypes"}) public class TaskHeartbeatHandler extends AbstractService { - - private static class ReportTime { + + static class ReportTime { private long lastProgress; - + private final AtomicBoolean reported; + public ReportTime(long time) { setLastProgress(time); + reported = new AtomicBoolean(false); } public synchronized void setLastProgress(long time) { @@ -61,6 +64,10 @@ public class TaskHeartbeatHandler extends AbstractService { public synchronized long getLastProgress() { return lastProgress; } + + public boolean isReported(){ + return reported.get(); + } } private static final Logger LOG = @@ -72,6 +79,7 @@ public class TaskHeartbeatHandler extends AbstractService { private volatile boolean stopped; private long taskTimeOut; private long unregisterTimeOut; + private long taskStuckTimeOut; private int taskTimeOutCheckInterval = 30 * 1000; // 30 seconds. private final EventHandler eventHandler; @@ -98,6 +106,8 @@ public class TaskHeartbeatHandler extends AbstractService { MRJobConfig.TASK_TIMEOUT, MRJobConfig.DEFAULT_TASK_TIMEOUT_MILLIS); unregisterTimeOut = conf.getLong(MRJobConfig.TASK_EXIT_TIMEOUT, MRJobConfig.TASK_EXIT_TIMEOUT_DEFAULT); + taskStuckTimeOut = conf.getLong(MRJobConfig.TASK_STUCK_TIMEOUT_MS, + MRJobConfig.DEFAULT_TASK_STUCK_TIMEOUT_MS); // enforce task timeout is at least twice as long as task report interval long taskProgressReportIntervalMillis = MRJobConfUtil. @@ -135,6 +145,7 @@ public class TaskHeartbeatHandler extends AbstractService { //TODO throw an exception if the task isn't registered. ReportTime time = runningAttempts.get(attemptID); if(time != null) { + time.reported.compareAndSet(false, true); time.setLastProgress(clock.getTime()); } } @@ -179,13 +190,21 @@ public class TaskHeartbeatHandler extends AbstractService { Map.Entry entry = iterator.next(); boolean taskTimedOut = (taskTimeOut > 0) && (currentTime > (entry.getValue().getLastProgress() + taskTimeOut)); + // when container in NM not started in a long time, + // we think the taskAttempt is stuck + boolean taskStuck = (!entry.getValue().isReported()) && + (currentTime > + (entry.getValue().getLastProgress() + taskStuckTimeOut)); - if(taskTimedOut) { + if(taskTimedOut || taskStuck) { // task is lost, remove from the list and raise lost event iterator.remove(); eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(entry .getKey(), "AttemptID:" + entry.getKey().toString() - + " Timed out after " + taskTimeOut / 1000 + " secs")); + + " task timeout set: " + taskTimeOut / 1000 + "s," + + " taskTimedOut: " + taskTimedOut + ";" + + " task stuck timeout set: " + taskStuckTimeOut / 1000 + "s," + + " taskStuck: " + taskStuck)); eventHandler.handle(new TaskAttemptEvent(entry.getKey(), TaskAttemptEventType.TA_TIMED_OUT)); } @@ -206,6 +225,11 @@ public class TaskHeartbeatHandler extends AbstractService { } @VisibleForTesting + ConcurrentMap getRunningAttempts(){ + return runningAttempts; + } + + @VisibleForTesting public long getTaskTimeOut() { return taskTimeOut; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/13a21f66/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java index 5d86479..0fbde2c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java @@ -18,6 +18,7 @@ package org.apache.hadoop.mapreduce.v2.app; +import static org.junit.Assert.assertFalse; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -41,12 +42,15 @@ import org.apache.hadoop.yarn.util.SystemClock; import org.junit.Assert; import org.junit.Test; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; + public class TestTaskHeartbeatHandler { @SuppressWarnings({ "rawtypes", "unchecked" }) @Test - public void testTimeout() throws InterruptedException { + public void testTaskTimeout() throws InterruptedException { EventHandler mockHandler = mock(EventHandler.class); Clock clock = SystemClock.getInstance(); TaskHeartbeatHandler hb = new TaskHeartbeatHandler(mockHandler, clock, 1); @@ -62,11 +66,13 @@ public class TestTaskHeartbeatHandler { hb.init(conf); hb.start(); try { - ApplicationId appId = ApplicationId.newInstance(0l, 5); + ApplicationId appId = ApplicationId.newInstance(0L, 5); JobId jobId = MRBuilderUtils.newJobId(appId, 4); TaskId tid = MRBuilderUtils.newTaskId(jobId, 3, TaskType.MAP); TaskAttemptId taid = MRBuilderUtils.newTaskAttemptId(tid, 2); hb.register(taid); + // Task heartbeat once to avoid stuck + hb.progressing(taid); Thread.sleep(100); //Events only happen when the task is canceled verify(mockHandler, times(2)).handle(any(Event.class)); @@ -75,6 +81,47 @@ public class TestTaskHeartbeatHandler { } } + @SuppressWarnings("unchecked") + @Test + public void testTaskStuck() throws InterruptedException { + EventHandler mockHandler = mock(EventHandler.class); + Clock clock = SystemClock.getInstance(); + TaskHeartbeatHandler hb = new TaskHeartbeatHandler(mockHandler, clock, 1); + + + Configuration conf = new Configuration(); + conf.setLong(MRJobConfig.TASK_STUCK_TIMEOUT_MS, 10); // 10ms + conf.setInt(MRJobConfig.TASK_TIMEOUT, 1000); //1000 ms + // set TASK_PROGRESS_REPORT_INTERVAL to a value smaller than TASK_TIMEOUT + // so that TASK_TIMEOUT is not overridden + conf.setLong(MRJobConfig.TASK_PROGRESS_REPORT_INTERVAL, 5); + conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 10); //10 ms + + hb.init(conf); + hb.start(); + try { + ApplicationId appId = ApplicationId.newInstance(0L, 5); + JobId jobId = MRBuilderUtils.newJobId(appId, 4); + TaskId tid = MRBuilderUtils.newTaskId(jobId, 3, TaskType.MAP); + TaskAttemptId taid = MRBuilderUtils.newTaskAttemptId(tid, 2); + hb.register(taid); + + ConcurrentMap + runningAttempts = hb.getRunningAttempts(); + for (Map.Entry entry + : runningAttempts.entrySet()) { + assertFalse(entry.getValue().isReported()); + } + + Thread.sleep(100); + + //Events only happen when the task is canceled + verify(mockHandler, times(2)).handle(any(Event.class)); + } finally { + hb.stop(); + } + } + /** * Test if the final heartbeat timeout is set correctly when task progress * report interval is set bigger than the task timeout in the configuration. @@ -120,7 +167,7 @@ public class TestTaskHeartbeatHandler { hb.init(conf); hb.start(); try { - ApplicationId appId = ApplicationId.newInstance(0l, 5); + ApplicationId appId = ApplicationId.newInstance(0L, 5); JobId jobId = MRBuilderUtils.newJobId(appId, 4); TaskId tid = MRBuilderUtils.newTaskId(jobId, 3, TaskType.MAP); final TaskAttemptId taid = MRBuilderUtils.newTaskAttemptId(tid, 2); http://git-wip-us.apache.org/repos/asf/hadoop/blob/13a21f66/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 565c052..b36b5ce 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -353,6 +353,14 @@ public interface MRJobConfig { public static final String TASK_TIMEOUT = "mapreduce.task.timeout"; long DEFAULT_TASK_TIMEOUT_MILLIS = 5 * 60 * 1000L; + /** + * The max timeout before receiving remote task's first heartbeat. + * This parameter is in order to avoid waiting for the container + * to start indefinitely, which made task stuck in the NEW state. + */ + String TASK_STUCK_TIMEOUT_MS = "mapreduce.task.stuck.timeout-ms"; + long DEFAULT_TASK_STUCK_TIMEOUT_MS = 10 * 60 * 1000L; + String TASK_PROGRESS_REPORT_INTERVAL = "mapreduce.task.progress-report.interval"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/13a21f66/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index c993537..fa26e4d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -277,6 +277,15 @@ + mapreduce.task.stuck.timeout-ms + 600000 + The max timeout before receiving remote task's first heartbeat. + This parameter is in order to avoid waiting for the container + to start indefinitely, which made task stuck in the NEW state. + + + + mapreduce.map.memory.mb -1 The amount of memory to request from the scheduler for each --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org