hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r813944 - in /hadoop/mapreduce/trunk: ./ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/examples/org/apache/hadoop/examples/ src/test/mapred/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapreduce/ src/t...
Date Fri, 11 Sep 2009 17:39:30 GMT
Author: omalley
Date: Fri Sep 11 17:39:29 2009
New Revision: 813944

URL: http://svn.apache.org/viewvc?rev=813944&view=rev
Log:
MAPREDUCE-973. Move FailJob and SleepJob from examples to test. (cdouglas 
via omalley)

Added:
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/FailJob.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/SleepJob.java
Removed:
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/FailJob.java
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/SleepJob.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerWithJobTracker.java
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ReliabilityTest.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueInformation.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTTMemoryReporting.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/test/MapredTestDriver.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=813944&r1=813943&r2=813944&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Sep 11 17:39:29 2009
@@ -328,6 +328,9 @@
     to support regulating tasks for a job based on resources currently in use
     by that job. (dhruba)
 
+    MAPREDUCE-973. Move FailJob and SleepJob from examples to test. (cdouglas 
+    via omalley)
+
   BUG FIXES
 
     MAPREDUCE-878. Rename fair scheduler design doc to 

Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerWithJobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerWithJobTracker.java?rev=813944&r1=813943&r2=813944&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerWithJobTracker.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerWithJobTracker.java
Fri Sep 11 17:39:29 2009
@@ -20,8 +20,8 @@
 
 import java.util.Properties;
 
-import org.apache.hadoop.examples.SleepJob;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.SleepJob;
 
 public class TestCapacitySchedulerWithJobTracker extends
     ClusterWithCapacityScheduler {

Modified: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java?rev=813944&r1=813943&r2=813944&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java (original)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java Fri
Sep 11 17:39:29 2009
@@ -59,14 +59,12 @@
       pgd.addClass("secondarysort", SecondarySort.class,
                    "An example defining a secondary sort to the reduce.");
       pgd.addClass("sudoku", Sudoku.class, "A sudoku solver.");
-      pgd.addClass("sleep", SleepJob.class, "A job that sleeps at each map and reduce task.");
       pgd.addClass("join", Join.class, "A job that effects a join over sorted, equally partitioned
datasets");
       pgd.addClass("multifilewc", MultiFileWordCount.class, "A job that counts words from
several files.");
       pgd.addClass("dbcount", DBCountPageView.class, "An example job that count the pageview
counts from a database.");
       pgd.addClass("teragen", TeraGen.class, "Generate data for the terasort");
       pgd.addClass("terasort", TeraSort.class, "Run the terasort");
       pgd.addClass("teravalidate", TeraValidate.class, "Checking results of terasort");
-      pgd.addClass("fail", FailJob.class, "a job that always fails");
       exitCode = pgd.driver(argv);
     }
     catch(Throwable e){

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ReliabilityTest.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ReliabilityTest.java?rev=813944&r1=813943&r2=813944&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ReliabilityTest.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ReliabilityTest.java Fri
Sep 11 17:39:29 2009
@@ -122,7 +122,7 @@
         "-r", Integer.toString(maxReduces),
         "-mt", Integer.toString(mapSleepTime),
         "-rt", Integer.toString(reduceSleepTime)};
-    runTest(jc, conf, "org.apache.hadoop.examples.SleepJob", sleepJobArgs, 
+    runTest(jc, conf, "org.apache.hadoop.mapreduce.SleepJob", sleepJobArgs, 
         new KillTaskThread(jc, 2, 0.2f, false, 2),
         new KillTrackerThread(jc, 2, 0.4f, false, 1));
     LOG.info("SleepJob done");
@@ -492,4 +492,4 @@
     int res = ToolRunner.run(new Configuration(), new ReliabilityTest(), args);
     System.exit(res);
   }
-}
\ No newline at end of file
+}

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java?rev=813944&r1=813943&r2=813944&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java
Fri Sep 11 17:39:29 2009
@@ -25,9 +25,9 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.examples.SleepJob;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapreduce.SleepJob;
 import org.apache.hadoop.util.ToolRunner;
 
 public class TestJobDirCleanup extends TestCase {

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java?rev=813944&r1=813943&r2=813944&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java
Fri Sep 11 17:39:29 2009
@@ -20,9 +20,9 @@
 
 import java.io.IOException;
 
-import org.apache.hadoop.examples.SleepJob;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.SleepJob;
 import org.apache.hadoop.util.ToolRunner;
 
 /**

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueInformation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueInformation.java?rev=813944&r1=813943&r2=813944&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueInformation.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueInformation.java
Fri Sep 11 17:39:29 2009
@@ -28,13 +28,13 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.examples.SleepJob;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.mapreduce.SleepJob;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 
 import junit.framework.TestCase;

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java?rev=813944&r1=813943&r2=813944&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java
Fri Sep 11 17:39:29 2009
@@ -33,11 +33,11 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.examples.SleepJob;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapreduce.SleepJob;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java?rev=813944&r1=813943&r2=813944&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java Fri
Sep 11 17:39:29 2009
@@ -19,8 +19,8 @@
 
 import java.io.IOException;
 
-import org.apache.hadoop.examples.SleepJob;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.mapreduce.SleepJob;
 import org.apache.hadoop.util.ToolRunner;
 
 import junit.framework.TestCase;

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTTMemoryReporting.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTTMemoryReporting.java?rev=813944&r1=813943&r2=813944&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTTMemoryReporting.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTTMemoryReporting.java
Fri Sep 11 17:39:29 2009
@@ -22,7 +22,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.mapreduce.SleepJob;
 import org.apache.hadoop.util.LinuxMemoryCalculatorPlugin;
 import org.apache.hadoop.util.MemoryCalculatorPlugin;
 import org.apache.hadoop.util.ToolRunner;

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java?rev=813944&r1=813943&r2=813944&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java
Fri Sep 11 17:39:29 2009
@@ -28,10 +28,10 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.examples.SleepJob;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.SleepJob;
 import org.apache.hadoop.mapreduce.util.TestProcfsBasedProcessTree;
 import org.apache.hadoop.util.ProcfsBasedProcessTree;
 import org.apache.hadoop.util.StringUtils;

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/FailJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/FailJob.java?rev=813944&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/FailJob.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/FailJob.java Fri Sep
11 17:39:29 2009
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Dummy class for testing failed mappers and/or reducers.
+ * 
+ * Mappers emit a token amount of data.
+ */
+public class FailJob extends Configured implements Tool {
+  public static class FailMapper 
+      extends Mapper<LongWritable, Text, LongWritable, NullWritable> {
+    public void map(LongWritable key, Text value, Context context
+               ) throws IOException, InterruptedException {
+      if (context.getConfiguration().getBoolean("fail.job.map.fail", true)) {
+        throw new RuntimeException("Intentional map failure");
+      }
+      context.write(key, NullWritable.get());
+    }
+  }
+
+  public static class FailReducer  
+      extends Reducer<LongWritable, NullWritable, NullWritable, NullWritable> {
+
+    public void reduce(LongWritable key, Iterable<NullWritable> values,
+                       Context context) throws IOException {
+      if (context.getConfiguration().getBoolean("fail.job.reduce.fail", false)) {
+      	throw new RuntimeException("Intentional reduce failure");
+      }
+      context.setStatus("No worries");
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new FailJob(), args);
+    System.exit(res);
+  }
+
+  public Job createJob(boolean failMappers, boolean failReducers, Path inputFile) 
+      throws IOException {
+    Configuration conf = getConf();
+    conf.setBoolean("fail.job.map.fail", failMappers);
+    conf.setBoolean("fail.job.reduce.fail", failReducers);
+    Job job = new Job(conf, "fail");
+    job.setJarByClass(FailJob.class);
+    job.setMapperClass(FailMapper.class);
+    job.setMapOutputKeyClass(LongWritable.class);
+    job.setMapOutputValueClass(NullWritable.class);
+    job.setReducerClass(FailReducer.class);
+    job.setOutputFormatClass(NullOutputFormat.class);
+    job.setInputFormatClass(TextInputFormat.class);
+    job.setSpeculativeExecution(false);
+    job.setJobName("Fail job");
+    FileInputFormat.addInputPath(job, inputFile);
+    return job;
+  }
+
+  public int run(String[] args) throws Exception {
+    if(args.length < 1) {
+      System.err.println("FailJob " +
+          " (-failMappers|-failReducers)");
+      ToolRunner.printGenericCommandUsage(System.err);
+      return 2;
+    }
+    boolean failMappers = false, failReducers = false;
+
+    for (int i = 0; i < args.length; i++ ) {
+      if (args[i].equals("-failMappers")) {
+        failMappers = true;
+      }
+      else if(args[i].equals("-failReducers")) {
+        failReducers = true;
+      }
+    }
+    if (!(failMappers ^ failReducers)) {
+      System.err.println("Exactly one of -failMappers or -failReducers must be specified.");
+      return 3;
+    }
+
+    // Write a file with one line per mapper.
+    final FileSystem fs = FileSystem.get(getConf());
+    Path inputDir = new Path(FailJob.class.getSimpleName() + "_in");
+    fs.mkdirs(inputDir);
+    for (int i = 0; i < getConf().getInt("mapred.map.tasks", 1); ++i) {
+      BufferedWriter w = new BufferedWriter(new OutputStreamWriter(
+          fs.create(new Path(inputDir, Integer.toString(i)))));
+      w.write(Integer.toString(i) + "\n");
+      w.close();
+    }
+
+    Job job = createJob(failMappers, failReducers, inputDir);
+    return job.waitForCompletion(true) ? 0 : 1;
+  }
+}

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/SleepJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/SleepJob.java?rev=813944&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/SleepJob.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/SleepJob.java Fri Sep
11 17:39:29 2009
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Dummy class for testing MR framefork. Sleeps for a defined period 
+ * of time in mapper and reducer. Generates fake input for map / reduce 
+ * jobs. Note that generated number of input pairs is in the order 
+ * of <code>numMappers * mapSleepTime / 100</code>, so the job uses
+ * some disk space.
+ */
+public class SleepJob extends Configured implements Tool {
+
+  public static class SleepJobPartitioner extends 
+      Partitioner<IntWritable, NullWritable> {
+    public int getPartition(IntWritable k, NullWritable v, int numPartitions) {
+      return k.get() % numPartitions;
+    }
+  }
+  
+  public static class EmptySplit extends InputSplit implements Writable {
+    public void write(DataOutput out) throws IOException { }
+    public void readFields(DataInput in) throws IOException { }
+    public long getLength() { return 0L; }
+    public String[] getLocations() { return new String[0]; }
+  }
+
+  public static class SleepInputFormat 
+      extends InputFormat<IntWritable,IntWritable> {
+    
+    public List<InputSplit> getSplits(JobContext jobContext) {
+      List<InputSplit> ret = new ArrayList<InputSplit>();
+      int numSplits = jobContext.getConfiguration().
+                        getInt("mapred.map.tasks", 1);
+      for (int i = 0; i < numSplits; ++i) {
+        ret.add(new EmptySplit());
+      }
+      return ret;
+    }
+    
+    public RecordReader<IntWritable,IntWritable> createRecordReader(
+        InputSplit ignored, TaskAttemptContext taskContext)
+        throws IOException {
+      Configuration conf = taskContext.getConfiguration();
+      final int count = conf.getInt("sleep.job.map.sleep.count", 1);
+      if (count < 0) throw new IOException("Invalid map count: " + count);
+      final int redcount = conf.getInt("sleep.job.reduce.sleep.count", 1);
+      if (redcount < 0)
+        throw new IOException("Invalid reduce count: " + redcount);
+      final int emitPerMapTask = (redcount * taskContext.getNumReduceTasks());
+      
+      return new RecordReader<IntWritable,IntWritable>() {
+        private int records = 0;
+        private int emitCount = 0;
+        private IntWritable key = null;
+        private IntWritable value = null;
+        public void initialize(InputSplit split, TaskAttemptContext context) {
+        }
+
+        public boolean nextKeyValue()
+            throws IOException {
+          key = new IntWritable();
+          key.set(emitCount);
+          int emit = emitPerMapTask / count;
+          if ((emitPerMapTask) % count > records) {
+            ++emit;
+          }
+          emitCount += emit;
+          value = new IntWritable();
+          value.set(emit);
+          return records++ < count;
+        }
+        public IntWritable getCurrentKey() { return key; }
+        public IntWritable getCurrentValue() { return value; }
+        public void close() throws IOException { }
+        public float getProgress() throws IOException {
+          return records / ((float)count);
+        }
+      };
+    }
+  }
+
+  public static class SleepMapper 
+      extends Mapper<IntWritable, IntWritable, IntWritable, NullWritable> {
+    private long mapSleepDuration = 100;
+    private int mapSleepCount = 1;
+    private int count = 0;
+
+    protected void setup(Context context) 
+      throws IOException, InterruptedException {
+      Configuration conf = context.getConfiguration();
+      this.mapSleepCount =
+        conf.getInt("sleep.job.map.sleep.count", mapSleepCount);
+      this.mapSleepDuration =
+        conf.getLong("sleep.job.map.sleep.time" , 100) / mapSleepCount;
+    }
+
+    public void map(IntWritable key, IntWritable value, Context context
+               ) throws IOException, InterruptedException {
+      //it is expected that every map processes mapSleepCount number of records. 
+      try {
+        context.setStatus("Sleeping... (" +
+          (mapSleepDuration * (mapSleepCount - count)) + ") ms left");
+        Thread.sleep(mapSleepDuration);
+      }
+      catch (InterruptedException ex) {
+        throw (IOException)new IOException(
+            "Interrupted while sleeping").initCause(ex);
+      }
+      ++count;
+      // output reduceSleepCount * numReduce number of random values, so that
+      // each reducer will get reduceSleepCount number of keys.
+      int k = key.get();
+      for (int i = 0; i < value.get(); ++i) {
+        context.write(new IntWritable(k + i), NullWritable.get());
+      }
+    }
+  }
+  
+  public static class SleepReducer  
+      extends Reducer<IntWritable, NullWritable, NullWritable, NullWritable> {
+    private long reduceSleepDuration = 100;
+    private int reduceSleepCount = 1;
+    private int count = 0;
+
+    protected void setup(Context context) 
+      throws IOException, InterruptedException {
+      Configuration conf = context.getConfiguration();
+      this.reduceSleepCount =
+        conf.getInt("sleep.job.reduce.sleep.count", reduceSleepCount);
+      this.reduceSleepDuration =
+        conf.getLong("sleep.job.reduce.sleep.time" , 100) / reduceSleepCount;
+    }
+
+    public void reduce(IntWritable key, Iterable<NullWritable> values,
+                       Context context)
+      throws IOException {
+      try {
+        context.setStatus("Sleeping... (" +
+            (reduceSleepDuration * (reduceSleepCount - count)) + ") ms left");
+        Thread.sleep(reduceSleepDuration);
+      
+      }
+      catch (InterruptedException ex) {
+        throw (IOException)new IOException(
+          "Interrupted while sleeping").initCause(ex);
+      }
+      count++;
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new SleepJob(), args);
+    System.exit(res);
+  }
+
+  public Job createJob(int numMapper, int numReducer, 
+                       long mapSleepTime, int mapSleepCount, 
+                       long reduceSleepTime, int reduceSleepCount) 
+      throws IOException {
+    Configuration conf = getConf();
+    conf.setLong("sleep.job.map.sleep.time", mapSleepTime);
+    conf.setLong("sleep.job.reduce.sleep.time", reduceSleepTime);
+    conf.setInt("sleep.job.map.sleep.count", mapSleepCount);
+    conf.setInt("sleep.job.reduce.sleep.count", reduceSleepCount);
+    conf.setInt("mapred.map.tasks", numMapper);
+    Job job = new Job(conf, "sleep");
+    job.setNumReduceTasks(numReducer);
+    job.setJarByClass(SleepJob.class);
+    job.setNumReduceTasks(numReducer);
+    job.setMapperClass(SleepMapper.class);
+    job.setMapOutputKeyClass(IntWritable.class);
+    job.setMapOutputValueClass(NullWritable.class);
+    job.setReducerClass(SleepReducer.class);
+    job.setOutputFormatClass(NullOutputFormat.class);
+    job.setInputFormatClass(SleepInputFormat.class);
+    job.setPartitionerClass(SleepJobPartitioner.class);
+    job.setSpeculativeExecution(false);
+    job.setJobName("Sleep job");
+    FileInputFormat.addInputPath(job, new Path("ignored"));
+    return job;
+  }
+
+  public int run(String[] args) throws Exception {
+
+    if(args.length < 1) {
+      System.err.println("SleepJob [-m numMapper] [-r numReducer]" +
+          " [-mt mapSleepTime (msec)] [-rt reduceSleepTime (msec)]" +
+          " [-recordt recordSleepTime (msec)]");
+      ToolRunner.printGenericCommandUsage(System.err);
+      return 2;
+    }
+
+    int numMapper = 1, numReducer = 1;
+    long mapSleepTime = 100, reduceSleepTime = 100, recSleepTime = 100;
+    int mapSleepCount = 1, reduceSleepCount = 1;
+
+    for(int i=0; i < args.length; i++ ) {
+      if(args[i].equals("-m")) {
+        numMapper = Integer.parseInt(args[++i]);
+      }
+      else if(args[i].equals("-r")) {
+        numReducer = Integer.parseInt(args[++i]);
+      }
+      else if(args[i].equals("-mt")) {
+        mapSleepTime = Long.parseLong(args[++i]);
+      }
+      else if(args[i].equals("-rt")) {
+        reduceSleepTime = Long.parseLong(args[++i]);
+      }
+      else if (args[i].equals("-recordt")) {
+        recSleepTime = Long.parseLong(args[++i]);
+      }
+    }
+    
+    // sleep for *SleepTime duration in Task by recSleepTime per record
+    mapSleepCount = (int)Math.ceil(mapSleepTime / ((double)recSleepTime));
+    reduceSleepCount = (int)Math.ceil(reduceSleepTime / ((double)recSleepTime));
+    Job job = createJob(numMapper, numReducer, mapSleepTime,
+                mapSleepCount, reduceSleepTime, reduceSleepCount);
+    return job.waitForCompletion(true) ? 0 : 1;
+  }
+
+}

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/test/MapredTestDriver.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/test/MapredTestDriver.java?rev=813944&r1=813943&r2=813944&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/test/MapredTestDriver.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/test/MapredTestDriver.java Fri
Sep 11 17:39:29 2009
@@ -28,6 +28,8 @@
 import org.apache.hadoop.mapred.TestSequenceFileInputFormat;
 import org.apache.hadoop.mapred.TestTextInputFormat;
 import org.apache.hadoop.mapred.ThreadedMapBenchmark;
+import org.apache.hadoop.mapreduce.FailJob;
+import org.apache.hadoop.mapreduce.SleepJob;
 import org.apache.hadoop.util.ProgramDriver;
 
 /**
@@ -69,6 +71,8 @@
       pgd.addClass("MRReliabilityTest", ReliabilityTest.class,
           "A program that tests the reliability of the MR framework by " +
           "injecting faults/failures");
+      pgd.addClass("fail", FailJob.class, "a job that always fails");
+      pgd.addClass("sleep", SleepJob.class, "A job that sleeps at each map and reduce task.");
     } catch(Throwable e) {
       e.printStackTrace();
     }



Mime
View raw message