hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r713612 - in /hadoop/core/trunk: ./ src/examples/org/apache/hadoop/examples/ src/test/org/apache/hadoop/mapred/
Date Thu, 13 Nov 2008 01:41:39 GMT
Author: szetszwo
Date: Wed Nov 12 17:41:39 2008
New Revision: 713612

URL: http://svn.apache.org/viewvc?rev=713612&view=rev
Log:
HADOOP-4589. Correct PiEstimator output messages and improve the code readability. (szetszwo)

Removed:
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/PiEstimator.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/examples/org/apache/hadoop/examples/PiEstimator.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskLimits.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=713612&r1=713611&r2=713612&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Nov 12 17:41:39 2008
@@ -142,6 +142,9 @@
     HADOOP-4571. Add chukwa conf files to svn:ignore list. (Eric Yang via
     szetszwo)
 
+    HADOOP-4589. Correct PiEstimator output messages and improve the code
+    readability. (szetszwo)
+
 Release 0.19.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/trunk/src/examples/org/apache/hadoop/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/PiEstimator.java?rev=713612&r1=713611&r2=713612&view=diff
==============================================================================
--- hadoop/core/trunk/src/examples/org/apache/hadoop/examples/PiEstimator.java (original)
+++ hadoop/core/trunk/src/examples/org/apache/hadoop/examples/PiEstimator.java Wed Nov 12
17:41:39 2008
@@ -22,10 +22,10 @@
 import java.math.BigDecimal;
 import java.util.Iterator;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
@@ -48,11 +48,29 @@
 /**
  * A Map-reduce program to estimate the value of Pi
  * using quasi-Monte Carlo method.
+ *
+ * Mapper:
+ *   Generate points in a unit square
+ *   and then count points inside/outside of the inscribed circle of the square.
+ *
+ * Reducer:
+ *   Accumulate points inside/outside results from the mappers.
+ *
+ * Let numTotal = numInside + numOutside.
+ * The fraction numInside/numTotal is a rational approximation of
+ * the value (Area of the circle)/(Area of the square),
+ * where the area of the inscribed circle is Pi/4
+ * and the area of unit square is 1.
+ * Then, Pi is estimated value to be 4(numInside/numTotal).  
  */
 public class PiEstimator extends Configured implements Tool {
+  /** tmp directory for input/output */
+  static private final Path TMP_DIR = new Path(
+      PiEstimator.class.getSimpleName() + "_TMP_3_141592654");
   
   /** 2-dimensional Halton sequence {H(i)},
-   * where H(i) is a 2-dimensional point and i >= 1 is the index. 
+   * where H(i) is a 2-dimensional point and i >= 1 is the index.
+   * Halton sequence is used to generate sample points for Pi estimation. 
    */
   private static class HaltonSequence {
     /** Bases */
@@ -94,6 +112,8 @@
     /** Compute next point.
      * Assume the current point is H(index).
      * Compute H(index+1).
+     * 
+     * @return a 2-dimensional point with coordinates in [0,1)^2
      */
     double[] nextPoint() {
       index++;
@@ -113,37 +133,33 @@
   }
 
   /**
-   * Mappper class for Pi estimation.
+   * Mapper class for Pi estimation.
+   * Generate points in a unit square
+   * and then count points inside/outside of the inscribed circle of the square.
    */
-  
   public static class PiMapper extends MapReduceBase
-    implements Mapper<LongWritable, LongWritable, LongWritable, LongWritable> {
-    
-    /** Mapper configuration.
-     *
-     */
-    @Override
-    public void configure(JobConf job) {
-    }
-    
-    long numInside = 0L;
-    long numOutside = 0L;
-    
+    implements Mapper<LongWritable, LongWritable, BooleanWritable, LongWritable> {
+
     /** Map method.
-     * @param key
-     * @param val not-used
-     * @param out
+     * @param offset samples starting from the (offset+1)th sample.
+     * @param size the number of samples for this map
+     * @param out output {ture->numInside, false->numOutside}
      * @param reporter
      */
-    public void map(LongWritable key,
-                    LongWritable val,
-                    OutputCollector<LongWritable, LongWritable> out,
+    public void map(LongWritable offset,
+                    LongWritable size,
+                    OutputCollector<BooleanWritable, LongWritable> out,
                     Reporter reporter) throws IOException {
-      final HaltonSequence haltonsequence = new HaltonSequence(key.get());
-      final long nSamples = val.get();
 
-      for(long idx = 0; idx < nSamples; idx++) {
+      final HaltonSequence haltonsequence = new HaltonSequence(offset.get());
+      long numInside = 0L;
+      long numOutside = 0L;
+
+      for(long i = 0; i < size.get(); ) {
+        //generate points in a unit square
         final double[] point = haltonsequence.nextPoint();
+
+        //count points inside/outside of the inscribed circle of the square
         final double x = point[0] - 0.5;
         final double y = point[1] - 0.5;
         if (x*x + y*y > 0.25) {
@@ -151,170 +167,187 @@
         } else {
           numInside++;
         }
-        if (idx%1000 == 1) {
-          reporter.setStatus("Generated "+idx+" samples.");
+
+        //report status
+        i++;
+        if (i % 1000 == 0) {
+          reporter.setStatus("Generated " + i + " samples.");
         }
       }
-      out.collect(new LongWritable(0), new LongWritable(numOutside));
-      out.collect(new LongWritable(1), new LongWritable(numInside));
-    }
-    
-    @Override
-    public void close() {
-      // nothing
+
+      //output map results
+      out.collect(new BooleanWritable(true), new LongWritable(numInside));
+      out.collect(new BooleanWritable(false), new LongWritable(numOutside));
     }
   }
-  
+
+  /**
+   * Reducer class for Pi estimation.
+   * Accumulate points inside/outside results from the mappers.
+   */
   public static class PiReducer extends MapReduceBase
-    implements Reducer<LongWritable, LongWritable, WritableComparable<?>, Writable>
{
+    implements Reducer<BooleanWritable, LongWritable, WritableComparable<?>, Writable>
{
     
-    long numInside = 0;
-    long numOutside = 0;
-    JobConf conf;
+    private long numInside = 0;
+    private long numOutside = 0;
+    private JobConf conf; //configuration for accessing the file system
       
-    /** Reducer configuration.
-     *
-     */
+    /** Store job configuration. */
     @Override
     public void configure(JobConf job) {
       conf = job;
     }
-    /** Reduce method.
-     * @param key
-     * @param values
-     * @param output
+
+    /**
+     * Accumulate number of points inside/outside results from the mappers.
+     * @param isInside Is the points inside? 
+     * @param values An iterator to a list of point counts
+     * @param output dummy, not used here.
      * @param reporter
      */
-    public void reduce(LongWritable key,
+    public void reduce(BooleanWritable isInside,
                        Iterator<LongWritable> values,
                        OutputCollector<WritableComparable<?>, Writable> output,
                        Reporter reporter) throws IOException {
-      if (key.get() == 1) {
-        while (values.hasNext()) {
-          long num = values.next().get();
-          numInside += num;
-        }
+      if (isInside.get()) {
+        for(; values.hasNext(); numInside += values.next().get());
       } else {
-        while (values.hasNext()) {
-          long num = values.next().get();
-          numOutside += num;
-        }
+        for(; values.hasNext(); numOutside += values.next().get());
       }
     }
-      
+
+    /**
+     * Reduce task done, write output to a file.
+     */
     @Override
     public void close() throws IOException {
-      Path tmpDir = new Path("test-mini-mr");
-      Path outDir = new Path(tmpDir, "out");
+      //write output to a file
+      Path outDir = new Path(TMP_DIR, "out");
       Path outFile = new Path(outDir, "reduce-out");
       FileSystem fileSys = FileSystem.get(conf);
-      SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf, 
-                                                             outFile, LongWritable.class,
LongWritable.class, 
-                                                             CompressionType.NONE);
+      SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
+          outFile, LongWritable.class, LongWritable.class, 
+          CompressionType.NONE);
       writer.append(new LongWritable(numInside), new LongWritable(numOutside));
       writer.close();
     }
   }
 
   /**
-   * This is the main driver for computing the value of Pi using
-   * monte-carlo method.
+   * Run a map/reduce job for estimating Pi.
+   *
+   * @return the estimated value of Pi
    */
-  BigDecimal launch(int numMaps, long numPoints, String jt, String dfs)
-    throws IOException {
+  public static BigDecimal estimate(int numMaps, long numPoints, JobConf jobConf
+      ) throws IOException {
+    //setup job conf
+    jobConf.setJobName(PiEstimator.class.getSimpleName());
 
-    JobConf jobConf = new JobConf(getConf(), PiEstimator.class);
-    if (jt != null) { jobConf.set("mapred.job.tracker", jt); }
-    if (dfs != null) { FileSystem.setDefaultUri(jobConf, dfs); }
-    jobConf.setJobName("test-mini-mr");
-    
-    // turn off speculative execution, because DFS doesn't handle
-    // multiple writers to the same file.
-    jobConf.setSpeculativeExecution(false);
     jobConf.setInputFormat(SequenceFileInputFormat.class);
-        
-    jobConf.setOutputKeyClass(LongWritable.class);
+
+    jobConf.setOutputKeyClass(BooleanWritable.class);
     jobConf.setOutputValueClass(LongWritable.class);
     jobConf.setOutputFormat(SequenceFileOutputFormat.class);
-    
+
     jobConf.setMapperClass(PiMapper.class);
+    jobConf.setNumMapTasks(numMaps);
+
     jobConf.setReducerClass(PiReducer.class);
-    
     jobConf.setNumReduceTasks(1);
 
-    Path tmpDir = new Path("test-mini-mr");
-    Path inDir = new Path(tmpDir, "in");
-    Path outDir = new Path(tmpDir, "out");
-    FileSystem fileSys = FileSystem.get(jobConf);
-    fileSys.delete(tmpDir, true);
-    if (!fileSys.mkdirs(inDir)) {
-      throw new IOException("Mkdirs failed to create " + inDir.toString());
-    }
-    
+    // turn off speculative execution, because DFS doesn't handle
+    // multiple writers to the same file.
+    jobConf.setSpeculativeExecution(false);
+
+    //setup input/output directories
+    final Path inDir = new Path(TMP_DIR, "in");
+    final Path outDir = new Path(TMP_DIR, "out");
     FileInputFormat.setInputPaths(jobConf, inDir);
     FileOutputFormat.setOutputPath(jobConf, outDir);
-    
-    jobConf.setNumMapTasks(numMaps);
-    
-    for(int idx=0; idx < numMaps; ++idx) {
-      Path file = new Path(inDir, "part"+idx);
-      SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, jobConf, 
-                                                             file, LongWritable.class, LongWritable.class,
CompressionType.NONE);
-      writer.append(new LongWritable(idx * numPoints), new LongWritable(numPoints));
-      writer.close();
-      System.out.println("Wrote input for Map #"+idx);
+
+    final FileSystem fs = FileSystem.get(jobConf);
+    if (fs.exists(TMP_DIR)) {
+      throw new IOException("Tmp directory " + fs.makeQualified(TMP_DIR)
+          + " already exists.  Please remove it first.");
     }
-    
-    BigDecimal estimate = BigDecimal.ZERO;
+    if (!fs.mkdirs(inDir)) {
+      throw new IOException("Cannot create input directory " + inDir);
+    }
+
     try {
+      //generate an input file for each map task
+      for(int i=0; i < numMaps; ++i) {
+        final Path file = new Path(inDir, "part"+i);
+        final LongWritable offset = new LongWritable(i * numPoints);
+        final LongWritable size = new LongWritable(numPoints);
+        final SequenceFile.Writer writer = SequenceFile.createWriter(
+            fs, jobConf, file,
+            LongWritable.class, LongWritable.class, CompressionType.NONE);
+        try {
+          writer.append(offset, size);
+        } finally {
+          writer.close();
+        }
+        System.out.println("Wrote input for Map #"+i);
+      }
+  
+      //start a map/reduce job
       System.out.println("Starting Job");
-      long startTime = System.currentTimeMillis();
+      final long startTime = System.currentTimeMillis();
       JobClient.runJob(jobConf);
-      System.out.println("Job Finished in "+
-                         (System.currentTimeMillis() - startTime)/1000.0 + " seconds");
+      final double duration = (System.currentTimeMillis() - startTime)/1000.0;
+      System.out.println("Job Finished in " + duration + " seconds");
+
+      //read outputs
       Path inFile = new Path(outDir, "reduce-out");
-      SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, inFile,
-                                                           jobConf);
       LongWritable numInside = new LongWritable();
       LongWritable numOutside = new LongWritable();
-      reader.next(numInside, numOutside);
-      reader.close();
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, jobConf);
+      try {
+        reader.next(numInside, numOutside);
+      } finally {
+        reader.close();
+      }
 
-      estimate = BigDecimal.valueOf(4).setScale(20)
+      //compute estimated value
+      return BigDecimal.valueOf(4).setScale(20)
           .multiply(BigDecimal.valueOf(numInside.get()))
           .divide(BigDecimal.valueOf(numMaps))
           .divide(BigDecimal.valueOf(numPoints));
     } finally {
-      fileSys.delete(tmpDir, true);
+      fs.delete(TMP_DIR, true);
     }
-    
-    return estimate;
   }
-  
+
   /**
-   * Launches all the tasks in order.
+   * Parse arguments and then runs a map/reduce job.
+   * Print output in standard out.
+   * 
+   * @return a non-zero if there is an error.  Otherwise, return 0.  
    */
   public int run(String[] args) throws Exception {
-    if (args.length < 2) {
-      System.err.println("Usage: TestMiniMR <nMaps> <nSamples>");
+    if (args.length != 2) {
+      System.err.println("Usage: "+getClass().getName()+" <nMaps> <nSamples>");
       ToolRunner.printGenericCommandUsage(System.err);
       return -1;
     }
     
-    int nMaps = Integer.parseInt(args[0]);
-    long nSamples = Long.parseLong(args[1]);
+    final int nMaps = Integer.parseInt(args[0]);
+    final long nSamples = Long.parseLong(args[1]);
         
-    System.out.println("Number of Maps = "+nMaps+" Samples per Map = "+nSamples);
+    System.out.println("Number of Maps  = " + nMaps);
+    System.out.println("Samples per Map = " + nSamples);
         
-    System.out.println("Estimated value of PI is "+
-                       launch(nMaps, nSamples, null, null));
-    
+    final JobConf jobConf = new JobConf(getConf(), getClass());
+    System.out.println("Estimated value of Pi is "
+        + estimate(nMaps, nSamples, jobConf));
     return 0;
   }
-  
+
+  /**
+   * main method for running it as a stand alone command. 
+   */
   public static void main(String[] argv) throws Exception {
-    int res = ToolRunner.run(new Configuration(), new PiEstimator(), argv);
-    System.exit(res);
+    System.exit(ToolRunner.run(null, new PiEstimator(), argv));
   }
-
 }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?rev=713612&r1=713611&r2=713612&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java Wed Nov 12
17:41:39 2008
@@ -41,9 +41,6 @@
  * A JUnit test to test min map-reduce cluster with local file system.
  */
 public class TestMiniMRLocalFS extends TestCase {
-  
-  static final int NUM_MAPS = 10;
-  static final int NUM_SAMPLES = 100000;
   private static String TEST_ROOT_DIR =
     new File(System.getProperty("test.build.data","/tmp"))
     .toURI().toString().replace(' ', '+');
@@ -52,10 +49,8 @@
     MiniMRCluster mr = null;
     try {
       mr = new MiniMRCluster(2, "file:///", 3);
-      double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, 
-                                           mr.createJobConf());
-      double error = Math.abs(Math.PI - estimate);
-      assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));
+      TestMiniMRWithDFS.runPI(mr, mr.createJobConf());
+
       // run the wordcount example with caching
       JobConf job = mr.createJobConf();
       TestResult ret = MRCaching.launchMRCache(TEST_ROOT_DIR + "/wc/input",

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=713612&r1=713611&r2=713612&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Wed Nov 12
17:41:39 2008
@@ -171,7 +171,8 @@
 
   static void runPI(MiniMRCluster mr, JobConf jobconf) throws IOException {
     LOG.info("runPI");
-    double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, jobconf);
+    double estimate = org.apache.hadoop.examples.PiEstimator.estimate(
+        NUM_MAPS, NUM_SAMPLES, jobconf).doubleValue();
     double error = Math.abs(Math.PI - estimate);
     assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));
     checkTaskDirectories(mr, new String[]{}, new String[]{});

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskLimits.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskLimits.java?rev=713612&r1=713611&r2=713612&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskLimits.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskLimits.java Wed Nov 12 17:41:39
2008
@@ -25,6 +25,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.examples.PiEstimator;
 import org.apache.hadoop.fs.FileSystem;
 
 import org.apache.commons.logging.impl.Log4JLogger;
@@ -56,7 +57,7 @@
   
   static void runPI(MiniMRCluster mr, JobConf jobconf) throws IOException {
     LOG.info("runPI");
-    double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, jobconf);
+    double estimate = PiEstimator.estimate(NUM_MAPS, NUM_SAMPLES, jobconf).doubleValue();
     double error = Math.abs(Math.PI - estimate);
     System.out.println("PI estimation " + error);
   }



Mime
View raw message