Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 38220 invoked from network); 21 Jul 2008 21:50:44 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 21 Jul 2008 21:50:44 -0000 Received: (qmail 59998 invoked by uid 500); 21 Jul 2008 21:50:44 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 59848 invoked by uid 500); 21 Jul 2008 21:50:44 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 59837 invoked by uid 99); 21 Jul 2008 21:50:44 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Jul 2008 14:50:44 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Jul 2008 21:49:58 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 721A42388986; Mon, 21 Jul 2008 14:50:23 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r678579 - in /hadoop/core/trunk: CHANGES.txt src/examples/org/apache/hadoop/examples/SleepJob.java Date: Mon, 21 Jul 2008 21:50:23 -0000 To: core-commits@hadoop.apache.org From: acmurthy@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080721215023.721A42388986@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: acmurthy Date: Mon Jul 21 14:50:22 2008 New Revision: 678579 URL: http://svn.apache.org/viewvc?rev=678579&view=rev Log: HADOOP-3728. Fix SleepJob so that it doesn't depend on temporary files, this ensures we can now run more than one instance of SleepJob simultaneously. Contributed by Chris Douglas. Modified: hadoop/core/trunk/CHANGES.txt hadoop/core/trunk/src/examples/org/apache/hadoop/examples/SleepJob.java Modified: hadoop/core/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=678579&r1=678578&r2=678579&view=diff ============================================================================== --- hadoop/core/trunk/CHANGES.txt (original) +++ hadoop/core/trunk/CHANGES.txt Mon Jul 21 14:50:22 2008 @@ -146,6 +146,10 @@ HADOOP-3771. Ensure that Lzo compressors/decompressors correctly handle the case where native libraries aren't available. (Chris Douglas via acmurthy) + HADOOP-3728. Fix SleepJob so that it doesn't depend on temporary files, + this ensures we can now run more than one instance of SleepJob + simultaneously. (Chris Douglas via acmurthy) + Release 0.18.0 - Unreleased INCOMPATIBLE CHANGES Modified: hadoop/core/trunk/src/examples/org/apache/hadoop/examples/SleepJob.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/SleepJob.java?rev=678579&r1=678578&r2=678579&view=diff ============================================================================== --- hadoop/core/trunk/src/examples/org/apache/hadoop/examples/SleepJob.java (original) +++ hadoop/core/trunk/src/examples/org/apache/hadoop/examples/SleepJob.java Mon Jul 21 14:50:22 2008 @@ -18,6 +18,8 @@ package org.apache.hadoop.examples; import java.io.IOException; +import java.io.DataInput; +import java.io.DataOutput; import java.util.Iterator; import java.util.Random; @@ -26,16 +28,9 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Partitioner; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapred.lib.NullOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -48,71 +43,120 @@ * some disk space. */ public class SleepJob extends Configured implements Tool, - Mapper, - Reducer, - Partitioner{ - - private long mapSleepTime = 100; - private long reduceSleepTime = 100; - private long mapSleepCount = 1; - private long reduceSleepCount = 1; - private int numReduce; - - private boolean firstRecord = true; - private long count = 0; - - public int getPartition(IntWritable key, IntWritable value, int numPartitions) { - return key.get() % numPartitions; + Mapper, + Reducer, + Partitioner { + + private long mapSleepDuration = 100; + private long reduceSleepDuration = 100; + private int mapSleepCount = 1; + private int reduceSleepCount = 1; + private int count = 0; + + public int getPartition(IntWritable k, NullWritable v, int numPartitions) { + return k.get() % numPartitions; } + public static class EmptySplit implements InputSplit { + public void write(DataOutput out) throws IOException { } + public void readFields(DataInput in) throws IOException { } + public long getLength() { return 0L; } + public String[] getLocations() { return new String[0]; } + } + + public static class SleepInputFormat extends Configured + implements InputFormat { + public void validateInput(JobConf conf) { } + public InputSplit[] getSplits(JobConf conf, int numSplits) { + InputSplit[] ret = new InputSplit[numSplits]; + for (int i = 0; i < numSplits; ++i) { + ret[i] = new EmptySplit(); + } + return ret; + } + public RecordReader getRecordReader( + InputSplit ignored, JobConf conf, Reporter reporter) + throws IOException { + final int count = conf.getInt("sleep.job.map.sleep.count", 1); + if (count < 0) throw new IOException("Invalid map count: " + count); + final int redcount = conf.getInt("sleep.job.reduce.sleep.count", 1); + if (redcount < 0) + throw new IOException("Invalid reduce count: " + redcount); + final int emitPerMapTask = (redcount * conf.getNumReduceTasks()); + return new RecordReader() { + private int records = 0; + private int emitCount = 0; + + public boolean next(IntWritable key, IntWritable value) + throws IOException { + key.set(emitCount); + int emit = emitPerMapTask / count; + if ((emitPerMapTask) % count > records) { + ++emit; + } + emitCount += emit; + value.set(emit); + return records++ < count; + } + public IntWritable createKey() { return new IntWritable(); } + public IntWritable createValue() { return new IntWritable(); } + public long getPos() throws IOException { return records; } + public void close() throws IOException { } + public float getProgress() throws IOException { + return records / ((float)count); + } + }; + } + } + public void map(IntWritable key, IntWritable value, - OutputCollector output, Reporter reporter) throws IOException { + OutputCollector output, Reporter reporter) + throws IOException { //it is expected that every map processes mapSleepCount number of records. try { - long left = mapSleepCount - count ; - if(left < 0) left = 0; - reporter.setStatus("Sleeping... (" + ( mapSleepTime / mapSleepCount * left) + ") ms left"); - Thread.sleep(mapSleepTime / mapSleepCount); + reporter.setStatus("Sleeping... (" + + (mapSleepDuration * (mapSleepCount - count)) + ") ms left"); + Thread.sleep(mapSleepDuration); } catch (InterruptedException ex) { + throw (IOException)new IOException( + "Interrupted while sleeping").initCause(ex); } - count++; - if(firstRecord) { - - //output reduceSleepCount * numReduce number of random values, so that each reducer will get - //reduceSleepCount number of keys. - for(int i=0; i < reduceSleepCount * numReduce; i++) { - output.collect(new IntWritable(i), value); - } + ++count; + // output reduceSleepCount * numReduce number of random values, so that + // each reducer will get reduceSleepCount number of keys. + int k = key.get(); + for (int i = 0; i < value.get(); ++i) { + output.collect(new IntWritable(k + i), NullWritable.get()); } - firstRecord = false; } - public void reduce(IntWritable key, Iterator values, - OutputCollector output, Reporter reporter) throws IOException { - + public void reduce(IntWritable key, Iterator values, + OutputCollector output, Reporter reporter) + throws IOException { try { - long left = reduceSleepCount - count ; - if(left < 0) left = 0; - - reporter.setStatus("Sleeping... (" - +( reduceSleepTime / reduceSleepCount * left) + ") ms left"); - Thread.sleep(reduceSleepTime / reduceSleepCount); + reporter.setStatus("Sleeping... (" + + (reduceSleepDuration * (reduceSleepCount - count)) + ") ms left"); + Thread.sleep(reduceSleepDuration); } catch (InterruptedException ex) { + throw (IOException)new IOException( + "Interrupted while sleeping").initCause(ex); } - firstRecord = false; count++; } public void configure(JobConf job) { - this.mapSleepTime = job.getLong("sleep.job.map.sleep.time" , mapSleepTime); - this.reduceSleepTime = job.getLong("sleep.job.reduce.sleep.time" , reduceSleepTime); - this.mapSleepCount = job.getLong("sleep.job.map.sleep.count", mapSleepCount); - this.reduceSleepCount = job.getLong("sleep.job.reduce.sleep.count", reduceSleepCount); - numReduce = job.getNumReduceTasks(); + this.mapSleepCount = + job.getInt("sleep.job.map.sleep.count", mapSleepCount); + this.reduceSleepCount = + job.getInt("sleep.job.reduce.sleep.count", reduceSleepCount); + this.mapSleepDuration = + job.getLong("sleep.job.map.sleep.time" , 100) / mapSleepCount; + this.reduceSleepDuration = + job.getLong("sleep.job.reduce.sleep.time" , 100) / reduceSleepCount; } public void close() throws IOException { @@ -123,41 +167,28 @@ System.exit(res); } - public int run(int numMapper, int numReducer, long mapSleepTime - , long mapSleepCount, long reduceSleepTime - , long reduceSleepCount) throws Exception { - Random random = new Random(); - FileSystem fs = FileSystem.get(getConf()); - Path tempPath = new Path("/tmp/sleep.job.data"); - SequenceFile.Writer writer = SequenceFile.createWriter(fs, getConf() - , tempPath, IntWritable.class, IntWritable.class); - for(int i=0; i