Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 14430 invoked from network); 28 Apr 2009 11:28:10 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 28 Apr 2009 11:28:10 -0000 Received: (qmail 11857 invoked by uid 500); 28 Apr 2009 11:28:09 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 11794 invoked by uid 500); 28 Apr 2009 11:28:09 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 11785 invoked by uid 99); 28 Apr 2009 11:28:09 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Apr 2009 11:28:09 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Apr 2009 11:28:00 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id B1F15238889E; Tue, 28 Apr 2009 11:27:38 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r769339 - in /hadoop/core/trunk: ./ src/core/org/apache/hadoop/io/ src/examples/org/apache/hadoop/examples/ src/test/org/apache/hadoop/mapred/ Date: Tue, 28 Apr 2009 11:27:38 -0000 To: core-commits@hadoop.apache.org From: sharad@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090428112738.B1F15238889E@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: sharad Date: Tue Apr 28 11:27:37 2009 New Revision: 769339 URL: http://svn.apache.org/viewvc?rev=769339&view=rev Log: HADOOP-5699. Change PiEstimator to use new mapreduce api. Contributed by Amareshwari Sriramadasu. Modified: hadoop/core/trunk/CHANGES.txt hadoop/core/trunk/src/core/org/apache/hadoop/io/BooleanWritable.java hadoop/core/trunk/src/examples/org/apache/hadoop/examples/PiEstimator.java hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestComparators.java hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskLimits.java Modified: hadoop/core/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=769339&r1=769338&r2=769339&view=diff ============================================================================== --- hadoop/core/trunk/CHANGES.txt (original) +++ hadoop/core/trunk/CHANGES.txt Tue Apr 28 11:27:37 2009 @@ -34,6 +34,12 @@ HADOOP-5681. Change examples RandomWriter and RandomTextWriter to use new mapreduce API. (Amareshwari Sriramadasu via sharad) + HADOOP-5680. Change org.apache.hadoop.examples.SleepJob to use new + mapreduce api. (Amareshwari Sriramadasu via sharad) + + HADOOP-5699. Change org.apache.hadoop.examples.PiEstimator to use + new mapreduce api. (Amareshwari Sriramadasu via sharad) + NEW FEATURES HADOOP-4268. Change fsck to use ClientProtocol methods so that the @@ -256,9 +262,6 @@ HADOOP-5589. Eliminate source limit of 64 for map-side joins imposed by TupleWritable encoding. (Jingkei Ly via cdouglas) - HADOOP-5680. Change org.apache.hadoop.examples.SleepJob to use new - mapreduce api. (Amareshwari Sriramadasu via sharad) - OPTIMIZATIONS HADOOP-5595. NameNode does not need to run a replicator to choose a Modified: hadoop/core/trunk/src/core/org/apache/hadoop/io/BooleanWritable.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/io/BooleanWritable.java?rev=769339&r1=769338&r2=769339&view=diff ============================================================================== --- hadoop/core/trunk/src/core/org/apache/hadoop/io/BooleanWritable.java (original) +++ hadoop/core/trunk/src/core/org/apache/hadoop/io/BooleanWritable.java Tue Apr 28 11:27:37 2009 @@ -100,9 +100,7 @@ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { - boolean a = (readInt(b1, s1) == 1) ? true : false; - boolean b = (readInt(b2, s2) == 1) ? true : false; - return ((a == b) ? 0 : (a == false) ? -1 : 1); + return compareBytes(b1, s1, l1, b2, s2, l2); } } Modified: hadoop/core/trunk/src/examples/org/apache/hadoop/examples/PiEstimator.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/PiEstimator.java?rev=769339&r1=769338&r2=769339&view=diff ============================================================================== --- hadoop/core/trunk/src/examples/org/apache/hadoop/examples/PiEstimator.java (original) +++ hadoop/core/trunk/src/examples/org/apache/hadoop/examples/PiEstimator.java Tue Apr 28 11:27:37 2009 @@ -20,8 +20,8 @@ import java.io.IOException; import java.math.BigDecimal; -import java.util.Iterator; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -31,17 +31,11 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.SequenceFile.CompressionType; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.SequenceFileInputFormat; -import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -137,19 +131,18 @@ * Generate points in a unit square * and then count points inside/outside of the inscribed circle of the square. */ - public static class PiMapper extends MapReduceBase - implements Mapper { + public static class PiMapper extends + Mapper { /** Map method. * @param offset samples starting from the (offset+1)th sample. * @param size the number of samples for this map - * @param out output {ture->numInside, false->numOutside} - * @param reporter + * @param context output {ture->numInside, false->numOutside} */ public void map(LongWritable offset, LongWritable size, - OutputCollector out, - Reporter reporter) throws IOException { + Context context) + throws IOException, InterruptedException { final HaltonSequence haltonsequence = new HaltonSequence(offset.get()); long numInside = 0L; @@ -171,13 +164,13 @@ //report status i++; if (i % 1000 == 0) { - reporter.setStatus("Generated " + i + " samples."); + context.setStatus("Generated " + i + " samples."); } } //output map results - out.collect(new BooleanWritable(true), new LongWritable(numInside)); - out.collect(new BooleanWritable(false), new LongWritable(numOutside)); + context.write(new BooleanWritable(true), new LongWritable(numInside)); + context.write(new BooleanWritable(false), new LongWritable(numOutside)); } } @@ -185,34 +178,29 @@ * Reducer class for Pi estimation. * Accumulate points inside/outside results from the mappers. */ - public static class PiReducer extends MapReduceBase - implements Reducer, Writable> { + public static class PiReducer extends + Reducer, Writable> { private long numInside = 0; private long numOutside = 0; - private JobConf conf; //configuration for accessing the file system - /** Store job configuration. */ - @Override - public void configure(JobConf job) { - conf = job; - } - /** * Accumulate number of points inside/outside results from the mappers. * @param isInside Is the points inside? * @param values An iterator to a list of point counts - * @param output dummy, not used here. - * @param reporter + * @param context dummy, not used here. */ public void reduce(BooleanWritable isInside, - Iterator values, - OutputCollector, Writable> output, - Reporter reporter) throws IOException { + Iterable values, Context context) + throws IOException, InterruptedException { if (isInside.get()) { - for(; values.hasNext(); numInside += values.next().get()); + for (LongWritable val : values) { + numInside += val.get(); + } } else { - for(; values.hasNext(); numOutside += values.next().get()); + for (LongWritable val : values) { + numOutside += val.get(); + } } } @@ -220,10 +208,11 @@ * Reduce task done, write output to a file. */ @Override - public void close() throws IOException { + public void cleanup(Context context) throws IOException { //write output to a file Path outDir = new Path(TMP_DIR, "out"); Path outFile = new Path(outDir, "reduce-out"); + Configuration conf = context.getConfiguration(); FileSystem fileSys = FileSystem.get(conf); SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf, outFile, LongWritable.class, LongWritable.class, @@ -238,34 +227,35 @@ * * @return the estimated value of Pi */ - public static BigDecimal estimate(int numMaps, long numPoints, JobConf jobConf - ) throws IOException { + public static BigDecimal estimate(int numMaps, long numPoints, Configuration conf + ) throws IOException, ClassNotFoundException, InterruptedException { + Job job = new Job(conf); //setup job conf - jobConf.setJobName(PiEstimator.class.getSimpleName()); + job.setJobName(PiEstimator.class.getSimpleName()); + job.setJarByClass(PiEstimator.class); - jobConf.setInputFormat(SequenceFileInputFormat.class); + job.setInputFormatClass(SequenceFileInputFormat.class); - jobConf.setOutputKeyClass(BooleanWritable.class); - jobConf.setOutputValueClass(LongWritable.class); - jobConf.setOutputFormat(SequenceFileOutputFormat.class); + job.setOutputKeyClass(BooleanWritable.class); + job.setOutputValueClass(LongWritable.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); - jobConf.setMapperClass(PiMapper.class); - jobConf.setNumMapTasks(numMaps); + job.setMapperClass(PiMapper.class); - jobConf.setReducerClass(PiReducer.class); - jobConf.setNumReduceTasks(1); + job.setReducerClass(PiReducer.class); + job.setNumReduceTasks(1); // turn off speculative execution, because DFS doesn't handle // multiple writers to the same file. - jobConf.setSpeculativeExecution(false); + job.setSpeculativeExecution(false); //setup input/output directories final Path inDir = new Path(TMP_DIR, "in"); final Path outDir = new Path(TMP_DIR, "out"); - FileInputFormat.setInputPaths(jobConf, inDir); - FileOutputFormat.setOutputPath(jobConf, outDir); + FileInputFormat.setInputPaths(job, inDir); + FileOutputFormat.setOutputPath(job, outDir); - final FileSystem fs = FileSystem.get(jobConf); + final FileSystem fs = FileSystem.get(conf); if (fs.exists(TMP_DIR)) { throw new IOException("Tmp directory " + fs.makeQualified(TMP_DIR) + " already exists. Please remove it first."); @@ -281,7 +271,7 @@ final LongWritable offset = new LongWritable(i * numPoints); final LongWritable size = new LongWritable(numPoints); final SequenceFile.Writer writer = SequenceFile.createWriter( - fs, jobConf, file, + fs, conf, file, LongWritable.class, LongWritable.class, CompressionType.NONE); try { writer.append(offset, size); @@ -294,7 +284,7 @@ //start a map/reduce job System.out.println("Starting Job"); final long startTime = System.currentTimeMillis(); - JobClient.runJob(jobConf); + job.waitForCompletion(true); final double duration = (System.currentTimeMillis() - startTime)/1000.0; System.out.println("Job Finished in " + duration + " seconds"); @@ -302,7 +292,7 @@ Path inFile = new Path(outDir, "reduce-out"); LongWritable numInside = new LongWritable(); LongWritable numOutside = new LongWritable(); - SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, jobConf); + SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, conf); try { reader.next(numInside, numOutside); } finally { @@ -329,7 +319,7 @@ if (args.length != 2) { System.err.println("Usage: "+getClass().getName()+" "); ToolRunner.printGenericCommandUsage(System.err); - return -1; + return 2; } final int nMaps = Integer.parseInt(args[0]); @@ -338,9 +328,8 @@ System.out.println("Number of Maps = " + nMaps); System.out.println("Samples per Map = " + nSamples); - final JobConf jobConf = new JobConf(getConf(), getClass()); System.out.println("Estimated value of Pi is " - + estimate(nMaps, nSamples, jobConf)); + + estimate(nMaps, nSamples, getConf())); return 0; } Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestComparators.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestComparators.java?rev=769339&r1=769338&r2=769339&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestComparators.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestComparators.java Tue Apr 28 11:27:37 2009 @@ -19,7 +19,6 @@ import org.apache.hadoop.fs.*; import org.apache.hadoop.io.*; -import org.apache.hadoop.io.BooleanWritable.Comparator; import junit.framework.TestCase; import java.io.*; import java.util.*; @@ -237,7 +236,8 @@ return -super.compare(b1, s1, l1, b2, s2, l2); } static { // register this comparator - WritableComparator.define(DecreasingIntComparator.class, new Comparator()); + WritableComparator.define(DecreasingIntComparator.class, + new IntWritable.Comparator()); } } @@ -268,7 +268,8 @@ } static { - WritableComparator.define(CompositeIntGroupFn.class, new Comparator()); + WritableComparator.define(CompositeIntGroupFn.class, + new IntWritable.Comparator()); } } @@ -284,7 +285,8 @@ } static { - WritableComparator.define(CompositeIntReverseGroupFn.class, new Comparator()); + WritableComparator.define(CompositeIntReverseGroupFn.class, + new IntWritable.Comparator()); } } Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?rev=769339&r1=769338&r2=769339&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java Tue Apr 28 11:27:37 2009 @@ -45,7 +45,8 @@ new File(System.getProperty("test.build.data","/tmp")) .toURI().toString().replace(' ', '+'); - public void testWithLocal() throws IOException { + public void testWithLocal() + throws IOException, InterruptedException, ClassNotFoundException { MiniMRCluster mr = null; try { mr = new MiniMRCluster(2, "file:///", 3); Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=769339&r1=769338&r2=769339&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Tue Apr 28 11:27:37 2009 @@ -169,7 +169,8 @@ } } - public static void runPI(MiniMRCluster mr, JobConf jobconf) throws IOException { + public static void runPI(MiniMRCluster mr, JobConf jobconf) + throws IOException, InterruptedException, ClassNotFoundException { LOG.info("runPI"); double estimate = org.apache.hadoop.examples.PiEstimator.estimate( NUM_MAPS, NUM_SAMPLES, jobconf).doubleValue(); @@ -232,7 +233,8 @@ } } - public void testWithDFS() throws IOException { + public void testWithDFS() + throws IOException, InterruptedException, ClassNotFoundException { MiniDFSCluster dfs = null; MiniMRCluster mr = null; FileSystem fileSys = null; Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskLimits.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskLimits.java?rev=769339&r1=769338&r2=769339&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskLimits.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskLimits.java Tue Apr 28 11:27:37 2009 @@ -55,7 +55,8 @@ } } - static void runPI(MiniMRCluster mr, JobConf jobconf) throws IOException { + static void runPI(MiniMRCluster mr, JobConf jobconf) + throws IOException, InterruptedException, ClassNotFoundException { LOG.info("runPI"); double estimate = PiEstimator.estimate(NUM_MAPS, NUM_SAMPLES, jobconf).doubleValue(); double error = Math.abs(Math.PI - estimate); @@ -66,7 +67,8 @@ * Run the pi test with a specifix value of * mapred.jobtracker.maxtasks.per.job. Returns true if the job succeeded. */ - private boolean runOneTest(int maxTasks) throws IOException { + private boolean runOneTest(int maxTasks) + throws IOException, InterruptedException, ClassNotFoundException { MiniDFSCluster dfs = null; MiniMRCluster mr = null; FileSystem fileSys = null; @@ -96,7 +98,8 @@ return success; } - public void testTaskLimits() throws IOException { + public void testTaskLimits() + throws IOException, InterruptedException, ClassNotFoundException { System.out.println("Job 1 running with max set to 2"); boolean status = runOneTest(2);