Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 36987 invoked from network); 13 Feb 2009 04:02:22 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 13 Feb 2009 04:02:22 -0000 Received: (qmail 33577 invoked by uid 500); 13 Feb 2009 04:02:22 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 33540 invoked by uid 500); 13 Feb 2009 04:02:21 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 33529 invoked by uid 99); 13 Feb 2009 04:02:21 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Feb 2009 20:02:21 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 13 Feb 2009 04:02:19 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 3082A23889BB; Fri, 13 Feb 2009 04:01:58 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r743974 - in /hadoop/core/branches/branch-0.19: CHANGES.txt src/mapred/org/apache/hadoop/mapred/TaskInProgress.java src/test/org/apache/hadoop/mapred/MiniMRCluster.java src/test/org/apache/hadoop/mapred/TestLostTracker.java Date: Fri, 13 Feb 2009 04:01:57 -0000 To: core-commits@hadoop.apache.org From: ddas@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090213040158.3082A23889BB@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: ddas Date: Fri Feb 13 04:01:57 2009 New Revision: 743974 URL: http://svn.apache.org/viewvc?rev=743974&view=rev Log: HADOOP-5067. Fixes TaskInProgress.java to keep track of count of failed and killed tasks correctly. Contributed by Amareshwari Sriramadasu. Added: hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestLostTracker.java Modified: hadoop/core/branches/branch-0.19/CHANGES.txt hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Modified: hadoop/core/branches/branch-0.19/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/CHANGES.txt?rev=743974&r1=743973&r2=743974&view=diff ============================================================================== --- hadoop/core/branches/branch-0.19/CHANGES.txt (original) +++ hadoop/core/branches/branch-0.19/CHANGES.txt Fri Feb 13 04:01:57 2009 @@ -102,6 +102,9 @@ HADOOP-5166. Fix JobTracker restart to work when ACLs are configured for the JobTracker. (Amar Kamat via yhemanth). + HADOOP-5067. Fixes TaskInProgress.java to keep track of count of failed and + killed tasks correctly. (Amareshwari Sriramadasu via ddas) + Release 0.19.0 - 2008-11-18 INCOMPATIBLE CHANGES Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=743974&r1=743973&r2=743974&view=diff ============================================================================== --- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original) +++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Fri Feb 13 04:01:57 2009 @@ -94,6 +94,8 @@ // Map from task Id -> TaskTracker Id, contains tasks that are // currently runnings private TreeMap activeTasks = new TreeMap(); + // All attempt Ids of this TIP + private TreeSet tasks = new TreeSet(); private JobConf conf; private Map> taskDiagnosticData = new TreeMap>(); @@ -585,9 +587,7 @@ } } - // Note that there can be failures of tasks that are hosted on a machine - // that has not yet registered with restarted jobtracker - boolean isPresent = this.activeTasks.remove(taskid) != null; + this.activeTasks.remove(taskid); // Since we do not fail completed reduces (whose outputs go to hdfs), we // should note this failure only for completed maps, only if this taskid; @@ -601,8 +601,10 @@ resetSuccessfulTaskid(); } + // Note that there can be failures of tasks that are hosted on a machine + // that has not yet registered with restarted jobtracker // recalculate the counts only if its a genuine failure - if (isPresent) { + if (tasks.contains(taskid)) { if (taskState == TaskStatus.State.FAILED) { numTaskFailures++; machinesWhereFailed.add(trackerHostName); @@ -907,6 +909,7 @@ } activeTasks.put(taskid, taskTracker); + tasks.add(taskid); // Ask JobTracker to note that the task exists jobtracker.createTaskEntry(taskid, taskTracker, this); Modified: hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/MiniMRCluster.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=743974&r1=743973&r2=743974&view=diff ============================================================================== --- hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original) +++ hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Fri Feb 13 04:01:57 2009 @@ -235,6 +235,9 @@ return jobTracker; } + TaskTrackerRunner getTaskTrackerRunner(int id) { + return taskTrackerList.get(id); + } /** * Get the number of task trackers in the cluster */ Added: hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestLostTracker.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestLostTracker.java?rev=743974&view=auto ============================================================================== --- hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestLostTracker.java (added) +++ hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestLostTracker.java Fri Feb 13 04:01:57 2009 @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.mapred.TestJobTrackerRestart; + +import junit.framework.TestCase; +import java.io.*; + +public class TestLostTracker extends TestCase { + final Path testDir = new Path("/jt-lost-tt"); + final Path inDir = new Path(testDir, "input"); + final Path shareDir = new Path(testDir, "share"); + final Path outputDir = new Path(testDir, "output"); + + private JobConf configureJob(JobConf conf, int[] maps, int[] reduces, + String mapSignal, String redSignal) + throws IOException { + JobPriority[] priority = new JobPriority[] {JobPriority.NORMAL}; + return TestJobTrackerRestart.getJobs(conf, priority, + maps, reduces, outputDir, inDir, + mapSignal, redSignal)[0]; + } + + public void testLostTracker(MiniDFSCluster dfs, + MiniMRCluster mr) + throws IOException { + FileSystem fileSys = dfs.getFileSystem(); + JobConf jobConf = mr.createJobConf(); + int numMaps = 10; + int numReds = 1; + String mapSignalFile = TestJobTrackerRestart.getMapSignalFile(shareDir); + String redSignalFile = TestJobTrackerRestart.getReduceSignalFile(shareDir); + + // Configure the job + JobConf job = configureJob(jobConf, new int[] {numMaps}, + new int[] {numReds}, + mapSignalFile, redSignalFile); + + TestJobTrackerRestart.cleanUp(fileSys, shareDir); + + // Submit the job + JobClient jobClient = new JobClient(job); + RunningJob rJob = jobClient.submitJob(job); + JobID id = rJob.getID(); + + // wait for the job to be inited + mr.initializeJob(id); + + // Make sure that the master job is 50% completed + while (TestJobTrackerRestart.getJobStatus(jobClient, id).mapProgress() + < 0.5f) { + TestJobTrackerRestart.waitFor(10); + } + // get a completed task on 1st tracker + TaskAttemptID taskid = mr.getTaskTrackerRunner(0).getTaskTracker(). + getNonRunningTasks().get(0).getTaskID(); + + // Kill the 1st tasktracker + mr.stopTaskTracker(0); + + // Signal all the maps to complete + TestJobTrackerRestart.signalTasks(dfs, fileSys, true, + mapSignalFile, redSignalFile); + + // Signal the reducers to complete + TestJobTrackerRestart.signalTasks(dfs, fileSys, false, + mapSignalFile, redSignalFile); + // wait for the job to complete + TestJobTrackerRestart.waitTillDone(jobClient); + + // Check if the tasks on the lost tracker got killed and re-executed + assertTrue(jobClient.getClusterStatus().getTaskTrackers() + < mr.getNumTaskTrackers()); + assertEquals(JobStatus.SUCCEEDED, rJob.getJobState()); + TaskInProgress tip = mr.getJobTrackerRunner().getJobTracker(). + getTip(taskid.getTaskID()); + assertTrue(tip.isComplete()); + assertEquals(tip.numKilledTasks(), 1); + } + + public void testLostTracker() throws IOException { + String namenode = null; + MiniDFSCluster dfs = null; + MiniMRCluster mr = null; + FileSystem fileSys = null; + + try { + Configuration conf = new Configuration(); + conf.setBoolean("dfs.replication.considerLoad", false); + dfs = new MiniDFSCluster(conf, 1, true, null, null); + dfs.waitActive(); + fileSys = dfs.getFileSystem(); + + // clean up + fileSys.delete(testDir, true); + + if (!fileSys.mkdirs(inDir)) { + throw new IOException("Mkdirs failed to create " + inDir.toString()); + } + + // Write the input file + TestRackAwareTaskPlacement.writeFile(dfs.getNameNode(), conf, + new Path(inDir + "/file"), + (short)1); + + dfs.startDataNodes(conf, 1, true, null, null, null, null); + dfs.waitActive(); + + namenode = (dfs.getFileSystem()).getUri().getHost() + ":" + + (dfs.getFileSystem()).getUri().getPort(); + + JobConf jtConf = new JobConf(); + jtConf.setInt("mapred.tasktracker.map.tasks.maximum", 1); + jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1); + jtConf.setLong("mapred.tasktracker.expiry.interval", 10 * 1000); + jtConf.setInt("mapred.reduce.copy.backoff", 4); + + mr = new MiniMRCluster(2, namenode, 1, null, null, jtConf); + + // Test Lost tracker case + testLostTracker(dfs, mr); + } finally { + if (mr != null) { + try { + mr.shutdown(); + } catch (Exception e) {} + } + if (dfs != null) { + try { + dfs.shutdown(); + } catch (Exception e) {} + } + } + } + + public static void main(String[] args) throws IOException { + new TestLostTracker().testLostTracker(); + } +}