hadoop-mapreduce-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jason Lowe (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (MAPREDUCE-6920) Cannot find the effect of mapreduce.job.speculative.slowtaskthreshold parameter
Date Wed, 26 Jul 2017 16:20:00 GMT

    [ https://issues.apache.org/jira/browse/MAPREDUCE-6920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101869#comment-16101869
] 

Jason Lowe commented on MAPREDUCE-6920:
---------------------------------------

I agree that mapreduce.job.speculative.slowtaskthreshold is essentially useless.  It will
have very little effect in practice unless set to a very large value and the standard deviation
of completed task durations is significant.  And even then, all it would do is serve to disable
speculation which is of dubious utility.

If I were to guess it looked like someone originally intended for this property to control
the speculation trigger threshold via scaling the standard deviation of task completion times.
 If an estimated running task duration were to be below that threshold then we would never
speculate it until the estimate changed to exceed that threshold.  But that's not what it
does in practice.  Essentially thresholdRuntime is returning a boolean value, via Long.MAX_VALUE
or otherwise, that a task should or should not be speculated, so calculating a realistic threshold
is a waste of time.

If my guess is correct, I'm not sure that check would actually matter in practice if performed.
 We already do not speculate a task if we expect the existing task to complete before a newly
scheduled task would complete, and usually the existing task already has a significant head
start.

> Cannot find the effect of mapreduce.job.speculative.slowtaskthreshold parameter
> -------------------------------------------------------------------------------
>
>                 Key: MAPREDUCE-6920
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-6920
>             Project: Hadoop Map/Reduce
>          Issue Type: Bug
>    Affects Versions: 2.7.1
>            Reporter: NING DING
>            Priority: Minor
>
> The description of parameter mapreduce.job.speculative.slowtaskthreshold is as below.
> {code:xml}
> <property>
>   <name>mapreduce.job.speculative.slowtaskthreshold</name>
>   <value>1.0</value>
>   <description>The number of standard deviations by which a task's
>   ave progress-rates must be lower than the average of all running tasks'
>   for the task to be considered too slow.
>   </description>
> </property>
> {code}
> But from the source code I find it has no effect for starting speculative task.
> The call stack is as below.
> DefaultSpeculator.speculationValue -> StartEndTimesBase.thresholdRuntime -> DataStatistics.outlier
> {code:title=DefaultSpeculator.java|borderStyle=solid}
>   private TaskItem speculationValue(TaskId taskID, long now) {
>     TaskItem taskItem = new TaskItem();
>     Job job = context.getJob(taskID.getJobId());
>     Task task = job.getTask(taskID);
>     Map<TaskAttemptId, TaskAttempt> attempts = task.getAttempts();
>     long acceptableRuntime = Long.MIN_VALUE;
>     long speculationValue = Long.MIN_VALUE;
>     if (!mayHaveSpeculated.contains(taskID)) {
>       acceptableRuntime = estimator.thresholdRuntime(taskID);
>       if (acceptableRuntime == Long.MAX_VALUE) {
>         taskItem.setSpeculationValue(ON_SCHEDULE);
>         return taskItem;
>       }
>     }
>    ...
>   }
> {code}
> {code:title=StartEndTimesBase.java|borderStyle=solid}
>   public long thresholdRuntime(TaskId taskID) {
>     JobId jobID = taskID.getJobId();
>     Job job = context.getJob(jobID);
>     TaskType type = taskID.getTaskType();
>     DataStatistics statistics
>         = dataStatisticsForTask(taskID);
>     int completedTasksOfType
>         = type == TaskType.MAP
>             ? job.getCompletedMaps() : job.getCompletedReduces();
>     int totalTasksOfType
>         = type == TaskType.MAP
>             ? job.getTotalMaps() : job.getTotalReduces();
>     if (completedTasksOfType < MINIMUM_COMPLETE_NUMBER_TO_SPECULATE
>         || (((float)completedTasksOfType) / totalTasksOfType)
>               < MINIMUM_COMPLETE_PROPORTION_TO_SPECULATE ) {
>       return Long.MAX_VALUE;
>     }
>     long result =  statistics == null
>         ? Long.MAX_VALUE
>         : (long)statistics.outlier(slowTaskRelativeTresholds.get(job));
>     return result;
>   }
> {code}
> {code:title=DataStatistics.java|borderStyle=solid}
>   public synchronized double outlier(float sigma) {
>     if (count != 0.0) {
>       return mean() + std() * sigma;
>     }
>     return 0.0;
>   }
> {code}
> The StartEndTimesBase.contextualize read mapreduce.job.speculative.slowtaskthreshold
parameter value, then use it as outlier method parameter sigma value.
> {code:title=StartEndTimesBase.java|borderStyle=solid}
>   public void contextualize(Configuration conf, AppContext context) {
>     this.context = context;
>     Map<JobId, Job> allJobs = context.getAllJobs();
>     for (Map.Entry<JobId, Job> entry : allJobs.entrySet()) {
>       final Job job = entry.getValue();
>       mapperStatistics.put(job, new DataStatistics());
>       reducerStatistics.put(job, new DataStatistics());
>       slowTaskRelativeTresholds.put
>           (job, conf.getFloat(MRJobConfig.SPECULATIVE_SLOWTASK_THRESHOLD,1.0f));
>     }
>   }
> {code}
> I think the outlier return value is hard to be Long.MAX_VALUE no matter what the mapreduce.job.speculative.slowtaskthreshold
parameter value is.
> Then it cannot affect the return value of DefaultSpeculator.speculationValue method.
> Then I run a test for this parameter. Test source code is as below.
> {code:title=TestSpeculativeTask.java|borderStyle=solid}
> package test.speculation;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.conf.Configured;
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.io.NullWritable;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapreduce.Job;
> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
> import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
> import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
> import org.apache.hadoop.util.Tool;
> import org.apache.hadoop.util.ToolRunner;
> public class TestSpeculativeTask extends Configured implements Tool {
>   public TestSpeculativeTask() {
>   }
>   public int run(String[] args) throws Exception {
>     Configuration conf = getConf();
>     Job job = Job.getInstance(conf);
>     FileInputFormat.setInputPaths(job, args[0]);
>     job.setMapperClass(SpeculativeTestMapper.class);
>     job.setNumReduceTasks(0);
>     job.setJarByClass(SpeculativeTestMapper.class);
>     Path output = new Path(args[1]);
>     FileOutputFormat.setOutputPath(job, output);
>     job.setOutputFormatClass(TextOutputFormat.class);
>     job.setOutputKeyClass(Text.class);
>     job.setOutputValueClass(NullWritable.class);
>     job.waitForCompletion(true);
>     return 0;
>   }
>   public static void main(String[] args) throws Exception {
>     int res =
>         ToolRunner.run(new Configuration(), new TestSpeculativeTask(), args);
>     System.exit(res);
>   }
> }
> {code}
> {code:title=SpeculativeTestMapper.java|borderStyle=solid}
> package test.speculation;
> import java.io.IOException;
> import org.apache.commons.logging.Log;
> import org.apache.commons.logging.LogFactory;
> import org.apache.hadoop.io.NullWritable;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapreduce.Mapper;
> public class SpeculativeTestMapper<K> extends
>     Mapper<K, Text, Text, NullWritable> {
>   private Log log = LogFactory.getLog(this.getClass());
>   public void map(K key, Text value, Context context) throws IOException,
>       InterruptedException {
>     String text = value.toString();
>     log.info("processing " + text);
>     try {
>       Thread.sleep(60 * 1000);
>     } catch (Exception e) {
>       throw new RuntimeException(e);
>     }
>     context.write(new Text(text), NullWritable.get());
>   }
> }
> {code}
> The input path has 10 files in hdfs. Only one file has 10 lines and other files has one
line.
> The mapper task processing that file has 10 lines must be slow and cause speculative
task attempt.
> The test result is the speculative task start time has no obviouse difference for mapreduce.job.speculative.slowtaskthreshold=1
and mapreduce.job.speculative.slowtaskthreshold=10.
> If any one find the same issue. Or I misunderstand mapreduce.job.speculative.slowtaskthreshold
parameter.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: mapreduce-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: mapreduce-issues-help@hadoop.apache.org


Mime
View raw message