hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r678579 - in /hadoop/core/trunk: CHANGES.txt src/examples/org/apache/hadoop/examples/SleepJob.java
Date Mon, 21 Jul 2008 21:50:23 GMT
Author: acmurthy
Date: Mon Jul 21 14:50:22 2008
New Revision: 678579

URL: http://svn.apache.org/viewvc?rev=678579&view=rev
Log:
HADOOP-3728. Fix SleepJob so that it doesn't depend on temporary files, this ensures we can
now run more than one instance of SleepJob simultaneously. Contributed by Chris Douglas.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/examples/org/apache/hadoop/examples/SleepJob.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=678579&r1=678578&r2=678579&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Jul 21 14:50:22 2008
@@ -146,6 +146,10 @@
     HADOOP-3771. Ensure that Lzo compressors/decompressors correctly handle the
     case where native libraries aren't available. (Chris Douglas via acmurthy) 
 
+    HADOOP-3728. Fix SleepJob so that it doesn't depend on temporary files,
+    this ensures we can now run more than one instance of SleepJob
+    simultaneously. (Chris Douglas via acmurthy) 
+
 Release 0.18.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/trunk/src/examples/org/apache/hadoop/examples/SleepJob.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/SleepJob.java?rev=678579&r1=678578&r2=678579&view=diff
==============================================================================
--- hadoop/core/trunk/src/examples/org/apache/hadoop/examples/SleepJob.java (original)
+++ hadoop/core/trunk/src/examples/org/apache/hadoop/examples/SleepJob.java Mon Jul 21 14:50:22
2008
@@ -18,6 +18,8 @@
 package org.apache.hadoop.examples;
 
 import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.util.Iterator;
 import java.util.Random;
 
@@ -26,16 +28,9 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Partitioner;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.mapred.lib.NullOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -48,71 +43,120 @@
  * some disk space.
  */
 public class SleepJob extends Configured implements Tool,  
-             Mapper<IntWritable, IntWritable, IntWritable, IntWritable>, 
-             Reducer<IntWritable, IntWritable, IntWritable, IntWritable>, 
-             Partitioner<IntWritable, IntWritable>{
-
-  private long mapSleepTime = 100;
-  private long reduceSleepTime = 100;
-  private long mapSleepCount = 1;
-  private long reduceSleepCount = 1;
-  private int  numReduce;
-  
-  private boolean firstRecord = true;
-  private long count = 0;
-  
-  public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
-    return key.get() % numPartitions;
+             Mapper<IntWritable, IntWritable, IntWritable, NullWritable>,
+             Reducer<IntWritable, NullWritable, NullWritable, NullWritable>,
+             Partitioner<IntWritable,NullWritable> {
+
+  private long mapSleepDuration = 100;
+  private long reduceSleepDuration = 100;
+  private int mapSleepCount = 1;
+  private int reduceSleepCount = 1;
+  private int count = 0;
+
+  public int getPartition(IntWritable k, NullWritable v, int numPartitions) {
+    return k.get() % numPartitions;
   }
   
+  public static class EmptySplit implements InputSplit {
+    public void write(DataOutput out) throws IOException { }
+    public void readFields(DataInput in) throws IOException { }
+    public long getLength() { return 0L; }
+    public String[] getLocations() { return new String[0]; }
+  }
+
+  public static class SleepInputFormat extends Configured
+      implements InputFormat<IntWritable,IntWritable> {
+    public void validateInput(JobConf conf) { }
+    public InputSplit[] getSplits(JobConf conf, int numSplits) {
+      InputSplit[] ret = new InputSplit[numSplits];
+      for (int i = 0; i < numSplits; ++i) {
+        ret[i] = new EmptySplit();
+      }
+      return ret;
+    }
+    public RecordReader<IntWritable,IntWritable> getRecordReader(
+        InputSplit ignored, JobConf conf, Reporter reporter)
+        throws IOException {
+      final int count = conf.getInt("sleep.job.map.sleep.count", 1);
+      if (count < 0) throw new IOException("Invalid map count: " + count);
+      final int redcount = conf.getInt("sleep.job.reduce.sleep.count", 1);
+      if (redcount < 0)
+        throw new IOException("Invalid reduce count: " + redcount);
+      final int emitPerMapTask = (redcount * conf.getNumReduceTasks());
+    return new RecordReader<IntWritable,IntWritable>() {
+        private int records = 0;
+        private int emitCount = 0;
+
+        public boolean next(IntWritable key, IntWritable value)
+            throws IOException {
+          key.set(emitCount);
+          int emit = emitPerMapTask / count;
+          if ((emitPerMapTask) % count > records) {
+            ++emit;
+          }
+          emitCount += emit;
+          value.set(emit);
+          return records++ < count;
+        }
+        public IntWritable createKey() { return new IntWritable(); }
+        public IntWritable createValue() { return new IntWritable(); }
+        public long getPos() throws IOException { return records; }
+        public void close() throws IOException { }
+        public float getProgress() throws IOException {
+          return records / ((float)count);
+        }
+      };
+    }
+  }
+
   public void map(IntWritable key, IntWritable value,
-      OutputCollector<IntWritable, IntWritable> output, Reporter reporter) throws IOException
{
+      OutputCollector<IntWritable, NullWritable> output, Reporter reporter)
+      throws IOException {
 
     //it is expected that every map processes mapSleepCount number of records. 
     try {
-      long left = mapSleepCount - count ;
-      if(left < 0) left = 0;
-      reporter.setStatus("Sleeping... (" + ( mapSleepTime / mapSleepCount * left) + ") ms
left");
-      Thread.sleep(mapSleepTime / mapSleepCount);
+      reporter.setStatus("Sleeping... (" +
+          (mapSleepDuration * (mapSleepCount - count)) + ") ms left");
+      Thread.sleep(mapSleepDuration);
     }
     catch (InterruptedException ex) {
+      throw (IOException)new IOException(
+          "Interrupted while sleeping").initCause(ex);
     }
-    count++;
-    if(firstRecord) {
-      
-      //output reduceSleepCount * numReduce number of random values, so that each reducer
will get 
-      //reduceSleepCount number of keys. 
-      for(int i=0; i < reduceSleepCount * numReduce; i++) {
-        output.collect(new IntWritable(i), value);
-      }
+    ++count;
+    // output reduceSleepCount * numReduce number of random values, so that
+    // each reducer will get reduceSleepCount number of keys.
+    int k = key.get();
+    for (int i = 0; i < value.get(); ++i) {
+      output.collect(new IntWritable(k + i), NullWritable.get());
     }
-    firstRecord = false;
   }
 
-  public void reduce(IntWritable key, Iterator<IntWritable> values,
-      OutputCollector<IntWritable, IntWritable> output, Reporter reporter) throws IOException
{
-
+  public void reduce(IntWritable key, Iterator<NullWritable> values,
+      OutputCollector<NullWritable, NullWritable> output, Reporter reporter)
+      throws IOException {
     try {
-      long left = reduceSleepCount - count ;
-      if(left < 0) left = 0;
-      
-      reporter.setStatus("Sleeping... (" 
-            +( reduceSleepTime / reduceSleepCount * left) + ") ms left");
-        Thread.sleep(reduceSleepTime / reduceSleepCount);
+      reporter.setStatus("Sleeping... (" +
+          (reduceSleepDuration * (reduceSleepCount - count)) + ") ms left");
+        Thread.sleep(reduceSleepDuration);
       
     }
     catch (InterruptedException ex) {
+      throw (IOException)new IOException(
+          "Interrupted while sleeping").initCause(ex);
     }
-    firstRecord = false;
     count++;
   }
 
   public void configure(JobConf job) {
-    this.mapSleepTime = job.getLong("sleep.job.map.sleep.time" , mapSleepTime);
-    this.reduceSleepTime = job.getLong("sleep.job.reduce.sleep.time" , reduceSleepTime);
-    this.mapSleepCount = job.getLong("sleep.job.map.sleep.count", mapSleepCount);
-    this.reduceSleepCount = job.getLong("sleep.job.reduce.sleep.count", reduceSleepCount);
-    numReduce = job.getNumReduceTasks();
+    this.mapSleepCount =
+      job.getInt("sleep.job.map.sleep.count", mapSleepCount);
+    this.reduceSleepCount =
+      job.getInt("sleep.job.reduce.sleep.count", reduceSleepCount);
+    this.mapSleepDuration =
+      job.getLong("sleep.job.map.sleep.time" , 100) / mapSleepCount;
+    this.reduceSleepDuration =
+      job.getLong("sleep.job.reduce.sleep.time" , 100) / reduceSleepCount;
   }
 
   public void close() throws IOException {
@@ -123,41 +167,28 @@
     System.exit(res);
   }
 
-  public int run(int numMapper, int numReducer, long mapSleepTime
-      , long mapSleepCount, long reduceSleepTime
-      , long reduceSleepCount) throws Exception {
-    Random random = new Random();
-    FileSystem fs = FileSystem.get(getConf());
-    Path tempPath = new Path("/tmp/sleep.job.data");
-    SequenceFile.Writer writer = SequenceFile.createWriter(fs, getConf()
-        , tempPath, IntWritable.class, IntWritable.class);
-    for(int i=0; i<numMapper * mapSleepCount ;i++) {
-      writer.append(new IntWritable(random.nextInt()), new IntWritable(random.nextInt()));
-    }
-    writer.close();
-    try {
-      JobConf job = new JobConf(getConf(), SleepJob.class);
-      job.setNumMapTasks(numMapper);
-      job.setNumReduceTasks(numReducer);
-      job.setMapperClass(SleepJob.class);
-      job.setMapOutputKeyClass(IntWritable.class);
-      job.setMapOutputValueClass(IntWritable.class);
-      job.setReducerClass(SleepJob.class);
-      job.setOutputFormat(NullOutputFormat.class);
-      job.setInputFormat(SequenceFileInputFormat.class);
-      job.setSpeculativeExecution(false);
-      job.setJobName("Sleep job");
-      FileInputFormat.addInputPath(job, tempPath);
-      job.setLong("sleep.job.map.sleep.time", mapSleepTime);
-      job.setLong("sleep.job.reduce.sleep.time", reduceSleepTime);
-      job.setLong("sleep.job.map.sleep.count", mapSleepCount);
-      job.setLong("sleep.job.reduce.sleep.count", reduceSleepCount);
-
-      JobClient.runJob(job);
-    } 
-    finally {
-      fs.delete(tempPath, true);
-    }
+  public int run(int numMapper, int numReducer, long mapSleepTime,
+      int mapSleepCount, long reduceSleepTime,
+      int reduceSleepCount) throws IOException {
+    JobConf job = new JobConf(getConf(), SleepJob.class);
+    job.setNumMapTasks(numMapper);
+    job.setNumReduceTasks(numReducer);
+    job.setMapperClass(SleepJob.class);
+    job.setMapOutputKeyClass(IntWritable.class);
+    job.setMapOutputValueClass(NullWritable.class);
+    job.setReducerClass(SleepJob.class);
+    job.setOutputFormat(NullOutputFormat.class);
+    job.setInputFormat(SleepInputFormat.class);
+    job.setPartitionerClass(SleepJob.class);
+    job.setSpeculativeExecution(false);
+    job.setJobName("Sleep job");
+    FileInputFormat.addInputPath(job, new Path("ignored"));
+    job.setLong("sleep.job.map.sleep.time", mapSleepTime);
+    job.setLong("sleep.job.reduce.sleep.time", reduceSleepTime);
+    job.setInt("sleep.job.map.sleep.count", mapSleepCount);
+    job.setInt("sleep.job.reduce.sleep.count", reduceSleepCount);
+
+    JobClient.runJob(job);
     return 0;
   }
 
@@ -165,14 +196,15 @@
 
     if(args.length < 1) {
       System.err.println("SleepJob [-m numMapper] [-r numReducer]" +
-          " [-mt mapSleepTime (msec)] [-rt reduceSleepTime (msec)] ");
+          " [-mt mapSleepTime (msec)] [-rt reduceSleepTime (msec)]" +
+          " [-recordt recordSleepTime (msec)]");
       ToolRunner.printGenericCommandUsage(System.err);
       return -1;
     }
 
     int numMapper = 1, numReducer = 1;
-    long mapSleepTime = 100, reduceSleepTime = 100;
-    long mapSleepCount = 1, reduceSleepCount = 1;
+    long mapSleepTime = 100, reduceSleepTime = 100, recSleepTime = 100;
+    int mapSleepCount = 1, reduceSleepCount = 1;
 
     for(int i=0; i < args.length; i++ ) {
       if(args[i].equals("-m")) {
@@ -187,13 +219,17 @@
       else if(args[i].equals("-rt")) {
         reduceSleepTime = Long.parseLong(args[++i]);
       }
+      else if (args[i].equals("-recordt")) {
+        recSleepTime = Long.parseLong(args[++i]);
+      }
     }
     
-    mapSleepCount = (long)Math.ceil(mapSleepTime / 100.0d);
-    reduceSleepCount = (long)Math.ceil(reduceSleepTime / 100.0d);
+    // sleep for *SleepTime duration in Task by recSleepTime per record
+    mapSleepCount = (int)Math.ceil(mapSleepTime / ((double)recSleepTime));
+    reduceSleepCount = (int)Math.ceil(reduceSleepTime / ((double)recSleepTime));
     
-    return run(numMapper, numReducer, mapSleepTime, mapSleepCount
-        , reduceSleepTime, reduceSleepCount);
+    return run(numMapper, numReducer, mapSleepTime, mapSleepCount,
+        reduceSleepTime, reduceSleepCount);
   }
 
 }



Mime
View raw message