hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rkan...@apache.org
Subject hadoop git commit: MAPREDUCE-6143. add configuration for mapreduce speculative execution in MR2 (zxu via rkanter)
Date Mon, 02 Feb 2015 21:53:10 GMT
Repository: hadoop
Updated Branches:
  refs/heads/trunk d085eb15d -> 8acc5e9b4


MAPREDUCE-6143. add configuration for mapreduce speculative execution in MR2 (zxu via rkanter)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8acc5e9b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8acc5e9b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8acc5e9b

Branch: refs/heads/trunk
Commit: 8acc5e9b4b3fea8b418b3526c15022c8a9fedd56
Parents: d085eb1
Author: Robert Kanter <rkanter@apache.org>
Authored: Mon Feb 2 13:43:05 2015 -0800
Committer: Robert Kanter <rkanter@apache.org>
Committed: Mon Feb 2 13:51:08 2015 -0800

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |  3 +
 .../v2/app/speculate/DefaultSpeculator.java     | 62 ++++++++++++++++----
 .../mapreduce/v2/app/TestRuntimeEstimators.java | 16 +++++
 .../apache/hadoop/mapreduce/MRJobConfig.java    | 29 +++++++++
 .../hadoop/mapreduce/util/ConfigUtil.java       |  4 +-
 .../src/main/resources/mapred-default.xml       | 41 +++++++++----
 6 files changed, 130 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8acc5e9b/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 7672f08..3408580 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -268,6 +268,9 @@ Release 2.7.0 - UNRELEASED
 
     MAPREDUCE-6151. Update document of GridMix (Masatake Iwasaki via aw)
 
+    MAPREDUCE-6143. add configuration for mapreduce speculative execution in
+    MR2 (zxu via rkanter)
+
   OPTIMIZATIONS
 
     MAPREDUCE-6169. MergeQueue should release reference to the current item 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8acc5e9b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java
index 392a51a..07a49af 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.util.Clock;
 
+import com.google.common.annotations.VisibleForTesting;
 
 public class DefaultSpeculator extends AbstractService implements
     Speculator {
@@ -62,12 +63,11 @@ public class DefaultSpeculator extends AbstractService implements
   private static final long NOT_RUNNING = Long.MIN_VALUE + 4;
   private static final long TOO_LATE_TO_SPECULATE = Long.MIN_VALUE + 5;
 
-  private static final long SOONEST_RETRY_AFTER_NO_SPECULATE = 1000L * 1L;
-  private static final long SOONEST_RETRY_AFTER_SPECULATE = 1000L * 15L;
-
-  private static final double PROPORTION_RUNNING_TASKS_SPECULATABLE = 0.1;
-  private static final double PROPORTION_TOTAL_TASKS_SPECULATABLE = 0.01;
-  private static final int  MINIMUM_ALLOWED_SPECULATIVE_TASKS = 10;
+  private long soonestRetryAfterNoSpeculate;
+  private long soonestRetryAfterSpeculate;
+  private double proportionRunningTasksSpeculatable;
+  private double proportionTotalTasksSpeculatable;
+  private int  minimumAllowedSpeculativeTasks;
 
   private static final Log LOG = LogFactory.getLog(DefaultSpeculator.class);
 
@@ -163,6 +163,21 @@ public class DefaultSpeculator extends AbstractService implements
     this.estimator = estimator;
     this.clock = clock;
     this.eventHandler = context.getEventHandler();
+    this.soonestRetryAfterNoSpeculate =
+        conf.getLong(MRJobConfig.SPECULATIVE_RETRY_AFTER_NO_SPECULATE,
+                MRJobConfig.DEFAULT_SPECULATIVE_RETRY_AFTER_NO_SPECULATE);
+    this.soonestRetryAfterSpeculate =
+        conf.getLong(MRJobConfig.SPECULATIVE_RETRY_AFTER_SPECULATE,
+                MRJobConfig.DEFAULT_SPECULATIVE_RETRY_AFTER_SPECULATE);
+    this.proportionRunningTasksSpeculatable =
+        conf.getDouble(MRJobConfig.SPECULATIVECAP_RUNNING_TASKS,
+                MRJobConfig.DEFAULT_SPECULATIVECAP_RUNNING_TASKS);
+    this.proportionTotalTasksSpeculatable =
+        conf.getDouble(MRJobConfig.SPECULATIVECAP_TOTAL_TASKS,
+                MRJobConfig.DEFAULT_SPECULATIVECAP_TOTAL_TASKS);
+    this.minimumAllowedSpeculativeTasks =
+        conf.getInt(MRJobConfig.SPECULATIVE_MINIMUM_ALLOWED_TASKS,
+                MRJobConfig.DEFAULT_SPECULATIVE_MINIMUM_ALLOWED_TASKS);
   }
 
 /*   *************************************************************    */
@@ -182,8 +197,8 @@ public class DefaultSpeculator extends AbstractService implements
                 try {
                   int speculations = computeSpeculations();
                   long mininumRecomp
-                      = speculations > 0 ? SOONEST_RETRY_AFTER_SPECULATE
-                                         : SOONEST_RETRY_AFTER_NO_SPECULATE;
+                      = speculations > 0 ? soonestRetryAfterSpeculate
+                                         : soonestRetryAfterNoSpeculate;
 
                   long wait = Math.max(mininumRecomp,
                         clock.getTime() - backgroundRunStartTime);
@@ -497,8 +512,8 @@ public class DefaultSpeculator extends AbstractService implements
       Map<TaskId, Task> tasks = job.getTasks(type);
 
       int numberAllowedSpeculativeTasks
-          = (int) Math.max(MINIMUM_ALLOWED_SPECULATIVE_TASKS,
-                           PROPORTION_TOTAL_TASKS_SPECULATABLE * tasks.size());
+          = (int) Math.max(minimumAllowedSpeculativeTasks,
+              proportionTotalTasksSpeculatable * tasks.size());
 
       TaskId bestTaskID = null;
       long bestSpeculationValue = -1L;
@@ -523,7 +538,7 @@ public class DefaultSpeculator extends AbstractService implements
       }
       numberAllowedSpeculativeTasks
           = (int) Math.max(numberAllowedSpeculativeTasks,
-                           PROPORTION_RUNNING_TASKS_SPECULATABLE * numberRunningTasks);
+              proportionRunningTasksSpeculatable * numberRunningTasks);
 
       // If we found a speculation target, fire it off
       if (bestTaskID != null
@@ -583,4 +598,29 @@ public class DefaultSpeculator extends AbstractService implements
       this.lastHeartBeatTime = lastHeartBeatTime;
     }
   }
+
+  @VisibleForTesting
+  public long getSoonestRetryAfterNoSpeculate() {
+    return soonestRetryAfterNoSpeculate;
+  }
+
+  @VisibleForTesting
+  public long getSoonestRetryAfterSpeculate() {
+    return soonestRetryAfterSpeculate;
+  }
+
+  @VisibleForTesting
+  public double getProportionRunningTasksSpeculatable() {
+    return proportionRunningTasksSpeculatable;
+  }
+
+  @VisibleForTesting
+  public double getProportionTotalTasksSpeculatable() {
+    return proportionTotalTasksSpeculatable;
+  }
+
+  @VisibleForTesting
+  public int getMinimumAllowedSpeculativeTasks() {
+    return minimumAllowedSpeculativeTasks;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8acc5e9b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
index c25cf50..fe0f341 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.TaskCompletionEvent;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
@@ -137,7 +138,22 @@ public class TestRuntimeEstimators {
 
     estimator.contextualize(conf, myAppContext);
 
+    conf.setLong(MRJobConfig.SPECULATIVE_RETRY_AFTER_NO_SPECULATE, 500L);
+    conf.setLong(MRJobConfig.SPECULATIVE_RETRY_AFTER_SPECULATE, 5000L);
+    conf.setDouble(MRJobConfig.SPECULATIVECAP_RUNNING_TASKS, 0.1);
+    conf.setDouble(MRJobConfig.SPECULATIVECAP_TOTAL_TASKS, 0.001);
+    conf.setInt(MRJobConfig.SPECULATIVE_MINIMUM_ALLOWED_TASKS, 5);
     speculator = new DefaultSpeculator(conf, myAppContext, estimator, clock);
+    Assert.assertEquals("wrong SPECULATIVE_RETRY_AFTER_NO_SPECULATE value",
+        500L, speculator.getSoonestRetryAfterNoSpeculate());
+    Assert.assertEquals("wrong SPECULATIVE_RETRY_AFTER_SPECULATE value",
+        5000L, speculator.getSoonestRetryAfterSpeculate());
+    Assert.assertEquals(speculator.getProportionRunningTasksSpeculatable(),
+        0.1, 0.00001);
+    Assert.assertEquals(speculator.getProportionTotalTasksSpeculatable(),
+        0.001, 0.00001);
+    Assert.assertEquals("wrong SPECULATIVE_MINIMUM_ALLOWED_TASKS value",
+        5, speculator.getMinimumAllowedSpeculativeTasks());
 
     dispatcher.register(Speculator.EventType.class, speculator);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8acc5e9b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index 44f57f4..d06b075 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -86,12 +86,41 @@ public interface MRJobConfig {
 
   public static final String SKIP_OUTDIR = "mapreduce.job.skip.outdir";
 
+  // SPECULATIVE_SLOWNODE_THRESHOLD is obsolete and will be deleted in the future
+  @Deprecated
   public static final String SPECULATIVE_SLOWNODE_THRESHOLD = "mapreduce.job.speculative.slownodethreshold";
 
   public static final String SPECULATIVE_SLOWTASK_THRESHOLD = "mapreduce.job.speculative.slowtaskthreshold";
 
+  // SPECULATIVECAP is obsolete and will be deleted in the future
+  @Deprecated
   public static final String SPECULATIVECAP = "mapreduce.job.speculative.speculativecap";
 
+  public static final String SPECULATIVECAP_RUNNING_TASKS =
+      "mapreduce.job.speculative.speculative-cap-running-tasks";
+  public static final double DEFAULT_SPECULATIVECAP_RUNNING_TASKS =
+      0.1;
+
+  public static final String SPECULATIVECAP_TOTAL_TASKS =
+      "mapreduce.job.speculative.speculative-cap-total-tasks";
+  public static final double DEFAULT_SPECULATIVECAP_TOTAL_TASKS =
+      0.01;
+
+  public static final String SPECULATIVE_MINIMUM_ALLOWED_TASKS =
+      "mapreduce.job.speculative.minimum-allowed-tasks";
+  public static final int DEFAULT_SPECULATIVE_MINIMUM_ALLOWED_TASKS =
+      10;
+
+  public static final String SPECULATIVE_RETRY_AFTER_NO_SPECULATE =
+      "mapreduce.job.speculative.retry-after-no-speculate";
+  public static final long DEFAULT_SPECULATIVE_RETRY_AFTER_NO_SPECULATE =
+      1000L;
+
+  public static final String SPECULATIVE_RETRY_AFTER_SPECULATE =
+      "mapreduce.job.speculative.retry-after-speculate";
+  public static final long DEFAULT_SPECULATIVE_RETRY_AFTER_SPECULATE =
+      15000L;
+
   public static final String JOB_LOCAL_DIR = "mapreduce.job.local.dir";
 
   public static final String OUTPUT_KEY_CLASS = "mapreduce.job.output.key.class";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8acc5e9b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java
index b1756ce..bbbf99c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java
@@ -214,12 +214,10 @@ public class ConfigUtil {
         MRJobConfig.SKIP_RECORDS),
       new DeprecationDelta("mapred.skip.out.dir",
         MRJobConfig.SKIP_OUTDIR),
-      new DeprecationDelta("mapred.speculative.execution.slowNodeThreshold",
-        MRJobConfig.SPECULATIVE_SLOWNODE_THRESHOLD),
       new DeprecationDelta("mapred.speculative.execution.slowTaskThreshold",
         MRJobConfig.SPECULATIVE_SLOWTASK_THRESHOLD),
       new DeprecationDelta("mapred.speculative.execution.speculativeCap",
-        MRJobConfig.SPECULATIVECAP),
+        MRJobConfig.SPECULATIVECAP_RUNNING_TASKS),
       new DeprecationDelta("job.local.dir",
         MRJobConfig.JOB_LOCAL_DIR),
       new DeprecationDelta("mapreduce.inputformat.class",

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8acc5e9b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index 4535137..6e80679 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -490,14 +490,43 @@
   <description>If true, then multiple instances of some reduce tasks 
                may be executed in parallel.</description>
 </property>
+
 <property>
-  <name>mapreduce.job.speculative.speculativecap</name>
+  <name>mapreduce.job.speculative.speculative-cap-running-tasks</name>
   <value>0.1</value>
   <description>The max percent (0-1) of running tasks that
   can be speculatively re-executed at any time.</description>
 </property>
 
 <property>
+  <name>mapreduce.job.speculative.speculative-cap-total-tasks</name>
+  <value>0.01</value>
+  <description>The max percent (0-1) of all tasks that
+  can be speculatively re-executed at any time.</description>
+</property>
+
+<property>
+  <name>mapreduce.job.speculative.minimum-allowed-tasks</name>
+  <value>10</value>
+  <description>The minimum allowed tasks that
+  can be speculatively re-executed at any time.</description>
+</property>
+
+<property>
+  <name>mapreduce.job.speculative.retry-after-no-speculate</name>
+  <value>1000</value>
+  <description>The waiting time(ms) to do next round of speculation
+  if there is no task speculated in this round.</description>
+</property>
+
+<property>
+  <name>mapreduce.job.speculative.retry-after-speculate</name>
+  <value>15000</value>
+  <description>The waiting time(ms) to do next round of speculation
+  if there are tasks speculated in this round.</description>
+</property>
+
+<property>
   <name>mapreduce.job.map.output.collector.class</name>
   <value>org.apache.hadoop.mapred.MapTask$MapOutputBuffer</value>
   <description>
@@ -517,16 +546,6 @@
 </property>
 
 <property>
-  <name>mapreduce.job.speculative.slownodethreshold</name>
-  <value>1.0</value>
-  <description>The number of standard deviations by which a Task 
-  Tracker's average map and reduce progress-rates (finishTime-dispatchTime)
-  must be lower than the average of all successful map/reduce task's for
-  the NodeManager to be considered too slow to give a speculative task to.
-  </description>
-</property>
-
-<property>
   <name>mapreduce.job.ubertask.enable</name>
   <value>false</value>
   <description>Whether to enable the small-jobs "ubertask" optimization,


Mime
View raw message