Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 22758 invoked from network); 24 Jun 2009 14:25:46 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 24 Jun 2009 14:25:46 -0000 Received: (qmail 88291 invoked by uid 500); 24 Jun 2009 14:25:57 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 88268 invoked by uid 500); 24 Jun 2009 14:25:57 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Delivered-To: moderator for mapreduce-commits@hadoop.apache.org Received: (qmail 84025 invoked by uid 99); 24 Jun 2009 14:22:52 -0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r788036 [2/3] - in /hadoop/mapreduce/trunk: ./ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapr... Date: Wed, 24 Jun 2009 14:22:15 -0000 To: mapreduce-commits@hadoop.apache.org From: yhemanth@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090624142217.2E3F323888D6@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=788036&r1=788035&r2=788036&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Wed Jun 24 14:22:13 2009 @@ -51,6 +51,7 @@ import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.Node; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker; /************************************************************* * JobInProgress maintains all the info for keeping @@ -58,8 +59,10 @@ * and its latest JobStatus, plus a set of tables for * doing bookkeeping of its Tasks. * *********************************************************** + * + * This is NOT a public interface! */ -class JobInProgress { +public class JobInProgress { static final Log LOG = LogFactory.getLog(JobInProgress.class); JobProfile profile; @@ -74,6 +77,8 @@ TaskInProgress setup[] = new TaskInProgress[0]; int numMapTasks = 0; int numReduceTasks = 0; + int numSlotsPerMap = 1; + int numSlotsPerReduce = 1; // Counters to track currently running/finished/failed Map/Reduce task-attempts int runningMapTasks = 0; @@ -234,6 +239,37 @@ //runningReduceStats used to maintain the RUNNING reduce tasks' statistics private DataStatistics runningReduceTaskStats = new DataStatistics(); + private static class FallowSlotInfo { + long timestamp; + int numSlots; + + public FallowSlotInfo(long timestamp, int numSlots) { + this.timestamp = timestamp; + this.numSlots = numSlots; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + public int getNumSlots() { + return numSlots; + } + + public void setNumSlots(int numSlots) { + this.numSlots = numSlots; + } + } + + private Map trackersReservedForMaps = + new HashMap(); + private Map trackersReservedForReduces = + new HashMap(); + /** * Create an almost empty JobInProgress, which can be used only for tests */ @@ -445,6 +481,41 @@ } /** + * Get the number of slots required to run a single map task-attempt. + * @return the number of slots required to run a single map task-attempt + */ + synchronized int getNumSlotsPerMap() { + return numSlotsPerMap; + } + + /** + * Set the number of slots required to run a single map task-attempt. + * This is typically set by schedulers which support high-ram jobs. + * @param slots the number of slots required to run a single map task-attempt + */ + synchronized void setNumSlotsPerMap(int numSlotsPerMap) { + this.numSlotsPerMap = numSlotsPerMap; + } + + /** + * Get the number of slots required to run a single reduce task-attempt. + * @return the number of slots required to run a single reduce task-attempt + */ + synchronized int getNumSlotsPerReduce() { + return numSlotsPerReduce; + } + + /** + * Set the number of slots required to run a single reduce task-attempt. + * This is typically set by schedulers which support high-ram jobs. + * @param slots the number of slots required to run a single reduce + * task-attempt + */ + synchronized void setNumSlotsPerReduce(int numSlotsPerReduce) { + this.numSlotsPerReduce = numSlotsPerReduce; + } + + /** * Construct the splits, etc. This is invoked from an async * thread so that split-computation doesn't block anyone. */ @@ -501,7 +572,7 @@ inputLength += splits[i].getDataLength(); maps[i] = new TaskInProgress(jobId, jobFile, splits[i], - jobtracker, conf, this, i); + jobtracker, conf, this, i, numSlotsPerMap); } LOG.info("Input size for job " + jobId + " = " + inputLength + ". Number of splits = " + splits.length); @@ -519,7 +590,7 @@ for (int i = 0; i < numReduceTasks; i++) { reduces[i] = new TaskInProgress(jobId, jobFile, numMapTasks, i, - jobtracker, conf, this); + jobtracker, conf, this, numSlotsPerReduce); nonRunningReduces.add(reduces[i]); } @@ -538,12 +609,12 @@ // split. JobClient.RawSplit emptySplit = new JobClient.RawSplit(); cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit, - jobtracker, conf, this, numMapTasks); + jobtracker, conf, this, numMapTasks, 1); cleanup[0].setJobCleanupTask(); // cleanup reduce tip. cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks, - numReduceTasks, jobtracker, conf, this); + numReduceTasks, jobtracker, conf, this, 1); cleanup[1].setJobCleanupTask(); // create two setup tips, one map and one reduce. @@ -552,12 +623,12 @@ // setup map tip. This map doesn't use any split. Just assign an empty // split. setup[0] = new TaskInProgress(jobId, jobFile, emptySplit, - jobtracker, conf, this, numMapTasks + 1 ); + jobtracker, conf, this, numMapTasks + 1, 1); setup[0].setJobSetupTask(); // setup reduce tip. setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks, - numReduceTasks + 1, jobtracker, conf, this); + numReduceTasks + 1, jobtracker, conf, this, 1); setup[1].setJobSetupTask(); synchronized(jobInitKillStatus){ @@ -618,6 +689,15 @@ return numReduceTasks - runningReduceTasks - failedReduceTIPs - finishedReduceTasks + speculativeReduceTasks; } + public synchronized int getNumSlotsPerTask(TaskType taskType) { + if (taskType == TaskType.MAP) { + return numSlotsPerMap; + } else if (taskType == TaskType.REDUCE) { + return numSlotsPerReduce; + } else { + return 1; + } + } public JobPriority getPriority() { return this.priority; } @@ -824,8 +904,10 @@ if (change) { TaskStatus.State state = status.getRunState(); // get the TaskTrackerStatus where the task ran - TaskTrackerStatus ttStatus = + TaskTracker taskTracker = this.jobtracker.getTaskTracker(tip.machineWhereTaskRan(taskid)); + TaskTrackerStatus ttStatus = + (taskTracker == null) ? null : taskTracker.getStatus(); String httpTaskLogLocation = null; if (null != ttStatus){ @@ -887,7 +969,7 @@ } // Tell the job to fail the relevant task - failedTask(tip, taskid, status, ttStatus, + failedTask(tip, taskid, status, taskTracker, wasRunning, wasComplete); // Did the task failure lead to tip failure? @@ -1424,9 +1506,9 @@ * to the blacklist iff too many trackers in the cluster i.e. * (clusterSize * CLUSTER_BLACKLIST_PERCENT) haven't turned 'flaky' already. * - * @param trackerName task-tracker on which a task failed + * @param taskTracker task-tracker on which a task failed */ - void addTrackerTaskFailure(String trackerName) { + void addTrackerTaskFailure(String trackerName, TaskTracker taskTracker) { if (flakyTaskTrackers < (clusterSize * CLUSTER_BLACKLIST_PERCENT)) { String trackerHostName = convertTrackerNameToHostName(trackerName); @@ -1439,11 +1521,78 @@ // Check if this tasktracker has turned 'flaky' if (trackerFailures.intValue() == conf.getMaxTaskFailuresPerTracker()) { ++flakyTaskTrackers; + + // Cancel reservations if appropriate + if (taskTracker != null) { + taskTracker.unreserveSlots(TaskType.MAP, this); + taskTracker.unreserveSlots(TaskType.REDUCE, this); + } LOG.info("TaskTracker at '" + trackerHostName + "' turned 'flaky'"); } } } + + public synchronized void reserveTaskTracker(TaskTracker taskTracker, + TaskType type, int numSlots) { + Map map = + (type == TaskType.MAP) ? trackersReservedForMaps : trackersReservedForReduces; + + long now = System.currentTimeMillis(); + + FallowSlotInfo info = map.get(taskTracker); + if (info == null) { + info = new FallowSlotInfo(now, numSlots); + } else { + // Increment metering info if the reservation is changing + if (info.getNumSlots() != numSlots) { + Enum counter = + (type == TaskType.MAP) ? + JobCounter.FALLOW_SLOTS_MILLIS_MAPS : + JobCounter.FALLOW_SLOTS_MILLIS_REDUCES; + long fallowSlotMillis = (now - info.getTimestamp()) * info.getNumSlots(); + jobCounters.incrCounter(counter, fallowSlotMillis); + + // Update + info.setTimestamp(now); + info.setNumSlots(numSlots); + } + } + map.put(taskTracker, info); + } + + public synchronized void unreserveTaskTracker(TaskTracker taskTracker, + TaskType type) { + Map map = + (type == TaskType.MAP) ? trackersReservedForMaps : + trackersReservedForReduces; + + FallowSlotInfo info = map.get(taskTracker); + if (info == null) { + LOG.warn("Cannot find information about fallow slots for " + + taskTracker.getTrackerName()); + return; + } + long now = System.currentTimeMillis(); + + Enum counter = + (type == TaskType.MAP) ? + JobCounter.FALLOW_SLOTS_MILLIS_MAPS : + JobCounter.FALLOW_SLOTS_MILLIS_REDUCES; + long fallowSlotMillis = (now - info.getTimestamp()) * info.getNumSlots(); + jobCounters.incrCounter(counter, fallowSlotMillis); + + map.remove(taskTracker); + } + + public int getNumReservedTaskTrackersForMaps() { + return trackersReservedForMaps.size(); + } + + public int getNumReservedTaskTrackersForReduces() { + return trackersReservedForReduces.size(); + } + private int getTrackerTaskFailures(String trackerName) { String trackerHostName = convertTrackerNameToHostName(trackerName); Integer failedTasks = trackerToFailuresMap.get(trackerHostName); @@ -2263,7 +2412,7 @@ // Update jobhistory TaskTrackerStatus ttStatus = - this.jobtracker.getTaskTracker(status.getTaskTracker()); + this.jobtracker.getTaskTrackerStatus(status.getTaskTracker()); String trackerHostname = jobtracker.getNode(ttStatus.getHost()).toString(); String taskType = getTaskType(tip); if (status.getIsMap()){ @@ -2421,6 +2570,7 @@ this.status.setReduceProgress(1.0f); } this.finishTime = jobtracker.getClock().getTime(); + cancelReservedSlots(); LOG.info("Job " + this.status.getJobID() + " has completed successfully."); JobHistory.JobInfo.logFinished(this.status.getJobID(), finishTime, @@ -2504,9 +2654,20 @@ for (int i = 0; i < reduces.length; i++) { reduces[i].kill(); } + + // Clear out reserved tasktrackers + cancelReservedSlots(); } } + private void cancelReservedSlots() { + for (TaskTracker tt : trackersReservedForMaps.keySet()) { + tt.unreserveSlots(TaskType.MAP, this); + } + for (TaskTracker tt : trackersReservedForReduces.keySet()) { + tt.unreserveSlots(TaskType.REDUCE, this); + } + } private void clearUncleanTasks() { TaskAttemptID taskid = null; TaskInProgress tip = null; @@ -2580,7 +2741,7 @@ */ private void failedTask(TaskInProgress tip, TaskAttemptID taskid, TaskStatus status, - TaskTrackerStatus taskTrackerStatus, + TaskTracker taskTracker, boolean wasRunning, boolean wasComplete) { final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation(); // check if the TIP is already failed @@ -2642,6 +2803,8 @@ String taskTrackerName = taskStatus.getTaskTracker(); String taskTrackerHostName = convertTrackerNameToHostName(taskTrackerName); int taskTrackerPort = -1; + TaskTrackerStatus taskTrackerStatus = + (taskTracker == null) ? null : taskTracker.getStatus(); if (taskTrackerStatus != null) { taskTrackerPort = taskTrackerStatus.getHttpPort(); } @@ -2687,7 +2850,7 @@ // Note down that a task has failed on this tasktracker // if (status.getRunState() == TaskStatus.State.FAILED) { - addTrackerTaskFailure(taskTrackerName); + addTrackerTaskFailure(taskTrackerName, taskTracker); } // @@ -2778,6 +2941,9 @@ TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(), taskid, 0.0f, + tip.isMapTask() ? + numSlotsPerMap : + numSlotsPerReduce, state, reason, reason, Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobQueueTaskScheduler.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobQueueTaskScheduler.java?rev=788036&r1=788035&r2=788036&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobQueueTaskScheduler.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobQueueTaskScheduler.java Wed Jun 24 14:22:13 2009 @@ -25,6 +25,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker; /** * A {@link TaskScheduler} that keeps jobs in a queue in priority order (FIFO @@ -77,9 +78,9 @@ } @Override - public synchronized List assignTasks(TaskTrackerStatus taskTracker) + public synchronized List assignTasks(TaskTracker taskTracker) throws IOException { - + TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus(); ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus(); final int numTaskTrackers = clusterStatus.getTaskTrackers(); final int clusterMapCapacity = clusterStatus.getMaxMapTasks(); @@ -91,10 +92,10 @@ // // Get map + reduce counts for the current tracker. // - final int trackerMapCapacity = taskTracker.getMaxMapTasks(); - final int trackerReduceCapacity = taskTracker.getMaxReduceTasks(); - final int trackerRunningMaps = taskTracker.countMapTasks(); - final int trackerRunningReduces = taskTracker.countReduceTasks(); + final int trackerMapCapacity = taskTrackerStatus.getMaxMapSlots(); + final int trackerReduceCapacity = taskTrackerStatus.getMaxReduceSlots(); + final int trackerRunningMaps = taskTrackerStatus.countMapTasks(); + final int trackerRunningReduces = taskTrackerStatus.countReduceTasks(); // Assigned tasks List assignedTasks = new ArrayList(); @@ -167,7 +168,7 @@ // Try to schedule a node-local or rack-local Map task t = - job.obtainNewLocalMapTask(taskTracker, numTaskTrackers, + job.obtainNewLocalMapTask(taskTrackerStatus, numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts()); if (t != null) { assignedTasks.add(t); @@ -186,7 +187,7 @@ // Try to schedule a node-local or rack-local Map task t = - job.obtainNewNonLocalMapTask(taskTracker, numTaskTrackers, + job.obtainNewNonLocalMapTask(taskTrackerStatus, numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts()); if (t != null) { @@ -224,7 +225,7 @@ } Task t = - job.obtainNewReduceTask(taskTracker, numTaskTrackers, + job.obtainNewReduceTask(taskTrackerStatus, numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts() ); if (t != null) { @@ -243,7 +244,7 @@ } if (LOG.isDebugEnabled()) { - LOG.debug("Task assignments for " + taskTracker.getTrackerName() + " --> " + + LOG.debug("Task assignments for " + taskTrackerStatus.getTrackerName() + " --> " + "[" + mapLoadFactor + ", " + trackerMapCapacity + ", " + trackerCurrentMapCapacity + ", " + trackerRunningMaps + "] -> [" + (trackerCurrentMapCapacity - trackerRunningMaps) + ", " + Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=788036&r1=788035&r2=788036&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Wed Jun 24 14:22:13 2009 @@ -85,6 +85,9 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.VersionInfo; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker; + /******************************************************* * JobTracker is the central location for submitting and * tracking MR jobs in a network environment. @@ -267,7 +270,8 @@ JobInProgress job = tip.getJob(); String trackerName = getAssignedTracker(taskId); TaskTrackerStatus trackerStatus = - getTaskTracker(trackerName); + getTaskTrackerStatus(trackerName); + // This might happen when the tasktracker has already // expired and this thread tries to call failedtask // again. expire tasktracker should have called failed @@ -348,22 +352,24 @@ long now = clock.getTime(); TaskTrackerStatus leastRecent = null; while ((trackerExpiryQueue.size() > 0) && - ((leastRecent = trackerExpiryQueue.first()) != null) && - (now - leastRecent.getLastSeen() > tasktrackerExpiryInterval)) { + (leastRecent = trackerExpiryQueue.first()) != null && + ((now - leastRecent.getLastSeen()) > tasktrackerExpiryInterval)) { // Remove profile from head of queue trackerExpiryQueue.remove(leastRecent); String trackerName = leastRecent.getTrackerName(); // Figure out if last-seen time should be updated, or if tracker is dead - TaskTrackerStatus newProfile = taskTrackers.get(leastRecent.getTrackerName()); + TaskTracker current = getTaskTracker(trackerName); + TaskTrackerStatus newProfile = + (current == null ) ? null : current.getStatus(); // Items might leave the taskTracker set through other means; the // status stored in 'taskTrackers' might be null, which means the // tracker has already been destroyed. if (newProfile != null) { - if (now - newProfile.getLastSeen() > tasktrackerExpiryInterval) { + if ((now - newProfile.getLastSeen()) > tasktrackerExpiryInterval) { // Remove completely after marking the tasks as 'KILLED' - lostTaskTracker(leastRecent.getTrackerName()); + lostTaskTracker(current); // tracker is lost, and if it is blacklisted, remove // it from the count of blacklisted trackers in the cluster if (isBlacklisted(trackerName)) { @@ -373,7 +379,7 @@ // remove the mapping from the hosts list String hostname = newProfile.getHost(); - hostnameToTrackerName.get(hostname).remove(trackerName); + hostnameToTaskTracker.get(hostname).remove(trackerName); } else { // Update time by inserting latest profile trackerExpiryQueue.add(newProfile); @@ -625,9 +631,9 @@ synchronized (taskTrackers) { // remove the capacity of trackers on this host for (TaskTrackerStatus status : getStatusesOnHost(hostName)) { - int mapSlots = status.getMaxMapTasks(); + int mapSlots = status.getMaxMapSlots(); totalMapTaskCapacity -= mapSlots; - int reduceSlots = status.getMaxReduceTasks(); + int reduceSlots = status.getMaxReduceSlots(); totalReduceTaskCapacity -= reduceSlots; getInstrumentation().addBlackListedMapSlots( mapSlots); @@ -644,9 +650,9 @@ int numTrackersOnHost = 0; // add the capacity of trackers on the host for (TaskTrackerStatus status : getStatusesOnHost(hostName)) { - int mapSlots = status.getMaxMapTasks(); + int mapSlots = status.getMaxMapSlots(); totalMapTaskCapacity += mapSlots; - int reduceSlots = status.getMaxReduceTasks(); + int reduceSlots = status.getMaxReduceSlots(); totalReduceTaskCapacity += reduceSlots; numTrackersOnHost++; getInstrumentation().decBlackListedMapSlots(mapSlots); @@ -694,7 +700,8 @@ private List getStatusesOnHost(String hostName) { List statuses = new ArrayList(); synchronized (taskTrackers) { - for (TaskTrackerStatus status : taskTrackers.values()) { + for (TaskTracker tt : taskTrackers.values()) { + TaskTrackerStatus status = tt.getStatus(); if (hostName.equals(status.getHost())) { statuses.add(status); } @@ -1015,14 +1022,14 @@ // II. Create the (appropriate) task status if (type.equals(Values.MAP.name())) { taskStatus = - new MapTaskStatus(attemptId, 0.0f, TaskStatus.State.RUNNING, - "", "", trackerName, TaskStatus.Phase.MAP, - new Counters()); + new MapTaskStatus(attemptId, 0.0f, job.getNumSlotsPerTask(TaskType.MAP), + TaskStatus.State.RUNNING, "", "", trackerName, + TaskStatus.Phase.MAP, new Counters()); } else { taskStatus = - new ReduceTaskStatus(attemptId, 0.0f, TaskStatus.State.RUNNING, - "", "", trackerName, TaskStatus.Phase.REDUCE, - new Counters()); + new ReduceTaskStatus(attemptId, 0.0f, job.getNumSlotsPerTask(TaskType.REDUCE), + TaskStatus.State.RUNNING, "", "", trackerName, + TaskStatus.Phase.REDUCE, new Counters()); } // Set the start time @@ -1041,10 +1048,13 @@ synchronized (taskTrackers) { synchronized (trackerExpiryQueue) { // IV. Register a new tracker - boolean isTrackerRegistered = getTaskTracker(trackerName) != null; + TaskTracker taskTracker = getTaskTracker(trackerName); + boolean isTrackerRegistered = (taskTracker != null); if (!isTrackerRegistered) { markTracker(trackerName); // add the tracker to recovery-manager - addNewTracker(ttStatus); + taskTracker = new TaskTracker(trackerName); + taskTracker.setStatus(ttStatus); + addNewTracker(taskTracker); } // V. Update the tracker status @@ -1398,17 +1408,17 @@ // are updated int size = trackerExpiryQueue.size(); for (int i = 0; i < size ; ++i) { - // Get the first status - TaskTrackerStatus status = trackerExpiryQueue.first(); + // Get the first tasktracker + TaskTrackerStatus taskTracker = trackerExpiryQueue.first(); // Remove it - trackerExpiryQueue.remove(status); + trackerExpiryQueue.remove(taskTracker); // Set the new time - status.setLastSeen(recoveryProcessEndTime); + taskTracker.setLastSeen(recoveryProcessEndTime); // Add back to get the sorted list - trackerExpiryQueue.add(status); + trackerExpiryQueue.add(taskTracker); } } @@ -1493,11 +1503,11 @@ Map hostnameToNodeMap = Collections.synchronizedMap(new TreeMap()); - // (hostname --> Set(trackername)) + // (hostname --> Set(tasktracker)) // This is used to keep track of all trackers running on one host. While // decommissioning the host, all the trackers on the host will be lost. - Map> hostnameToTrackerName = - Collections.synchronizedMap(new TreeMap>()); + Map> hostnameToTaskTracker = + Collections.synchronizedMap(new TreeMap>()); // Number of resolved entries int numResolved; @@ -1510,8 +1520,8 @@ // int totalMaps = 0; int totalReduces = 0; - private HashMap taskTrackers = - new HashMap(); + private HashMap taskTrackers = + new HashMap(); MapuniqueHostsMap = new ConcurrentHashMap(); ExpireTrackers expireTrackers = new ExpireTrackers(); Thread expireTrackersThread = null; @@ -1526,7 +1536,7 @@ RecoveryManager recoveryManager; /** - * It might seem like a bug to maintain a TreeSet of status objects, + * It might seem like a bug to maintain a TreeSet of tasktracker objects, * which can be updated at any time. But that's not what happens! We * only update status objects in the taskTrackers table. Status objects * are never updated once they enter the expiry queue. Instead, we wait @@ -2326,9 +2336,15 @@ * @return {@link Collection} of {@link TaskTrackerStatus} */ public Collection taskTrackers() { + Collection ttStatuses; synchronized (taskTrackers) { - return taskTrackers.values(); + ttStatuses = + new ArrayList(taskTrackers.values().size()); + for (TaskTracker tt : taskTrackers.values()) { + ttStatuses.add(tt.getStatus()); + } } + return ttStatuses; } /** @@ -2340,7 +2356,8 @@ Collection activeTrackers = new ArrayList(); synchronized (taskTrackers) { - for (TaskTrackerStatus status : taskTrackers.values()) { + for ( TaskTracker tt : taskTrackers.values()) { + TaskTrackerStatus status = tt.getStatus(); if (!faultyTrackers.isBlacklisted(status.getHost())) { activeTrackers.add(status); } @@ -2361,7 +2378,8 @@ List blacklistedTrackers = new ArrayList(); synchronized (taskTrackers) { - for (TaskTrackerStatus status : taskTrackers.values()) { + for (TaskTracker tt : taskTrackers.values()) { + TaskTrackerStatus status = tt.getStatus(); if (!faultyTrackers.isBlacklisted(status.getHost())) { activeTrackers.add(status.getTrackerName()); } else { @@ -2384,7 +2402,8 @@ Collection blacklistedTrackers = new ArrayList(); synchronized (taskTrackers) { - for (TaskTrackerStatus status : taskTrackers.values()) { + for (TaskTracker tt : taskTrackers.values()) { + TaskTrackerStatus status = tt.getStatus(); if (faultyTrackers.isBlacklisted(status.getHost())) { blacklistedTrackers.add(status); } @@ -2414,14 +2433,22 @@ * @return true if blacklisted, false otherwise */ public boolean isBlacklisted(String trackerID) { - TaskTrackerStatus status = getTaskTracker(trackerID); + TaskTrackerStatus status = getTaskTrackerStatus(trackerID); if (status != null) { return faultyTrackers.isBlacklisted(status.getHost()); } return false; } - public TaskTrackerStatus getTaskTracker(String trackerID) { + public TaskTrackerStatus getTaskTrackerStatus(String trackerID) { + TaskTracker taskTracker; + synchronized (taskTrackers) { + taskTracker = taskTrackers.get(trackerID); + } + return (taskTracker == null) ? null : taskTracker.getStatus(); + } + + public TaskTracker getTaskTracker(String trackerID) { synchronized (taskTrackers) { return taskTrackers.get(trackerID); } @@ -2435,7 +2462,8 @@ * * @param status Task Tracker's status */ - private void addNewTracker(TaskTrackerStatus status) { + private void addNewTracker(TaskTracker taskTracker) { + TaskTrackerStatus status = taskTracker.getStatus(); trackerExpiryQueue.add(status); // Register the tracker if its not registered @@ -2446,14 +2474,14 @@ } // add it to the set of tracker per host - Set trackers = hostnameToTrackerName.get(hostname); + Set trackers = hostnameToTaskTracker.get(hostname); if (trackers == null) { - trackers = Collections.synchronizedSet(new HashSet()); - hostnameToTrackerName.put(hostname, trackers); + trackers = Collections.synchronizedSet(new HashSet()); + hostnameToTaskTracker.put(hostname, trackers); } LOG.info("Adding tracker " + status.getTrackerName() + " to host " + hostname); - trackers.add(status.getTrackerName()); + trackers.add(taskTracker); } public Node resolveAndAddToTopology(String name) { @@ -2565,11 +2593,13 @@ boolean acceptNewTasks, short responseId) throws IOException { - LOG.debug("Got heartbeat from: " + status.getTrackerName() + - " (restarted: " + restarted + - " initialContact: " + initialContact + - " acceptNewTasks: " + acceptNewTasks + ")" + - " with responseId: " + responseId); + if (LOG.isDebugEnabled()) { + LOG.debug("Got heartbeat from: " + status.getTrackerName() + + " (restarted: " + restarted + + " initialContact: " + initialContact + + " acceptNewTasks: " + acceptNewTasks + ")" + + " with responseId: " + responseId); + } // Make sure heartbeat is from a tasktracker allowed by the jobtracker. if (!acceptTaskTracker(status)) { @@ -2645,13 +2675,13 @@ // Check for new tasks to be executed on the tasktracker if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) { - TaskTrackerStatus taskTrackerStatus = getTaskTracker(trackerName); + TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName) ; if (taskTrackerStatus == null) { LOG.warn("Unknown task tracker polling; ignoring: " + trackerName); } else { List tasks = getSetupAndCleanupTasks(taskTrackerStatus); if (tasks == null ) { - tasks = taskScheduler.assignTasks(taskTrackerStatus); + tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName)); } if (tasks != null) { for (Task task : tasks) { @@ -2751,14 +2781,15 @@ */ private boolean updateTaskTrackerStatus(String trackerName, TaskTrackerStatus status) { - TaskTrackerStatus oldStatus = taskTrackers.get(trackerName); + TaskTracker tt = getTaskTracker(trackerName); + TaskTrackerStatus oldStatus = (tt == null) ? null : tt.getStatus(); if (oldStatus != null) { totalMaps -= oldStatus.countMapTasks(); totalReduces -= oldStatus.countReduceTasks(); if (!faultyTrackers.isBlacklisted(oldStatus.getHost())) { - int mapSlots = oldStatus.getMaxMapTasks(); + int mapSlots = oldStatus.getMaxMapSlots(); totalMapTaskCapacity -= mapSlots; - int reduceSlots = oldStatus.getMaxReduceTasks(); + int reduceSlots = oldStatus.getMaxReduceSlots(); totalReduceTaskCapacity -= reduceSlots; } if (status == null) { @@ -2778,16 +2809,56 @@ totalMaps += status.countMapTasks(); totalReduces += status.countReduceTasks(); if (!faultyTrackers.isBlacklisted(status.getHost())) { - int mapSlots = status.getMaxMapTasks(); + int mapSlots = status.getMaxMapSlots(); totalMapTaskCapacity += mapSlots; - int reduceSlots = status.getMaxReduceTasks(); + int reduceSlots = status.getMaxReduceSlots(); totalReduceTaskCapacity += reduceSlots; } boolean alreadyPresent = false; - if (taskTrackers.containsKey(trackerName)) { + TaskTracker taskTracker = taskTrackers.get(trackerName); + if (taskTracker != null) { alreadyPresent = true; + } else { + taskTracker = new TaskTracker(trackerName); + } + + taskTracker.setStatus(status); + taskTrackers.put(trackerName, taskTracker); + + if (LOG.isDebugEnabled()) { + int runningMaps = 0, runningReduces = 0; + int commitPendingMaps = 0, commitPendingReduces = 0; + int unassignedMaps = 0, unassignedReduces = 0; + int miscMaps = 0, miscReduces = 0; + List taskReports = status.getTaskReports(); + for (Iterator it = taskReports.iterator(); it.hasNext();) { + TaskStatus ts = (TaskStatus) it.next(); + boolean isMap = ts.getIsMap(); + TaskStatus.State state = ts.getRunState(); + if (state == TaskStatus.State.RUNNING) { + if (isMap) { ++runningMaps; } + else { ++runningReduces; } + } else if (state == TaskStatus.State.UNASSIGNED) { + if (isMap) { ++unassignedMaps; } + else { ++unassignedReduces; } + } else if (state == TaskStatus.State.COMMIT_PENDING) { + if (isMap) { ++commitPendingMaps; } + else { ++commitPendingReduces; } + } else { + if (isMap) { ++miscMaps; } + else { ++miscReduces; } + } + } + LOG.debug(trackerName + ": Status -" + + " running(m) = " + runningMaps + + " unassigned(m) = " + unassignedMaps + + " commit_pending(m) = " + commitPendingMaps + + " misc(m) = " + miscMaps + + " running(r) = " + runningReduces + + " unassigned(r) = " + unassignedReduces + + " commit_pending(r) = " + commitPendingReduces + + " misc(r) = " + miscReduces); } - taskTrackers.put(trackerName, status); if (!alreadyPresent) { Integer numTaskTrackersInHost = @@ -2816,11 +2887,12 @@ synchronized (trackerExpiryQueue) { boolean seenBefore = updateTaskTrackerStatus(trackerName, trackerStatus); + TaskTracker taskTracker = getTaskTracker(trackerName); if (initialContact) { // If it's first contact, then clear out // any state hanging around if (seenBefore) { - lostTaskTracker(trackerName); + lostTaskTracker(taskTracker); } } else { // If not first contact, there should be some record of the tracker @@ -2837,7 +2909,7 @@ if (isBlacklisted(trackerName)) { faultyTrackers.numBlacklistedTrackers += 1; } - addNewTracker(trackerStatus); + addNewTracker(taskTracker); } } } @@ -2956,8 +3028,8 @@ // returns cleanup tasks first, then setup tasks. private synchronized List getSetupAndCleanupTasks( TaskTrackerStatus taskTracker) throws IOException { - int maxMapTasks = taskTracker.getMaxMapTasks(); - int maxReduceTasks = taskTracker.getMaxReduceTasks(); + int maxMapTasks = taskTracker.getMaxMapSlots(); + int maxReduceTasks = taskTracker.getMaxReduceSlots(); int numMaps = taskTracker.countMapTasks(); int numReduces = taskTracker.countReduceTasks(); int numTaskTrackers = getClusterStatus().getTaskTrackers(); @@ -3635,7 +3707,8 @@ * already been updated. Just process the contained tasks and any * jobs that might be affected. */ - void lostTaskTracker(String trackerName) { + void lostTaskTracker(TaskTracker taskTracker) { + String trackerName = taskTracker.getTrackerName(); LOG.info("Lost tracker '" + trackerName + "'"); // remove the tracker from the local structures @@ -3693,10 +3766,14 @@ // Penalize this tracker for each of the jobs which // had any tasks running on it when it was 'lost' + // Also, remove any reserved slots on this tasktracker for (JobInProgress job : jobsWithFailures) { - job.addTrackerTaskFailure(trackerName); + job.addTrackerTaskFailure(trackerName, taskTracker); } - + + // Cleanup + taskTracker.lost(); + // Purge 'marked' tasks, needs to be done // here to prevent hanging references! removeMarkedTasks(trackerName); @@ -3726,9 +3803,9 @@ hostsReader.refresh(); Set excludeSet = new HashSet(); - for(Map.Entry eSet : taskTrackers.entrySet()) { + for(Map.Entry eSet : taskTrackers.entrySet()) { String trackerName = eSet.getKey(); - TaskTrackerStatus status = eSet.getValue(); + TaskTrackerStatus status = eSet.getValue().getStatus(); // Check if not include i.e not in host list or in hosts list but excluded if (!inHostsList(status) || inExcludedHostsList(status)) { excludeSet.add(status.getHost()); // add to rejected trackers @@ -3746,12 +3823,13 @@ synchronized (trackerExpiryQueue) { for (String host : hosts) { LOG.info("Decommissioning host " + host); - Set trackers = hostnameToTrackerName.remove(host); + Set trackers = hostnameToTaskTracker.remove(host); if (trackers != null) { - for (String tracker : trackers) { - LOG.info("Losing tracker " + tracker + " on host " + host); + for (TaskTracker tracker : trackers) { + LOG.info("Decommission: Losing tracker " + tracker + + " on host " + host); lostTaskTracker(tracker); // lose the tracker - updateTaskTrackerStatus(tracker, null); + updateTaskTrackerStatus(tracker.getStatus().getTrackerName(), null); } } LOG.info("Host " + host + " is ready for decommissioning"); Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java?rev=788036&r1=788035&r2=788036&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java Wed Jun 24 14:22:13 2009 @@ -26,6 +26,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker; /** * A {@link TaskScheduler} that limits the maximum number of tasks @@ -69,9 +70,9 @@ } @Override - public synchronized List assignTasks(TaskTrackerStatus taskTracker) + public synchronized List assignTasks(TaskTracker taskTracker) throws IOException { - + TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus(); final int numTaskTrackers = taskTrackerManager.getClusterStatus().getTaskTrackers(); Collection jobQueue = @@ -79,10 +80,10 @@ Task task; /* Stats about the current taskTracker */ - final int mapTasksNumber = taskTracker.countMapTasks(); - final int reduceTasksNumber = taskTracker.countReduceTasks(); - final int maximumMapTasksNumber = taskTracker.getMaxMapTasks(); - final int maximumReduceTasksNumber = taskTracker.getMaxReduceTasks(); + final int mapTasksNumber = taskTrackerStatus.countMapTasks(); + final int reduceTasksNumber = taskTrackerStatus.countReduceTasks(); + final int maximumMapTasksNumber = taskTrackerStatus.getMaxMapSlots(); + final int maximumReduceTasksNumber = taskTrackerStatus.getMaxReduceSlots(); /* * Statistics about the whole cluster. Most are approximate because of @@ -141,11 +142,11 @@ continue; } if (step == 0 || step == 2) { - task = job.obtainNewMapTask(taskTracker, numTaskTrackers, + task = job.obtainNewMapTask(taskTrackerStatus, numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts()); } else { - task = job.obtainNewReduceTask(taskTracker, numTaskTrackers, + task = job.obtainNewReduceTask(taskTrackerStatus, numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts()); } if (task != null) { Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=788036&r1=788035&r2=788036&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Wed Jun 24 14:22:13 2009 @@ -168,7 +168,7 @@ MapTask map = new MapTask(file.toString(), mapId, i, rawSplits[i].getClassName(), - rawSplits[i].getBytes()); + rawSplits[i].getBytes(), 1); JobConf localConf = new JobConf(job); map.setJobFile(localFile.toString()); map.localizeConfiguration(localConf); @@ -207,7 +207,7 @@ } if (!this.isInterrupted()) { ReduceTask reduce = new ReduceTask(file.toString(), - reduceId, 0, mapIds.size()); + reduceId, 0, mapIds.size(), 1); JobConf localConf = new JobConf(job); reduce.setJobFile(localFile.toString()); reduce.localizeConfiguration(localConf); Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=788036&r1=788035&r2=788036&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Wed Jun 24 14:22:13 2009 @@ -88,9 +88,9 @@ } public MapTask(String jobFile, TaskAttemptID taskId, - int partition, String splitClass, BytesWritable split - ) { - super(jobFile, taskId, partition); + int partition, String splitClass, BytesWritable split, + int numSlotsRequired) { + super(jobFile, taskId, partition, numSlotsRequired); this.splitClass = splitClass; this.split = split; } Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTaskStatus.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTaskStatus.java?rev=788036&r1=788035&r2=788036&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTaskStatus.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTaskStatus.java Wed Jun 24 14:22:13 2009 @@ -30,10 +30,10 @@ public MapTaskStatus() {} - public MapTaskStatus(TaskAttemptID taskid, float progress, + public MapTaskStatus(TaskAttemptID taskid, float progress, int numSlots, State runState, String diagnosticInfo, String stateString, String taskTracker, Phase phase, Counters counters) { - super(taskid, progress, runState, diagnosticInfo, stateString, + super(taskid, progress, numSlots, runState, diagnosticInfo, stateString, taskTracker, phase, counters); } Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=788036&r1=788035&r2=788036&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Wed Jun 24 14:22:13 2009 @@ -149,8 +149,8 @@ } public ReduceTask(String jobFile, TaskAttemptID taskId, - int partition, int numMaps) { - super(jobFile, taskId, partition); + int partition, int numMaps, int numSlotsRequired) { + super(jobFile, taskId, partition, numSlotsRequired); this.numMaps = numMaps; } Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java?rev=788036&r1=788035&r2=788036&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java Wed Jun 24 14:22:13 2009 @@ -34,11 +34,11 @@ public ReduceTaskStatus() {} - public ReduceTaskStatus(TaskAttemptID taskid, float progress, State runState, - String diagnosticInfo, String stateString, String taskTracker, - Phase phase, Counters counters) { - super(taskid, progress, runState, diagnosticInfo, stateString, taskTracker, - phase, counters); + public ReduceTaskStatus(TaskAttemptID taskid, float progress, int numSlots, + State runState, String diagnosticInfo, String stateString, + String taskTracker, Phase phase, Counters counters) { + super(taskid, progress, numSlots, runState, diagnosticInfo, stateString, + taskTracker, phase, counters); } @Override Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=788036&r1=788035&r2=788036&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java Wed Jun 24 14:22:13 2009 @@ -52,8 +52,12 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; -/** Base class for tasks. */ -abstract class Task implements Writable, Configurable { +/** + * Base class for tasks. + * + * This is NOT a public interface. + */ +abstract public class Task implements Writable, Configurable { private static final Log LOG = LogFactory.getLog(Task.class); @@ -119,6 +123,7 @@ protected org.apache.hadoop.mapreduce.OutputFormat outputFormat; protected org.apache.hadoop.mapreduce.OutputCommitter committer; protected final Counters.Counter spilledRecordsCounter; + private int numSlotsRequired; //////////////////////////////////////////// // Constructors @@ -130,13 +135,15 @@ spilledRecordsCounter = counters.findCounter(TaskCounter.SPILLED_RECORDS); } - public Task(String jobFile, TaskAttemptID taskId, int partition) { + public Task(String jobFile, TaskAttemptID taskId, int partition, + int numSlotsRequired) { this.jobFile = jobFile; this.taskId = taskId; this.partition = partition; + this.numSlotsRequired = numSlotsRequired; this.taskStatus = TaskStatus.createTaskStatus(isMapTask(), this.taskId, - 0.0f, + 0.0f, numSlotsRequired, TaskStatus.State.UNASSIGNED, "", "", "", isMapTask() ? @@ -153,6 +160,10 @@ public void setJobFile(String jobFile) { this.jobFile = jobFile; } public String getJobFile() { return jobFile; } public TaskAttemptID getTaskID() { return taskId; } + public int getNumSlotsRequired() { + return numSlotsRequired; + } + Counters getCounters() { return counters; } /** @@ -173,14 +184,14 @@ /** * Return current phase of the task. * needs to be synchronized as communication thread sends the phase every second - * @return + * @return the curent phase of the task */ public synchronized TaskStatus.Phase getPhase(){ return this.taskStatus.getPhase(); } /** * Set current phase of the task. - * @param p + * @param phase task phase */ protected synchronized void setPhase(TaskStatus.Phase phase){ this.taskStatus.setPhase(phase); @@ -282,6 +293,7 @@ Text.writeString(out, jobFile); taskId.write(out); out.writeInt(partition); + out.writeInt(numSlotsRequired); taskStatus.write(out); skipRanges.write(out); out.writeBoolean(skipping); @@ -295,6 +307,7 @@ jobFile = Text.readString(in); taskId = TaskAttemptID.read(in); partition = in.readInt(); + numSlotsRequired = in.readInt(); taskStatus.readFields(in); this.mapOutputFile.setJobId(taskId.getJobID()); skipRanges.readFields(in); Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=788036&r1=788035&r2=788036&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Wed Jun 24 14:22:13 2009 @@ -68,6 +68,7 @@ private JobTracker jobtracker; private TaskID id; private JobInProgress job; + private final int numSlotsRequired; // Status of the TIP private int successEventNumber = -1; @@ -134,7 +135,8 @@ public TaskInProgress(JobID jobid, String jobFile, RawSplit rawSplit, JobTracker jobtracker, JobConf conf, - JobInProgress job, int partition) { + JobInProgress job, int partition, + int numSlotsRequired) { this.jobFile = jobFile; this.rawSplit = rawSplit; this.jobtracker = jobtracker; @@ -142,6 +144,7 @@ this.conf = conf; this.partition = partition; this.maxSkipRecords = SkipBadRecords.getMapperMaxSkipRecords(conf); + this.numSlotsRequired = numSlotsRequired; setMaxTaskAttempts(); init(jobid); } @@ -152,7 +155,7 @@ public TaskInProgress(JobID jobid, String jobFile, int numMaps, int partition, JobTracker jobtracker, JobConf conf, - JobInProgress job) { + JobInProgress job, int numSlotsRequired) { this.jobFile = jobFile; this.numMaps = numMaps; this.partition = partition; @@ -160,6 +163,7 @@ this.job = job; this.conf = conf; this.maxSkipRecords = SkipBadRecords.getReducerMaxSkipGroups(conf); + this.numSlotsRequired = numSlotsRequired; setMaxTaskAttempts(); init(jobid); } @@ -538,7 +542,9 @@ newState != TaskStatus.State.UNASSIGNED) && (oldState == newState)) { LOG.warn("Recieved duplicate status update of '" + newState + - "' for '" + taskid + "' of TIP '" + getTIPId() + "'"); + "' for '" + taskid + "' of TIP '" + getTIPId() + "'" + + "oldTT=" + oldStatus.getTaskTracker() + + " while newTT=" + status.getTaskTracker()); return false; } @@ -968,9 +974,10 @@ } else { split = new BytesWritable(); } - t = new MapTask(jobFile, taskid, partition, splitClass, split); + t = new MapTask(jobFile, taskid, partition, splitClass, split, + numSlotsRequired); } else { - t = new ReduceTask(jobFile, taskid, partition, numMaps); + t = new ReduceTask(jobFile, taskid, partition, numMaps, numSlotsRequired); } if (jobCleanup) { t.setJobCleanupTask(); Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskScheduler.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskScheduler.java?rev=788036&r1=788035&r2=788036&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskScheduler.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskScheduler.java Wed Jun 24 14:22:13 2009 @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker; /** * Used by a {@link JobTracker} to schedule {@link Task}s on @@ -36,7 +37,7 @@ * between the job being added (when * {@link JobInProgressListener#jobAdded(JobInProgress)} is called) * and tasks for that job being assigned (by - * {@link #assignTasks(TaskTrackerStatus)}). + * {@link #assignTasks(TaskTracker)}). * @see EagerTaskInitializationListener */ abstract class TaskScheduler implements Configurable { @@ -80,8 +81,8 @@ * @param taskTracker The TaskTracker for which we're looking for tasks. * @return A list of tasks to run on that TaskTracker, possibly empty. */ - public abstract List assignTasks(TaskTrackerStatus taskTracker) - throws IOException; + public abstract List assignTasks(TaskTracker taskTracker) + throws IOException; /** * Returns a collection of jobs in an order which is specific to Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java?rev=788036&r1=788035&r2=788036&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java Wed Jun 24 14:22:13 2009 @@ -49,6 +49,7 @@ private String diagnosticInfo; private String stateString; private String taskTracker; + private int numSlots; private long startTime; //in ms private long finishTime; @@ -61,14 +62,16 @@ public TaskStatus() { taskid = new TaskAttemptID(); + numSlots = 0; } - public TaskStatus(TaskAttemptID taskid, float progress, + public TaskStatus(TaskAttemptID taskid, float progress, int numSlots, State runState, String diagnosticInfo, String stateString, String taskTracker, Phase phase, Counters counters) { this.taskid = taskid; this.progress = progress; + this.numSlots = numSlots; this.runState = runState; this.diagnosticInfo = diagnosticInfo; this.stateString = stateString; @@ -80,6 +83,10 @@ public TaskAttemptID getTaskID() { return taskid; } public abstract boolean getIsMap(); + public int getNumSlots() { + return numSlots; + } + public float getProgress() { return progress; } public void setProgress(float progress) { this.progress = progress; @@ -383,6 +390,7 @@ public void write(DataOutput out) throws IOException { taskid.write(out); out.writeFloat(progress); + out.writeInt(numSlots); WritableUtils.writeEnum(out, runState); Text.writeString(out, diagnosticInfo); Text.writeString(out, stateString); @@ -400,6 +408,7 @@ public void readFields(DataInput in) throws IOException { this.taskid.readFields(in); setProgress(in.readFloat()); + this.numSlots = in.readInt(); this.runState = WritableUtils.readEnum(in, State.class); this.diagnosticInfo = Text.readString(in); this.stateString = Text.readString(in); @@ -419,24 +428,27 @@ // Factory-like methods to create/read/write appropriate TaskStatus objects ////////////////////////////////////////////////////////////////////////////// - static TaskStatus createTaskStatus(DataInput in, TaskAttemptID taskId, float progress, + static TaskStatus createTaskStatus(DataInput in, TaskAttemptID taskId, + float progress, int numSlots, State runState, String diagnosticInfo, String stateString, String taskTracker, Phase phase, Counters counters) throws IOException { boolean isMap = in.readBoolean(); - return createTaskStatus(isMap, taskId, progress, runState, diagnosticInfo, - stateString, taskTracker, phase, counters); + return createTaskStatus(isMap, taskId, progress, numSlots, runState, + diagnosticInfo, stateString, taskTracker, phase, + counters); } - static TaskStatus createTaskStatus(boolean isMap, TaskAttemptID taskId, float progress, - State runState, String diagnosticInfo, - String stateString, String taskTracker, - Phase phase, Counters counters) { - return (isMap) ? new MapTaskStatus(taskId, progress, runState, + static TaskStatus createTaskStatus(boolean isMap, TaskAttemptID taskId, + float progress, int numSlots, + State runState, String diagnosticInfo, + String stateString, String taskTracker, + Phase phase, Counters counters) { + return (isMap) ? new MapTaskStatus(taskId, progress, numSlots, runState, diagnosticInfo, stateString, taskTracker, phase, counters) : - new ReduceTaskStatus(taskId, progress, runState, + new ReduceTaskStatus(taskId, progress, numSlots, runState, diagnosticInfo, stateString, taskTracker, phase, counters); } Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=788036&r1=788035&r2=788036&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Jun 24 14:22:13 2009 @@ -67,6 +67,7 @@ import org.apache.hadoop.ipc.Server; import org.apache.hadoop.mapred.TaskStatus.Phase; import org.apache.hadoop.mapred.pipes.Submitter; +import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.metrics.MetricsContext; import org.apache.hadoop.metrics.MetricsException; import org.apache.hadoop.metrics.MetricsRecord; @@ -189,8 +190,8 @@ private static final String OUTPUT = "output"; private JobConf fConf; private FileSystem localFs; - private int maxCurrentMapTasks; - private int maxCurrentReduceTasks; + private int maxMapSlots; + private int maxReduceSlots; private int failures; private MapEventsFetcherThread mapEventsFetcher; int workerThreads; @@ -486,8 +487,8 @@ } // RPC initialization - int max = maxCurrentMapTasks > maxCurrentReduceTasks ? - maxCurrentMapTasks : maxCurrentReduceTasks; + int max = maxMapSlots > maxReduceSlots ? + maxMapSlots : maxReduceSlots; //set the num handlers to max*2 since canCommit may wait for the duration //of a heartbeat RPC this.taskReportServer = @@ -525,8 +526,8 @@ this.indexCache = new IndexCache(this.fConf); - mapLauncher = new TaskLauncher(maxCurrentMapTasks); - reduceLauncher = new TaskLauncher(maxCurrentReduceTasks); + mapLauncher = new TaskLauncher(TaskType.MAP, maxMapSlots); + reduceLauncher = new TaskLauncher(TaskType.REDUCE, maxReduceSlots); mapLauncher.start(); reduceLauncher.start(); Class taskControllerClass @@ -902,10 +903,8 @@ */ public TaskTracker(JobConf conf) throws IOException { fConf = conf; - maxCurrentMapTasks = conf.getInt( - "mapred.tasktracker.map.tasks.maximum", 2); - maxCurrentReduceTasks = conf.getInt( - "mapred.tasktracker.reduce.tasks.maximum", 2); + maxMapSlots = conf.getInt("mapred.tasktracker.map.tasks.maximum", 2); + maxReduceSlots = conf.getInt("mapred.tasktracker.reduce.tasks.maximum", 2); this.jobTrackAddr = JobTracker.getAddress(conf); InetSocketAddress infoSocAddr = NetUtils.createSocketAddr( conf.get("mapred.task.tracker.http.address", "0.0.0.0:50060")); @@ -1174,8 +1173,8 @@ cloneAndResetRunningTaskStatuses( sendCounters), failures, - maxCurrentMapTasks, - maxCurrentReduceTasks); + maxMapSlots, + maxReduceSlots); } } else { LOG.info("Resending 'status' to '" + jobTrackAddr.getHostName() + @@ -1188,9 +1187,10 @@ boolean askForNewTask; long localMinSpaceStart; synchronized (this) { - askForNewTask = (status.countMapTasks() < maxCurrentMapTasks || - status.countReduceTasks() < maxCurrentReduceTasks) && - acceptNewTasks; + askForNewTask = + ((status.countOccupiedMapSlots() < maxMapSlots || + status.countOccupiedReduceSlots() < maxReduceSlots) && + acceptNewTasks); localMinSpaceStart = minSpaceStart; } if (askForNewTask) { @@ -1564,12 +1564,12 @@ private final int maxSlots; private List tasksToLaunch; - public TaskLauncher(int numSlots) { + public TaskLauncher(TaskType taskType, int numSlots) { this.maxSlots = numSlots; this.numFreeSlots = new IntWritable(numSlots); this.tasksToLaunch = new LinkedList(); setDaemon(true); - setName("TaskLauncher for task"); + setName("TaskLauncher for " + taskType + " tasks"); } public void addToTaskQueue(LaunchTaskAction action) { @@ -1584,9 +1584,9 @@ tasksToLaunch.clear(); } - public void addFreeSlot() { + public void addFreeSlots(int numSlots) { synchronized (numFreeSlots) { - numFreeSlots.set(numFreeSlots.get() + 1); + numFreeSlots.set(numFreeSlots.get() + numSlots); assert (numFreeSlots.get() <= maxSlots); LOG.info("addFreeSlot : current free slots : " + numFreeSlots.get()); numFreeSlots.notifyAll(); @@ -1597,22 +1597,29 @@ while (!Thread.interrupted()) { try { TaskInProgress tip; + Task task; synchronized (tasksToLaunch) { while (tasksToLaunch.isEmpty()) { tasksToLaunch.wait(); } //get the TIP tip = tasksToLaunch.remove(0); - LOG.info("Trying to launch : " + tip.getTask().getTaskID()); + task = tip.getTask(); + LOG.info("Trying to launch : " + tip.getTask().getTaskID() + + " which needs " + task.getNumSlotsRequired() + " slots"); } - //wait for a slot to run + //wait for free slots to run synchronized (numFreeSlots) { - while (numFreeSlots.get() == 0) { + while (numFreeSlots.get() < task.getNumSlotsRequired()) { + LOG.info("TaskLauncher : Waiting for " + task.getNumSlotsRequired() + + " to launch " + task.getTaskID() + ", currently we have " + + numFreeSlots.get() + " free slots"); numFreeSlots.wait(); } LOG.info("In TaskLauncher, current free slots : " + numFreeSlots.get()+ - " and trying to launch "+tip.getTask().getTaskID()); - numFreeSlots.set(numFreeSlots.get() - 1); + " and trying to launch "+tip.getTask().getTaskID() + + " which needs " + task.getNumSlotsRequired() + " slots"); + numFreeSlots.set(numFreeSlots.get() - task.getNumSlotsRequired()); assert (numFreeSlots.get() >= 0); } synchronized (tip) { @@ -1621,7 +1628,7 @@ tip.getRunState() != TaskStatus.State.FAILED_UNCLEAN && tip.getRunState() != TaskStatus.State.KILLED_UNCLEAN) { //got killed externally while still in the launcher queue - addFreeSlot(); + addFreeSlots(task.getNumSlotsRequired()); continue; } tip.slotTaken = true; @@ -1786,6 +1793,7 @@ localJobConf = null; taskStatus = TaskStatus.createTaskStatus(task.isMapTask(), task.getTaskID(), 0.0f, + task.getNumSlotsRequired(), task.getState(), diagnosticInfo.toString(), "initializing", @@ -2347,7 +2355,7 @@ private synchronized void releaseSlot() { if (slotTaken) { if (launcher != null) { - launcher.addFreeSlot(); + launcher.addFreeSlots(task.getNumSlotsRequired()); } slotTaken = false; } @@ -2979,11 +2987,11 @@ } int getMaxCurrentMapTasks() { - return maxCurrentMapTasks; + return maxMapSlots; } int getMaxCurrentReduceTasks() { - return maxCurrentReduceTasks; + return maxReduceSlots; } /** @@ -3047,7 +3055,7 @@ JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, JobConf.DISABLED_MEMORY_LIMIT); totalMemoryAllottedForTasks = - maxCurrentMapTasks * mapSlotMemorySizeOnTT + maxCurrentReduceTasks + maxMapSlots * mapSlotMemorySizeOnTT + maxReduceSlots * reduceSlotSizeMemoryOnTT; if (totalMemoryAllottedForTasks < 0) { totalMemoryAllottedForTasks = JobConf.DISABLED_MEMORY_LIMIT; Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=788036&r1=788035&r2=788036&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java Wed Jun 24 14:22:13 2009 @@ -17,7 +17,10 @@ */ package org.apache.hadoop.mapred; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.*; +import org.apache.hadoop.mapred.TaskStatus.State; import java.io.*; import java.util.*; @@ -28,9 +31,11 @@ * of the most recent TaskTrackerStatus objects for each * unique TaskTracker it knows about. * + * This is NOT a public interface! **************************************************/ -class TaskTrackerStatus implements Writable { - +public class TaskTrackerStatus implements Writable { + public static final Log LOG = LogFactory.getLog(TaskTrackerStatus.class); + static { // register a ctor WritableFactories.setFactory (TaskTrackerStatus.class, @@ -253,19 +258,27 @@ public List getTaskReports() { return taskReports; } - + /** - * Return the current MapTask count + * Is the given task considered as 'running' ? + * @param taskStatus + * @return + */ + private boolean isTaskRunning(TaskStatus taskStatus) { + TaskStatus.State state = taskStatus.getRunState(); + return (state == State.RUNNING || state == State.UNASSIGNED || + taskStatus.inTaskCleanupPhase()); + } + + /** + * Get the number of running map tasks. + * @return the number of running map tasks */ public int countMapTasks() { int mapCount = 0; - for (Iterator it = taskReports.iterator(); it.hasNext();) { + for (Iterator it = taskReports.iterator(); it.hasNext();) { TaskStatus ts = (TaskStatus) it.next(); - TaskStatus.State state = ts.getRunState(); - if (ts.getIsMap() && - ((state == TaskStatus.State.RUNNING) || - (state == TaskStatus.State.UNASSIGNED) || - ts.inTaskCleanupPhase())) { + if (ts.getIsMap() && isTaskRunning(ts)) { mapCount++; } } @@ -273,17 +286,37 @@ } /** - * Return the current ReduceTask count + * Get the number of occupied map slots. + * @return the number of occupied map slots + */ + public int countOccupiedMapSlots() { + int mapSlotsCount = 0; + for (Iterator it = taskReports.iterator(); it.hasNext();) { + TaskStatus ts = (TaskStatus) it.next(); + if (ts.getIsMap() && isTaskRunning(ts)) { + mapSlotsCount += ts.getNumSlots(); + } + } + return mapSlotsCount; + } + + /** + * Get available map slots. + * @return available map slots + */ + public int getAvailableMapSlots() { + return getMaxMapSlots() - countOccupiedMapSlots(); + } + + /** + * Get the number of running reduce tasks. + * @return the number of running reduce tasks */ public int countReduceTasks() { int reduceCount = 0; - for (Iterator it = taskReports.iterator(); it.hasNext();) { + for (Iterator it = taskReports.iterator(); it.hasNext();) { TaskStatus ts = (TaskStatus) it.next(); - TaskStatus.State state = ts.getRunState(); - if ((!ts.getIsMap()) && - ((state == TaskStatus.State.RUNNING) || - (state == TaskStatus.State.UNASSIGNED) || - ts.inTaskCleanupPhase())) { + if ((!ts.getIsMap()) && isTaskRunning(ts)) { reduceCount++; } } @@ -291,6 +324,30 @@ } /** + * Get the number of occupied reduce slots. + * @return the number of occupied reduce slots + */ + public int countOccupiedReduceSlots() { + int reduceSlotsCount = 0; + for (Iterator it = taskReports.iterator(); it.hasNext();) { + TaskStatus ts = (TaskStatus) it.next(); + if ((!ts.getIsMap()) && isTaskRunning(ts)) { + reduceSlotsCount += ts.getNumSlots(); + } + } + return reduceSlotsCount; + } + + /** + * Get available reduce slots. + * @return available reduce slots + */ + public int getAvailableReduceSlots() { + return getMaxReduceSlots() - countOccupiedReduceSlots(); + } + + + /** */ public long getLastSeen() { return lastSeen; @@ -302,15 +359,18 @@ } /** - * Get the maximum concurrent tasks for this node. (This applies - * per type of task - a node with maxTasks==1 will run up to 1 map - * and 1 reduce concurrently). - * @return maximum tasks this node supports + * Get the maximum map slots for this node. + * @return the maximum map slots for this node */ - public int getMaxMapTasks() { + public int getMaxMapSlots() { return maxMapTasks; } - public int getMaxReduceTasks() { + + /** + * Get the maximum reduce slots for this node. + * @return the maximum reduce slots for this node + */ + public int getMaxReduceSlots() { return maxReduceTasks; } Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=788036&r1=788035&r2=788036&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Wed Jun 24 14:22:13 2009 @@ -55,9 +55,10 @@ * Version 15 Adds FAILED_UNCLEAN and KILLED_UNCLEAN states for HADOOP-4759 * Version 16 Change in signature of getTask() for HADOOP-5488 * Version 17 Modified TaskID to be aware of the new TaskTypes + * Version 18 Added numRequiredSlots to TaskStatus for MAPREDUCE-516 * */ - public static final long versionID = 17L; + public static final long versionID = 18L; /** * Called when a child task process starts, to get its task. Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobCounter.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobCounter.java?rev=788036&r1=788035&r2=788036&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobCounter.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobCounter.java Wed Jun 24 14:22:13 2009 @@ -26,5 +26,7 @@ TOTAL_LAUNCHED_REDUCES, OTHER_LOCAL_MAPS, DATA_LOCAL_MAPS, - RACK_LOCAL_MAPS + RACK_LOCAL_MAPS, + FALLOW_SLOTS_MILLIS_MAPS, + FALLOW_SLOTS_MILLIS_REDUCES } Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobCounter.properties URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobCounter.properties?rev=788036&r1=788035&r2=788036&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobCounter.properties (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobCounter.properties Wed Jun 24 14:22:13 2009 @@ -1,12 +1,13 @@ # ResourceBundle properties file for job-level counters -CounterGroupName= Job Counters - -NUM_FAILED_MAPS.name= Failed map tasks -NUM_FAILED_REDUCES.name= Failed reduce tasks -TOTAL_LAUNCHED_MAPS.name= Launched map tasks -TOTAL_LAUNCHED_REDUCES.name= Launched reduce tasks -OTHER_LOCAL_MAPS.name= Other local map tasks -DATA_LOCAL_MAPS.name= Data-local map tasks -RACK_LOCAL_MAPS.name= Rack-local map tasks +CounterGroupName= Job Counters +NUM_FAILED_MAPS.name= Failed map tasks +NUM_FAILED_REDUCES.name= Failed reduce tasks +TOTAL_LAUNCHED_MAPS.name= Launched map tasks +TOTAL_LAUNCHED_REDUCES.name= Launched reduce tasks +OTHER_LOCAL_MAPS.name= Other local map tasks +DATA_LOCAL_MAPS.name= Data-local map tasks +RACK_LOCAL_MAPS.name= Rack-local map tasks +FALLOW_SLOTS_MILLIS_MAPS.name= Total time spent by all maps waiting after reserving slots (ms) +FALLOW_SLOTS_MILLIS_REDUCES.name= Total time spent by all reduces waiting after reserving slots (ms) Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/TaskTracker.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/TaskTracker.java?rev=788036&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/TaskTracker.java (added) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/TaskTracker.java Wed Jun 24 14:22:13 2009 @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.server.jobtracker; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapred.JobInProgress; +import org.apache.hadoop.mapred.JobTracker; +import org.apache.hadoop.mapred.TaskTrackerStatus; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.TaskType; + +/** + * The representation of a single TaskTracker as seen by + * the {@link JobTracker}. + */ +public class TaskTracker { + static final Log LOG = LogFactory.getLog(TaskTracker.class); + + final private String trackerName; + private TaskTrackerStatus status; + + private JobInProgress jobForFallowMapSlot; + private JobInProgress jobForFallowReduceSlot; + + /** + * Create a new {@link TaskTracker}. + * @param trackerName Unique identifier for the TaskTracker + */ + public TaskTracker(String trackerName) { + this.trackerName = trackerName; + } + + /** + * Get the unique identifier for the {@link TaskTracker} + * @return the unique identifier for the TaskTracker + */ + public String getTrackerName() { + return trackerName; + } + + /** + * Get the current {@link TaskTrackerStatus} of the TaskTracker. + * @return the current TaskTrackerStatus of the + * TaskTracker + */ + public TaskTrackerStatus getStatus() { + return status; + } + + /** + * Set the current {@link TaskTrackerStatus} of the TaskTracker. + * @param status the current TaskTrackerStatus of the + * TaskTracker + */ + public void setStatus(TaskTrackerStatus status) { + this.status = status; + } + + /** + * Get the number of currently available slots on this tasktracker for the + * given type of the task. + * @param taskType the {@link TaskType} to check for number of available slots + * @return the number of currently available slots for the given + * taskType + */ + public int getAvailableSlots(TaskType taskType) { + int availableSlots = 0; + if (taskType == TaskType.MAP) { + if (LOG.isDebugEnabled()) { + LOG.debug(trackerName + " getAvailSlots:" + + " max(m)=" + status.getMaxMapSlots() + + " occupied(m)=" + status.countOccupiedMapSlots()); + } + availableSlots = status.getAvailableMapSlots(); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug(trackerName + " getAvailSlots:" + + " max(r)=" + status.getMaxReduceSlots() + + " occupied(r)=" + status.countOccupiedReduceSlots()); + } + availableSlots = status.getAvailableReduceSlots(); + } + return availableSlots; + } + + /** + * Get the {@link JobInProgress} for which the fallow slot(s) are held. + * @param taskType {@link TaskType} of the task + * @return the task for which the fallow slot(s) are held, + * null if there are no fallow slots + */ + public JobInProgress getJobForFallowSlot(TaskType taskType) { + return + (taskType == TaskType.MAP) ? jobForFallowMapSlot : jobForFallowReduceSlot; + } + + /** + * Reserve specified number of slots for a given job. + * @param taskType {@link TaskType} of the task + * @param job the job for which slots on this TaskTracker + * are to be reserved + * @param numSlots number of slots to be reserved + */ + public void reserveSlots(TaskType taskType, JobInProgress job, int numSlots) { + JobID jobId = job.getJobID(); + if (taskType == TaskType.MAP) { + if (jobForFallowMapSlot != null && + !jobForFallowMapSlot.getJobID().equals(jobId)) { + throw new RuntimeException(trackerName + " already has " + + "slots reserved for " + + jobForFallowMapSlot + "; being" + + " asked to reserve " + numSlots + " for " + + jobId); + } + + jobForFallowMapSlot = job; + } else if (taskType == TaskType.REDUCE){ + if (jobForFallowReduceSlot != null && + !jobForFallowReduceSlot.getJobID().equals(jobId)) { + throw new RuntimeException(trackerName + " already has " + + "slots reserved for " + + jobForFallowReduceSlot + "; being" + + " asked to reserve " + numSlots + " for " + + jobId); + } + + jobForFallowReduceSlot = job; + } + + job.reserveTaskTracker(this, taskType, numSlots); + LOG.info(trackerName + ": Reserved " + numSlots + " " + taskType + + " slots for " + jobId); + } + + /** + * Free map slots on this TaskTracker which were reserved for + * taskType. + * @param taskType {@link TaskType} of the task + * @param job job whose slots are being un-reserved + */ + public void unreserveSlots(TaskType taskType, JobInProgress job) { + JobID jobId = job.getJobID(); + if (taskType == TaskType.MAP) { + if (jobForFallowMapSlot == null || + !jobForFallowMapSlot.getJobID().equals(jobId)) { + throw new RuntimeException(trackerName + " already has " + + "slots reserved for " + + jobForFallowMapSlot + "; being" + + " asked to un-reserve for " + jobId); + } + + jobForFallowMapSlot = null; + } else { + if (jobForFallowReduceSlot == null || + !jobForFallowReduceSlot.getJobID().equals(jobId)) { + throw new RuntimeException(trackerName + " already has " + + "slots reserved for " + + jobForFallowReduceSlot + "; being" + + " asked to un-reserve for " + jobId); + } + + jobForFallowReduceSlot = null; + } + + job.unreserveTaskTracker(this, taskType); + LOG.info(trackerName + ": Unreserved " + taskType + " slots for " + jobId); + } + + /** + * Cleanup when the {@link TaskTracker} is declared as 'lost' by the + * JobTracker. + */ + public void lost() { + // Inform jobs which have reserved slots on this tasktracker + if (jobForFallowMapSlot != null) { + unreserveSlots(TaskType.MAP, jobForFallowMapSlot); + } + if (jobForFallowReduceSlot != null) { + unreserveSlots(TaskType.REDUCE, jobForFallowReduceSlot); + } + } +} Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/TaskTracker.java.orig URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/TaskTracker.java.orig?rev=788036&view=auto ============================================================================== (empty) Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java?rev=788036&r1=788035&r2=788036&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java (original) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java Wed Jun 24 14:22:13 2009 @@ -696,7 +696,7 @@ ts.getFinishTime() == Long.parseLong(attempt.get(Keys.FINISH_TIME))); - TaskTrackerStatus ttStatus = jt.getTaskTracker(ts.getTaskTracker()); + TaskTrackerStatus ttStatus = jt.getTaskTrackerStatus(ts.getTaskTracker()); if (ttStatus != null) { assertTrue("http port of task attempt " + idStr + " obtained from " + Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueInformation.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueInformation.java?rev=788036&r1=788035&r2=788036&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueInformation.java (original) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueInformation.java Wed Jun 24 14:22:13 2009 @@ -35,6 +35,7 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UnixUserGroupInformation; +import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker; import junit.framework.TestCase; @@ -66,7 +67,7 @@ public static class TestTaskScheduler extends LimitTasksPerJobTaskScheduler { @Override - public synchronized List assignTasks(TaskTrackerStatus taskTracker) + public synchronized List assignTasks(TaskTracker taskTracker) throws IOException { Collection jips = jobQueueJobInProgressListener .getJobQueue();