Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 62614 invoked from network); 17 Jun 2009 20:56:18 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 17 Jun 2009 20:56:18 -0000 Received: (qmail 43177 invoked by uid 500); 17 Jun 2009 20:56:29 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 43114 invoked by uid 500); 17 Jun 2009 20:56:29 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 43105 invoked by uid 99); 17 Jun 2009 20:56:29 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 17 Jun 2009 20:56:29 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 17 Jun 2009 20:56:16 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 97D8E23888CF; Wed, 17 Jun 2009 20:55:56 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r785794 [2/3] - in /hadoop/core/branches/HADOOP-4687: core/ core/src/java/ core/src/java/org/apache/hadoop/fs/ core/src/java/org/apache/hadoop/util/ core/src/test/core/ core/src/test/core/org/apache/hadoop/util/ hdfs/ hdfs/src/java/ hdfs/sr... Date: Wed, 17 Jun 2009 20:55:54 -0000 To: core-commits@hadoop.apache.org From: omalley@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090617205556.97D8E23888CF@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=785794&r1=785793&r2=785794&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobInProgress.java (original) +++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobInProgress.java Wed Jun 17 20:55:51 2009 @@ -21,6 +21,10 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.HashMap; import java.util.IdentityHashMap; import java.util.Iterator; import java.util.LinkedHashSet; @@ -103,7 +107,7 @@ private volatile boolean jobFailed = false; JobPriority priority = JobPriority.NORMAL; - final JobTracker jobtracker; + protected JobTracker jobtracker; // NetworkTopology Node to the set of TIPs Map> nonRunningMapCache; @@ -129,14 +133,14 @@ // A list of cleanup tasks for the reduce task attempts, to be launched List reduceCleanupTasks = new LinkedList(); - private final int maxLevel; + private int maxLevel; /** * A special value indicating that * {@link #findNewMapTask(TaskTrackerStatus, int, int, int, double)} should * schedule any available map tasks for this job, including speculative tasks. */ - private final int anyCacheLevel; + private int anyCacheLevel; /** * A special value indicating that @@ -189,9 +193,13 @@ private MetricsRecord jobMetrics; - // Maximum no. of fetch-failure notifications after which - // the map task is killed + // Maximum no. of fetch-failure notifications after which map task is killed private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3; + + // Don't lower speculativeCap below one TT's worth (for small clusters) + private static final int MIN_SPEC_CAP = 10; + + private static final float MIN_SLOTS_CAP = 0.01f; // Map of mapTaskId -> no. of fetch failures private Map mapTaskIdToFetchFailuresMap = @@ -199,19 +207,65 @@ private Object schedulingInfo; - + //thresholds for speculative execution + private float slowTaskThreshold; + private float speculativeCap; + private float slowNodeThreshold; //standard deviations + + //Statistics are maintained for a couple of things + //mapTaskStats is used for maintaining statistics about + //the completion time of map tasks on the trackers. On a per + //tracker basis, the mean time for task completion is maintained + private DataStatistics mapTaskStats = new DataStatistics(); + //reduceTaskStats is used for maintaining statistics about + //the completion time of reduce tasks on the trackers. On a per + //tracker basis, the mean time for task completion is maintained + private DataStatistics reduceTaskStats = new DataStatistics(); + //trackerMapStats used to maintain a mapping from the tracker to the + //the statistics about completion time of map tasks + private Map trackerMapStats = + new HashMap(); + //trackerReduceStats used to maintain a mapping from the tracker to the + //the statistics about completion time of reduce tasks + private Map trackerReduceStats = + new HashMap(); + //runningMapStats used to maintain the RUNNING map tasks' statistics + private DataStatistics runningMapTaskStats = new DataStatistics(); + //runningReduceStats used to maintain the RUNNING reduce tasks' statistics + private DataStatistics runningReduceTaskStats = new DataStatistics(); + /** * Create an almost empty JobInProgress, which can be used only for tests */ - protected JobInProgress(JobID jobid, JobConf conf) { + protected JobInProgress(JobID jobid, JobConf conf, JobTracker tracker) { this.conf = conf; this.jobId = jobid; this.numMapTasks = conf.getNumMapTasks(); this.numReduceTasks = conf.getNumReduceTasks(); this.maxLevel = NetworkTopology.DEFAULT_HOST_LEVEL; this.anyCacheLevel = this.maxLevel+1; - this.jobtracker = null; + this.jobtracker = tracker; this.restartCount = 0; + + hasSpeculativeMaps = conf.getMapSpeculativeExecution(); + hasSpeculativeReduces = conf.getReduceSpeculativeExecution(); + this.nonLocalMaps = new LinkedList(); + this.nonLocalRunningMaps = new LinkedHashSet(); + this.runningMapCache = new IdentityHashMap>(); + this.nonRunningReduces = new LinkedList(); + this.runningReduces = new LinkedHashSet(); + this.resourceEstimator = new ResourceEstimator(this); + this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP); + this.taskCompletionEvents = new ArrayList + (numMapTasks + numReduceTasks + 10); + + this.slowTaskThreshold = Math.max(0.0f, + conf.getFloat("mapred.speculative.execution.slowTaskThreshold",1.0f)); + this.speculativeCap = conf.getFloat( + "mapred.speculative.execution.speculativeCap",0.1f); + this.slowNodeThreshold = conf.getFloat( + "mapred.speculative.execution.slowNodeThreshold",1.0f); + } /** @@ -285,6 +339,19 @@ this.nonRunningReduces = new LinkedList(); this.runningReduces = new LinkedHashSet(); this.resourceEstimator = new ResourceEstimator(this); + + this.nonLocalMaps = new LinkedList(); + this.nonLocalRunningMaps = new LinkedHashSet(); + this.runningMapCache = new IdentityHashMap>(); + this.nonRunningReduces = new LinkedList(); + this.runningReduces = new LinkedHashSet(); + this.slowTaskThreshold = Math.max(0.0f, + conf.getFloat("mapred.speculative.execution.slowTaskThreshold",1.0f)); + this.speculativeCap = conf.getFloat( + "mapred.speculative.execution.speculativeCap",0.1f); + this.slowNodeThreshold = conf.getFloat( + "mapred.speculative.execution.slowNodeThreshold",1.0f); + } /** @@ -443,7 +510,7 @@ } // set the launch time - this.launchTime = System.currentTimeMillis(); + this.launchTime = jobtracker.getClock().getTime(); // // Create reduce tasks @@ -939,9 +1006,9 @@ LOG.info("Cannot create task split for " + profile.getJobID()); return null; } - - int target = findNewMapTask(tts, clusterSize, numUniqueHosts, anyCacheLevel, - status.mapProgress()); + + int target = findNewMapTask(tts, clusterSize, numUniqueHosts, + anyCacheLevel); if (target == -1) { return null; } @@ -1001,8 +1068,7 @@ return null; } - int target = findNewMapTask(tts, clusterSize, numUniqueHosts, maxLevel, - status.mapProgress()); + int target = findNewMapTask(tts, clusterSize, numUniqueHosts, maxLevel); if (target == -1) { return null; } @@ -1025,7 +1091,7 @@ } int target = findNewMapTask(tts, clusterSize, numUniqueHosts, - NON_LOCAL_CACHE_LEVEL, status.mapProgress()); + NON_LOCAL_CACHE_LEVEL); if (target == -1) { return null; } @@ -1203,8 +1269,7 @@ return null; } - int target = findNewReduceTask(tts, clusterSize, numUniqueHosts, - status.reduceProgress()); + int target = findNewReduceTask(tts, clusterSize, numUniqueHosts); if (target == -1) { return null; } @@ -1267,15 +1332,21 @@ name = Values.MAP.name(); counter = JobCounter.TOTAL_LAUNCHED_MAPS; splits = tip.getSplitNodes(); - if (tip.getActiveTasks().size() > 1) + if (tip.isSpeculating()) { speculativeMapTasks++; + LOG.debug("Chosen speculative task, current speculativeMap task count: " + + speculativeMapTasks); + } metrics.launchMap(id); } else { ++runningReduceTasks; name = Values.REDUCE.name(); counter = JobCounter.TOTAL_LAUNCHED_REDUCES; - if (tip.getActiveTasks().size() > 1) + if (tip.isSpeculating()) { speculativeReduceTasks++; + LOG.debug("Chosen speculative task, current speculativeReduce task count: " + + speculativeReduceTasks); + } metrics.launchReduce(id); } // Note that the logs are for the scheduled tasks only. Tasks that join on @@ -1433,7 +1504,7 @@ String[] splitLocations = tip.getSplitLocations(); // Remove the TIP from the list for running non-local maps - if (splitLocations.length == 0) { + if (splitLocations == null || splitLocations.length == 0) { nonLocalRunningMaps.remove(tip); return; } @@ -1473,8 +1544,9 @@ * Adds a map tip to the list of running maps. * @param tip the tip that needs to be scheduled as running */ - private synchronized void scheduleMap(TaskInProgress tip) { + protected synchronized void scheduleMap(TaskInProgress tip) { + runningMapTaskStats.add(0.0f); if (runningMapCache == null) { LOG.warn("Running cache for maps is missing!! " + "Job details are missing."); @@ -1483,7 +1555,7 @@ String[] splitLocations = tip.getSplitLocations(); // Add the TIP to the list of non-local running TIPs - if (splitLocations.length == 0) { + if (splitLocations == null || splitLocations.length == 0) { nonLocalRunningMaps.add(tip); return; } @@ -1508,7 +1580,8 @@ * Adds a reduce tip to the list of running reduces * @param tip the tip that needs to be scheduled as running */ - private synchronized void scheduleReduce(TaskInProgress tip) { + protected synchronized void scheduleReduce(TaskInProgress tip) { + runningReduceTaskStats.add(0.0f); if (runningReduces == null) { LOG.warn("Running cache for reducers missing!! " + "Job details are missing."); @@ -1612,57 +1685,71 @@ return null; } + public boolean hasSpeculativeMaps() { + return hasSpeculativeMaps; + } + + public boolean hasSpeculativeReduces() { + return hasSpeculativeReduces; + } + /** - * Find a speculative task - * @param list a list of tips - * @param taskTracker the tracker that has requested a tip - * @param avgProgress the average progress for speculation - * @param currentTime current time in milliseconds - * @param shouldRemove whether to remove the tips - * @return a tip that can be speculated on the tracker - */ - private synchronized TaskInProgress findSpeculativeTask( - Collection list, TaskTrackerStatus ttStatus, - double avgProgress, long currentTime, boolean shouldRemove) { + * Retrieve a task for speculation. + * If a task slot becomes available and there are less than SpeculativeCap + * speculative tasks running: + * 1)Ignore the request if the TT's progressRate is < SlowNodeThreshold + * 2)Choose candidate tasks - those tasks whose progress rate is below + * slowTaskThreshold * mean(progress-rates) + * 3)Speculate task that's expected to complete last + * @param list pool of tasks to choose from + * @param taskTrackerName the name of the TaskTracker asking for a task + * @param taskTrackerHost the hostname of the TaskTracker asking for a task + * @return the TIP to speculatively re-execute + */ + protected synchronized TaskInProgress findSpeculativeTask( + Collection list, String taskTrackerName, + String taskTrackerHost) { + if (list.isEmpty()) { + return null; + } + long now = jobtracker.getClock().getTime(); + if (isSlowTracker(taskTrackerName) || atSpeculativeCap(list)) { + return null; + } + // List of speculatable candidates, start with all, and chop it down + ArrayList candidates = new ArrayList(list); - Iterator iter = list.iterator(); - + Iterator iter = candidates.iterator(); while (iter.hasNext()) { TaskInProgress tip = iter.next(); - // should never be true! (since we delete completed/failed tasks) - if (!tip.isRunning()) { - iter.remove(); - continue; - } - - if (!tip.hasRunOnMachine(ttStatus.getHost(), - ttStatus.getTrackerName())) { - if (tip.hasSpeculativeTask(currentTime, avgProgress)) { - // In case of shared list we don't remove it. Since the TIP failed - // on this tracker can be scheduled on some other tracker. - if (shouldRemove) { - iter.remove(); //this tracker is never going to run it again - } - return tip; - } - } else { - // Check if this tip can be removed from the list. - // If the list is shared then we should not remove. - if (shouldRemove) { - // This tracker will never speculate this tip + if (tip.hasRunOnMachine(taskTrackerHost, taskTrackerName) || + !tip.canBeSpeculated(now)) { + //remove it from candidates iter.remove(); - } } } - return null; + //resort according to expected time till completion + Comparator LateComparator = + new EstimatedTimeLeftComparator(now); + Collections.sort(candidates, LateComparator); + if (candidates.size() > 0 ) { + TaskInProgress tip = candidates.get(0); + if (LOG.isDebugEnabled()) { + LOG.debug("Chose task " + tip.getTIPId() + ". Statistics: Task's : " + + tip.getCurrentProgressRate(now) + " Job's : " + + (tip.isMapTask() ? runningMapTaskStats : runningReduceTaskStats)); + } + return tip; + } else { + return null; + } } - + /** * Find new map task * @param tts The task tracker that is asking for a task * @param clusterSize The number of task trackers in the cluster * @param numUniqueHosts The number of hosts that run task trackers - * @param avgProgress The average progress of this kind of task in this job * @param maxCacheLevel The maximum topology level until which to schedule * maps. * A value of {@link #anyCacheLevel} implies any @@ -1675,14 +1762,14 @@ private synchronized int findNewMapTask(final TaskTrackerStatus tts, final int clusterSize, final int numUniqueHosts, - final int maxCacheLevel, - final double avgProgress) { + final int maxCacheLevel) { + String taskTrackerName = tts.getTrackerName(); + String taskTrackerHost = tts.getHost(); if (numMapTasks == 0) { LOG.info("No maps to schedule for " + profile.getJobID()); return -1; } - String taskTracker = tts.getTrackerName(); TaskInProgress tip = null; // @@ -1694,7 +1781,7 @@ return -1; } - if (!shouldRunOnTaskTracker(taskTracker)) { + if (!shouldRunOnTaskTracker(taskTrackerName)) { return -1; } @@ -1821,82 +1908,61 @@ // if (hasSpeculativeMaps) { - long currentTime = System.currentTimeMillis(); - - // 1. Check bottom up for speculative tasks from the running cache - if (node != null) { - Node key = node; - for (int level = 0; level < maxLevel; ++level) { - Set cacheForLevel = runningMapCache.get(key); - if (cacheForLevel != null) { - tip = findSpeculativeTask(cacheForLevel, tts, - avgProgress, currentTime, level == 0); - if (tip != null) { - if (cacheForLevel.size() == 0) { - runningMapCache.remove(key); - } - return tip.getIdWithinJob(); - } - } - key = key.getParent(); - } + tip = getSpeculativeMap(taskTrackerName, taskTrackerHost); + if (tip != null) { + return tip.getIdWithinJob(); } + } + return -1; + } - // 2. Check breadth-wise for speculative tasks - - for (Node parent : nodesAtMaxLevel) { - // ignore the parent which is already scanned - if (parent == nodeParentAtMaxLevel) { - continue; - } + private synchronized TaskInProgress getSpeculativeMap(String taskTrackerName, + String taskTrackerHost) { - Set cache = runningMapCache.get(parent); - if (cache != null) { - tip = findSpeculativeTask(cache, tts, avgProgress, - currentTime, false); - if (tip != null) { - // remove empty cache entries - if (cache.size() == 0) { - runningMapCache.remove(parent); - } - LOG.info("Choosing a non-local task " + tip.getTIPId() - + " for speculation"); - return tip.getIdWithinJob(); - } - } - } - - // 3. Check non-local tips for speculation - tip = findSpeculativeTask(nonLocalRunningMaps, tts, avgProgress, - currentTime, false); - if (tip != null) { - LOG.info("Choosing a non-local task " + tip.getTIPId() - + " for speculation"); - return tip.getIdWithinJob(); + //////// Populate allTips with all TaskInProgress + Set allTips = new HashSet(); + + // collection of node at max level in the cache structure + Collection nodesAtMaxLevel = jobtracker.getNodesAtMaxLevel(); + // Add all tasks from max-level nodes breadth-wise + for (Node parent : nodesAtMaxLevel) { + Set cache = runningMapCache.get(parent); + if (cache != null) { + allTips.addAll(cache); } } + // Add all non-local TIPs + allTips.addAll(nonLocalRunningMaps); - return -1; + ///////// Select a TIP to run on + TaskInProgress tip = findSpeculativeTask(allTips, taskTrackerName, + taskTrackerHost); + + if (tip != null) { + LOG.info("Choosing map task " + tip.getTIPId() + + " for speculative execution"); + } else { + LOG.debug("No speculative map task found for tracker " + taskTrackerName); + } + return tip; } - + /** * Find new reduce task * @param tts The task tracker that is asking for a task * @param clusterSize The number of task trackers in the cluster * @param numUniqueHosts The number of hosts that run task trackers - * @param avgProgress The average progress of this kind of task in this job * @return the index in tasks of the selected task (or -1 for no task) */ private synchronized int findNewReduceTask(TaskTrackerStatus tts, int clusterSize, - int numUniqueHosts, - double avgProgress) { + int numUniqueHosts) { + String taskTrackerName = tts.getTrackerName(); + String taskTrackerHost = tts.getHost(); if (numReduceTasks == 0) { LOG.info("No reduces to schedule for " + profile.getJobID()); return -1; } - - String taskTracker = tts.getTrackerName(); TaskInProgress tip = null; // Update the last-known clusterSize @@ -1906,14 +1972,14 @@ return -1; } - if (!shouldRunOnTaskTracker(taskTracker)) { + if (!shouldRunOnTaskTracker(taskTrackerName)) { return -1; } long outSize = resourceEstimator.getEstimatedReduceInputSize(); long availSpace = tts.getResourceStatus().getAvailableSpace(); if(availSpace < outSize) { - LOG.warn("No room for reduce task. Node " + taskTracker + " has " + + LOG.warn("No room for reduce task. Node " + taskTrackerName + " has " + availSpace + " bytes free; but we expect reduce input to take " + outSize); @@ -1930,16 +1996,187 @@ // 2. check for a reduce tip to be speculated if (hasSpeculativeReduces) { - tip = findSpeculativeTask(runningReduces, tts, avgProgress, - System.currentTimeMillis(), false); + tip = getSpeculativeReduce(taskTrackerName, taskTrackerHost); if (tip != null) { - scheduleReduce(tip); return tip.getIdWithinJob(); } } return -1; } + + private synchronized TaskInProgress getSpeculativeReduce( + String taskTrackerName, String taskTrackerHost) { + TaskInProgress tip = findSpeculativeTask( + runningReduces, taskTrackerName, taskTrackerHost); + if (tip != null) { + LOG.info("Choosing reduce task " + tip.getTIPId() + + " for speculative execution"); + }else { + LOG.debug("No speculative map task found for tracker " + taskTrackerHost); + } + return tip; + } + + /** + * Check to see if the maximum number of speculative tasks are + * already being executed currently. + * @param tasks the set of tasks to test + * @return has the cap been reached? + */ + private boolean atSpeculativeCap(Collection tasks) { + float numTasks = tasks.size(); + if (numTasks == 0){ + return true; // avoid divide by zero + } + + //return true if totalSpecTask < max(10, 0.01 * total-slots, + // 0.1 * total-running-tasks) + + if (speculativeMapTasks + speculativeReduceTasks < MIN_SPEC_CAP) { + return false; // at least one slow tracker's worth of slots(default=10) + } + ClusterStatus c = jobtracker.getClusterStatus(false); + int numSlots = c.getMaxMapTasks() + c.getMaxReduceTasks(); + if ((float)(speculativeMapTasks + speculativeReduceTasks) < + numSlots * MIN_SLOTS_CAP) { + return false; + } + boolean atCap = (((float)(speculativeMapTasks+ + speculativeReduceTasks)/numTasks) >= speculativeCap); + if (LOG.isDebugEnabled()) { + LOG.debug("SpeculativeCap is "+speculativeCap+", specTasks/numTasks is " + + ((float)(speculativeMapTasks+speculativeReduceTasks)/numTasks)+ + ", so atSpecCap() is returning "+atCap); + } + return atCap; + } + + /** + * A class for comparing the estimated time to completion of two tasks + */ + private static class EstimatedTimeLeftComparator + implements Comparator { + private long time; + public EstimatedTimeLeftComparator(long now) { + this.time = now; + } + /** + * Estimated time to completion is measured as: + * % of task left to complete (1 - progress) / progress rate of the task. + * + * This assumes that tasks are linear in their progress, which is + * often wrong, especially since progress for reducers is currently + * calculated by evenly weighting their three stages (shuffle, sort, map) + * which rarely account for 1/3 each. This should be fixed in the future + * by calculating progressRate more intelligently or splitting these + * multi-phase tasks into individual tasks. + * + * The ordering this comparator defines is: task1 < task2 if task1 is + * estimated to finish farther in the future => compare(t1,t2) returns -1 + */ + public int compare(TaskInProgress tip1, TaskInProgress tip2) { + //we have to use the Math.max in the denominator to avoid divide by zero + //error because prog and progRate can both be zero (if one is zero, + //the other one will be 0 too). + //We use inverse of time_reminaing=[(1- prog) / progRate] + //so that (1-prog) is in denom. because tasks can have arbitrarily + //low progRates in practice (e.g. a task that is half done after 1000 + //seconds will have progRate of 0.0000005) so we would rather + //use Math.maxnon (1-prog) by putting it in the denominator + //which will cause tasks with prog=1 look 99.99% done instead of 100% + //which is okay + double t1 = tip1.getCurrentProgressRate(time) / Math.max(0.0001, + 1.0 - tip1.getProgress()); + double t2 = tip2.getCurrentProgressRate(time) / Math.max(0.0001, + 1.0 - tip2.getProgress()); + if (t1 < t2) return -1; + else if (t2 < t1) return 1; + else return 0; + } + } + + /** + * Compares the ave progressRate of tasks that have finished on this + * taskTracker to the ave of all succesfull tasks thus far to see if this + * TT one is too slow for speculating. + * slowNodeThreshold is used to determine the number of standard deviations + * @param taskTracker the name of the TaskTracker we are checking + * @return is this TaskTracker slow + */ + protected boolean isSlowTracker(String taskTracker) { + if (trackerMapStats.get(taskTracker) != null && + trackerMapStats.get(taskTracker).mean() - + mapTaskStats.mean() > mapTaskStats.std()*slowNodeThreshold) { + if (LOG.isDebugEnabled()) { + LOG.debug("Tracker " + taskTracker + + " declared slow. trackerMapStats.get(taskTracker).mean() :" + trackerMapStats.get(taskTracker).mean() + + " mapTaskStats :" + mapTaskStats); + } + return true; + } + if (trackerReduceStats.get(taskTracker) != null && + trackerReduceStats.get(taskTracker).mean() - + reduceTaskStats.mean() > reduceTaskStats.std()*slowNodeThreshold) { + if (LOG.isDebugEnabled()) { + LOG.debug("Tracker " + taskTracker + + " declared slow. trackerReduceStats.get(taskTracker).mean() :" + trackerReduceStats.get(taskTracker).mean() + + " reduceTaskStats :" + reduceTaskStats); + } + return true; + } + return false; + } + + static class DataStatistics{ + private int count = 0; + private double sum = 0; + private double sumSquares = 0; + + public DataStatistics() { + } + + public DataStatistics(double initNum) { + this.count = 1; + this.sum = initNum; + this.sumSquares = initNum * initNum; + } + + public void add(double newNum) { + this.count++; + this.sum += newNum; + this.sumSquares += newNum * newNum; + } + + public void updateStatistics(double old, double update) { + sub(old); + add(update); + } + private void sub(double oldNum) { + this.count--; + this.sum -= oldNum; + this.sumSquares -= oldNum * oldNum; + } + + public double mean() { + return sum/count; + } + + public double var() { + // E(X^2) - E(X)^2 + return (sumSquares/count) - mean() * mean(); + } + + public double std() { + return Math.sqrt(this.var()); + } + + public String toString() { + return "DataStatistics: count is " + count + ", sum is " + sum + + ", sumSquares is " + sumSquares + " mean is " + mean() + " std() is " + std(); + } + + } private boolean shouldRunOnTaskTracker(String taskTracker) { // @@ -2002,7 +2239,6 @@ TaskStatus status) { TaskAttemptID taskid = status.getTaskID(); - int oldNumAttempts = tip.getActiveTasks().size(); final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation(); // Sanity check: is the TIP already complete? @@ -2018,10 +2254,9 @@ } return false; } - + boolean wasSpeculating = tip.isSpeculating(); //store this fact LOG.info("Task '" + taskid + "' has completed " + tip.getTIPId() + " successfully."); - // Mark the TIP as complete tip.completed(taskid); resourceEstimator.updateWithCompletedTask(status, tip); @@ -2059,7 +2294,6 @@ tip.getExecFinishTime(), status.getCounters()); - int newNumAttempts = tip.getActiveTasks().size(); if (tip.isJobSetupTask()) { // setup task has finished. kill the extra setup tip killSetupTip(!tip.isMapTask()); @@ -2096,12 +2330,11 @@ jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid); } else if (tip.isMapTask()) { runningMapTasks -= 1; - // check if this was a sepculative task - if (oldNumAttempts > 1) { - speculativeMapTasks -= (oldNumAttempts - newNumAttempts); - } finishedMapTasks += 1; metrics.completeMap(taskid); + if (hasSpeculativeMaps) { + updateTaskTrackerStats(tip,ttStatus,trackerMapStats,mapTaskStats); + } // remove the completed map from the resp running caches retireMap(tip); if ((finishedMapTasks + failedMapTIPs) == (numMapTasks)) { @@ -2109,21 +2342,66 @@ } } else { runningReduceTasks -= 1; - if (oldNumAttempts > 1) { - speculativeReduceTasks -= (oldNumAttempts - newNumAttempts); - } finishedReduceTasks += 1; metrics.completeReduce(taskid); + if (hasSpeculativeReduces) { + updateTaskTrackerStats(tip,ttStatus,trackerReduceStats,reduceTaskStats); + } // remove the completed reduces from the running reducers set retireReduce(tip); if ((finishedReduceTasks + failedReduceTIPs) == (numReduceTasks)) { this.status.setReduceProgress(1.0f); } } - + decrementSpeculativeCount(wasSpeculating, tip); return true; } - + + private void updateTaskTrackerStats(TaskInProgress tip, TaskTrackerStatus ttStatus, + Map trackerStats, DataStatistics overallStats) { + float tipDuration = tip.getExecFinishTime()-tip.getDispatchTime(); + DataStatistics ttStats = + trackerStats.get(ttStatus.getTrackerName()); + double oldMean = 0.0d; + //We maintain the mean of TaskTrackers' means. That way, we get a single + //data-point for every tracker (used in the evaluation in isSlowTracker) + if (ttStats != null) { + oldMean = ttStats.mean(); + ttStats.add(tipDuration); + overallStats.updateStatistics(oldMean, ttStats.mean()); + } else { + trackerStats.put(ttStatus.getTrackerName(), + (ttStats = new DataStatistics(tipDuration))); + overallStats.add(tipDuration); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Added mean of " +ttStats.mean() + " to trackerStats of type "+ + (tip.isMapTask() ? "Map" : "Reduce") + + " on "+ttStatus.getTrackerName()+". DataStatistics is now: " + + trackerStats.get(ttStatus.getTrackerName())); + } + } + + public void updateStatistics(double oldProg, double newProg, boolean isMap) { + if (isMap) { + runningMapTaskStats.updateStatistics(oldProg, newProg); + } else { + runningReduceTaskStats.updateStatistics(oldProg, newProg); + } + } + + public DataStatistics getRunningTaskStatistics(boolean isMap) { + if (isMap) { + return runningMapTaskStats; + } else { + return runningReduceTaskStats; + } + } + + public float getSlowTaskThreshold() { + return slowTaskThreshold; + } + /** * The job is done since all it's component tasks are either * successful or have failed. @@ -2142,7 +2420,7 @@ if (reduces.length == 0) { this.status.setReduceProgress(1.0f); } - this.finishTime = System.currentTimeMillis(); + this.finishTime = jobtracker.getClock().getTime(); LOG.info("Job " + this.status.getJobID() + " has completed successfully."); JobHistory.JobInfo.logFinished(this.status.getJobID(), finishTime, @@ -2164,7 +2442,7 @@ this.status = new JobStatus(status.getJobID(), 1.0f, 1.0f, 1.0f, JobStatus.FAILED, status.getJobPriority()); - this.finishTime = System.currentTimeMillis(); + this.finishTime = jobtracker.getClock().getTime(); JobHistory.JobInfo.logFailed(this.status.getJobID(), finishTime, this.finishedMapTasks, this.finishedReduceTasks); @@ -2172,7 +2450,7 @@ this.status = new JobStatus(status.getJobID(), 1.0f, 1.0f, 1.0f, JobStatus.KILLED, status.getJobPriority()); - this.finishTime = System.currentTimeMillis(); + this.finishTime = jobtracker.getClock().getTime(); JobHistory.JobInfo.logKilled(this.status.getJobID(), finishTime, this.finishedMapTasks, this.finishedReduceTasks); @@ -2273,6 +2551,21 @@ terminate(JobStatus.FAILED); } + private void decrementSpeculativeCount(boolean wasSpeculating, + TaskInProgress tip) { + if (wasSpeculating) { + if (tip.isMapTask()) { + speculativeMapTasks--; + LOG.debug("Decrement count. Current speculativeMap task count: " + + speculativeMapTasks); + } else { + speculativeReduceTasks--; + LOG.debug("Decremented count. Current speculativeReduce task count: " + + speculativeReduceTasks); + } + } + } + /** * A task assigned to this JobInProgress has reported in as failed. * Most of the time, we'll just reschedule execution. However, after @@ -2292,9 +2585,11 @@ final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation(); // check if the TIP is already failed boolean wasFailed = tip.isFailed(); + boolean wasSpeculating = tip.isSpeculating(); // Mark the taskid as FAILED or KILLED tip.incompleteSubTask(taskid, this.status); + decrementSpeculativeCount(wasSpeculating, tip); boolean isRunning = tip.isRunning(); boolean isComplete = tip.isComplete(); @@ -2477,8 +2772,8 @@ * @param reason The reason that the task failed * @param trackerName The task tracker the task failed on */ - public void failedTask(TaskInProgress tip, TaskAttemptID taskid, String reason, - TaskStatus.Phase phase, TaskStatus.State state, + public synchronized void failedTask(TaskInProgress tip, TaskAttemptID taskid, + String reason, TaskStatus.Phase phase, TaskStatus.State state, String trackerName) { TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(), taskid, @@ -2491,10 +2786,10 @@ // update the actual start-time of the attempt TaskStatus oldStatus = tip.getTaskStatus(taskid); long startTime = oldStatus == null - ? System.currentTimeMillis() + ? jobtracker.getClock().getTime() : oldStatus.getStartTime(); status.setStartTime(startTime); - status.setFinishTime(System.currentTimeMillis()); + status.setFinishTime(jobtracker.getClock().getTime()); boolean wasComplete = tip.isComplete(); updateTaskStatus(tip, status); boolean isComplete = tip.isComplete(); Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobQueueClient.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobQueueClient.java?rev=785794&r1=785793&r2=785794&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobQueueClient.java (original) +++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobQueueClient.java Wed Jun 17 20:55:51 2009 @@ -112,12 +112,11 @@ */ private void displayQueueInfo(String queue, boolean showJobs) throws IOException { - JobQueueInfo schedInfo = jc.getQueueInfo(queue); - if (schedInfo == null) { + JobQueueInfo jobQueueInfo = jc.getQueueInfo(queue); + if (jobQueueInfo == null) { System.out.printf("Queue Name : %s has no scheduling information \n", queue); } else { - System.out.printf("Queue Name : %s \n", schedInfo.getQueueName()); - System.out.printf("Scheduling Info : %s \n",schedInfo.getSchedulingInfo()); + printJobQueueInfo(jobQueueInfo); } if (showJobs) { System.out.printf("Job List\n"); @@ -128,6 +127,13 @@ } } + // format and print information about the passed in job queue. + private void printJobQueueInfo(JobQueueInfo jobQueueInfo) { + System.out.printf("Queue Name : %s \n", jobQueueInfo.getQueueName()); + System.out.printf("Queue State : %s \n", jobQueueInfo.getQueueState()); + System.out.printf("Scheduling Info : %s \n",jobQueueInfo.getSchedulingInfo()); + } + /** * Method used to display the list of the JobQueues registered * with the {@link QueueManager} @@ -141,8 +147,7 @@ if(schedInfo.trim().equals("")){ schedInfo = "N/A"; } - System.out.printf("Queue Name : %s \n", queue.getQueueName()); - System.out.printf("Scheduling Info : %s \n",queue.getSchedulingInfo()); + printJobQueueInfo(queue); } } Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobQueueInfo.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobQueueInfo.java?rev=785794&r1=785793&r2=785794&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobQueueInfo.java (original) +++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobQueueInfo.java Wed Jun 17 20:55:51 2009 @@ -37,6 +37,7 @@ //Once the scheduling information is set there is no way to recover it. private String schedulingInfo; + private String queueState; /** * Default constructor for Job Queue Info. @@ -56,6 +57,8 @@ public JobQueueInfo(String queueName, String schedulingInfo) { this.queueName = queueName; this.schedulingInfo = schedulingInfo; + // make it running by default. + this.queueState = Queue.QueueState.RUNNING.getStateName(); } @@ -100,15 +103,33 @@ } } + /** + * Set the state of the queue + * @param state state of the queue. + */ + public void setQueueState(String state) { + queueState = state; + } + + /** + * Return the queue state + * @return the queue state. + */ + public String getQueueState() { + return queueState; + } + @Override public void readFields(DataInput in) throws IOException { queueName = Text.readString(in); + queueState = Text.readString(in); schedulingInfo = Text.readString(in); } @Override public void write(DataOutput out) throws IOException { Text.writeString(out, queueName); + Text.writeString(out, queueState); if(schedulingInfo!= null) { Text.writeString(out, schedulingInfo); }else { Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java?rev=785794&r1=785793&r2=785794&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java (original) +++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java Wed Jun 17 20:55:51 2009 @@ -61,8 +61,10 @@ * Version 21: Modified TaskID to be aware of the new TaskTypes * Version 22: Added method getQueueAclsForCurrentUser to get queue acls info * for a user + * Version 23: Modified the JobQueueInfo class to inlucde queue state. + * Part of HADOOP-5913. */ - public static final long versionID = 22L; + public static final long versionID = 23L; /** * Allocate a name for the job. Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=785794&r1=785793&r2=785794&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobTracker.java (original) +++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobTracker.java Wed Jun 17 20:55:51 2009 @@ -138,6 +138,8 @@ // system files should have 700 permission final static FsPermission SYSTEM_FILE_PERMISSION = FsPermission.createImmutable((short) 0700); // rwx------ + + private Clock clock; /** * A client tried to submit a job before the Job Tracker was ready. @@ -165,6 +167,10 @@ public static final Log LOG = LogFactory.getLog(JobTracker.class); + public Clock getClock() { + return clock; + } + /** * Start the JobTracker with given configuration. * @@ -181,7 +187,7 @@ JobTracker result = null; while (true) { try { - result = new JobTracker(conf); + result = new JobTracker(conf, new Clock()); result.taskScheduler.setTaskTrackerManager(result); break; } catch (VersionMismatch e) { @@ -242,7 +248,7 @@ try { // Every 3 minutes check for any tasks that are overdue Thread.sleep(tasktrackerExpiryInterval/3); - long now = System.currentTimeMillis(); + long now = clock.getTime(); LOG.debug("Starting launching task sweep"); synchronized (JobTracker.this) { synchronized (launchingTasks) { @@ -295,7 +301,7 @@ public void addNewTask(TaskAttemptID taskName) { synchronized (launchingTasks) { launchingTasks.put(taskName, - System.currentTimeMillis()); + clock.getTime()); } } @@ -339,7 +345,7 @@ synchronized (JobTracker.this) { synchronized (taskTrackers) { synchronized (trackerExpiryQueue) { - long now = System.currentTimeMillis(); + long now = clock.getTime(); TaskTrackerStatus leastRecent = null; while ((trackerExpiryQueue.size() > 0) && ((leastRecent = trackerExpiryQueue.first()) != null) && @@ -405,7 +411,7 @@ try { Thread.sleep(retireJobCheckInterval); List retiredJobs = new ArrayList(); - long now = System.currentTimeMillis(); + long now = clock.getTime(); long retireBefore = now - retireJobInterval; synchronized (jobs) { @@ -465,9 +471,9 @@ long lastUpdated; boolean blacklisted; - FaultInfo() { + FaultInfo(long time) { numFaults = 0; - lastUpdated = System.currentTimeMillis(); + lastUpdated = time; blacklisted = false; } @@ -517,14 +523,15 @@ void incrementFaults(String hostName) { synchronized (potentiallyFaultyTrackers) { FaultInfo fi = potentiallyFaultyTrackers.get(hostName); + long now = clock.getTime(); if (fi == null) { - fi = new FaultInfo(); + fi = new FaultInfo(now); potentiallyFaultyTrackers.put(hostName, fi); } int numFaults = fi.getFaultCount(); ++numFaults; fi.setFaultCount(numFaults); - fi.setLastUpdated(System.currentTimeMillis()); + fi.setLastUpdated(now); if (!fi.isBlacklisted()) { if (shouldBlacklist(hostName, numFaults)) { LOG.info("Adding " + hostName + " to the blacklist" + @@ -746,7 +753,7 @@ TaskID id = TaskID.forName(taskId); TaskInProgress tip = getTip(id); - + updateTip(tip, task); } @@ -759,7 +766,7 @@ // Check if the transaction for this attempt can be committed String taskStatus = attempt.get(Keys.TASK_STATUS); - + if (taskStatus.length() > 0) { // This means this is an update event if (taskStatus.equals(Values.SUCCESS.name())) { @@ -1028,7 +1035,7 @@ TaskTrackerStatus ttStatus = new TaskTrackerStatus(trackerName, trackerHostName, port, ttStatusList, 0 , 0, 0); - ttStatus.setLastSeen(System.currentTimeMillis()); + ttStatus.setLastSeen(clock.getTime()); synchronized (JobTracker.this) { synchronized (taskTrackers) { @@ -1244,7 +1251,7 @@ } public void recover() { - long recoveryProcessStartTime = System.currentTimeMillis(); + long recoveryProcessStartTime = clock.getTime(); if (!shouldRecover()) { // clean up jobs structure jobsToRecover.clear(); @@ -1275,7 +1282,7 @@ // check the access try { - checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB, ugi); + checkAccess(job, Queue.QueueOperation.SUBMIT_JOB, ugi); } catch (Throwable t) { LOG.warn("Access denied for user " + ugi.getUserName() + " in groups : [" @@ -1312,13 +1319,12 @@ continue; } } - + long now = clock.getTime(); LOG.info("Took a total of " - + StringUtils.formatTime(System.currentTimeMillis() + + StringUtils.formatTime(now - recoveryProcessStartTime) + " for recovering filenames of all the jobs from history."); - long recoveryStartTime = System.currentTimeMillis(); // II. Recover each job idIter = jobsToRecover.iterator(); @@ -1375,10 +1381,10 @@ } } - long recoveryProcessEndTime = System.currentTimeMillis(); + long recoveryProcessEndTime = clock.getTime(); LOG.info("Took a total of " + StringUtils.formatTime(recoveryProcessEndTime - - recoveryStartTime) + - now) + " for parsing and recovering all the jobs from history."); recoveryDuration = recoveryProcessEndTime - recoveryProcessStartTime; @@ -1565,11 +1571,15 @@ private QueueManager queueManager; + JobTracker(JobConf conf) throws IOException,InterruptedException{ + this(conf, new Clock()); + } /** * Start the JobTracker process, listen on the indicated port */ - JobTracker(JobConf conf) throws IOException, InterruptedException { + JobTracker(JobConf conf, Clock clock) throws IOException, InterruptedException { // find the owner of the process + this.clock = clock; try { mrOwner = UnixUserGroupInformation.login(conf); } catch (LoginException e) { @@ -1649,7 +1659,7 @@ conf.get("mapred.job.tracker.http.address", "0.0.0.0:50030")); String infoBindAddress = infoSocAddr.getHostName(); int tmpInfoPort = infoSocAddr.getPort(); - this.startTime = System.currentTimeMillis(); + this.startTime = clock.getTime(); infoServer = new HttpServer("job", infoBindAddress, tmpInfoPort, tmpInfoPort == 0, conf); infoServer.setAttribute("job.tracker", this); @@ -2152,7 +2162,7 @@ final JobTrackerInstrumentation metrics = getInstrumentation(); metrics.finalizeJob(conf, id); - long now = System.currentTimeMillis(); + long now = clock.getTime(); // mark the job for cleanup at all the trackers addJobForCleanup(id); @@ -2568,7 +2578,7 @@ // First check if the last heartbeat response got through String trackerName = status.getTrackerName(); - long now = System.currentTimeMillis(); + long now = clock.getTime(); boolean isBlacklisted = false; if (restarted) { faultyTrackers.markTrackerHealthy(status.getHost()); @@ -3071,10 +3081,15 @@ new CleanupQueue().addToQueue(fs, getSystemDirectoryForJob(jobId)); throw new IOException("Queue \"" + queue + "\" does not exist"); } - + + //check if queue is RUNNING + if(!queueManager.isRunning(queue)) { + new CleanupQueue().addToQueue(fs, getSystemDirectoryForJob(jobId)); + throw new IOException("Queue \"" + queue + "\" is not running"); + } // check for access try { - checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB); + checkAccess(job, Queue.QueueOperation.SUBMIT_JOB); } catch (IOException ioe) { LOG.warn("Access denied for user " + job.getJobConf().getUser() + ". Ignoring job " + jobId, ioe); @@ -3122,7 +3137,7 @@ // Check whether the specified operation can be performed // related to the job. private void checkAccess(JobInProgress job, - QueueManager.QueueOperation oper) + Queue.QueueOperation oper) throws IOException { // get the user group info UserGroupInformation ugi = UserGroupInformation.getCurrentUGI(); @@ -3130,7 +3145,7 @@ } // use the passed ugi for checking the access - private void checkAccess(JobInProgress job, QueueManager.QueueOperation oper, + private void checkAccess(JobInProgress job, Queue.QueueOperation oper, UserGroupInformation ugi) throws IOException { // get the queue String queue = job.getProfile().getQueueName(); @@ -3192,7 +3207,7 @@ } JobStatus prevStatus = (JobStatus)job.getStatus().clone(); - checkAccess(job, QueueManager.QueueOperation.ADMINISTER_JOBS); + checkAccess(job, Queue.QueueOperation.ADMINISTER_JOBS); job.kill(); // Inform the listeners if the job is killed @@ -3225,7 +3240,7 @@ + " is not a valid job"); return; } - checkAccess(job, QueueManager.QueueOperation.ADMINISTER_JOBS); + checkAccess(job, Queue.QueueOperation.ADMINISTER_JOBS); JobPriority newPriority = JobPriority.valueOf(priority); setJobPriority(jobid, newPriority); } @@ -3450,7 +3465,7 @@ public synchronized boolean killTask(TaskAttemptID taskid, boolean shouldFail) throws IOException{ TaskInProgress tip = taskidToTIPMap.get(taskid); if(tip != null) { - checkAccess(tip.getJob(), QueueManager.QueueOperation.ADMINISTER_JOBS); + checkAccess(tip.getJob(), Queue.QueueOperation.ADMINISTER_JOBS); return tip.killTask(taskid, shouldFail); } else { @@ -3847,10 +3862,10 @@ } @Override - public void refreshQueueAcls() throws IOException{ - LOG.info("Refreshing queue acls. requested by : " + + public void refreshQueues() throws IOException{ + LOG.info("Refreshing queue information. requested by : " + UserGroupInformation.getCurrentUGI().getUserName()); - this.queueManager.refreshAcls(new Configuration(this.conf)); + this.queueManager.refreshQueues(new Configuration(this.conf)); } private void initializeTaskMemoryRelatedConfig() { Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/QueueManager.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/QueueManager.java?rev=785794&r1=785793&r2=785794&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/QueueManager.java (original) +++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/QueueManager.java Wed Jun 17 20:55:51 2009 @@ -21,12 +21,14 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; import java.util.Set; -import java.util.TreeSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.Queue.QueueState; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.SecurityUtil.AccessControlList; import org.apache.hadoop.util.StringUtils; @@ -49,51 +51,24 @@ class QueueManager { private static final Log LOG = LogFactory.getLog(QueueManager.class); - - // Prefix in configuration for queue related keys - private static final String QUEUE_CONF_PROPERTY_NAME_PREFIX - = "mapred.queue."; - // Configured queues + + // Configured queues this is backed by queues Map , mentioned below private Set queueNames; - // Map of a queue and ACL property name with an ACL - private HashMap aclsMap; - // Map of a queue name to any generic object that represents - // scheduler information - private HashMap schedulerInfoObjects; + + // Map of a queue name and Queue object + private HashMap queues; + // Whether ACLs are enabled in the system or not. private boolean aclsEnabled; - - //Resource in which queue acls are configured. - static final String QUEUE_ACLS_FILE_NAME = "mapred-queue-acls.xml"; - - /** - * Enum representing an operation that can be performed on a queue. - */ - static enum QueueOperation { - SUBMIT_JOB ("acl-submit-job", false), - ADMINISTER_JOBS ("acl-administer-jobs", true); - // TODO: Add ACL for LIST_JOBS when we have ability to authenticate - // users in UI - // TODO: Add ACL for CHANGE_ACL when we have an admin tool for - // configuring queues. - - private final String aclName; - private final boolean jobOwnerAllowed; - - QueueOperation(String aclName, boolean jobOwnerAllowed) { - this.aclName = aclName; - this.jobOwnerAllowed = jobOwnerAllowed; - } - final String getAclName() { - return aclName; - } - - final boolean isJobOwnerAllowed() { - return jobOwnerAllowed; - } - } - + static final String QUEUE_STATE_SUFFIX = "state"; + + static final String QUEUE_CONF_FILE_NAME = "mapred-queues.xml"; + + // Prefix in configuration for queue related keys + static final String QUEUE_CONF_PROPERTY_NAME_PREFIX + = "mapred.queue.";//Resource in which queue acls are configured. + /** * Construct a new QueueManager using configuration specified in the passed * in {@link org.apache.hadoop.conf.Configuration} object. @@ -101,10 +76,28 @@ * @param conf Configuration object where queue configuration is specified. */ public QueueManager(Configuration conf) { - queueNames = new TreeSet(); - aclsMap = new HashMap(); - schedulerInfoObjects = new HashMap(); - initialize(conf); + checkDeprecation(conf); + conf.addResource(QUEUE_CONF_FILE_NAME); + + queues = new HashMap(); + + // First get the queue names + String[] queueNameValues = conf.getStrings("mapred.queue.names", + new String[]{JobConf.DEFAULT_QUEUE_NAME}); + + // Get configured ACLs and state for each queue + aclsEnabled = conf.getBoolean("mapred.acls.enabled", false); + for (String name : queueNameValues) { + try { + Map acls = getQueueAcls(name, conf); + QueueState state = getQueueState(name, conf); + queues.put(name, new Queue(name, acls, state)); + } catch (Throwable t) { + LOG.warn("Not able to initialize queue " + name); + } + } + // Sync queue names with the configured queues. + queueNames = queues.keySet(); } /** @@ -121,7 +114,7 @@ } /** - * Return true if the given {@link QueueManager.QueueOperation} can be + * Return true if the given {@link Queue.QueueOperation} can be * performed by the specified user on the given queue. * * An operation is allowed if all users are provided access for this @@ -134,13 +127,14 @@ * * @return true if the operation is allowed, false otherwise. */ - public synchronized boolean hasAccess(String queueName, QueueOperation oper, + public synchronized boolean hasAccess(String queueName, + Queue.QueueOperation oper, UserGroupInformation ugi) { return hasAccess(queueName, null, oper, ugi); } /** - * Return true if the given {@link QueueManager.QueueOperation} can be + * Return true if the given {@link Queue.QueueOperation} can be * performed by the specified user on the specified job in the given queue. * * An operation is allowed either if the owner of the job is the user @@ -148,7 +142,7 @@ * operation, or if either the user or any of the groups specified is * provided access. * - * If the {@link QueueManager.QueueOperation} is not job specific then the + * If the {@link Queue.QueueOperation} is not job specific then the * job parameter is ignored. * * @param queueName Queue on which the operation needs to be performed. @@ -160,28 +154,37 @@ * @return true if the operation is allowed, false otherwise. */ public synchronized boolean hasAccess(String queueName, JobInProgress job, - QueueOperation oper, + Queue.QueueOperation oper, UserGroupInformation ugi) { if (!aclsEnabled) { return true; } - if (LOG.isDebugEnabled()) { - LOG.debug("checking access for : " + toFullPropertyName(queueName, - oper.getAclName())); + Queue q = queues.get(queueName); + if (q == null) { + LOG.info("Queue " + queueName + " is not present"); + return false; } + if (LOG.isDebugEnabled()) { + LOG.debug("checking access for : " + + QueueManager.toFullPropertyName(queueName, oper.getAclName())); + } + if (oper.isJobOwnerAllowed()) { - if (job != null && job.getJobConf().getUser().equals(ugi.getUserName())) { + if (job != null + && job.getJobConf().getUser().equals(ugi.getUserName())) { return true; } } - AccessControlList acl = aclsMap.get(toFullPropertyName(queueName, oper.getAclName())); + AccessControlList acl = q.getAcls().get( + toFullPropertyName(queueName, + oper.getAclName())); if (acl == null) { return false; } - + // Check the ACL list boolean allowed = acl.allAllowed(); if (!allowed) { @@ -199,8 +202,21 @@ } } } - - return allowed; + + return allowed; + } + + /** + * Checks whether the given queue is running or not. + * @param queueName name of the queue + * @return true, if the queue is running. + */ + synchronized boolean isRunning(String queueName) { + Queue q = queues.get(queueName); + if (q != null) { + return q.getState().equals(QueueState.RUNNING); + } + return false; } /** @@ -216,7 +232,8 @@ */ public synchronized void setSchedulerInfo(String queueName, Object queueInfo) { - schedulerInfoObjects.put(queueName, queueInfo); + if (queues.get(queueName) != null) + queues.get(queueName).setSchedulingInfo(queueInfo); } /** @@ -225,125 +242,179 @@ * @param queueName queue for which the scheduling information is required. * @return The scheduling information for this queue. * - * @see #setSchedulerInfo(String, Object) + * @see #setSchedulingInfo(String, Object) */ public synchronized Object getSchedulerInfo(String queueName) { - return schedulerInfoObjects.get(queueName); + if (queues.get(queueName) != null) + return queues.get(queueName).getSchedulingInfo(); + return null; } /** - * Refresh the acls for the configured queues in the system by reading - * it from mapred-queue-acls.xml. + * Refresh the acls and state for the configured queues in the system + * by reading it from mapred-queues.xml. * - * The previous acls are removed. Previously configured queues and - * if or not acl is disabled is retained. + * Previously configured queues and if or not acls are disabled is retained. * - * @throws IOException when queue ACL configuration file is invalid. + * @throws IOException when queue configuration file is invalid. */ - synchronized void refreshAcls(Configuration conf) throws IOException { + synchronized void refreshQueues(Configuration conf) throws IOException { + + // First check if things are configured in mapred-site.xml, + // so we can print out a deprecation warning. + // This check is needed only until we support the configuration + // in mapred-site.xml + checkDeprecation(conf); + + // Add the queue configuration file. Values from mapred-site.xml + // will be overridden. + conf.addResource(QUEUE_CONF_FILE_NAME); + + // Now we refresh the properties of the queues. Note that we + // do *not* refresh the queue names or the acls flag. Instead + // we use the older values configured for them. + LOG.info("Refreshing acls and state for configured queues."); try { - HashMap newAclsMap = - getQueueAcls(conf); - aclsMap = newAclsMap; + Iterator itr = queueNames.iterator(); + while(itr.hasNext()) { + String name = itr.next(); + Queue q = queues.get(name); + Map newAcls = getQueueAcls(name, conf); + QueueState newState = getQueueState(name, conf); + q.setAcls(newAcls); + q.setState(newState); + } } catch (Throwable t) { String exceptionString = StringUtils.stringifyException(t); - LOG.warn("Queue ACLs could not be refreshed because there was an " + + LOG.warn("Queues could not be refreshed because there was an " + "exception in parsing the configuration: "+ exceptionString + - ". Existing ACLs are retained."); + ". Existing ACLs/state is retained."); throw new IOException(exceptionString); } - } + // Check if queue properties are configured in the passed in + // configuration. If yes, print out deprecation warning messages. private void checkDeprecation(Configuration conf) { - for(String queue: queueNames) { - for (QueueOperation oper : QueueOperation.values()) { - String key = toFullPropertyName(queue, oper.getAclName()); - String aclString = conf.get(key); - if(aclString != null) { - LOG.warn("Configuring queue ACLs in mapred-site.xml or " + - "hadoop-site.xml is deprecated. Configure queue ACLs in " + - QUEUE_ACLS_FILE_NAME); - return; + + // check if queues are defined. + String[] queues = null; + String queueNameValues = conf.get("mapred.queue.names"); + if (queueNameValues != null) { + LOG.warn("Configuring \"mapred.queue.names\" in mapred-site.xml or " + + "hadoop-site.xml is deprecated. Configure " + + "\"mapred.queue.names\" in " + + QUEUE_CONF_FILE_NAME); + // store queues so we can check if ACLs are also configured + // in the deprecated files. + queues = conf.getStrings("mapred.queue.names"); + } + + // check if the acls flag is defined + String aclsEnable = conf.get("mapred.acls.enabled"); + if (aclsEnable != null) { + LOG.warn("Configuring \"mapred.acls.enabled\" in mapred-site.xml or " + + "hadoop-site.xml is deprecated. Configure " + + "\"mapred.acls.enabled\" in " + + QUEUE_CONF_FILE_NAME); + } + + // check if acls are defined + if (queues != null) { + for (String queue : queues) { + for (Queue.QueueOperation oper : Queue.QueueOperation.values()) { + String key = toFullPropertyName(queue, oper.getAclName()); + String aclString = conf.get(key); + if (aclString != null) { + LOG.warn("Configuring queue ACLs in mapred-site.xml or " + + "hadoop-site.xml is deprecated. Configure queue ACLs in " + + QUEUE_CONF_FILE_NAME); + // even if one string is configured, it is enough for printing + // the warning. so we can return from here. + return; + } } } } } - private HashMap getQueueAcls(Configuration conf) { - checkDeprecation(conf); - conf.addResource(QUEUE_ACLS_FILE_NAME); - HashMap aclsMap = - new HashMap(); - for (String queue : queueNames) { - for (QueueOperation oper : QueueOperation.values()) { - String key = toFullPropertyName(queue, oper.getAclName()); - String aclString = conf.get(key, "*"); - aclsMap.put(key, new AccessControlList(aclString)); - } - } - return aclsMap; + // Parse ACLs for the queue from the configuration. + private Map getQueueAcls(String name, + Configuration conf) { + HashMap map = + new HashMap(); + for (Queue.QueueOperation oper : Queue.QueueOperation.values()) { + String aclKey = toFullPropertyName(name, oper.getAclName()); + map.put(aclKey, new AccessControlList(conf.get(aclKey, "*"))); + } + return map; } - - private void initialize(Configuration conf) { - aclsEnabled = conf.getBoolean("mapred.acls.enabled", false); - String[] queues = conf.getStrings("mapred.queue.names", - new String[] {JobConf.DEFAULT_QUEUE_NAME}); - addToSet(queueNames, queues); - aclsMap = getQueueAcls(conf); + + // Parse ACLs for the queue from the configuration. + private QueueState getQueueState(String name, Configuration conf) { + QueueState retState = QueueState.RUNNING; + String stateVal = conf.get(toFullPropertyName(name, + QueueManager.QUEUE_STATE_SUFFIX), + QueueState.RUNNING.getStateName()); + for (QueueState state : QueueState.values()) { + if (state.getStateName().equalsIgnoreCase(stateVal)) { + retState = state; + break; + } + } + return retState; } - - private static final String toFullPropertyName(String queue, + + public static final String toFullPropertyName(String queue, String property) { return QUEUE_CONF_PROPERTY_NAME_PREFIX + queue + "." + property; } - - private static final void addToSet(Set set, String[] elems) { - for (String elem : elems) { - set.add(elem); - } - } - + synchronized JobQueueInfo[] getJobQueueInfos() { ArrayList queueInfoList = new ArrayList(); - for(String queue : queueNames) { - Object schedulerInfo = schedulerInfoObjects.get(queue); - if(schedulerInfo != null) { - queueInfoList.add(new JobQueueInfo(queue,schedulerInfo.toString())); - }else { - queueInfoList.add(new JobQueueInfo(queue,null)); + for (String queue : queueNames) { + JobQueueInfo queueInfo = getJobQueueInfo(queue); + if (queueInfo != null) { + queueInfoList.add(getJobQueueInfo(queue)); } } - return (JobQueueInfo[]) queueInfoList.toArray(new JobQueueInfo[queueInfoList - .size()]); + return (JobQueueInfo[]) queueInfoList.toArray( + new JobQueueInfo[queueInfoList.size()]); } - JobQueueInfo getJobQueueInfo(String queue) { - Object schedulingInfo = schedulerInfoObjects.get(queue); - if(schedulingInfo!=null){ - return new JobQueueInfo(queue,schedulingInfo.toString()); - }else { - return new JobQueueInfo(queue,null); + synchronized JobQueueInfo getJobQueueInfo(String queue) { + if (queues.get(queue) != null) { + Object schedulingInfo = queues.get(queue).getSchedulingInfo(); + JobQueueInfo qInfo; + if (schedulingInfo != null) { + qInfo = new JobQueueInfo(queue, schedulingInfo.toString()); + } else { + qInfo = new JobQueueInfo(queue, null); + } + qInfo.setQueueState(queues.get(queue).getState().getStateName()); + return qInfo; } + return null; } /** - * Generates the array of QueueAclsInfo object. The array consists of only those queues - * for which user has acls + * Generates the array of QueueAclsInfo object. + * + * The array consists of only those queues for which user has acls. * * @return QueueAclsInfo[] * @throws java.io.IOException */ - synchronized QueueAclsInfo[] getQueueAcls(UserGroupInformation - ugi) throws IOException { + synchronized QueueAclsInfo[] getQueueAcls(UserGroupInformation ugi) + throws IOException { //List of all QueueAclsInfo objects , this list is returned ArrayList queueAclsInfolist = new ArrayList(); - QueueOperation[] operations = QueueOperation.values(); + Queue.QueueOperation[] operations = Queue.QueueOperation.values(); for (String queueName : queueNames) { QueueAclsInfo queueAclsInfo = null; ArrayList operationsAllowed = null; - for (QueueOperation operation : operations) { + for (Queue.QueueOperation operation : operations) { if (hasAccess(queueName, operation, ugi)) { if (operationsAllowed == null) { operationsAllowed = new ArrayList(); @@ -363,4 +434,10 @@ queueAclsInfolist.size()]); } + // ONLY FOR TESTING - Do not use in production code. + synchronized void setQueues(Queue[] queues) { + for (Queue queue : queues) { + this.queues.put(queue.getName(), queue); + } + } } Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskInProgress.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=785794&r1=785793&r2=785794&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original) +++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskInProgress.java Wed Jun 17 20:55:51 2009 @@ -32,6 +32,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapred.JobClient.RawSplit; +import org.apache.hadoop.mapred.JobInProgress.DataStatistics; import org.apache.hadoop.mapred.SortedRanges.Range; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.net.Node; @@ -52,9 +53,8 @@ * ************************************************************** */ class TaskInProgress { - static final int MAX_TASK_EXECS = 1; + static final int MAX_TASK_EXECS = 1; //max # nonspec tasks to run concurrently int maxTaskAttempts = 4; - static final double SPECULATIVE_GAP = 0.2; static final long SPECULATIVE_LAG = 60 * 1000; private static final int NUM_ATTEMPTS_PER_RESTART = 1000; @@ -74,9 +74,10 @@ private int numTaskFailures = 0; private int numKilledTasks = 0; private double progress = 0; + private double oldProgressRate; private String state = ""; - private long startTime = 0; - private long execStartTime = 0; + private long dispatchTime = 0; // most recent time task attempt given to TT + private long execStartTime = 0; // when we started first task-attempt private long execFinishTime = 0; private int completes = 0; private boolean failed = false; @@ -220,7 +221,6 @@ * Initialization common to Map and Reduce */ void init(JobID jobId) { - this.startTime = System.currentTimeMillis(); this.id = new TaskID(jobId, isMapTask() ? TaskType.MAP : TaskType.REDUCE, partition); this.skipping = startSkipping(); @@ -229,12 +229,19 @@ //////////////////////////////////// // Accessors, info, profiles, etc. //////////////////////////////////// - + /** - * Return the start time + * Return the dispatch time */ - public long getStartTime() { - return startTime; + public long getDispatchTime(){ + return this.dispatchTime; + } + + /** + * Set the dispatch time + */ + public void setDispatchTime(long disTime){ + this.dispatchTime = disTime; } /** @@ -399,9 +406,15 @@ !tasksReportedClosed.contains(taskid)) { tasksReportedClosed.add(taskid); close = true; + if (isComplete() && !isComplete(taskid)) { + addDiagnosticInfo(taskid, "Another (possibly speculative) attempt" + + " already SUCCEEDED"); + } } else if (isCommitPending(taskid) && !shouldCommit(taskid) && !tasksReportedClosed.contains(taskid)) { tasksReportedClosed.add(taskid); + addDiagnosticInfo(taskid, "Another (possibly speculative) attempt" + + " went to COMMIT_PENDING state earlier"); close = true; } else { close = tasksToKill.keySet().contains(taskid); @@ -562,6 +575,17 @@ // but finishTime has to be updated. if (!isCleanupAttempt(taskid)) { taskStatuses.put(taskid, status); + if ((isMapTask() && job.hasSpeculativeMaps()) || + (!isMapTask() && job.hasSpeculativeReduces())) { + long now = jobtracker.getClock().getTime(); + double oldProgRate = getOldProgressRate(); + double currProgRate = getCurrentProgressRate(now); + job.updateStatistics(oldProgRate, currProgRate, isMapTask()); + //we need to store the current progress rate, so that we can + //update statistics accurately the next time we invoke + //updateStatistics + setProgressRate(currProgRate); + } } else { taskStatuses.get(taskid).statusUpdate(status.getRunState(), status.getProgress(), status.getStateString(), status.getPhase(), @@ -619,7 +643,7 @@ // tasktracker went down and failed time was not reported. if (0 == status.getFinishTime()){ - status.setFinishTime(System.currentTimeMillis()); + status.setFinishTime(jobtracker.getClock().getTime()); } } @@ -723,7 +747,7 @@ // this.completes++; - this.execFinishTime = System.currentTimeMillis(); + this.execFinishTime = jobtracker.getClock().getTime(); recomputeProgress(); } @@ -762,7 +786,7 @@ } this.failed = true; killed = true; - this.execFinishTime = System.currentTimeMillis(); + this.execFinishTime = jobtracker.getClock().getTime(); recomputeProgress(); } @@ -860,35 +884,39 @@ } /** - * Return whether the TIP has a speculative task to run. We - * only launch a speculative task if the current TIP is really - * far behind, and has been behind for a non-trivial amount of - * time. + * Can this task be speculated? This requires that it isn't done or almost + * done and that it isn't already being speculatively executed. + * + * Added for use by queue scheduling algorithms. + * @param currentTime */ - boolean hasSpeculativeTask(long currentTime, double averageProgress) { - // - // REMIND - mjc - these constants should be examined - // in more depth eventually... - // - - if (!skipping && activeTasks.size() <= MAX_TASK_EXECS && - (averageProgress - progress >= SPECULATIVE_GAP) && - (currentTime - startTime >= SPECULATIVE_LAG) - && completes == 0 && !isOnlyCommitPending()) { - return true; - } - return false; + boolean canBeSpeculated(long currentTime) { + DataStatistics taskStats = job.getRunningTaskStatistics(isMapTask()); + if (LOG.isDebugEnabled()) { + LOG.debug("activeTasks.size(): " + activeTasks.size() + " " + + activeTasks.firstKey() + " task's progressrate: " + + getCurrentProgressRate(currentTime) + + " taskStats : " + taskStats); + } + return (!skipping && isRunnable() && isRunning() && + activeTasks.size() <= MAX_TASK_EXECS && + currentTime - dispatchTime >= SPECULATIVE_LAG && + completes == 0 && !isOnlyCommitPending() && + (taskStats.mean() - getCurrentProgressRate(currentTime) > + taskStats.std() * job.getSlowTaskThreshold())); } - + + /** + * Is the task currently speculating? + */ + boolean isSpeculating() { + return (activeTasks.size() > MAX_TASK_EXECS); + } + /** * Return a Task that can be sent to a TaskTracker for execution. */ - public Task getTaskToRun(String taskTracker) throws IOException { - if (0 == execStartTime){ - // assume task starts running now - execStartTime = System.currentTimeMillis(); - } - + public Task getTaskToRun(String taskTracker) throws IOException { // Create the 'taskid'; do not count the 'killed' tasks against the job! TaskAttemptID taskid = null; if (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts + numKilledTasks)) { @@ -903,6 +931,16 @@ return null; } + //keep track of the last time we started an attempt at this TIP + //used to calculate the progress rate of this TIP + setDispatchTime(jobtracker.getClock().getTime()); + + //set this the first time we run a taskAttempt in this TIP + //each Task attempt has its own TaskStatus, which tracks that + //attempts execStartTime, thus this startTime is TIP wide. + if (0 == execStartTime){ + setExecStartTime(dispatchTime); + } return addRunningTask(taskid, taskTracker); } @@ -1084,6 +1122,34 @@ } /** + * Compare most recent task attempts dispatch time to current system time so + * that task progress rate will slow down as time proceeds even if no progress + * is reported for the task. This allows speculative tasks to be launched for + * tasks on slow/dead TT's before we realize the TT is dead/slow. Skew isn't + * an issue since both times are from the JobTrackers perspective. + * @return the progress rate from the active task that is doing best + */ + public double getCurrentProgressRate(long currentTime) { + double bestProgressRate = 0; + for (TaskStatus ts : taskStatuses.values()){ + double progressRate = ts.getProgress()/Math.max(1, + currentTime - dispatchTime); + if ((ts.getRunState() == TaskStatus.State.RUNNING || + ts.getRunState() == TaskStatus.State.SUCCEEDED) && + progressRate > bestProgressRate){ + bestProgressRate = progressRate; + } + } + return bestProgressRate; + } + + private void setProgressRate(double rate) { + oldProgressRate = rate; + } + private double getOldProgressRate() { + return oldProgressRate; + } + /** * This class keeps the records to be skipped during further executions * based on failed records from all the previous attempts. * It also narrow down the skip records if it is more than the