hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jl...@apache.org
Subject hadoop git commit: MAPREDUCE-6489. Fail fast rogue tasks that write too much to local disk. Contributed by Maysam Yabandeh
Date Wed, 21 Oct 2015 14:12:20 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2 cb19552fb -> be72ed1c7


MAPREDUCE-6489. Fail fast rogue tasks that write too much to local disk. Contributed by Maysam
Yabandeh


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/be72ed1c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/be72ed1c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/be72ed1c

Branch: refs/heads/branch-2
Commit: be72ed1c7558987029497da6210238f8803416f4
Parents: cb19552
Author: Jason Lowe <jlowe@apache.org>
Authored: Wed Oct 21 14:10:28 2015 +0000
Committer: Jason Lowe <jlowe@apache.org>
Committed: Wed Oct 21 14:11:48 2015 +0000

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |  3 +
 .../java/org/apache/hadoop/mapred/Task.java     | 72 ++++++++++++++---
 .../apache/hadoop/mapreduce/MRJobConfig.java    |  5 ++
 .../src/main/resources/mapred-default.xml       | 12 +++
 .../hadoop/mapred/TestTaskProgressReporter.java | 83 +++++++++++++++++++-
 5 files changed, 164 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/be72ed1c/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index ea90466..940ae6c 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -127,6 +127,9 @@ Release 2.8.0 - UNRELEASED
     MAPREDUCE-6479. Add missing mapred job command options in mapreduce
     document. (nijel via aajisaka)
 
+    MAPREDUCE-6489. Fail fast rogue tasks that write too much to local disk
+    (Maysam Yabandeh via jlowe)
+
   OPTIMIZATIONS
 
     MAPREDUCE-6376. Add avro binary support for jhist files (Ray Chiang via

http://git-wip-us.apache.org/repos/asf/hadoop/blob/be72ed1c/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
index 5031368..794ca07 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem.Statistics;
 import org.apache.hadoop.io.BytesWritable;
@@ -63,6 +64,7 @@ import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
 import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
 import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -729,11 +731,49 @@ abstract public class Task implements Writable, Configurable {
       } else {
         return split;
       }
-    }  
-    /** 
-     * The communication thread handles communication with the parent (Task Tracker). 
-     * It sends progress updates if progress has been made or if the task needs to 
-     * let the parent know that it's alive. It also pings the parent to see if it's alive.

+    }
+
+    /**
+     * exception thrown when the task exceeds some configured limits.
+     */
+    public class TaskLimitException extends IOException {
+      public TaskLimitException(String str) {
+        super(str);
+      }
+    }
+
+    /**
+     * check the counters to see whether the task has exceeded any configured
+     * limits.
+     * @throws TaskLimitException
+     */
+    protected void checkTaskLimits() throws TaskLimitException {
+      // check the limit for writing to local file system
+      long limit = conf.getLong(MRJobConfig.TASK_LOCAL_WRITE_LIMIT_BYTES,
+              MRJobConfig.DEFAULT_TASK_LOCAL_WRITE_LIMIT_BYTES);
+      if (limit >= 0) {
+        Counters.Counter localWritesCounter = null;
+        try {
+          LocalFileSystem localFS = FileSystem.getLocal(conf);
+          localWritesCounter = counters.findCounter(localFS.getScheme(),
+                  FileSystemCounter.BYTES_WRITTEN);
+        } catch (IOException e) {
+          LOG.warn("Could not get LocalFileSystem BYTES_WRITTEN counter");
+        }
+        if (localWritesCounter != null
+                && localWritesCounter.getCounter() > limit) {
+          throw new TaskLimitException("too much write to local file system." +
+                  " current value is " + localWritesCounter.getCounter() +
+                  " the limit is " + limit);
+        }
+      }
+    }
+
+    /**
+     * The communication thread handles communication with the parent (Task
+     * Tracker). It sends progress updates if progress has been made or if
+     * the task needs to let the parent know that it's alive. It also pings
+     * the parent to see if it's alive.
      */
     public void run() {
       final int MAX_RETRIES = 3;
@@ -763,8 +803,9 @@ abstract public class Task implements Writable, Configurable {
           if (sendProgress) {
             // we need to send progress update
             updateCounters();
+            checkTaskLimits();
             taskStatus.statusUpdate(taskProgress.get(),
-                                    taskProgress.toString(), 
+                                    taskProgress.toString(),
                                     counters);
             taskFound = umbilical.statusUpdate(taskId, taskStatus);
             taskStatus.clearStatus();
@@ -782,10 +823,21 @@ abstract public class Task implements Writable, Configurable {
             System.exit(66);
           }
 
-          sendProgress = resetProgressFlag(); 
+          sendProgress = resetProgressFlag();
           remainingRetries = MAX_RETRIES;
-        } 
-        catch (Throwable t) {
+        } catch (TaskLimitException e) {
+          String errMsg = "Task exceeded the limits: " +
+                  StringUtils.stringifyException(e);
+          LOG.fatal(errMsg);
+          try {
+            umbilical.fatalError(taskId, errMsg);
+          } catch (IOException ioe) {
+            LOG.fatal("Failed to update failure diagnosis", ioe);
+          }
+          LOG.fatal("Killing " + taskId);
+          resetDoneFlag();
+          ExitUtil.terminate(69);
+        } catch (Throwable t) {
           LOG.info("Communication exception: " + StringUtils.stringifyException(t));
           remainingRetries -=1;
           if (remainingRetries == 0) {
@@ -1043,7 +1095,7 @@ abstract public class Task implements Writable, Configurable {
                    TaskReporter reporter
                    ) throws IOException, InterruptedException {
     LOG.info("Task:" + taskId + " is done."
-             + " And is in the process of committing");
+            + " And is in the process of committing");
     updateCounters();
 
     boolean commitRequired = isCommitRequired();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/be72ed1c/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index 401aedb..0ed2f29 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -49,6 +49,11 @@ public interface MRJobConfig {
 
   public static final String TASK_CLEANUP_NEEDED = "mapreduce.job.committer.task.cleanup.needed";
 
+  public static final String TASK_LOCAL_WRITE_LIMIT_BYTES =
+          "mapreduce.task.local-fs.write-limit.bytes";
+  // negative values disable the limit
+  public static final long DEFAULT_TASK_LOCAL_WRITE_LIMIT_BYTES = -1;
+
   public static final String TASK_PROGRESS_REPORT_INTERVAL =
       "mapreduce.task.progress-report.interval";
   /** The number of milliseconds between progress reports. */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/be72ed1c/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index 0d15ca8..ae84f8c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -1781,4 +1781,16 @@
   </description>
 </property>
 
+<property>
+  <name>mapreduce.task.local-fs.write-limit.bytes</name>
+  <value>-1</value>
+  <description>Limit on the byte written to the local file system by each task.
+  This limit only applies to writes that go through the Hadoop filesystem APIs
+  within the task process (i.e.: writes that will update the local filesystem's
+  BYTES_WRITTEN counter). It does not cover other writes such as logging,
+  sideband writes from subprocesses (e.g.: streaming jobs), etc.
+  Negative values disable the limit.
+  default is -1</description>
+</property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/be72ed1c/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskProgressReporter.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskProgressReporter.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskProgressReporter.java
index 66632b3..b20ad96 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskProgressReporter.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskProgressReporter.java
@@ -19,15 +19,28 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
 
 import org.apache.hadoop.ipc.ProtocolSignature;
 import org.apache.hadoop.mapred.SortedRanges.Range;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.util.ExitUtil;
 import org.junit.Assert;
 import org.junit.Test;
 
 public class TestTaskProgressReporter {
   private static int statusUpdateTimes = 0;
+
+  // set to true if the thread is existed with ExitUtil.terminate
+  volatile boolean threadExited = false;
+
+  final static int LOCAL_BYTES_WRITTEN = 1024;
+
   private FakeUmbilical fakeUmbilical = new FakeUmbilical();
 
   private static class DummyTask extends Task {
@@ -120,13 +133,22 @@ public class TestTaskProgressReporter {
   }
 
   private class DummyTaskReporter extends Task.TaskReporter {
+    volatile boolean taskLimitIsChecked = false;
+
     public DummyTaskReporter(Task task) {
       task.super(task.getProgress(), fakeUmbilical);
     }
+
     @Override
     public void setProgress(float progress) {
       super.setProgress(progress);
     }
+
+    @Override
+    protected void checkTaskLimits() throws TaskLimitException {
+      taskLimitIsChecked = true;
+      super.checkTaskLimits();
+    }
   }
 
   @Test (timeout=10000)
@@ -144,4 +166,63 @@ public class TestTaskProgressReporter {
     t.join();
     Assert.assertEquals(statusUpdateTimes, 2);
   }
-}
\ No newline at end of file
+
+  @Test(timeout=10000)
+  public void testBytesWrittenRespectingLimit() throws Exception {
+    // add 1024 to the limit to account for writes not controlled by the test
+    testBytesWrittenLimit(LOCAL_BYTES_WRITTEN + 1024, false);
+  }
+
+  @Test(timeout=10000)
+  public void testBytesWrittenExceedingLimit() throws Exception {
+    testBytesWrittenLimit(LOCAL_BYTES_WRITTEN - 1, true);
+  }
+
+  /**
+   * This is to test the limit on BYTES_WRITTEN. The test is limited in that
+   * the check is done only once at the first loop of TaskReport#run.
+   * @param limit the limit on BYTES_WRITTEN in local file system
+   * @param failFast should the task fail fast with such limit?
+   * @throws Exception
+   */
+  public void testBytesWrittenLimit(long limit, boolean failFast)
+          throws Exception {
+    ExitUtil.disableSystemExit();
+    threadExited = false;
+    Thread.UncaughtExceptionHandler h = new Thread.UncaughtExceptionHandler() {
+      public void uncaughtException(Thread th, Throwable ex) {
+        System.out.println("Uncaught exception: " + ex);
+        if (ex instanceof ExitUtil.ExitException) {
+          threadExited = true;
+        }
+      }
+    };
+    JobConf conf = new JobConf();
+    // To disable task reporter sleeping
+    conf.getLong(MRJobConfig.TASK_PROGRESS_REPORT_INTERVAL, 0);
+    conf.setLong(MRJobConfig.TASK_LOCAL_WRITE_LIMIT_BYTES, limit);
+    LocalFileSystem localFS = FileSystem.getLocal(conf);
+    Path tmpPath = new Path("/tmp/testBytesWrittenLimit-tmpFile-"
+            + new Random(System.currentTimeMillis()).nextInt());
+    FSDataOutputStream out = localFS.create(tmpPath, true);
+    out.write(new byte[LOCAL_BYTES_WRITTEN]);
+    out.close();
+
+    Task task = new DummyTask();
+    task.setConf(conf);
+    DummyTaskReporter reporter = new DummyTaskReporter(task);
+    Thread t = new Thread(reporter);
+    t.setUncaughtExceptionHandler(h);
+    reporter.setProgressFlag();
+
+    t.start();
+    while (!reporter.taskLimitIsChecked) {
+      Thread.yield();
+    }
+
+    task.setTaskDone();
+    reporter.resetDoneFlag();
+    t.join();
+    Assert.assertEquals(failFast, threadExited);
+  }
+}


Mime
View raw message