Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 38803 invoked from network); 26 Feb 2009 07:17:52 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 26 Feb 2009 07:17:52 -0000 Received: (qmail 13471 invoked by uid 500); 26 Feb 2009 07:17:52 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 13435 invoked by uid 500); 26 Feb 2009 07:17:52 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 13426 invoked by uid 99); 26 Feb 2009 07:17:52 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 25 Feb 2009 23:17:52 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Feb 2009 07:17:45 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 467172388920; Thu, 26 Feb 2009 07:17:25 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r748045 - in /hadoop/core/branches/branch-0.19: CHANGES.txt src/mapred/org/apache/hadoop/mapred/JobInProgress.java src/mapred/org/apache/hadoop/mapred/JobStatus.java src/mapred/org/apache/hadoop/mapred/JobTracker.java Date: Thu, 26 Feb 2009 07:17:25 -0000 To: core-commits@hadoop.apache.org From: ddas@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090226071725.467172388920@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: ddas Date: Thu Feb 26 07:17:24 2009 New Revision: 748045 URL: http://svn.apache.org/viewvc?rev=748045&view=rev Log: HADOOP-5247. Committing this to the 0.19 branch. Modified: hadoop/core/branches/branch-0.19/CHANGES.txt hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobStatus.java hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java Modified: hadoop/core/branches/branch-0.19/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/CHANGES.txt?rev=748045&r1=748044&r2=748045&view=diff ============================================================================== --- hadoop/core/branches/branch-0.19/CHANGES.txt (original) +++ hadoop/core/branches/branch-0.19/CHANGES.txt Thu Feb 26 07:17:24 2009 @@ -21,6 +21,11 @@ (HADOOP-5234) and NPE in handling KillTaskAction of a cleanup task (HADOOP-5235). (Amareshwari Sriramadasu via ddas) + HADOOP-5247. Introduces a broadcast of KillJobAction to all trackers when + a job finishes. This fixes a bunch of problems to do with NPE when a completed + job is not in memory and a tasktracker comes to the jobtracker with a status + report of a task belonging to that job. (Amar Kamat via ddas) + Release 0.19.1 - 2009-02-23 INCOMPATIBLE CHANGES Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=748045&r1=748044&r2=748045&view=diff ============================================================================== --- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original) +++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Thu Feb 26 07:17:24 2009 @@ -2480,9 +2480,6 @@ } boolean isComplete() { - int runState = this.status.getRunState(); - return runState == JobStatus.SUCCEEDED - || runState == JobStatus.FAILED - || runState == JobStatus.KILLED; + return status.isJobComplete(); } } Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobStatus.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobStatus.java?rev=748045&r1=748044&r2=748045&view=diff ============================================================================== --- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobStatus.java (original) +++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobStatus.java Thu Feb 26 07:17:24 2009 @@ -270,6 +270,14 @@ priority = jp; } + /** + * Returns true if the status is for a completed job. + */ + public synchronized boolean isJobComplete() { + return (runState == JobStatus.SUCCEEDED || runState == JobStatus.FAILED + || runState == JobStatus.KILLED); + } + /////////////////////////////////////// // Writable /////////////////////////////////////// Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=748045&r1=748044&r2=748045&view=diff ============================================================================== --- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original) +++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java Thu Feb 26 07:17:24 2009 @@ -960,6 +960,10 @@ TreeMap> userToJobsMap = new TreeMap>(); + // (trackerID --> list of jobs to cleanup) + Map> trackerToJobsToCleanup = + new HashMap>(); + // All the known TaskInProgress items, mapped to by taskids (taskid->TIP) Map taskidToTIPMap = new TreeMap(); @@ -1552,7 +1556,10 @@ } long now = System.currentTimeMillis(); - + + // mark the job for cleanup at all the trackers + addJobForCleanup(id); + // Purge oldest jobs and keep at-most MAX_COMPLETE_USER_JOBS_IN_MEMORY jobs of a given user // in memory; information about the purged jobs is available via // JobHistory. @@ -1919,6 +1926,12 @@ actions.addAll(killTasksList); } + // Check for jobs to be killed/cleanedup + List killJobsList = getJobsForCleanup(trackerName); + if (killJobsList != null) { + actions.addAll(killJobsList); + } + // Check for tasks whose outputs can be saved List commitTasksList = getTasksToSave(status); if (commitTasksList != null) { @@ -2085,27 +2098,58 @@ Set taskIds = trackerToTaskMap.get(taskTracker); if (taskIds != null) { List killList = new ArrayList(); - Set killJobIds = new TreeSet(); for (TaskAttemptID killTaskId : taskIds) { TaskInProgress tip = taskidToTIPMap.get(killTaskId); + if (tip == null) { + continue; + } if (tip.shouldClose(killTaskId)) { // // This is how the JobTracker ends a task at the TaskTracker. // It may be successfully completed, or may be killed in // mid-execution. // - if (tip.getJob().getStatus().getRunState() == JobStatus.RUNNING || - tip.getJob().getStatus().getRunState() == JobStatus.PREP) { + if (!tip.getJob().isComplete()) { killList.add(new KillTaskAction(killTaskId)); LOG.debug(taskTracker + " -> KillTaskAction: " + killTaskId); - } else { - JobID killJobId = tip.getJob().getStatus().getJobID(); - killJobIds.add(killJobId); } } } - for (JobID killJobId : killJobIds) { + return killList; + } + return null; + } + + /** + * Add a job to cleanup for the tracker. + */ + private void addJobForCleanup(JobID id) { + for (String taskTracker : taskTrackers.keySet()) { + LOG.debug("Marking job " + id + " for cleanup by tracker " + taskTracker); + synchronized (trackerToJobsToCleanup) { + Set jobsToKill = trackerToJobsToCleanup.get(taskTracker); + if (jobsToKill == null) { + jobsToKill = new HashSet(); + trackerToJobsToCleanup.put(taskTracker, jobsToKill); + } + jobsToKill.add(id); + } + } + } + + /** + * A tracker wants to know if any job needs cleanup because the job completed. + */ + private List getJobsForCleanup(String taskTracker) { + Set jobs = null; + synchronized (trackerToJobsToCleanup) { + jobs = trackerToJobsToCleanup.remove(taskTracker); + } + if (jobs != null) { + // prepare the actions list + List killList = new ArrayList(); + for (JobID killJobId : jobs) { killList.add(new KillJobAction(killJobId)); LOG.debug(taskTracker + " -> KillJobAction: " + killJobId); } @@ -2127,6 +2171,9 @@ if (taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING) { TaskAttemptID taskId = taskStatus.getTaskID(); TaskInProgress tip = taskidToTIPMap.get(taskId); + if (tip == null) { + continue; + } if (tip.shouldCommit(taskId)) { saveList.add(new CommitTaskAction(taskId)); LOG.debug(tts.getTrackerName() + @@ -2654,16 +2701,23 @@ for (TaskStatus report : status.getTaskReports()) { report.setTaskTracker(trackerName); TaskAttemptID taskId = report.getTaskID(); + + // expire it + expireLaunchingTasks.removeTask(taskId); + + JobInProgress job = getJob(taskId.getJobID()); + if (job == null) { + continue; + } + TaskInProgress tip = taskidToTIPMap.get(taskId); // Check if the tip is known to the jobtracker. In case of a restarted // jt, some tasks might join in later if (tip != null || hasRestarted()) { - JobInProgress job = getJob(taskId.getJobID()); if (tip == null) { tip = job.getTaskInProgress(taskId.getTaskID()); job.addRunningTaskToTIP(tip, taskId, status, false); } - expireLaunchingTasks.removeTask(taskId); // Update the job and inform the listeners if necessary JobStatus prevStatus = (JobStatus)job.getStatus().clone(); @@ -2711,6 +2765,12 @@ */ void lostTaskTracker(String trackerName) { LOG.info("Lost tracker '" + trackerName + "'"); + + // remove the tracker from the local structures + synchronized (trackerToJobsToCleanup) { + trackerToJobsToCleanup.remove(trackerName); + } + Set lostTasks = trackerToTaskMap.get(trackerName); trackerToTaskMap.remove(trackerName);