Return-Path: Delivered-To: apmail-lucene-hadoop-commits-archive@locus.apache.org Received: (qmail 68816 invoked from network); 8 Aug 2006 22:24:25 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 8 Aug 2006 22:24:25 -0000 Received: (qmail 14968 invoked by uid 500); 8 Aug 2006 22:24:25 -0000 Delivered-To: apmail-lucene-hadoop-commits-archive@lucene.apache.org Received: (qmail 14938 invoked by uid 500); 8 Aug 2006 22:24:25 -0000 Mailing-List: contact hadoop-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-commits@lucene.apache.org Received: (qmail 14928 invoked by uid 99); 8 Aug 2006 22:24:25 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Aug 2006 15:24:25 -0700 X-ASF-Spam-Status: No, hits=-8.6 required=10.0 tests=ALL_TRUSTED,INFO_TLD,NO_REAL_NAME X-Spam-Check-By: apache.org Received-SPF: pass (asf.osuosl.org: local policy) Received: from [140.211.166.113] (HELO eris.apache.org) (140.211.166.113) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Aug 2006 15:24:24 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id 073541A981A; Tue, 8 Aug 2006 15:24:04 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r429858 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/mapred/ Date: Tue, 08 Aug 2006 22:24:02 -0000 To: hadoop-commits@lucene.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20060808222404.073541A981A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Author: cutting Date: Tue Aug 8 15:24:01 2006 New Revision: 429858 URL: http://svn.apache.org/viewvc?rev=429858&view=rev Log: HADOOP-427. New bug number for this patch. Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=429858&r1=429857&r2=429858&view=diff ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Tue Aug 8 15:24:01 2006 @@ -3,7 +3,7 @@ Trunk (unreleased changes) - 1. HADOOP-415. Replace some uses of DatanodeDescriptor in the DFS + 1. HADOOP-427. Replace some uses of DatanodeDescriptor in the DFS web UI code with DatanodeInfo, the preferred public class. (Devaraj Das via cutting) Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=429858&r1=429857&r2=429858&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Tue Aug 8 15:24:01 2006 @@ -280,7 +280,7 @@ } else if (status.getRunState() == TaskStatus.FAILED) { // Tell the job to fail the relevant task failedTask(tip, status.getTaskId(), status, status.getTaskTracker(), - wasRunning, wasComplete); + wasRunning, wasComplete, metrics); } } @@ -520,7 +520,7 @@ } } } - + // // If all tasks are complete, then the job is done! // @@ -571,7 +571,8 @@ */ private void failedTask(TaskInProgress tip, String taskid, TaskStatus status, String trackerName, - boolean wasRunning, boolean wasComplete) { + boolean wasRunning, boolean wasComplete, + JobTrackerMetrics metrics) { tip.failedSubTask(taskid, trackerName); boolean isRunning = tip.isRunning(); boolean isComplete = tip.isComplete(); @@ -596,8 +597,10 @@ // the failed task goes to the end of the list. if (tip.isMapTask()) { firstMapToTry = (tip.getIdWithinJob() + 1) % maps.length; + metrics.failedMap(); } else { firstReduceToTry = (tip.getIdWithinJob() + 1) % reduces.length; + metrics.failedReduce(); } // @@ -695,4 +698,5 @@ } return null; } + } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=429858&r1=429857&r2=429858&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Tue Aug 8 15:24:01 2006 @@ -219,6 +219,20 @@ // tracker has already been destroyed. if (newProfile != null) { if (now - newProfile.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL) { + // But save the state so that if at a later + // point of time, we happen to hear from the + // same TaskTracker, we can reinstate + // the state + ExpiredTaskTrackerState + expTaskTrackerState = + new ExpiredTaskTrackerState( + leastRecent.getTrackerName()); + if (LOG.isDebugEnabled()) + LOG.debug("Saving state of TaskTracker " + + leastRecent.getTrackerName()); + expiredTaskTrackerStates.put( + leastRecent.getTrackerName(), + expTaskTrackerState); // Remove completely updateTaskTrackerStatus(trackerName, null); lostTaskTracker(leastRecent.getTrackerName(), @@ -347,6 +361,11 @@ Metrics.report(metricsRecord, "maps-completed", ++numMapTasksCompleted); } + + synchronized void failedMap() { + Metrics.report(metricsRecord, "maps-completed", + --numMapTasksCompleted); + } synchronized void launchReduce() { Metrics.report(metricsRecord, "reduces-launched", @@ -357,6 +376,11 @@ Metrics.report(metricsRecord, "reduces-completed", ++numReduceTasksCompleted); } + + synchronized void failedReduce() { + Metrics.report(metricsRecord, "reduces-completed", + --numReduceTasksCompleted); + } synchronized void submitJob() { Metrics.report(metricsRecord, "jobs-submitted", @@ -427,6 +451,7 @@ Thread initJobsThread = null; ExpireLaunchingTasks expireLaunchingTasks = new ExpireLaunchingTasks(); Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks); + private TreeMap expiredTaskTrackerStates = new TreeMap(); /** * It might seem like a bug to maintain a TreeSet of status objects, @@ -599,6 +624,36 @@ LOG.info("stopped all jobtracker services"); return; } + + boolean reinstateStateOfTaskTracker(String trackerName) { + if (LOG.isDebugEnabled()) + LOG.debug("Going to reinstate state of tasktracker " + trackerName); + ExpiredTaskTrackerState e = (ExpiredTaskTrackerState) + expiredTaskTrackerStates.get(trackerName); + if (e == null) return false; + Set taskset = e.getTaskSet(); + if (taskset == null) return true; + for (Iterator it = taskset.iterator(); it.hasNext(); ) { + String taskId = (String) it.next(); + TaskInProgress tip = e.getTIP(taskId); + if (LOG.isDebugEnabled()) + LOG.debug("Going to recreate task entry for task " + taskId); + //check whether the job is still running + if (tip != null && + tip.getJob().getStatus().getRunState() == JobStatus.RUNNING) + createTaskEntry(taskId, trackerName, tip); + } + ArrayList completedTasks = e.getCompletedTasks(); + for (int i = 0; i < completedTasks.size(); i++) { + TaskStatus ts = (TaskStatus)completedTasks.get(i); + TaskInProgress tip = (TaskInProgress)taskidToTIPMap.get(ts.getTaskId()); + if (tip == null) continue; + JobInProgress j = tip.getJob(); + if (j != null && j.getStatus().getRunState() == JobStatus.RUNNING) + j.updateTaskStatus(tip, ts, myMetrics); + } + return true; + } /////////////////////////////////////////////////////// // Maintain lookup tables; called by JobInProgress @@ -748,7 +803,11 @@ } else { // If not first contact, there should be some record of the tracker if (!seenBefore) { - return InterTrackerProtocol.UNKNOWN_TASKTRACKER; + if (!reinstateStateOfTaskTracker(trackerName)) + return InterTrackerProtocol.UNKNOWN_TASKTRACKER; + else + trackerExpiryQueue.add(trackerStatus); + } } @@ -1196,5 +1255,68 @@ Configuration conf=new Configuration(); startTracker(conf); + } + + private class ExpiredTaskTrackerState { + //Map from taskId (assigned to a given tasktracker) to the taskId's TIP + private TreeMap trackerTaskIdToTIPMap = new TreeMap(); + //completedTasks is an array list that contains the list of tasks that a + //tasktracker successfully completed + ArrayList completedTasks = new ArrayList(); + + public ExpiredTaskTrackerState(String trackerId) { + trackerTaskIdToTIPMap.clear(); + completedTasks.clear(); + TreeSet tasks = (TreeSet) trackerToTaskMap.get(trackerId); + if (tasks == null) { + if (LOG.isDebugEnabled()) + LOG.debug("This tasktracker has no tasks"); + return; + } + if (LOG.isDebugEnabled()) + LOG.debug("Task IDs that this tasktracker has: "); + //We save the status of completed tasks only since TaskTrackers don't + //send updates about completed tasks. We don't need to save the status + //of other tasks since the TaskTracker will send the update along + //with the heartbeat (whenever that happens). + //Saving the status of completed tasks is required since the JobTracker + //will mark all tasks that belonged to a given TaskTracker as failed + //if that TaskTracker is lost. Now, if that same TaskTracker reports + //in later on, we can simply re-mark the completed tasks (TIPs really) + //it reported earlier about as "completed" and avoid unnecessary + //re-run of those tasks. + for (Iterator it = tasks.iterator(); it.hasNext(); ) { + String taskId = (String) it.next(); + if (LOG.isDebugEnabled()) + LOG.debug(taskId); + TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(taskId); + if (tip !=null && + tip.getJob().getStatus().getRunState() == JobStatus.RUNNING) + trackerTaskIdToTIPMap.put(taskId, tip); + else continue; + TaskStatus ts = tip.getTaskStatus(taskId); + //ts could be null for a recently assigned task, in the case where, + //the tasktracker hasn't yet reported status about that task + if (ts == null) continue; + if (tip.isComplete()) { + TaskStatus saveTS = null; + try { + saveTS = (TaskStatus)ts.clone(); + } catch (Exception e) { + LOG.fatal("Could not save TaskTracker state",e); + } + completedTasks.add(saveTS); + } + } + } + public Set getTaskSet() { + return trackerTaskIdToTIPMap.keySet(); + } + public TaskInProgress getTIP(String taskId) { + return (TaskInProgress)trackerTaskIdToTIPMap.get(taskId); + } + public ArrayList getCompletedTasks() { + return completedTasks; + } } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=429858&r1=429857&r2=429858&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Tue Aug 8 15:24:01 2006 @@ -283,6 +283,12 @@ taskStatuses.put(taskid, status); + //since if this task was declared failed due to tasktracker getting + //lost, but now that same tasktracker reports in with this taskId as + //running, we update recentTasks + if (status.getRunState() == TaskStatus.RUNNING) + recentTasks.add(taskid); + // Recompute progress recomputeProgress(); return changed; @@ -469,5 +475,12 @@ */ public int getIdWithinJob() { return partition; + } + + /** + * Get the TaskStatus associated with a given taskId + */ + public TaskStatus getTaskStatus(String taskId) { + return (TaskStatus)taskStatuses.get(taskId); } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java?rev=429858&r1=429857&r2=429858&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java Tue Aug 8 15:24:01 2006 @@ -25,7 +25,7 @@ * * @author Mike Cafarella **************************************************/ -class TaskStatus implements Writable { +class TaskStatus implements Writable, Cloneable { public static final int RUNNING = 0; public static final int SUCCEEDED = 1; public static final int FAILED = 2; @@ -51,6 +51,16 @@ this.diagnosticInfo = diagnosticInfo; this.stateString = stateString; this.taskTracker = taskTracker; + } + + //Implementing the clone method so that we can save the status of tasks + public Object clone() throws CloneNotSupportedException { + TaskStatus ts = (TaskStatus)super.clone(); + if (this.diagnosticInfo != null) + ts.diagnosticInfo = new String(this.diagnosticInfo); + if (this.stateString != null) + ts.stateString = new String(this.stateString); + return ts; } public String getTaskId() { return taskid; }