Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 40426 invoked from network); 11 Sep 2009 17:39:59 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 11 Sep 2009 17:39:59 -0000 Received: (qmail 94536 invoked by uid 500); 11 Sep 2009 17:39:59 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 94490 invoked by uid 500); 11 Sep 2009 17:39:59 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 94480 invoked by uid 99); 11 Sep 2009 17:39:59 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Sep 2009 17:39:59 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Sep 2009 17:39:50 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 43A4F23888D7; Fri, 11 Sep 2009 17:39:30 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r813944 - in /hadoop/mapreduce/trunk: ./ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/examples/org/apache/hadoop/examples/ src/test/mapred/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapreduce/ src/t... Date: Fri, 11 Sep 2009 17:39:30 -0000 To: mapreduce-commits@hadoop.apache.org From: omalley@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090911173930.43A4F23888D7@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: omalley Date: Fri Sep 11 17:39:29 2009 New Revision: 813944 URL: http://svn.apache.org/viewvc?rev=813944&view=rev Log: MAPREDUCE-973. Move FailJob and SleepJob from examples to test. (cdouglas via omalley) Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/FailJob.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/SleepJob.java Removed: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/FailJob.java hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/SleepJob.java Modified: hadoop/mapreduce/trunk/CHANGES.txt hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerWithJobTracker.java hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ReliabilityTest.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueInformation.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTTMemoryReporting.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/test/MapredTestDriver.java Modified: hadoop/mapreduce/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=813944&r1=813943&r2=813944&view=diff ============================================================================== --- hadoop/mapreduce/trunk/CHANGES.txt (original) +++ hadoop/mapreduce/trunk/CHANGES.txt Fri Sep 11 17:39:29 2009 @@ -328,6 +328,9 @@ to support regulating tasks for a job based on resources currently in use by that job. (dhruba) + MAPREDUCE-973. Move FailJob and SleepJob from examples to test. (cdouglas + via omalley) + BUG FIXES MAPREDUCE-878. Rename fair scheduler design doc to Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerWithJobTracker.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerWithJobTracker.java?rev=813944&r1=813943&r2=813944&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerWithJobTracker.java (original) +++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerWithJobTracker.java Fri Sep 11 17:39:29 2009 @@ -20,8 +20,8 @@ import java.util.Properties; -import org.apache.hadoop.examples.SleepJob; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.SleepJob; public class TestCapacitySchedulerWithJobTracker extends ClusterWithCapacityScheduler { Modified: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java?rev=813944&r1=813943&r2=813944&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java (original) +++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java Fri Sep 11 17:39:29 2009 @@ -59,14 +59,12 @@ pgd.addClass("secondarysort", SecondarySort.class, "An example defining a secondary sort to the reduce."); pgd.addClass("sudoku", Sudoku.class, "A sudoku solver."); - pgd.addClass("sleep", SleepJob.class, "A job that sleeps at each map and reduce task."); pgd.addClass("join", Join.class, "A job that effects a join over sorted, equally partitioned datasets"); pgd.addClass("multifilewc", MultiFileWordCount.class, "A job that counts words from several files."); pgd.addClass("dbcount", DBCountPageView.class, "An example job that count the pageview counts from a database."); pgd.addClass("teragen", TeraGen.class, "Generate data for the terasort"); pgd.addClass("terasort", TeraSort.class, "Run the terasort"); pgd.addClass("teravalidate", TeraValidate.class, "Checking results of terasort"); - pgd.addClass("fail", FailJob.class, "a job that always fails"); exitCode = pgd.driver(argv); } catch(Throwable e){ Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ReliabilityTest.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ReliabilityTest.java?rev=813944&r1=813943&r2=813944&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ReliabilityTest.java (original) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ReliabilityTest.java Fri Sep 11 17:39:29 2009 @@ -122,7 +122,7 @@ "-r", Integer.toString(maxReduces), "-mt", Integer.toString(mapSleepTime), "-rt", Integer.toString(reduceSleepTime)}; - runTest(jc, conf, "org.apache.hadoop.examples.SleepJob", sleepJobArgs, + runTest(jc, conf, "org.apache.hadoop.mapreduce.SleepJob", sleepJobArgs, new KillTaskThread(jc, 2, 0.2f, false, 2), new KillTrackerThread(jc, 2, 0.4f, false, 1)); LOG.info("SleepJob done"); @@ -492,4 +492,4 @@ int res = ToolRunner.run(new Configuration(), new ReliabilityTest(), args); System.exit(res); } -} \ No newline at end of file +} Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java?rev=813944&r1=813943&r2=813944&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java (original) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java Fri Sep 11 17:39:29 2009 @@ -25,9 +25,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.examples.SleepJob; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.mapreduce.SleepJob; import org.apache.hadoop.util.ToolRunner; public class TestJobDirCleanup extends TestCase { Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java?rev=813944&r1=813943&r2=813944&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java (original) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java Fri Sep 11 17:39:29 2009 @@ -20,9 +20,9 @@ import java.io.IOException; -import org.apache.hadoop.examples.SleepJob; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.SleepJob; import org.apache.hadoop.util.ToolRunner; /** Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueInformation.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueInformation.java?rev=813944&r1=813943&r2=813944&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueInformation.java (original) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueInformation.java Fri Sep 11 17:39:29 2009 @@ -28,13 +28,13 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.examples.SleepJob; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UnixUserGroupInformation; +import org.apache.hadoop.mapreduce.SleepJob; import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker; import junit.framework.TestCase; Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java?rev=813944&r1=813943&r2=813944&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java (original) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java Fri Sep 11 17:39:29 2009 @@ -33,11 +33,11 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.examples.SleepJob; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.mapreduce.SleepJob; import org.apache.hadoop.security.UnixUserGroupInformation; import org.apache.hadoop.security.UserGroupInformation; Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java?rev=813944&r1=813943&r2=813944&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java (original) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java Fri Sep 11 17:39:29 2009 @@ -19,8 +19,8 @@ import java.io.IOException; -import org.apache.hadoop.examples.SleepJob; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.mapreduce.SleepJob; import org.apache.hadoop.util.ToolRunner; import junit.framework.TestCase; Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTTMemoryReporting.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTTMemoryReporting.java?rev=813944&r1=813943&r2=813944&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTTMemoryReporting.java (original) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTTMemoryReporting.java Fri Sep 11 17:39:29 2009 @@ -22,7 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.examples.SleepJob; +import org.apache.hadoop.mapreduce.SleepJob; import org.apache.hadoop.util.LinuxMemoryCalculatorPlugin; import org.apache.hadoop.util.MemoryCalculatorPlugin; import org.apache.hadoop.util.ToolRunner; Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java?rev=813944&r1=813943&r2=813944&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java (original) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java Fri Sep 11 17:39:29 2009 @@ -28,10 +28,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.examples.SleepJob; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.SleepJob; import org.apache.hadoop.mapreduce.util.TestProcfsBasedProcessTree; import org.apache.hadoop.util.ProcfsBasedProcessTree; import org.apache.hadoop.util.StringUtils; Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/FailJob.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/FailJob.java?rev=813944&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/FailJob.java (added) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/FailJob.java Fri Sep 11 17:39:29 2009 @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +/** + * Dummy class for testing failed mappers and/or reducers. + * + * Mappers emit a token amount of data. + */ +public class FailJob extends Configured implements Tool { + public static class FailMapper + extends Mapper { + public void map(LongWritable key, Text value, Context context + ) throws IOException, InterruptedException { + if (context.getConfiguration().getBoolean("fail.job.map.fail", true)) { + throw new RuntimeException("Intentional map failure"); + } + context.write(key, NullWritable.get()); + } + } + + public static class FailReducer + extends Reducer { + + public void reduce(LongWritable key, Iterable values, + Context context) throws IOException { + if (context.getConfiguration().getBoolean("fail.job.reduce.fail", false)) { + throw new RuntimeException("Intentional reduce failure"); + } + context.setStatus("No worries"); + } + } + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(new Configuration(), new FailJob(), args); + System.exit(res); + } + + public Job createJob(boolean failMappers, boolean failReducers, Path inputFile) + throws IOException { + Configuration conf = getConf(); + conf.setBoolean("fail.job.map.fail", failMappers); + conf.setBoolean("fail.job.reduce.fail", failReducers); + Job job = new Job(conf, "fail"); + job.setJarByClass(FailJob.class); + job.setMapperClass(FailMapper.class); + job.setMapOutputKeyClass(LongWritable.class); + job.setMapOutputValueClass(NullWritable.class); + job.setReducerClass(FailReducer.class); + job.setOutputFormatClass(NullOutputFormat.class); + job.setInputFormatClass(TextInputFormat.class); + job.setSpeculativeExecution(false); + job.setJobName("Fail job"); + FileInputFormat.addInputPath(job, inputFile); + return job; + } + + public int run(String[] args) throws Exception { + if(args.length < 1) { + System.err.println("FailJob " + + " (-failMappers|-failReducers)"); + ToolRunner.printGenericCommandUsage(System.err); + return 2; + } + boolean failMappers = false, failReducers = false; + + for (int i = 0; i < args.length; i++ ) { + if (args[i].equals("-failMappers")) { + failMappers = true; + } + else if(args[i].equals("-failReducers")) { + failReducers = true; + } + } + if (!(failMappers ^ failReducers)) { + System.err.println("Exactly one of -failMappers or -failReducers must be specified."); + return 3; + } + + // Write a file with one line per mapper. + final FileSystem fs = FileSystem.get(getConf()); + Path inputDir = new Path(FailJob.class.getSimpleName() + "_in"); + fs.mkdirs(inputDir); + for (int i = 0; i < getConf().getInt("mapred.map.tasks", 1); ++i) { + BufferedWriter w = new BufferedWriter(new OutputStreamWriter( + fs.create(new Path(inputDir, Integer.toString(i))))); + w.write(Integer.toString(i) + "\n"); + w.close(); + } + + Job job = createJob(failMappers, failReducers, inputDir); + return job.waitForCompletion(true) ? 0 : 1; + } +} Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/SleepJob.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/SleepJob.java?rev=813944&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/SleepJob.java (added) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/SleepJob.java Fri Sep 11 17:39:29 2009 @@ -0,0 +1,257 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce; + +import java.io.IOException; +import java.io.DataInput; +import java.io.DataOutput; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +/** + * Dummy class for testing MR framefork. Sleeps for a defined period + * of time in mapper and reducer. Generates fake input for map / reduce + * jobs. Note that generated number of input pairs is in the order + * of numMappers * mapSleepTime / 100, so the job uses + * some disk space. + */ +public class SleepJob extends Configured implements Tool { + + public static class SleepJobPartitioner extends + Partitioner { + public int getPartition(IntWritable k, NullWritable v, int numPartitions) { + return k.get() % numPartitions; + } + } + + public static class EmptySplit extends InputSplit implements Writable { + public void write(DataOutput out) throws IOException { } + public void readFields(DataInput in) throws IOException { } + public long getLength() { return 0L; } + public String[] getLocations() { return new String[0]; } + } + + public static class SleepInputFormat + extends InputFormat { + + public List getSplits(JobContext jobContext) { + List ret = new ArrayList(); + int numSplits = jobContext.getConfiguration(). + getInt("mapred.map.tasks", 1); + for (int i = 0; i < numSplits; ++i) { + ret.add(new EmptySplit()); + } + return ret; + } + + public RecordReader createRecordReader( + InputSplit ignored, TaskAttemptContext taskContext) + throws IOException { + Configuration conf = taskContext.getConfiguration(); + final int count = conf.getInt("sleep.job.map.sleep.count", 1); + if (count < 0) throw new IOException("Invalid map count: " + count); + final int redcount = conf.getInt("sleep.job.reduce.sleep.count", 1); + if (redcount < 0) + throw new IOException("Invalid reduce count: " + redcount); + final int emitPerMapTask = (redcount * taskContext.getNumReduceTasks()); + + return new RecordReader() { + private int records = 0; + private int emitCount = 0; + private IntWritable key = null; + private IntWritable value = null; + public void initialize(InputSplit split, TaskAttemptContext context) { + } + + public boolean nextKeyValue() + throws IOException { + key = new IntWritable(); + key.set(emitCount); + int emit = emitPerMapTask / count; + if ((emitPerMapTask) % count > records) { + ++emit; + } + emitCount += emit; + value = new IntWritable(); + value.set(emit); + return records++ < count; + } + public IntWritable getCurrentKey() { return key; } + public IntWritable getCurrentValue() { return value; } + public void close() throws IOException { } + public float getProgress() throws IOException { + return records / ((float)count); + } + }; + } + } + + public static class SleepMapper + extends Mapper { + private long mapSleepDuration = 100; + private int mapSleepCount = 1; + private int count = 0; + + protected void setup(Context context) + throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + this.mapSleepCount = + conf.getInt("sleep.job.map.sleep.count", mapSleepCount); + this.mapSleepDuration = + conf.getLong("sleep.job.map.sleep.time" , 100) / mapSleepCount; + } + + public void map(IntWritable key, IntWritable value, Context context + ) throws IOException, InterruptedException { + //it is expected that every map processes mapSleepCount number of records. + try { + context.setStatus("Sleeping... (" + + (mapSleepDuration * (mapSleepCount - count)) + ") ms left"); + Thread.sleep(mapSleepDuration); + } + catch (InterruptedException ex) { + throw (IOException)new IOException( + "Interrupted while sleeping").initCause(ex); + } + ++count; + // output reduceSleepCount * numReduce number of random values, so that + // each reducer will get reduceSleepCount number of keys. + int k = key.get(); + for (int i = 0; i < value.get(); ++i) { + context.write(new IntWritable(k + i), NullWritable.get()); + } + } + } + + public static class SleepReducer + extends Reducer { + private long reduceSleepDuration = 100; + private int reduceSleepCount = 1; + private int count = 0; + + protected void setup(Context context) + throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + this.reduceSleepCount = + conf.getInt("sleep.job.reduce.sleep.count", reduceSleepCount); + this.reduceSleepDuration = + conf.getLong("sleep.job.reduce.sleep.time" , 100) / reduceSleepCount; + } + + public void reduce(IntWritable key, Iterable values, + Context context) + throws IOException { + try { + context.setStatus("Sleeping... (" + + (reduceSleepDuration * (reduceSleepCount - count)) + ") ms left"); + Thread.sleep(reduceSleepDuration); + + } + catch (InterruptedException ex) { + throw (IOException)new IOException( + "Interrupted while sleeping").initCause(ex); + } + count++; + } + } + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(new Configuration(), new SleepJob(), args); + System.exit(res); + } + + public Job createJob(int numMapper, int numReducer, + long mapSleepTime, int mapSleepCount, + long reduceSleepTime, int reduceSleepCount) + throws IOException { + Configuration conf = getConf(); + conf.setLong("sleep.job.map.sleep.time", mapSleepTime); + conf.setLong("sleep.job.reduce.sleep.time", reduceSleepTime); + conf.setInt("sleep.job.map.sleep.count", mapSleepCount); + conf.setInt("sleep.job.reduce.sleep.count", reduceSleepCount); + conf.setInt("mapred.map.tasks", numMapper); + Job job = new Job(conf, "sleep"); + job.setNumReduceTasks(numReducer); + job.setJarByClass(SleepJob.class); + job.setNumReduceTasks(numReducer); + job.setMapperClass(SleepMapper.class); + job.setMapOutputKeyClass(IntWritable.class); + job.setMapOutputValueClass(NullWritable.class); + job.setReducerClass(SleepReducer.class); + job.setOutputFormatClass(NullOutputFormat.class); + job.setInputFormatClass(SleepInputFormat.class); + job.setPartitionerClass(SleepJobPartitioner.class); + job.setSpeculativeExecution(false); + job.setJobName("Sleep job"); + FileInputFormat.addInputPath(job, new Path("ignored")); + return job; + } + + public int run(String[] args) throws Exception { + + if(args.length < 1) { + System.err.println("SleepJob [-m numMapper] [-r numReducer]" + + " [-mt mapSleepTime (msec)] [-rt reduceSleepTime (msec)]" + + " [-recordt recordSleepTime (msec)]"); + ToolRunner.printGenericCommandUsage(System.err); + return 2; + } + + int numMapper = 1, numReducer = 1; + long mapSleepTime = 100, reduceSleepTime = 100, recSleepTime = 100; + int mapSleepCount = 1, reduceSleepCount = 1; + + for(int i=0; i < args.length; i++ ) { + if(args[i].equals("-m")) { + numMapper = Integer.parseInt(args[++i]); + } + else if(args[i].equals("-r")) { + numReducer = Integer.parseInt(args[++i]); + } + else if(args[i].equals("-mt")) { + mapSleepTime = Long.parseLong(args[++i]); + } + else if(args[i].equals("-rt")) { + reduceSleepTime = Long.parseLong(args[++i]); + } + else if (args[i].equals("-recordt")) { + recSleepTime = Long.parseLong(args[++i]); + } + } + + // sleep for *SleepTime duration in Task by recSleepTime per record + mapSleepCount = (int)Math.ceil(mapSleepTime / ((double)recSleepTime)); + reduceSleepCount = (int)Math.ceil(reduceSleepTime / ((double)recSleepTime)); + Job job = createJob(numMapper, numReducer, mapSleepTime, + mapSleepCount, reduceSleepTime, reduceSleepCount); + return job.waitForCompletion(true) ? 0 : 1; + } + +} Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/test/MapredTestDriver.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/test/MapredTestDriver.java?rev=813944&r1=813943&r2=813944&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/test/MapredTestDriver.java (original) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/test/MapredTestDriver.java Fri Sep 11 17:39:29 2009 @@ -28,6 +28,8 @@ import org.apache.hadoop.mapred.TestSequenceFileInputFormat; import org.apache.hadoop.mapred.TestTextInputFormat; import org.apache.hadoop.mapred.ThreadedMapBenchmark; +import org.apache.hadoop.mapreduce.FailJob; +import org.apache.hadoop.mapreduce.SleepJob; import org.apache.hadoop.util.ProgramDriver; /** @@ -69,6 +71,8 @@ pgd.addClass("MRReliabilityTest", ReliabilityTest.class, "A program that tests the reliability of the MR framework by " + "injecting faults/failures"); + pgd.addClass("fail", FailJob.class, "a job that always fails"); + pgd.addClass("sleep", SleepJob.class, "A job that sleeps at each map and reduce task."); } catch(Throwable e) { e.printStackTrace(); }