Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id CEA63200CD1 for ; Wed, 26 Jul 2017 18:20:08 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id CD5E516921F; Wed, 26 Jul 2017 16:20:08 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id EE5D416921A for ; Wed, 26 Jul 2017 18:20:07 +0200 (CEST) Received: (qmail 29834 invoked by uid 500); 26 Jul 2017 16:20:06 -0000 Mailing-List: contact mapreduce-issues-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list mapreduce-issues@hadoop.apache.org Received: (qmail 29823 invoked by uid 99); 26 Jul 2017 16:20:06 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Jul 2017 16:20:06 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 33AB6C02BE for ; Wed, 26 Jul 2017 16:20:06 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.202 X-Spam-Level: X-Spam-Status: No, score=-99.202 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id eXi8fKcluoXi for ; Wed, 26 Jul 2017 16:20:04 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id 809805FBBD for ; Wed, 26 Jul 2017 16:20:03 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 1298AE01E5 for ; Wed, 26 Jul 2017 16:20:02 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 178F824818 for ; Wed, 26 Jul 2017 16:20:00 +0000 (UTC) Date: Wed, 26 Jul 2017 16:20:00 +0000 (UTC) From: "Jason Lowe (JIRA)" To: mapreduce-issues@hadoop.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (MAPREDUCE-6920) Cannot find the effect of mapreduce.job.speculative.slowtaskthreshold parameter MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Wed, 26 Jul 2017 16:20:09 -0000 [ https://issues.apache.org/jira/browse/MAPREDUCE-6920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101869#comment-16101869 ] Jason Lowe commented on MAPREDUCE-6920: --------------------------------------- I agree that mapreduce.job.speculative.slowtaskthreshold is essentially useless. It will have very little effect in practice unless set to a very large value and the standard deviation of completed task durations is significant. And even then, all it would do is serve to disable speculation which is of dubious utility. If I were to guess it looked like someone originally intended for this property to control the speculation trigger threshold via scaling the standard deviation of task completion times. If an estimated running task duration were to be below that threshold then we would never speculate it until the estimate changed to exceed that threshold. But that's not what it does in practice. Essentially thresholdRuntime is returning a boolean value, via Long.MAX_VALUE or otherwise, that a task should or should not be speculated, so calculating a realistic threshold is a waste of time. If my guess is correct, I'm not sure that check would actually matter in practice if performed. We already do not speculate a task if we expect the existing task to complete before a newly scheduled task would complete, and usually the existing task already has a significant head start. > Cannot find the effect of mapreduce.job.speculative.slowtaskthreshold parameter > ------------------------------------------------------------------------------- > > Key: MAPREDUCE-6920 > URL: https://issues.apache.org/jira/browse/MAPREDUCE-6920 > Project: Hadoop Map/Reduce > Issue Type: Bug > Affects Versions: 2.7.1 > Reporter: NING DING > Priority: Minor > > The description of parameter mapreduce.job.speculative.slowtaskthreshold is as below. > {code:xml} > > mapreduce.job.speculative.slowtaskthreshold > 1.0 > The number of standard deviations by which a task's > ave progress-rates must be lower than the average of all running tasks' > for the task to be considered too slow. > > > {code} > But from the source code I find it has no effect for starting speculative task. > The call stack is as below. > DefaultSpeculator.speculationValue -> StartEndTimesBase.thresholdRuntime -> DataStatistics.outlier > {code:title=DefaultSpeculator.java|borderStyle=solid} > private TaskItem speculationValue(TaskId taskID, long now) { > TaskItem taskItem = new TaskItem(); > Job job = context.getJob(taskID.getJobId()); > Task task = job.getTask(taskID); > Map attempts = task.getAttempts(); > long acceptableRuntime = Long.MIN_VALUE; > long speculationValue = Long.MIN_VALUE; > if (!mayHaveSpeculated.contains(taskID)) { > acceptableRuntime = estimator.thresholdRuntime(taskID); > if (acceptableRuntime == Long.MAX_VALUE) { > taskItem.setSpeculationValue(ON_SCHEDULE); > return taskItem; > } > } > ... > } > {code} > {code:title=StartEndTimesBase.java|borderStyle=solid} > public long thresholdRuntime(TaskId taskID) { > JobId jobID = taskID.getJobId(); > Job job = context.getJob(jobID); > TaskType type = taskID.getTaskType(); > DataStatistics statistics > = dataStatisticsForTask(taskID); > int completedTasksOfType > = type == TaskType.MAP > ? job.getCompletedMaps() : job.getCompletedReduces(); > int totalTasksOfType > = type == TaskType.MAP > ? job.getTotalMaps() : job.getTotalReduces(); > if (completedTasksOfType < MINIMUM_COMPLETE_NUMBER_TO_SPECULATE > || (((float)completedTasksOfType) / totalTasksOfType) > < MINIMUM_COMPLETE_PROPORTION_TO_SPECULATE ) { > return Long.MAX_VALUE; > } > long result = statistics == null > ? Long.MAX_VALUE > : (long)statistics.outlier(slowTaskRelativeTresholds.get(job)); > return result; > } > {code} > {code:title=DataStatistics.java|borderStyle=solid} > public synchronized double outlier(float sigma) { > if (count != 0.0) { > return mean() + std() * sigma; > } > return 0.0; > } > {code} > The StartEndTimesBase.contextualize read mapreduce.job.speculative.slowtaskthreshold parameter value, then use it as outlier method parameter sigma value. > {code:title=StartEndTimesBase.java|borderStyle=solid} > public void contextualize(Configuration conf, AppContext context) { > this.context = context; > Map allJobs = context.getAllJobs(); > for (Map.Entry entry : allJobs.entrySet()) { > final Job job = entry.getValue(); > mapperStatistics.put(job, new DataStatistics()); > reducerStatistics.put(job, new DataStatistics()); > slowTaskRelativeTresholds.put > (job, conf.getFloat(MRJobConfig.SPECULATIVE_SLOWTASK_THRESHOLD,1.0f)); > } > } > {code} > I think the outlier return value is hard to be Long.MAX_VALUE no matter what the mapreduce.job.speculative.slowtaskthreshold parameter value is. > Then it cannot affect the return value of DefaultSpeculator.speculationValue method. > Then I run a test for this parameter. Test source code is as below. > {code:title=TestSpeculativeTask.java|borderStyle=solid} > package test.speculation; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.conf.Configured; > import org.apache.hadoop.fs.Path; > import org.apache.hadoop.io.NullWritable; > import org.apache.hadoop.io.Text; > import org.apache.hadoop.mapreduce.Job; > import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; > import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; > import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; > import org.apache.hadoop.util.Tool; > import org.apache.hadoop.util.ToolRunner; > public class TestSpeculativeTask extends Configured implements Tool { > public TestSpeculativeTask() { > } > public int run(String[] args) throws Exception { > Configuration conf = getConf(); > Job job = Job.getInstance(conf); > FileInputFormat.setInputPaths(job, args[0]); > job.setMapperClass(SpeculativeTestMapper.class); > job.setNumReduceTasks(0); > job.setJarByClass(SpeculativeTestMapper.class); > Path output = new Path(args[1]); > FileOutputFormat.setOutputPath(job, output); > job.setOutputFormatClass(TextOutputFormat.class); > job.setOutputKeyClass(Text.class); > job.setOutputValueClass(NullWritable.class); > job.waitForCompletion(true); > return 0; > } > public static void main(String[] args) throws Exception { > int res = > ToolRunner.run(new Configuration(), new TestSpeculativeTask(), args); > System.exit(res); > } > } > {code} > {code:title=SpeculativeTestMapper.java|borderStyle=solid} > package test.speculation; > import java.io.IOException; > import org.apache.commons.logging.Log; > import org.apache.commons.logging.LogFactory; > import org.apache.hadoop.io.NullWritable; > import org.apache.hadoop.io.Text; > import org.apache.hadoop.mapreduce.Mapper; > public class SpeculativeTestMapper extends > Mapper { > private Log log = LogFactory.getLog(this.getClass()); > public void map(K key, Text value, Context context) throws IOException, > InterruptedException { > String text = value.toString(); > log.info("processing " + text); > try { > Thread.sleep(60 * 1000); > } catch (Exception e) { > throw new RuntimeException(e); > } > context.write(new Text(text), NullWritable.get()); > } > } > {code} > The input path has 10 files in hdfs. Only one file has 10 lines and other files has one line. > The mapper task processing that file has 10 lines must be slow and cause speculative task attempt. > The test result is the speculative task start time has no obviouse difference for mapreduce.job.speculative.slowtaskthreshold=1 and mapreduce.job.speculative.slowtaskthreshold=10. > If any one find the same issue. Or I misunderstand mapreduce.job.speculative.slowtaskthreshold parameter. -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: mapreduce-issues-unsubscribe@hadoop.apache.org For additional commands, e-mail: mapreduce-issues-help@hadoop.apache.org