Author: ddas Date: Sat Mar 1 01:06:54 2008 New Revision: 632573 URL: http://svn.apache.org/viewvc?rev=632573&view=rev Log: HADOOP-2790. Reverted patch due to conflicts in 0.16 Modified: hadoop/core/trunk/CHANGES.txt hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Modified: hadoop/core/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=632573&r1=632572&r2=632573&view=diff ============================================================================== --- hadoop/core/trunk/CHANGES.txt (original) +++ hadoop/core/trunk/CHANGES.txt Sat Mar 1 01:06:54 2008 @@ -191,10 +191,6 @@ HADOOP-2918. Improve error logging so that dfs writes failure with "No lease on file" can be diagnosed. (dhruba) - HADOOP-2790. Optimizes hasSpeculativeTask to do with getting the value of - time (System.getCurrentTimeMillis()). This is now done only once in the - beginning and is used for all TIPs. (Owen O'Malley via ddas). - Release 0.16.0 - 2008-02-07 INCOMPATIBLE CHANGES Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=632573&r1=632572&r2=632573&view=diff ============================================================================== --- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original) +++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Sat Mar 1 01:06:54 2008 @@ -108,8 +108,6 @@ private LocalFileSystem localFs; private String jobId; - private boolean hasSpeculativeMaps; - private boolean hasSpeculativeReduces; // Per-job counters public static enum Counter { @@ -181,8 +179,6 @@ this.jobMetrics.setTag("sessionId", conf.getSessionId()); this.jobMetrics.setTag("jobName", conf.getJobName()); this.jobMetrics.setTag("jobId", jobid); - hasSpeculativeMaps = conf.getMapSpeculativeExecution(); - hasSpeculativeReduces = conf.getReduceSpeculativeExecution(); } /** @@ -636,7 +632,7 @@ int target = findNewTask(tts, clusterSize, status.mapProgress(), - maps, nodesToMaps, hasSpeculativeMaps); + maps, nodesToMaps); if (target == -1) { return null; } @@ -672,7 +668,7 @@ } int target = findNewTask(tts, clusterSize, status.reduceProgress() , - reduces, null, hasSpeculativeReduces); + reduces, null); if (target == -1) { return null; } @@ -758,11 +754,10 @@ return trackerErrors; } - private boolean shouldRunSpeculativeTask(long currentTime, - TaskInProgress task, - double avgProgress, - String taskTracker) { - return task.hasSpeculativeTask(currentTime, avgProgress) && + private boolean shouldRunSpeculativeTask(TaskInProgress task, + double avgProgress, + String taskTracker) { + return task.hasSpeculativeTask(avgProgress) && !task.hasRunOnMachine(taskTracker); } @@ -774,15 +769,13 @@ * @param tasks The list of potential tasks to try * @param firstTaskToTry The first index in tasks to check * @param cachedTasks A list of tasks that would like to run on this node - * @param hasSpeculative Should it try to find speculative tasks * @return the index in tasks of the selected task (or -1 for no task) */ private int findNewTask(TaskTrackerStatus tts, int clusterSize, double avgProgress, TaskInProgress[] tasks, - Map> cachedTasks, - boolean hasSpeculative) { + Map> cachedTasks) { String taskTracker = tts.getTrackerName(); int specTarget = -1; @@ -806,7 +799,6 @@ } return -1; } - long currentTime = System.currentTimeMillis(); // // See if there is a split over a block that is stored on @@ -853,9 +845,8 @@ } return cacheTarget; } - if (hasSpeculative && specTarget == -1 && - shouldRunSpeculativeTask(currentTime, tip, avgProgress, - taskTracker)) { + if (specTarget == -1 && + shouldRunSpeculativeTask(tip, avgProgress, taskTracker)) { specTarget = tip.getIdWithinJob(); } } @@ -890,9 +881,8 @@ if (!isRunning) { LOG.info("Choosing normal task " + tasks[i].getTIPId()); return i; - } else if (hasSpeculative && specTarget == -1 && - shouldRunSpeculativeTask(currentTime, task, avgProgress, - taskTracker)) { + } else if (specTarget == -1 && + shouldRunSpeculativeTask(task, avgProgress, taskTracker)) { specTarget = i; } } Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=632573&r1=632572&r2=632573&view=diff ============================================================================== --- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original) +++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Sat Mar 1 01:06:54 2008 @@ -96,6 +96,7 @@ // currently runnings private TreeMap activeTasks = new TreeMap(); private JobConf conf; + private boolean runSpeculative; private Map> taskDiagnosticData = new TreeMap>(); /** @@ -130,6 +131,7 @@ this.conf = conf; this.partition = partition; setMaxTaskAttempts(); + this.runSpeculative = conf.getMapSpeculativeExecution(); init(JobTracker.getJobUniqueString(jobid)); } @@ -147,6 +149,7 @@ this.job = job; this.conf = conf; setMaxTaskAttempts(); + this.runSpeculative = conf.getReduceSpeculativeExecution(); init(JobTracker.getJobUniqueString(jobid)); } @@ -201,6 +204,11 @@ */ void init(String jobUniqueString) { this.startTime = System.currentTimeMillis(); + if ("true".equals(conf.get("mapred.speculative.execution"))) { + this.runSpeculative = true; + } else if ("false".equals(conf.get("mapred.speculative.execution"))) { + this.runSpeculative = false; + } this.taskIdPrefix = makeUniqueString(jobUniqueString); this.id = "tip_" + this.taskIdPrefix; } @@ -683,15 +691,16 @@ * far behind, and has been behind for a non-trivial amount of * time. */ - boolean hasSpeculativeTask(long currentTime, double averageProgress) { + boolean hasSpeculativeTask(double averageProgress) { // // REMIND - mjc - these constants should be examined // in more depth eventually... // if (activeTasks.size() <= MAX_TASK_EXECS && + runSpeculative && (averageProgress - progress >= SPECULATIVE_GAP) && - (currentTime - startTime >= SPECULATIVE_LAG) + (System.currentTimeMillis() - startTime >= SPECULATIVE_LAG) && completes == 0 && !isOnlyCommitPending()) { return true; }