Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 7978 invoked from network); 14 Oct 2008 05:04:30 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 14 Oct 2008 05:04:30 -0000 Received: (qmail 4000 invoked by uid 500); 14 Oct 2008 05:04:30 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 3972 invoked by uid 500); 14 Oct 2008 05:04:30 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 3963 invoked by uid 99); 14 Oct 2008 05:04:30 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Oct 2008 22:04:30 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Oct 2008 05:03:24 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 0DF3023888A4; Mon, 13 Oct 2008 22:03:31 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r704310 - in /hadoop/core/trunk: CHANGES.txt src/mapred/org/apache/hadoop/mapred/JobInProgress.java src/test/org/apache/hadoop/mapred/TestJobInProgress.java Date: Tue, 14 Oct 2008 05:03:30 -0000 To: core-commits@hadoop.apache.org From: ddas@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20081014050331.0DF3023888A4@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: ddas Date: Mon Oct 13 22:03:29 2008 New Revision: 704310 URL: http://svn.apache.org/viewvc?rev=704310&view=rev Log: HADOOP-4287. Fixes an issue to do with maintaining counts of running/pending maps/reduces. Contributed by Sreekanth Ramakrishnan. Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgress.java Modified: hadoop/core/trunk/CHANGES.txt hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Modified: hadoop/core/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=704310&r1=704309&r2=704310&view=diff ============================================================================== --- hadoop/core/trunk/CHANGES.txt (original) +++ hadoop/core/trunk/CHANGES.txt Mon Oct 13 22:03:29 2008 @@ -895,6 +895,9 @@ and org.apache.hadoop.security.AccessControlIOException into a single class hadoop.security.AccessControlException. (omalley via acmurthy) + HADOOP-4287. Fixes an issue to do with maintaining counts of running/pending + maps/reduces. (Sreekanth Ramakrishnan via ddas) + Release 0.18.2 - Unreleased BUG FIXES Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=704310&r1=704309&r2=704310&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Mon Oct 13 22:03:29 2008 @@ -492,11 +492,11 @@ return finishedReduceTasks; } public synchronized int pendingMaps() { - return numMapTasks - runningMapTasks - failedMapTasks - + return numMapTasks - runningMapTasks - failedMapTIPs - finishedMapTasks + speculativeMapTasks; } public synchronized int pendingReduces() { - return numReduceTasks - runningReduceTasks - failedReduceTasks - + return numReduceTasks - runningReduceTasks - failedReduceTIPs - finishedReduceTasks + speculativeReduceTasks; } public JobPriority getPriority() { @@ -1915,8 +1915,6 @@ if ((status.getRunState() == JobStatus.RUNNING) || (status.getRunState() == JobStatus.PREP)) { LOG.info("Killing job '" + this.status.getJobID() + "'"); - this.runningMapTasks = 0; - this.runningReduceTasks = 0; // // kill all TIPs. // Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgress.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgress.java?rev=704310&view=auto ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgress.java (added) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgress.java Mon Oct 13 22:03:29 2008 @@ -0,0 +1,147 @@ +package org.apache.hadoop.mapred; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.lib.IdentityMapper; +import org.apache.hadoop.mapred.lib.IdentityReducer; + +import junit.framework.TestCase; + +public class TestJobInProgress extends TestCase { + + private MiniMRCluster mrCluster; + + private MiniDFSCluster dfsCluster; + JobTracker jt; + + public static class FailMapTaskJob extends MapReduceBase implements + Mapper { + + @Override + public void map(LongWritable key, Text value, + OutputCollector output, Reporter reporter) + throws IOException { + // reporter.incrCounter(TaskCounts.LaunchedTask, 1); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new IllegalArgumentException("Interrupted MAP task"); + } + throw new IllegalArgumentException("Failing MAP task"); + } + } + + // Suppressing waring as we just need to write a failing reduce task job + // We don't need to bother about the actual key value pairs which are passed. + @SuppressWarnings("unchecked") + public static class FailReduceTaskJob extends MapReduceBase implements + Reducer { + + @Override + public void reduce(Object key, Iterator values, OutputCollector output, + Reporter reporter) throws IOException { + // reporter.incrCounter(TaskCounts.LaunchedTask, 1); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new IllegalArgumentException("Failing Reduce task"); + } + throw new IllegalArgumentException("Failing Reduce task"); + } + + } + + @Override + protected void setUp() throws Exception { + // TODO Auto-generated method stub + super.setUp(); + final int taskTrackers = 4; + Configuration conf = new Configuration(); + dfsCluster = new MiniDFSCluster(conf, 4, true, null); + mrCluster = new MiniMRCluster(taskTrackers, dfsCluster.getFileSystem() + .getUri().toString(), 1); + jt = mrCluster.getJobTrackerRunner().getJobTracker(); + } + + public void testPendingMapTaskCount() throws Exception { + launchTask(FailMapTaskJob.class, IdentityReducer.class); + checkTaskCounts(); + } + + public void testPendingReduceTaskCount() throws Exception { + launchTask(IdentityMapper.class, FailReduceTaskJob.class); + checkTaskCounts(); + } + + @Override + protected void tearDown() throws Exception { + mrCluster.shutdown(); + dfsCluster.shutdown(); + super.tearDown(); + } + + + @SuppressWarnings("unchecked") + void launchTask(Class MapClass,Class ReduceClass) throws Exception{ + JobConf jobConf = mrCluster.createJobConf(); + + JobClient jc = new JobClient(jobConf); + final Path inDir = new Path("./failjob/input"); + final Path outDir = new Path("./failjob/output"); + String input = "Test failing job.\n One more line"; + FileSystem inFs = inDir.getFileSystem(jobConf); + FileSystem outFs = outDir.getFileSystem(jobConf); + outFs.delete(outDir, true); + if (!inFs.mkdirs(inDir)) { + throw new IOException("create directory failed" + inDir.toString()); + } + + DataOutputStream file = inFs.create(new Path(inDir, "part-0")); + file.writeBytes(input); + file.close(); + jobConf.setJobName("failmaptask"); + jobConf.setInputFormat(TextInputFormat.class); + jobConf.setOutputKeyClass(Text.class); + jobConf.setOutputValueClass(Text.class); + jobConf.setMapperClass(MapClass); + jobConf.setCombinerClass(ReduceClass); + jobConf.setReducerClass(ReduceClass); + FileInputFormat.setInputPaths(jobConf, inDir); + FileOutputFormat.setOutputPath(jobConf, outDir); + jobConf.setNumMapTasks(10); + jobConf.setNumReduceTasks(5); + RunningJob job = null; + try { + job = JobClient.runJob(jobConf); + } catch (IOException e) { + } + + } + + void checkTaskCounts() { + JobStatus[] status = jt.getAllJobs(); + for (JobStatus js : status) { + JobInProgress jip = jt.getJob(js.getJobID()); + Counters counter = jip.getJobCounters(); + long totalTaskCount = counter + .getCounter(JobInProgress.Counter.TOTAL_LAUNCHED_MAPS) + + counter.getCounter(JobInProgress.Counter.TOTAL_LAUNCHED_REDUCES); + while (jip.getNumTaskCompletionEvents() < totalTaskCount) { + assertEquals(true, (jip.runningMaps() >= 0)); + assertEquals(true, (jip.pendingMaps() >= 0)); + assertEquals(true, (jip.runningReduces() >= 0)); + assertEquals(true, (jip.pendingReduces() >= 0)); + } + } + } + +}