hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r790015 [1/2] - in /hadoop/mapreduce/trunk: ./ src/examples/org/apache/hadoop/examples/ src/examples/org/apache/hadoop/examples/pi/ src/examples/org/apache/hadoop/examples/pi/math/ src/test/mapred/org/apache/hadoop/examples/pi/ src/test/map...
Date Wed, 01 Jul 2009 01:35:56 GMT
Author: szetszwo
Date: Wed Jul  1 01:35:55 2009
New Revision: 790015

URL: http://svn.apache.org/viewvc?rev=790015&view=rev
Log:
MAPREDUCE-637. Add an example, distbbp, which able to compute the n th bit of Pi for some large n.  (szetszwo)

Added:
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/Combinable.java
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/Container.java
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/DistBbp.java
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/DistSum.java
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/Parser.java
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/SummationWritable.java
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/TaskResult.java
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/Util.java
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/math/
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/math/ArithmeticProgression.java
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/math/Bellard.java
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/math/LongLong.java
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/math/Modular.java
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/math/Montgomery.java
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/math/Summation.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/examples/pi/
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/examples/pi/math/
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/examples/pi/math/TestLongLong.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/examples/pi/math/TestModular.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/examples/pi/math/TestSummation.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=790015&r1=790014&r2=790015&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed Jul  1 01:35:55 2009
@@ -25,6 +25,9 @@
     tasktracker nodes and blacklist nodes if they are unhealthy.
     (Sreekanth Ramakrishnan via yhemanth)
 
+    MAPREDUCE-637. Add an example, distbbp, which able to compute the n th bit
+    of Pi for some large n.  (szetszwo)
+
   IMPROVEMENTS
 
     HADOOP-5967. Sqoop should only use a single map task. (Aaron Kimball via

Modified: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java?rev=790015&r1=790014&r2=790015&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java (original)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java Wed Jul  1 01:35:55 2009
@@ -20,6 +20,7 @@
 
 import org.apache.hadoop.examples.dancing.DistributedPentomino;
 import org.apache.hadoop.examples.dancing.Sudoku;
+import org.apache.hadoop.examples.pi.DistBbp;
 import org.apache.hadoop.examples.terasort.TeraGen;
 import org.apache.hadoop.examples.terasort.TeraSort;
 import org.apache.hadoop.examples.terasort.TeraValidate;
@@ -51,6 +52,7 @@
       pgd.addClass("pi", PiEstimator.class,
                    "A map/reduce program that estimates Pi using quasi-Monte Carlo method.");
       pgd.addClass("bbp", BaileyBorweinPlouffe.class, BaileyBorweinPlouffe.DESCRIPTION);
+      pgd.addClass("distbbp", DistBbp.class, DistBbp.DESCRIPTION);
       pgd.addClass("pentomino", DistributedPentomino.class,
       "A map/reduce tile laying program to find solutions to pentomino problems.");
       pgd.addClass("secondarysort", SecondarySort.class,

Added: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/Combinable.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/Combinable.java?rev=790015&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/Combinable.java (added)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/Combinable.java Wed Jul  1 01:35:55 2009
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.examples.pi;
+
+/**
+ * A class is Combinable if its object can be combined with other objects.
+ * @param <T> The generic type
+ */
+public interface Combinable<T> extends Comparable<T> {
+  /**
+   * Combine this with that. 
+   * @param that Another object.
+   * @return The combined object.
+   */
+  public T combine(T that); 
+}
\ No newline at end of file

Added: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/Container.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/Container.java?rev=790015&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/Container.java (added)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/Container.java Wed Jul  1 01:35:55 2009
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.examples.pi;
+
+/**
+ * A class is a Container if it contains an element. 
+ * @param <T> The generic type
+ */
+public interface Container<T> {
+  /**
+   * @return The contained element.
+   */
+  public T getElement(); 
+}
\ No newline at end of file

Added: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/DistBbp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/DistBbp.java?rev=790015&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/DistBbp.java (added)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/DistBbp.java Wed Jul  1 01:35:55 2009
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.examples.pi;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.examples.pi.DistSum.Computation;
+import org.apache.hadoop.examples.pi.DistSum.Parameters;
+import org.apache.hadoop.examples.pi.math.Bellard;
+import org.apache.hadoop.examples.pi.math.Summation;
+import org.apache.hadoop.examples.pi.math.Bellard.Parameter;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * The main class for launching DistSum jobs to compute Pi.
+ * The steps are:
+ * 
+ * (1) Initialize parameters.
+ * (2) Create a list of sums.
+ * (3) Read computed values from the given local directory.
+ * (4) Remove the computed values from the sums.
+ * (5) Partition the remaining sums into computation jobs.
+ * (6) Summit the computation jobs to a cluster.
+ * (7) Write job outputs to the given local directory.
+ * (8) Combine the job outputs and print the Pi bits.
+ */
+/*
+ * The command line format is:
+ * > hadoop org.apache.hadoop.examples.pi.DistBbp \
+ *          <b> <nThreads> <nJobs> <type> <nPart> <remoteDir> <localDir>
+ * 
+ * And the parameters are:
+ *  <b>         The number of bits to skip, i.e. compute the (b+1)th position.
+ *  <nThreads>  The number of working threads.
+ *  <nJobs>     The number of jobs per sum.
+ *  <type>      'm' for map side job, 'r' for reduce side job, 'x' for mix type.
+ *  <nPart>     The number of parts per job.
+ *  <remoteDir> Remote directory for submitting jobs.
+ *  <localDir>  Local directory for storing output files.
+ *
+ * Note that it may take a long time to finish all the jobs when <b> is large.
+ * If the program is killed in the middle of the execution, the same command with
+ * a different <remoteDir> can be used to resume the execution.  For example, suppose
+ * we use the following command to compute the (10^15+57)th bit of Pi.
+ * 
+ * > hadoop org.apache.hadoop.examples.pi.DistBbp \
+ *          1,000,000,000,000,056 20 1000 x 500 remote/a local/output
+ *
+ * It uses 20 threads to summit jobs so that there are at most 20 concurrent jobs.
+ * Each sum (there are totally 14 sums) is partitioned into 1000 jobs.
+ * The jobs will be executed in map-side or reduce-side.  Each job has 500 parts.
+ * The remote directory for the jobs is remote/a and the local directory
+ * for storing output is local/output.  Depends on the cluster configuration,
+ * it may take many days to finish the entire execution.  If the execution is killed,
+ * we may resume it by
+ * 
+ * > hadoop org.apache.hadoop.examples.pi.DistBbp \
+ *          1,000,000,000,000,056 20 1000 x 500 remote/b local/output
+ */
+public final class DistBbp extends Configured implements Tool {
+  public static final String DESCRIPTION
+      = "A map/reduce program that uses a BBP-type formula to compute exact bits of Pi.";
+
+  private final Util.Timer timer = new Util.Timer(true);
+
+  /** {@inheritDoc} */
+  public int run(String[] args) throws Exception {
+    //parse arguments
+    if (args.length != DistSum.Parameters.COUNT + 1)
+      return Util.printUsage(args,
+          getClass().getName() + " <b> " + Parameters.LIST
+          + "\n  <b> The number of bits to skip, i.e. compute the (b+1)th position."
+          + Parameters.DESCRIPTION);
+
+    int i = 0;
+    final long b = Util.string2long(args[i++]);
+    final DistSum.Parameters parameters = DistSum.Parameters.parse(args, i);
+
+    if (b < 0)
+      throw new IllegalArgumentException("b = " + b + " < 0");
+    Util.printBitSkipped(b);
+    Util.out.println(parameters);
+    Util.out.println();
+
+    //initialize sums
+    final DistSum distsum = new DistSum();
+    distsum.setConf(getConf());
+    distsum.setParameters(parameters);
+    final boolean isVerbose = getConf().getBoolean(Parser.VERBOSE_PROPERTY, false);
+    final Map<Parameter, List<TaskResult>> existings = new Parser(isVerbose).parse(parameters.localDir.getPath(), null);
+    Parser.combine(existings);
+    for(List<TaskResult> tr : existings.values())
+      Collections.sort(tr);
+    Util.out.println();
+    final Map<Bellard.Parameter, Bellard.Sum> sums = Bellard.getSums(b, parameters.nJobs, existings);
+    Util.out.println();
+
+    //execute the computations
+    execute(distsum, sums);
+
+    //compute Pi from the sums 
+    final double pi = Bellard.computePi(b, sums);
+    Util.printBitSkipped(b);
+    Util.out.println(Util.pi2string(pi, Bellard.bit2terms(b)));
+    return 0;
+  }
+  
+  /** Execute DistSum computations */
+  private void execute(DistSum distsum,
+      final Map<Bellard.Parameter, Bellard.Sum> sums) throws Exception {
+    final List<Computation> computations = new ArrayList<Computation>();
+    int i = 0;
+    for(Bellard.Parameter p : Bellard.Parameter.values())
+      for(Summation s : sums.get(p))
+        if (s.getValue() == null)
+          computations.add(distsum.new Computation(i++, p.toString(), s));
+
+    if (computations.isEmpty())
+      Util.out.println("No computation");
+    else {
+      timer.tick("execute " + computations.size() + " computation(s)");
+      Util.execute(distsum.getParameters().nThreads, computations);
+      timer.tick("done");
+    }
+  }
+
+  /** main */
+  public static void main(String[] args) throws Exception {
+    System.exit(ToolRunner.run(null, new DistBbp(), args));
+  }
+}

Added: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/DistSum.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/DistSum.java?rev=790015&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/DistSum.java (added)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/DistSum.java Wed Jul  1 01:35:55 2009
@@ -0,0 +1,604 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.examples.pi;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.examples.pi.math.Summation;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.ClusterStatus;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * The main class for computing sums using map/reduce jobs.
+ * A sum is partitioned into jobs.
+ * A job may be executed on the map-side or on the reduce-side.
+ * A map-side job has multiple maps and zero reducer.
+ * A reduce-side job has one map and multiple reducers.
+ * Depending on the clusters status in runtime,
+ * a mix-type job may be executed on either side.
+ */
+public final class DistSum extends Configured implements Tool {
+  private static final Log LOG = LogFactory.getLog(DistSum.class);
+
+  private static final String NAME = DistSum.class.getSimpleName();
+  private static final String N_PARTS = NAME + ".nParts";
+  /////////////////////////////////////////////////////////////////////////////
+  /** DistSum job parameters */
+  static class Parameters {
+    static final int COUNT = 6;
+    static final String LIST = "<nThreads> <nJobs> <type> <nPart> <remoteDir> <localDir>";
+    static final String DESCRIPTION =
+        "\n  <nThreads> The number of working threads."
+      + "\n  <nJobs> The number of jobs per sum."
+      + "\n  <type> 'm' for map side job, 'r' for reduce side job, 'x' for mix type."
+      + "\n  <nPart> The number of parts per job."
+      + "\n  <remoteDir> Remote directory for submitting jobs."
+      + "\n  <localDir> Local directory for storing output files.";
+
+    /** Number of worker threads */
+    final int nThreads;
+    /** Number of jobs */
+    final int nJobs;
+    /** Number of parts per job */
+    final int nParts;
+    /** The machine used in the computation */
+    final Machine machine;
+    /** The remote job directory */
+    final String remoteDir;
+    /** The local output directory */
+    final File localDir;
+  
+    private Parameters(Machine machine, int nThreads, int nJobs, int nParts,
+        String remoteDir, File localDir) {
+      this.machine = machine;
+      this.nThreads = nThreads;
+      this.nJobs = nJobs;
+      this.nParts = nParts;
+      this.remoteDir = remoteDir;
+      this.localDir = localDir;
+    }
+
+    /** {@inheritDoc} */
+    public String toString() {
+      return "\nnThreads  = " + nThreads
+           + "\nnJobs     = " + nJobs
+           + "\nnParts    = " + nParts + " (" + machine + ")"
+           + "\nremoteDir = " + remoteDir
+           + "\nlocalDir  = " + localDir;
+    }
+
+    /** Parse parameters */
+    static Parameters parse(String[] args, int i) {
+      if (args.length - i < COUNT)
+        throw new IllegalArgumentException("args.length - i < COUNT = "
+            + COUNT + ", args.length="
+            + args.length + ", i=" + i + ", args=" + Arrays.asList(args));
+      
+      final int nThreads = Integer.parseInt(args[i++]);
+      final int nJobs = Integer.parseInt(args[i++]);
+      final String type = args[i++];
+      final int nParts = Integer.parseInt(args[i++]);
+      final String remoteDir = args[i++];
+      final File localDir = new File(args[i++]);
+
+      if (!"m".equals(type) && !"r".equals(type) && !"x".equals(type)) { 
+        throw new IllegalArgumentException("type=" + type + " is not equal to m, r or x");
+      } else if (nParts <= 0) {
+        throw new IllegalArgumentException("nParts = " + nParts + " <= 0");
+      } else if (nJobs <= 0) {
+        throw new IllegalArgumentException("nJobs = " + nJobs + " <= 0");
+      } else if (nThreads <= 0) {
+        throw new IllegalArgumentException("nThreads = " + nThreads + " <= 0");
+      }
+      Util.checkDirectory(localDir);
+
+      return new Parameters("m".equals(type)? MapSide.INSTANCE
+          : "r".equals(type)? ReduceSide.INSTANCE: MixMachine.INSTANCE,
+          nThreads, nJobs, nParts, remoteDir, localDir);
+    }
+  }
+  /////////////////////////////////////////////////////////////////////////////
+  /** Abstract machine for job execution. */
+  public static abstract class Machine {
+    /** Initialize a job */
+    abstract void init(Job job) throws IOException;
+    
+    /** {@inheritDoc} */
+    public String toString() {return getClass().getSimpleName();}
+
+    /** Compute sigma */
+    static void compute(Summation sigma,
+        TaskInputOutputContext<?, ?, NullWritable, TaskResult> context
+        ) throws IOException, InterruptedException {
+      String s;
+      LOG.info(s = "sigma=" + sigma);
+      context.setStatus(s);
+
+      final long start = System.currentTimeMillis();
+      sigma.compute();
+      final long duration = System.currentTimeMillis() - start;
+      final TaskResult result = new TaskResult(sigma, duration);
+
+      LOG.info(s = "result=" + result);
+      context.setStatus(s);
+      context.write(NullWritable.get(), result);
+    }
+
+    /** Split for the summations */
+    public static final class SummationSplit extends InputSplit implements Writable, Container<Summation> {
+      private final static String[] EMPTY = {};
+  
+      private Summation sigma;
+  
+      public SummationSplit() {}
+      private SummationSplit(Summation sigma) {this.sigma = sigma;}
+      /** {@inheritDoc} */
+      @Override
+      public Summation getElement() {return sigma;}
+      /** {@inheritDoc} */
+      @Override
+      public long getLength() {return 1;}
+      /** {@inheritDoc} */
+      @Override
+      public String[] getLocations() {return EMPTY;}
+  
+      /** {@inheritDoc} */
+      @Override
+      public void readFields(DataInput in) throws IOException {
+        sigma = SummationWritable.read(in);
+      }
+      /** {@inheritDoc} */
+      @Override
+      public void write(DataOutput out) throws IOException {
+        new SummationWritable(sigma).write(out);
+      }
+    }
+
+    /** An abstract InputFormat for the jobs */
+    public static abstract class AbstractInputFormat extends InputFormat<NullWritable, SummationWritable> {
+      /** Specify how to read the records */
+      @Override
+      public final RecordReader<NullWritable, SummationWritable> createRecordReader(
+          InputSplit generic, TaskAttemptContext context) {
+        final SummationSplit split = (SummationSplit)generic;
+  
+        //return a record reader
+        return new RecordReader<NullWritable, SummationWritable>() {
+          boolean done = false;
+  
+          /** {@inheritDoc} */
+          @Override
+          public void initialize(InputSplit split, TaskAttemptContext context) {}
+          /** {@inheritDoc} */
+          @Override
+          public boolean nextKeyValue() {return !done ? done = true : false;}
+          /** {@inheritDoc} */
+          @Override
+          public NullWritable getCurrentKey() {return NullWritable.get();}
+          /** {@inheritDoc} */
+          @Override
+          public SummationWritable getCurrentValue() {return new SummationWritable(split.getElement());}
+          /** {@inheritDoc} */
+          @Override
+          public float getProgress() {return done? 1f: 0f;}
+          /** {@inheritDoc} */
+          @Override
+          public void close() {}
+        };
+      }
+    }
+  }
+  /////////////////////////////////////////////////////////////////////////////
+  /**
+   * A machine which does computation on the map side.
+   */
+  public static class MapSide extends Machine {
+    private static final MapSide INSTANCE = new MapSide();
+
+    /** {@inheritDoc} */
+    @Override
+    public void init(Job job) {
+      // setup mapper
+      job.setMapperClass(SummingMapper.class);
+      job.setMapOutputKeyClass(NullWritable.class);
+      job.setMapOutputValueClass(TaskResult.class);
+
+      // zero reducer
+      job.setNumReduceTasks(0);
+
+      // setup input
+      job.setInputFormatClass(PartitionInputFormat.class);
+    }
+
+    /** An InputFormat which partitions a summation */
+    public static class PartitionInputFormat extends AbstractInputFormat {
+      /** Partitions the summation into parts and then return them as splits */
+      @Override
+      public List<InputSplit> getSplits(JobContext context) {
+        //read sigma from conf
+        final Configuration conf = context.getConfiguration();
+        final Summation sigma = SummationWritable.read(DistSum.class, conf); 
+        final int nParts = conf.getInt(N_PARTS, 0);
+  
+        //create splits
+        final List<InputSplit> splits = new ArrayList<InputSplit>(nParts);
+        final Summation[] parts = sigma.partition(nParts);
+        for(int i = 0; i < parts.length; ++i) {
+          splits.add(new SummationSplit(parts[i]));
+          //LOG.info("parts[" + i + "] = " + parts[i]);
+        }
+        return splits;
+      }
+    }
+  
+    /** A mapper which computes sums */
+    public static class SummingMapper extends
+        Mapper<NullWritable, SummationWritable, NullWritable, TaskResult> {
+      @Override
+      protected void map(NullWritable nw, SummationWritable sigma, final Context context
+          ) throws IOException, InterruptedException {
+        compute(sigma.getElement(), context);
+      }
+    }
+  }
+  /////////////////////////////////////////////////////////////////////////////
+  /**
+   * A machine which does computation on the reduce side.
+   */
+  public static class ReduceSide extends Machine {
+    private static final ReduceSide INSTANCE = new ReduceSide();
+
+    /** {@inheritDoc} */
+    @Override
+    public void init(Job job) {
+      // setup mapper
+      job.setMapperClass(PartitionMapper.class);
+      job.setMapOutputKeyClass(IntWritable.class);
+      job.setMapOutputValueClass(SummationWritable.class);
+
+      // setup partitioner
+      job.setPartitionerClass(IndexPartitioner.class);
+
+      // setup reducer
+      job.setReducerClass(SummingReducer.class);
+      job.setOutputKeyClass(NullWritable.class);
+      job.setOutputValueClass(TaskResult.class);
+      final Configuration conf = job.getConfiguration();
+      final int nParts = conf.getInt(N_PARTS, 1);
+      job.setNumReduceTasks(nParts);
+
+      // setup input
+      job.setInputFormatClass(SummationInputFormat.class);
+    }
+
+    /** An InputFormat which returns a single summation. */
+    public static class SummationInputFormat extends AbstractInputFormat {
+      /** @return a list containing a single split of summation */
+      @Override
+      public List<InputSplit> getSplits(JobContext context) {
+        //read sigma from conf
+        final Configuration conf = context.getConfiguration();
+        final Summation sigma = SummationWritable.read(DistSum.class, conf); 
+  
+        //create splits
+        final List<InputSplit> splits = new ArrayList<InputSplit>(1);
+        splits.add(new SummationSplit(sigma));
+        return splits;
+      }
+    }
+
+    /** A Mapper which partitions a summation */
+    public static class PartitionMapper extends
+        Mapper<NullWritable, SummationWritable, IntWritable, SummationWritable> {
+      /** Partitions sigma into parts */
+      @Override
+      protected void map(NullWritable nw, SummationWritable sigma, final Context context
+          ) throws IOException, InterruptedException {
+        final Configuration conf = context.getConfiguration();
+        final int nParts = conf.getInt(N_PARTS, 0);
+        final Summation[] parts = sigma.getElement().partition(nParts);
+        for(int i = 0; i < parts.length; ++i) {
+          context.write(new IntWritable(i), new SummationWritable(parts[i]));
+          LOG.info("parts[" + i + "] = " + parts[i]);
+        }
+      }
+    }
+
+    /** Use the index for partitioning. */
+    public static class IndexPartitioner extends Partitioner<IntWritable, SummationWritable> {
+      /** Return the index as the partition. */
+      @Override
+      public int getPartition(IntWritable index, SummationWritable value, int numPartitions) {
+        return index.get();
+      }
+    }    
+
+    /** A Reducer which computes sums */
+    public static class SummingReducer extends
+        Reducer<IntWritable, SummationWritable, NullWritable, TaskResult> {
+      @Override
+      protected void reduce(IntWritable index, Iterable<SummationWritable> sums,
+          Context context) throws IOException, InterruptedException {
+        LOG.info("index=" + index);
+        for(SummationWritable sigma : sums)
+          compute(sigma.getElement(), context);
+      }
+    }
+  }
+  /////////////////////////////////////////////////////////////////////////////
+  /**
+   * A machine which chooses Machine in runtime according to the cluster status
+   */
+  public static class MixMachine extends Machine {
+    private static final MixMachine INSTANCE = new MixMachine();
+    
+    private JobClient jobclient;
+
+    /** {@inheritDoc} */
+    @Override
+    public synchronized void init(Job job) throws IOException {
+      final Configuration conf = job.getConfiguration();
+      if (jobclient == null)
+        jobclient = new JobClient(JobTracker.getAddress(conf), conf);
+      chooseMachine(conf).init(job);
+    }
+
+    /**
+     * Choose a Machine in runtime according to the cluster status.
+     */
+    private Machine chooseMachine(Configuration conf) throws IOException {
+      final int parts = conf.getInt(N_PARTS, Integer.MAX_VALUE);
+      try {
+        for(;; Thread.sleep(2000)) {
+          //get cluster status
+          final ClusterStatus status = jobclient.getClusterStatus();
+          final int m = status.getMaxMapTasks() - status.getMapTasks();
+          final int r = status.getMaxReduceTasks() - status.getReduceTasks();
+          if (m >= parts || r >= parts) {
+            //favor ReduceSide machine
+            final Machine value = r >= parts?
+                ReduceSide.INSTANCE: MapSide.INSTANCE;
+            Util.out.println("  " + this + " is " + value + " (m=" + m + ", r=" + r + ")");
+            return value;
+          }
+        }
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }    
+    }
+
+  }
+  /////////////////////////////////////////////////////////////////////////////
+  private final Util.Timer timer = new Util.Timer(true);  
+  private Parameters parameters;
+
+  /** Get Parameters */
+  Parameters getParameters() {return parameters;}
+  /** Set Parameters */
+  void setParameters(Parameters p) {parameters = p;}
+
+  /** Create a job */
+  private Job createJob(String name, Summation sigma) throws IOException {
+    final Job job = new Job(getConf(), parameters.remoteDir + "/" + name);
+    final Configuration jobconf = job.getConfiguration();
+    job.setJarByClass(DistSum.class);
+    jobconf.setInt(N_PARTS, parameters.nParts);
+    SummationWritable.write(sigma, DistSum.class, jobconf);
+
+    // disable task timeout
+    jobconf.setLong("mapred.task.timeout", 0);
+    // do not use speculative execution
+    jobconf.setBoolean("mapred.map.tasks.speculative.execution", false);
+    jobconf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
+
+    return job; 
+  }
+
+  /** Start a job to compute sigma */
+  private void compute(final String name, Summation sigma) throws IOException {
+    if (sigma.getValue() != null)
+      throw new IOException("sigma.getValue() != null, sigma=" + sigma);
+
+    //setup remote directory
+    final FileSystem fs = FileSystem.get(getConf());
+    final Path dir = fs.makeQualified(new Path(parameters.remoteDir, name));
+    if (!Util.createNonexistingDirectory(fs, dir))
+      return;
+
+    //setup a job
+    final Job job = createJob(name, sigma);
+    final Path outdir = new Path(dir, "out");
+    FileOutputFormat.setOutputPath(job, outdir);
+
+    //start a map/reduce job
+    final String startmessage = "steps/parts = "
+      + sigma.E.getSteps() + "/" + parameters.nParts
+      + " = " + Util.long2string(sigma.E.getSteps()/parameters.nParts);
+    Util.runJob(name, job, parameters.machine, startmessage, timer);
+    final List<TaskResult> results = Util.readJobOutputs(fs, outdir);
+    Util.writeResults(name, results, fs, parameters.remoteDir);
+    fs.delete(dir, true);
+
+    //combine results
+    final List<TaskResult> combined = Util.combine(results);
+    final PrintWriter out = Util.createWriter(parameters.localDir, name);
+    try {
+      for(TaskResult r : combined) {
+        final String s = taskResult2string(name, r);
+        out.println(s);
+        out.flush();
+        Util.out.println(s);
+      }
+    } finally {
+      out.close();
+    }
+    if (combined.size() == 1) {
+      final Summation s = combined.get(0).getElement();
+      if (sigma.contains(s) && s.contains(sigma))
+        sigma.setValue(s.getValue());
+    }
+  }
+
+  /** Convert a TaskResult to a String */
+  public static String taskResult2string(String name, TaskResult result) {
+    return NAME + " " + name + "> " + result;
+  }
+
+  /** Convert a String to a (String, TaskResult) pair */
+  public static Map.Entry<String, TaskResult> string2TaskResult(final String s) {
+    //  LOG.info("line = " + line);
+    int j = s.indexOf(NAME);
+    if (j == 0) {
+      int i = j + NAME.length() + 1;
+      j = s.indexOf("> ", i);
+      final String key = s.substring(i, j);
+      final TaskResult value = TaskResult.valueOf(s.substring(j + 2));
+      return new Map.Entry<String, TaskResult>(){
+        @Override
+        public String getKey() {return key;}
+        @Override
+        public TaskResult getValue() {return value;}
+        @Override
+        public TaskResult setValue(TaskResult value) {
+          throw new UnsupportedOperationException();
+        }
+      };
+    }
+    return null;
+  }
+
+  /** Callable computation */
+  class Computation implements Callable<Computation> {
+    private final int index;
+    private final String name;
+    private final Summation sigma;
+
+    Computation(int index, String name, Summation sigma) {
+      this.index = index;
+      this.name = name;
+      this.sigma = sigma;
+    }
+
+    /** @return The job name */
+    String getJobName() {return String.format("%s.job%03d", name, index);}
+
+    /** {@inheritDoc} */
+    @Override
+    public String toString() {return getJobName() + sigma;}
+
+    /** Start the computation */
+    @Override
+    public Computation call() {
+      if (sigma.getValue() == null)
+        try {
+          compute(getJobName(), sigma);
+        } catch(Exception e) {
+          Util.out.println("ERROR: Got an exception from " + getJobName());
+          e.printStackTrace(Util.out);
+        }
+      return this;
+    }
+  }
+
+  /** Partition sigma and execute the computations. */
+  private Summation execute(String name, Summation sigma) {
+    final Summation[] summations = sigma.partition(parameters.nJobs);
+    final List<Computation> computations = new ArrayList<Computation>(); 
+    for(int i = 0; i < summations.length; i++)
+      computations.add(new Computation(i, name, summations[i]));
+    try {
+      Util.execute(parameters.nThreads, computations);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    final List<Summation> combined = Util.combine(Arrays.asList(summations));
+    return combined.size() == 1? combined.get(0): null;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public int run(String[] args) throws Exception {
+    //parse arguments
+    if (args.length != Parameters.COUNT + 2)
+      return Util.printUsage(args, getClass().getName()
+          + " <name> <sigma> " + Parameters.LIST
+          + "\n  <name> The name."
+          + "\n  <sigma> The summation."
+          + Parameters.DESCRIPTION);
+
+    int i = 0;
+    final String name = args[i++];
+    final Summation sigma = Summation.valueOf(args[i++]);
+    setParameters(DistSum.Parameters.parse(args, i));
+
+    Util.out.println();
+    Util.out.println("name  = " + name);
+    Util.out.println("sigma = " + sigma);
+    Util.out.println(parameters);
+    Util.out.println();
+
+    //run jobs
+    final Summation result = execute(name, sigma);
+    if (result.equals(sigma)) {
+      sigma.setValue(result.getValue());
+      timer.tick("\n\nDONE\n\nsigma=" + sigma);
+      return 0;
+    } else {
+      timer.tick("\n\nDONE WITH ERROR\n\nresult=" + result);
+      return 1;
+    }
+  }
+
+  /** main */
+  public static void main(String[] args) throws Exception {
+    System.exit(ToolRunner.run(null, new DistSum(), args));
+  }
+}
\ No newline at end of file

Added: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/Parser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/Parser.java?rev=790015&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/Parser.java (added)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/Parser.java Wed Jul  1 01:35:55 2009
@@ -0,0 +1,187 @@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.examples.pi;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.examples.pi.math.Bellard;
+import org.apache.hadoop.examples.pi.math.Bellard.Parameter;
+
+/** A class for parsing outputs */
+public final class Parser {
+  static final String VERBOSE_PROPERTY = "pi.parser.verbose";
+
+  final boolean isVerbose;
+  
+  public Parser(boolean isVerbose) {
+    this.isVerbose = isVerbose;
+  }
+  
+  private void println(String s) {
+    if (isVerbose)
+      Util.out.println(s);
+  }
+
+  /** Parse a line */
+  private static void parseLine(final String line, Map<Parameter, List<TaskResult>> m) {
+//      LOG.info("line = " + line);
+    final Map.Entry<String, TaskResult> e = DistSum.string2TaskResult(line);
+    if (e != null) {
+      final List<TaskResult> sums = m.get(Parameter.get(e.getKey()));
+      if (sums == null)
+        throw new IllegalArgumentException("sums == null, line=" + line + ", e=" + e);
+      sums.add(e.getValue());
+    }
+  }
+
+  /** Parse a file or a directory tree */
+  private void parse(File f, Map<Parameter, List<TaskResult>> sums) throws IOException {
+    if (f.isDirectory()) {
+      println("Process directory " + f);
+      for(File child : f.listFiles())
+        parse(child, sums);
+    } else if (f.getName().endsWith(".txt")) {
+      println("Parse file " + f);
+      final Map<Parameter, List<TaskResult>> m = new TreeMap<Parameter, List<TaskResult>>();    
+      for(Parameter p : Parameter.values())
+        m.put(p, new ArrayList<TaskResult>());
+
+      final BufferedReader in = new BufferedReader(new FileReader(f)); 
+      try {
+        for(String line; (line = in.readLine()) != null; )
+          try {
+            parseLine(line, m);
+          } catch(RuntimeException e) {
+            Util.err.println("line = " + line);
+            throw e;
+          }
+      } finally {
+        in.close();
+      }
+
+      for(Parameter p : Parameter.values()) {
+        final List<TaskResult> combined = Util.combine(m.get(p));
+        if (!combined.isEmpty()) {
+          println(p + " (size=" + combined.size() + "):");
+          for(TaskResult r : combined)
+            println("  " + r);
+        }
+        sums.get(p).addAll(m.get(p));
+      }
+    }
+  }
+
+  /** Parse a path */
+  private Map<Parameter, List<TaskResult>> parse(String f) throws IOException {
+    final Map<Parameter, List<TaskResult>> m = new TreeMap<Parameter, List<TaskResult>>();
+    for(Parameter p : Parameter.values())
+      m.put(p, new ArrayList<TaskResult>());
+    parse(new File(f), m);
+
+    //LOG.info("m=" + m.toString().replace(", ", ",\n  "));
+    for(Parameter p : Parameter.values())
+      m.put(p, m.get(p));
+    return m;
+  }
+
+  /** Parse input and re-write results. */
+  Map<Parameter, List<TaskResult>> parse(String inputpath, String outputdir
+      ) throws IOException {
+    //parse input
+    Util.out.print("\nParsing " + inputpath + " ... ");
+    Util.out.flush();
+    final Map<Parameter, List<TaskResult>> parsed = parse(inputpath);
+    Util.out.println("DONE");
+
+    //re-write the results
+    if (outputdir != null) {
+      Util.out.print("\nWriting to " + outputdir + " ...");
+      Util.out.flush();
+      for(Parameter p : Parameter.values()) {
+        final List<TaskResult> results = parsed.get(p);
+        Collections.sort(results);
+
+        final PrintWriter out = new PrintWriter(
+            new FileWriter(new File(outputdir, p + ".txt")), true);
+        try {
+          for(int i = 0; i < results.size(); i++)
+            out.println(DistSum.taskResult2string(p + "." + i, results.get(i)));
+        }
+        finally {
+          out.close();
+        }
+      }
+      Util.out.println("DONE");
+    }
+    return parsed;
+  }
+
+  /** Combine results */
+  static <T extends Combinable<T>> Map<Parameter, T> combine(Map<Parameter, List<T>> m) {
+    final Map<Parameter, T> combined = new TreeMap<Parameter, T>();
+    for(Parameter p : Parameter.values()) {
+      final List<T> results = Util.combine(m.get(p));
+      Util.out.format("%-6s => ", p); 
+      if (results == null)
+        Util.out.println("null");
+      else if (results.size() != 1) 
+        Util.out.println(results.toString().replace(", ", ",\n           "));
+      else {
+        final T r = results.get(0);
+        combined.put(p, r); 
+        Util.out.println(r);
+      }
+    }
+    return combined;
+  }
+
+  /** main */
+  public static void main(String[] args) throws IOException {
+    if (args.length < 2 || args.length > 3)
+      Util.printUsage(args, Parser.class.getName()
+          + " <b> <inputpath> [<outputdir>]");
+
+    int i = 0;
+    final long b = Util.string2long(args[i++]);
+    final String inputpath = args[i++];
+    final String outputdir = args.length >= 3? args[i++]: null;
+
+    //read input
+    final Map<Parameter, List<TaskResult>> parsed = new Parser(true).parse(inputpath, outputdir);
+    final Map<Parameter, TaskResult> combined = combine(parsed);
+    long duration = 0;
+    for(TaskResult r : combined.values())
+      duration += r.getDuration();
+
+    //print pi
+    final double pi = Bellard.computePi(b, combined);
+    Util.printBitSkipped(b);
+    Util.out.println(Util.pi2string(pi, Bellard.bit2terms(b)));
+    Util.out.println("cpu time = " + Util.millis2String(duration));
+  }
+}

Added: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/SummationWritable.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/SummationWritable.java?rev=790015&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/SummationWritable.java (added)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/SummationWritable.java Wed Jul  1 01:35:55 2009
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.examples.pi;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.pi.math.ArithmeticProgression;
+import org.apache.hadoop.examples.pi.math.Summation;
+import org.apache.hadoop.io.WritableComparable;
+
+/** A Writable class for Summation */
+public final class SummationWritable implements WritableComparable<SummationWritable>, Container<Summation> {
+  private Summation sigma;
+
+  public SummationWritable() {}
+  
+  SummationWritable(Summation sigma) {this.sigma = sigma;}
+
+  /** {@inheritDoc} */
+  @Override
+  public String toString() {return getClass().getSimpleName() + sigma;}
+
+  /** {@inheritDoc} */
+  @Override
+  public Summation getElement() {return sigma;}
+
+  /** Read sigma from conf */
+  public static Summation read(Class<?> clazz, Configuration conf) {
+    return Summation.valueOf(conf.get(clazz.getSimpleName() + ".sigma")); 
+  }
+
+  /** Write sigma to conf */
+  public static void write(Summation sigma, Class<?> clazz, Configuration conf) {
+    conf.set(clazz.getSimpleName() + ".sigma", sigma.toString());
+  }
+
+  /** Read Summation from DataInput */
+  static Summation read(DataInput in) throws IOException {
+    final SummationWritable s = new SummationWritable();
+    s.readFields(in);
+    return s.getElement();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    final ArithmeticProgression N = ArithmeticProgressionWritable.read(in);
+    final ArithmeticProgression E = ArithmeticProgressionWritable.read(in);
+    sigma = new Summation(N, E); 
+
+    if (in.readBoolean()) {
+      sigma.setValue(in.readDouble());
+    }
+  }
+
+  /** Write sigma to DataOutput */
+  public static void write(Summation sigma, DataOutput out) throws IOException {
+    ArithmeticProgressionWritable.write(sigma.N, out);
+    ArithmeticProgressionWritable.write(sigma.E, out);
+
+    final Double v = sigma.getValue();
+    if (v == null)
+      out.writeBoolean(false);
+    else {
+      out.writeBoolean(true);
+      out.writeDouble(v);
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void write(DataOutput out) throws IOException {
+    write(sigma, out);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public int compareTo(SummationWritable that) {
+    return this.sigma.compareTo(that.sigma);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    else if (obj != null && obj instanceof SummationWritable) {
+      final SummationWritable that = (SummationWritable)obj;
+      return this.compareTo(that) == 0;
+    }
+    throw new IllegalArgumentException(obj == null? "obj == null":
+      "obj.getClass()=" + obj.getClass());
+  }
+
+  /** Not supported */
+  @Override
+  public int hashCode() {
+    throw new UnsupportedOperationException();
+  }
+
+  /** A writable class for ArithmeticProgression */
+  private static class ArithmeticProgressionWritable {
+    /** Read ArithmeticProgression from DataInput */
+    private static ArithmeticProgression read(DataInput in) throws IOException {
+      return new ArithmeticProgression(in.readChar(),
+          in.readLong(), in.readLong(), in.readLong());
+    }
+
+    /** Write ArithmeticProgression to DataOutput */
+    private static void write(ArithmeticProgression ap, DataOutput out
+        ) throws IOException {
+      out.writeChar(ap.symbol);
+      out.writeLong(ap.value);
+      out.writeLong(ap.delta);
+      out.writeLong(ap.limit);
+    }
+  }
+}
\ No newline at end of file

Added: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/TaskResult.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/TaskResult.java?rev=790015&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/TaskResult.java (added)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/TaskResult.java Wed Jul  1 01:35:55 2009
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.examples.pi;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.examples.pi.math.Summation;
+import org.apache.hadoop.io.Writable;
+
+/** A class for map task results or reduce task results. */
+public class TaskResult implements Container<Summation>, Combinable<TaskResult>, Writable {
+  private Summation sigma;
+  private long duration;
+
+  public TaskResult() {}
+
+  TaskResult(Summation sigma, long duration) {
+    this.sigma = sigma;
+    this.duration = duration;      
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public Summation getElement() {return sigma;}
+
+  /** @return The time duration used */
+  long getDuration() {return duration;}
+
+  /** {@inheritDoc} */
+  @Override
+  public int compareTo(TaskResult that) {
+    return this.sigma.compareTo(that.sigma);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    else if (obj != null && obj instanceof TaskResult) {
+      final TaskResult that = (TaskResult)obj;
+      return this.compareTo(that) == 0;
+    }
+    throw new IllegalArgumentException(obj == null? "obj == null":
+      "obj.getClass()=" + obj.getClass());
+  }
+
+  /** Not supported */
+  @Override
+  public int hashCode() {
+    throw new UnsupportedOperationException();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public TaskResult combine(TaskResult that) {
+    final Summation s = sigma.combine(that.sigma);
+    return s == null? null: new TaskResult(s, this.duration + that.duration);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    sigma = SummationWritable.read(in);
+    duration = in.readLong();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void write(DataOutput out) throws IOException {
+    SummationWritable.write(sigma, out);
+    out.writeLong(duration);
+  }
+  
+  /** {@inheritDoc} */
+  @Override
+  public String toString() {
+    return "sigma=" + sigma + ", duration=" + duration + "(" + Util.millis2String(duration) + ")";
+  }
+
+  /** Covert a String to a TaskResult */
+  public static TaskResult valueOf(String s) {
+    int i = 0;
+    int j = s.indexOf(", duration=");
+    if (j < 0)
+      throw new IllegalArgumentException("i=" + i + ", j=" + j + " < 0, s=" + s);
+    final Summation sigma = Summation.valueOf(Util.parseStringVariable("sigma", s.substring(i, j)));
+
+    i = j + 2;
+    j = s.indexOf("(", i);
+    if (j < 0)
+      throw new IllegalArgumentException("i=" + i + ", j=" + j + " < 0, s=" + s);
+    final long duration = Util.parseLongVariable("duration", s.substring(i, j));
+
+    return new TaskResult(sigma, duration);
+  }
+}
\ No newline at end of file

Added: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/Util.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/Util.java?rev=790015&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/Util.java (added)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/Util.java Wed Jul  1 01:35:55 2009
@@ -0,0 +1,329 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.examples.pi;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+
+import org.apache.hadoop.examples.pi.DistSum.Machine;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.ToolRunner;
+
+/** Utility methods */
+public class Util {
+  /** Output stream */
+  public static final PrintStream out = System.out; 
+  /** Error stream */
+  public static final PrintStream err = System.out; 
+
+  /** Timer */
+  public static class Timer {
+    private final boolean isAccumulative;
+    private final long start = System.currentTimeMillis();
+    private long previous = start;
+  
+    /** Timer constructor
+     * @param isAccumulative  Is accumulating the time duration?
+     */
+    public Timer(boolean isAccumulative) {
+      this.isAccumulative = isAccumulative;
+      final StackTraceElement[] stack = Thread.currentThread().getStackTrace();
+      final StackTraceElement e = stack[stack.length - 1];
+      out.println(e + " started at " + new Date(start));
+    }
+  
+    /** Same as tick(null). */
+    public long tick() {return tick(null);}
+
+    /**
+     * Tick
+     * @param s Output message.  No output if it is null.
+     * @return delta
+     */
+    public synchronized long tick(String s) {
+      final long t = System.currentTimeMillis();
+      final long delta = t - (isAccumulative? start: previous);
+      if (s != null) {
+        out.format("%15dms (=%-15s: %s\n", delta, millis2String(delta) + ")", s);
+        out.flush();
+      }
+      previous = t;
+      return delta;
+    }
+  }
+    
+  /** Covert milliseconds to a String. */
+  public static String millis2String(long n) {
+    if (n < 0)
+      return "-" + millis2String(-n);
+    else if (n < 1000)
+      return n + "ms";
+
+    final StringBuilder b = new StringBuilder();
+    final int millis = (int)n % 1000;
+    if (millis != 0)
+      b.append(String.format(".%03d", millis)); 
+    if ((n /= 1000) < 60)
+      return b.insert(0, n).append("s").toString();
+
+    b.insert(0, String.format(":%02d", (int)(n % 60L)));
+    if ((n /= 60) < 60)
+      return b.insert(0, n).toString();
+
+    b.insert(0, String.format(":%02d", (int)(n % 60L)));
+    if ((n /= 60) < 24)
+      return b.insert(0, n).toString();
+
+    b.insert(0, n % 24L);
+    final int days = (int)((n /= 24) % 365L);
+    b.insert(0, days == 1? " day ": " days ").insert(0, days);
+    if ((n /= 365L) > 0)
+      b.insert(0, n == 1? " year ": " years ").insert(0, n);
+
+    return b.toString();
+  }
+
+  /** Covert a String to a long.  
+   * This support comma separated number format.
+   */
+  public static long string2long(String s) {
+    return Long.parseLong(s.trim().replace(",", ""));
+  }
+
+  /** Covert a long to a String in comma separated number format. */  
+  public static String long2string(long n) {
+    if (n < 0)
+      return "-" + long2string(-n);
+    
+    final StringBuilder b = new StringBuilder();
+    for(; n >= 1000; n = n/1000)
+      b.insert(0, String.format(",%03d", n % 1000));
+    return n + b.toString();    
+  }
+
+  /** Parse a variable. */  
+  public static long parseLongVariable(final String name, final String s) {
+    return string2long(parseStringVariable(name, s));
+  }
+
+  /** Parse a variable. */  
+  public static String parseStringVariable(final String name, final String s) {
+    if (!s.startsWith(name + '='))
+      throw new IllegalArgumentException("!s.startsWith(name + '='), name="
+          + name + ", s=" + s);
+    return s.substring(name.length() + 1);
+  }
+
+  /** Execute the callables by a number of threads */
+  public static <T, E extends Callable<T>> void execute(int nThreads, List<E> callables
+      ) throws InterruptedException, ExecutionException {
+    final ExecutorService executor = Executors.newFixedThreadPool(nThreads); 
+    final List<Future<T>> futures = executor.invokeAll(callables);
+    for(Future<T> f : futures)
+      f.get();
+  }
+
+  /** Print usage messages */
+  public static int printUsage(String[] args, String usage) {
+    err.println("args = " + Arrays.asList(args));
+    err.println();
+    err.println("Usage: java " + usage);
+    err.println();
+    ToolRunner.printGenericCommandUsage(err);
+    return -1;
+  }
+
+  /** Combine a list of items. */
+  public static <T extends Combinable<T>> List<T> combine(Collection<T> items) {
+    final List<T> sorted = new ArrayList<T>(items);
+    if (sorted.size() <= 1)
+      return sorted;
+
+    Collections.sort(sorted);
+    final List<T> combined = new ArrayList<T>(items.size());
+    T prev = sorted.get(0);
+    for(int i = 1; i < sorted.size(); i++) {
+      final T curr = sorted.get(i);
+      final T c = curr.combine(prev);
+
+      if (c != null)
+        prev = c;
+      else {
+        combined.add(prev);
+        prev = curr;
+      }
+    }
+    combined.add(prev);
+    return combined;
+  }
+
+  /** Check local directory. */
+  public static void checkDirectory(File dir) {
+    if (!dir.exists())
+      if (!dir.mkdirs())
+        throw new IllegalArgumentException("!dir.mkdirs(), dir=" + dir);
+    if (!dir.isDirectory())
+      throw new IllegalArgumentException("dir (=" + dir + ") is not a directory.");
+  }
+
+  private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("-yyyyMMdd-HHmmssSSS");
+  /** Create a writer of a local file. */
+  public static PrintWriter createWriter(File dir, String prefix) throws IOException {
+    checkDirectory(dir);
+
+    for(;;) {
+      final File f = new File(dir,
+          prefix + DATE_FORMAT.format(new Date(System.currentTimeMillis())) + ".txt");
+      if (!f.exists())
+        return new PrintWriter(new FileWriter(f));
+
+      try {Thread.sleep(10);} catch (InterruptedException e) {}
+    }
+  }
+
+  /** Print a "bits skipped" message. */
+  public static void printBitSkipped(final long b) {
+    out.println();
+    out.println("b = " + long2string(b)
+        + " (" + (b < 2? "bit": "bits") + " skipped)");
+  }
+
+  /** Convert a pi value to a String. */
+  public static String pi2string(final double pi, final long terms) {
+    final long value = (long)(pi * (1L << DOUBLE_PRECISION));
+    final int acc_bit = accuracy(terms, false);
+    final int acc_hex = acc_bit/4;
+    final int shift = DOUBLE_PRECISION - acc_bit;
+    return String.format("%0" + acc_hex + "X %0" + (13-acc_hex) + "X (%d hex digits)",
+        value >> shift, value & ((1 << shift) - 1), acc_hex);
+  }
+
+  static final int DOUBLE_PRECISION = 52; //mantissa size
+  static final int MACHEPS_EXPONENT = DOUBLE_PRECISION + 1;
+  /** Estimate accuracy. */
+  public static int accuracy(final long terms, boolean print) {
+    final double error = terms <= 0? 2: (Math.log(terms) / Math.log(2)) / 2;
+    final int bits = MACHEPS_EXPONENT - (int)Math.ceil(error);
+    if (print)
+      out.println("accuracy: bits=" + bits + ", terms=" + long2string(terms) + ", error exponent=" + error);
+    return bits - bits%4;
+  }
+
+  private static final String JOB_SEPARATION_PROPERTY = "pi.job.separation.seconds";
+  private static final Semaphore JOB_SEMAPHORE = new Semaphore(1);
+
+  /** Run a job. */
+  static void runJob(String name, Job job, Machine machine, String startmessage, Util.Timer timer) {
+    JOB_SEMAPHORE.acquireUninterruptibly();
+    Long starttime = null;
+    try {
+      try {
+        starttime = timer.tick("starting " + name + " ...\n  " + startmessage);
+
+        //initialize and submit a job
+        machine.init(job);
+        job.submit();
+  
+        // Separate jobs
+        final long sleeptime = 1000L * job.getConfiguration().getInt(JOB_SEPARATION_PROPERTY, 10);
+        if (sleeptime > 0) {
+          Util.out.println(name + "> sleep(" + Util.millis2String(sleeptime) + ")");
+          Thread.sleep(sleeptime);
+        }
+      } finally {
+        JOB_SEMAPHORE.release();
+      }
+  
+      if (!job.waitForCompletion(false))
+        throw new RuntimeException(name + " failed.");
+    } catch(Exception e) {
+      throw e instanceof RuntimeException? (RuntimeException)e: new RuntimeException(e);
+    } finally {
+      if (starttime != null)
+        timer.tick(name + "> timetaken=" + Util.millis2String(timer.tick() - starttime));
+    }
+  }
+
+  /** Read job outputs */
+  static List<TaskResult> readJobOutputs(FileSystem fs, Path outdir) throws IOException {
+    final List<TaskResult> results = new ArrayList<TaskResult>();
+    for(FileStatus status : fs.listStatus(outdir)) {
+      if (status.getPath().getName().startsWith("part-")) {
+        final BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(status.getPath())));
+        try {
+          for(String line; (line = in.readLine()) != null; )
+            results.add(TaskResult.valueOf(line));
+        }
+        finally {
+          in.close();
+        }
+      }
+    }
+    if (results.isEmpty())
+      throw new IOException("Output not found");
+    return results;
+  }
+  
+  /** Write results */
+  static void writeResults(String name, List<TaskResult> results, FileSystem fs, String dir) throws IOException {
+    final Path outfile = new Path(dir, name + ".txt");
+    Util.out.println(name + "> writing results to " + outfile);
+    final PrintStream out = new PrintStream(fs.create(outfile), true);
+    try {
+      for(TaskResult r : results)
+        out.println(r);
+    }
+    finally {
+      out.close();
+    }
+  }
+
+  /** Create a directory. */
+  static boolean createNonexistingDirectory(FileSystem fs, Path dir) throws IOException {
+    if (fs.exists(dir)) {
+      Util.err.println("dir (= " + dir + ") already exists.");
+      return false;
+    } else if (!fs.mkdirs(dir)) {
+      throw new IOException("Cannot create working directory " + dir);
+    }
+    fs.setPermission(dir, new FsPermission((short)0777));
+    return true;
+  }
+}
\ No newline at end of file

Added: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/math/ArithmeticProgression.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/math/ArithmeticProgression.java?rev=790015&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/math/ArithmeticProgression.java (added)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/math/ArithmeticProgression.java Wed Jul  1 01:35:55 2009
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.examples.pi.math;
+
+import org.apache.hadoop.examples.pi.Util;
+
+/** An arithmetic progression */
+public class ArithmeticProgression implements Comparable<ArithmeticProgression> {
+  /** A symbol */
+  public final char symbol;
+  /** Starting value */
+  public final long value;
+  /** Difference between terms */
+  public final long delta;
+  /** Ending value */
+  public final long limit;
+
+  /** Constructor */
+  public ArithmeticProgression(char symbol, long value, long delta, long limit) {
+    if (delta == 0)
+      throw new IllegalArgumentException("delta == 0");
+
+    this.symbol = symbol;
+    this.value = value;
+    this.delta = delta;
+    this.limit = limit;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    else if (obj != null && obj instanceof ArithmeticProgression) {
+      final ArithmeticProgression that = (ArithmeticProgression)obj;
+      if (this.symbol != that.symbol)
+        throw new IllegalArgumentException("this.symbol != that.symbol, this="
+            + this + ", that=" + that);
+      return this.value == that.value
+          && this.delta == that.delta
+          && this.limit == that.limit;
+    }
+    throw new IllegalArgumentException(obj == null? "obj == null":
+      "obj.getClass()=" + obj.getClass());
+  }
+
+  /** Not supported */
+  public int hashCode() {
+    throw new UnsupportedOperationException();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public int compareTo(ArithmeticProgression that) {
+    if (this.symbol != that.symbol)
+      throw new IllegalArgumentException("this.symbol != that.symbol, this="
+          + this + ", that=" + that);
+    if (this.delta != that.delta)
+      throw new IllegalArgumentException("this.delta != that.delta, this="
+          + this + ", that=" + that);
+    final long d = this.limit - that.limit;
+    return d > 0? 1: d == 0? 0: -1;
+  }
+
+  /** Does this contain that? */
+  boolean contains(ArithmeticProgression that) {
+    if (this.symbol != that.symbol)
+      throw new IllegalArgumentException("this.symbol != that.symbol, this="
+          + this + ", that=" + that);
+    if (this.delta == that.delta) {
+      if (this.value == that.value)
+        return this.getSteps() >= that.getSteps();
+      else if (this.delta < 0)
+        return this.value > that.value && this.limit <= that.limit;
+      else if (this.delta > 0)
+        return this.value < that.value && this.limit >= that.limit;
+    }
+    return false;    
+  }
+
+  /** Skip some steps */
+  long skip(long steps) {
+    if (steps < 0)
+      throw new IllegalArgumentException("steps < 0, steps=" + steps);
+    return value + steps*delta; 
+  }
+
+  /** Get the number of steps */
+  public long getSteps() {
+    return (limit - value)/delta;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public String toString() {
+    return symbol + ":value=" + value + ",delta=" + delta + ",limit=" + limit;
+  }
+
+  /** Convert a String to an ArithmeticProgression. */
+  static ArithmeticProgression valueOf(final String s) {
+    int i = 2;
+    int j = s.indexOf(",delta=");
+    final long value = Util.parseLongVariable("value", s.substring(2, j));
+    i = j + 1;
+    j = s.indexOf(",limit=");
+    final long delta = Util.parseLongVariable("delta", s.substring(i, j));
+    i = j + 1;
+    final long limit = Util.parseLongVariable("limit", s.substring(i));
+    return new ArithmeticProgression(s.charAt(0), value, delta, limit);
+  }
+}
\ No newline at end of file

Added: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/math/Bellard.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/math/Bellard.java?rev=790015&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/math/Bellard.java (added)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/math/Bellard.java Wed Jul  1 01:35:55 2009
@@ -0,0 +1,336 @@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.examples.pi.math;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.examples.pi.Container;
+import org.apache.hadoop.examples.pi.Util;
+
+/**
+ * Bellard's BBP-type Pi formula
+ * 1/2^6 \sum_{n=0}^\infty (-1)^n/2^{10n}
+ * (-2^5/(4n+1) -1/(4n+3) +2^8/(10n+1) -2^6/(10n+3) -2^2/(10n+5)
+ *  -2^2/(10n+7) +1/(10n+9))
+ *  
+ * References:
+ *
+ * [1] David H. Bailey, Peter B. Borwein and Simon Plouffe.  On the Rapid
+ *     Computation of Various Polylogarithmic Constants.
+ *     Math. Comp., 66:903-913, 1996.
+ *     
+ * [2] Fabrice Bellard.  A new formula to compute the n'th binary digit of pi,
+ *     1997.  Available at http://fabrice.bellard.free.fr/pi .
+ */
+public final class Bellard {
+  /** Parameters for the sums */
+  public enum Parameter {
+    // \sum_{k=0}^\infty (-1)^{k+1}( 2^{d-10k-1}/(4k+1) + 2^{d-10k-6}/(4k+3) )
+    P8_1(false, 1, 8, -1),
+    P8_3(false, 3, 8, -6),
+    P8_5(P8_1),
+    P8_7(P8_3),
+
+    /*
+     *   2^d\sum_{k=0}^\infty (-1)^k( 2^{ 2-10k} / (10k + 1)
+     *                               -2^{  -10k} / (10k + 3)
+     *                               -2^{-4-10k} / (10k + 5)
+     *                               -2^{-4-10k} / (10k + 7)
+     *                               +2^{-6-10k} / (10k + 9) )
+     */
+    P20_21(true , 1, 20,  2),
+    P20_3(false, 3, 20,  0),
+    P20_5(false, 5, 20, -4),
+    P20_7(false, 7, 20, -4),
+    P20_9(true , 9, 20, -6),
+    P20_11(P20_21),
+    P20_13(P20_3),
+    P20_15(P20_5),
+    P20_17(P20_7),
+    P20_19(P20_9);
+    
+    final boolean isplus;
+    final long j;
+    final int deltaN;
+    final int deltaE;
+    final int offsetE;      
+
+    private Parameter(boolean isplus, long j, int deltaN, int offsetE) {
+      this.isplus = isplus;
+      this.j = j;
+      this.deltaN = deltaN;
+      this.deltaE = -20;
+      this.offsetE = offsetE;        
+    }
+
+    private Parameter(Parameter p) {
+      this.isplus = !p.isplus;
+      this.j = p.j + (p.deltaN >> 1);
+      this.deltaN = p.deltaN;
+      this.deltaE = p.deltaE;
+      this.offsetE = p.offsetE + (p.deltaE >> 1);
+    }
+
+    /** Get the Parameter represented by the String */
+    public static Parameter get(String s) {
+      s = s.trim();
+      if (s.charAt(0) == 'P')
+        s = s.substring(1);
+      final String[] parts = s.split("\\D+");
+      if (parts.length >= 2) {
+        final String name = "P" + parts[0] + "_" + parts[1];  
+        for(Parameter p : values())
+          if (p.name().equals(name))
+            return p;
+      }
+      throw new IllegalArgumentException("s=" + s
+          + ", parts=" + Arrays.asList(parts));
+    }
+  }
+
+  /** The sums in the Bellard's formula */
+  public static class Sum implements Container<Summation>, Iterable<Summation> {
+    private static final long ACCURACY_BIT = 50;
+
+    private final Parameter parameter;
+    private final Summation sigma;
+    private final Summation[] parts;
+    private final Tail tail;
+
+    /** Constructor */
+    private <T extends Container<Summation>> Sum(long b, Parameter p, int nParts, List<T> existing) {
+      if (b < 0)
+        throw new IllegalArgumentException("b = " + b + " < 0");
+      if (nParts < 1)
+        throw new IllegalArgumentException("nParts = " + nParts + " < 1");
+      final long i = p.j == 1 && p.offsetE >= 0? 1 : 0;
+      final long e = b + i*p.deltaE + p.offsetE;
+      final long n = i*p.deltaN + p.j;
+
+      this.parameter = p;
+      this.sigma = new Summation(n, p.deltaN, e, p.deltaE, 0);
+      this.parts = partition(sigma, nParts, existing);
+      this.tail = new Tail(n, e);
+    }
+
+    private static <T extends Container<Summation>> Summation[] partition(
+        Summation sigma, int nParts, List<T> existing) {
+      final List<Summation> parts = new ArrayList<Summation>();
+      if (existing == null || existing.isEmpty())
+        parts.addAll(Arrays.asList(sigma.partition(nParts)));
+      else {
+        final long stepsPerPart = sigma.getSteps()/nParts;
+        final List<Summation> remaining = sigma.remainingTerms(existing);
+
+        for(Summation s : remaining) {
+          final int n = (int)((s.getSteps() - 1)/stepsPerPart) + 1;
+          parts.addAll(Arrays.asList(s.partition(n)));
+        }
+        
+        for(Container<Summation> c : existing)
+          parts.add(c.getElement());
+        Collections.sort(parts);
+      }
+      return parts.toArray(new Summation[parts.size()]);
+    }
+    
+    /** {@inheritDoc} */
+    @Override
+    public String toString() {
+      int n = 0;
+      for(Summation s : parts)
+        if (s.getValue() == null)
+          n++;
+      return getClass().getSimpleName() + "{" + parameter + ": " + sigma
+          + ", remaining=" + n + "}";
+    }
+
+    /** Set the value of sigma */
+    public void setValue(Summation s) {
+      if (s.getValue() == null)
+        throw new IllegalArgumentException("s.getValue()"
+            + "\n  sigma=" + sigma
+            + "\n  s    =" + s);
+      if (!s.contains(sigma) || !sigma.contains(s))
+        throw new IllegalArgumentException("!s.contains(sigma) || !sigma.contains(s)"
+            + "\n  sigma=" + sigma
+            + "\n  s    =" + s);
+      sigma.setValue(s.getValue());      
+    }
+
+    /** get the value of sigma */
+    public double getValue() {
+      if (sigma.getValue() == null) {
+        double d = 0;
+        for(int i = 0; i < parts.length; i++)
+          d = Modular.addMod(d, parts[i].compute());
+        sigma.setValue(d);
+      }
+
+      final double s = Modular.addMod(sigma.getValue(), tail.compute()); 
+      return parameter.isplus? s: -s;
+    }
+    
+    /** {@inheritDoc} */
+    @Override
+    public Summation getElement() {
+      if (sigma.getValue() == null) {
+        int i = 0;
+        double d = 0;
+        for(; i < parts.length && parts[i].getValue() != null; i++)
+          d = Modular.addMod(d, parts[i].getValue());
+        if (i == parts.length)
+          sigma.setValue(d);
+      }
+      return sigma;
+    }
+
+    /** The sum tail */
+    private class Tail {
+      private long n;
+      private long e;
+      
+      private Tail(long n, long e) {
+        this.n = n;
+        this.e = e;
+      }
+
+      private double compute() {
+        if (e > 0) {
+          final long edelta = -sigma.E.delta;
+          long q = e / edelta;
+          long r = e % edelta;
+          if (r == 0) {
+            e = 0;
+            n += q * sigma.N.delta;
+          } else {
+            e = edelta - r;
+            n += (q + 1)*sigma.N.delta;
+          }
+        } else if (e < 0)
+          e = -e; 
+    
+        double s = 0;
+        for(;; e -= sigma.E.delta) {
+          if (e > ACCURACY_BIT || (1L << (ACCURACY_BIT - e)) < n)
+            return s;
+    
+          s += 1.0 / (n << e);
+          if (s >= 1) s--;
+          n += sigma.N.delta;
+        }
+      }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Iterator<Summation> iterator() {
+      return new Iterator<Summation>() {
+        private int i = 0;
+
+        /** {@inheritDoc} */
+        @Override
+        public boolean hasNext() {return i < parts.length;}
+        /** {@inheritDoc} */
+        @Override
+        public Summation next() {return parts[i++];}
+        /** Unsupported */
+        @Override
+        public void remove() {throw new UnsupportedOperationException();}
+      };
+    }
+  }
+
+  /** Get the sums for the Bellard formula. */
+  public static <T extends Container<Summation>> Map<Parameter, Sum> getSums(
+      long b, int partsPerSum, Map<Parameter, List<T>> existing) {
+    final Map<Parameter, Sum> sums = new TreeMap<Parameter, Sum>();
+    for(Parameter p : Parameter.values()) {
+      final Sum s = new Sum(b, p, partsPerSum, existing.get(p));
+      Util.out.println("put " + s);
+      sums.put(p, s);
+    }
+    return sums;
+  }
+
+  /** Compute bits of Pi from the results. */
+  public static <T extends Container<Summation>> double computePi(
+      final long b, Map<Parameter, T> results) {
+    if (results.size() != Parameter.values().length)
+      throw new IllegalArgumentException("m.size() != Parameter.values().length"
+          + ", m.size()=" + results.size()
+          + "\n  m=" + results);
+
+    double pi = 0;
+    for(Parameter p : Parameter.values()) {
+      final Summation sigma = results.get(p).getElement();
+      final Sum s = new Sum(b, p, 1, null);
+      s.setValue(sigma);
+      pi = Modular.addMod(pi, s.getValue());
+    }
+    return pi;
+  }
+
+  /** Compute bits of Pi in the local machine. */
+  public static double computePi(final long b) {
+    double pi = 0;
+    for(Parameter p : Parameter.values())
+      pi = Modular.addMod(pi, new Sum(b, p, 1, null).getValue());
+    return pi;
+  }
+
+  /** Estimate the number of terms. */
+  public static long bit2terms(long b) {
+    return 7*(b/10);
+  }
+
+  private static void computePi(Util.Timer t, long b) {
+    t.tick(Util.pi2string(computePi(b), bit2terms(b)));
+  }
+
+  /** main */
+  public static void main(String[] args) throws IOException {
+    final Util.Timer t = new Util.Timer(false);
+
+    computePi(t, 0);
+    computePi(t, 1);
+    computePi(t, 2);
+    computePi(t, 3);
+    computePi(t, 4);
+
+    Util.printBitSkipped(1008);
+    computePi(t, 1008);
+    computePi(t, 1012);
+
+    long b = 10;
+    for(int i = 0; i < 7; i++) {
+      Util.printBitSkipped(b);
+      computePi(t, b - 4);
+      computePi(t, b);
+      computePi(t, b + 4);
+      b *= 10;
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/math/LongLong.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/math/LongLong.java?rev=790015&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/math/LongLong.java (added)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/math/LongLong.java Wed Jul  1 01:35:55 2009
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.examples.pi.math;
+
+import java.math.BigInteger;
+
+/** Support 124-bit integer arithmetic. */
+class LongLong {
+  static final int BITS_PER_LONG = 62;
+  static final int MID = BITS_PER_LONG >> 1;
+  static final int SIZE = BITS_PER_LONG << 1;
+
+  static final long FULL_MASK = (1L << BITS_PER_LONG) - 1;
+  static final long LOWER_MASK = FULL_MASK >>> MID;
+  static final long UPPER_MASK = LOWER_MASK << MID;
+
+  private long d0;
+  private long d1;
+
+  /** Set the values. */
+  LongLong set(long d0, long d1) {
+    this.d0 = d0;
+    this.d1 = d1;
+    return this;
+  }
+
+  /** And operation (&). */
+  long and(long mask) {
+    return d0 & mask;
+  }
+
+  /** Shift right operation (<<). */
+  long shiftRight(int n) {
+    return (d1 << (BITS_PER_LONG - n)) + (d0 >>> n);
+  }
+
+  /** Plus equal operation (+=). */
+  LongLong plusEqual(LongLong that) {
+    this.d0 += that.d0;
+    this.d1 += that.d1;
+    return this;
+  }
+
+  /** Convert this to a BigInteger. */
+  BigInteger toBigInteger() {
+    return BigInteger.valueOf(d1).shiftLeft(BITS_PER_LONG).add(BigInteger.valueOf(d0));
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public String toString() {
+    final int remainder = BITS_PER_LONG % 4;
+    return String.format("%x*2^%d + %016x", d1<<remainder, BITS_PER_LONG-remainder, d0);
+  }
+
+  /** Compute a*b and store the result to r.
+   * @return r
+   */
+  static LongLong multiplication(final LongLong r, final long a, final long b) {
+    /*
+    final long x0 = a & LOWER_MASK;
+    final long x1 = (a & UPPER_MASK) >> MID;
+
+    final long y0 = b & LOWER_MASK;
+    final long y1 = (b & UPPER_MASK) >> MID;
+
+    final long t = (x0 + x1)*(y0 + y1);
+    final long u = (x0 - x1)*(y0 - y1);
+    final long v = x1*y1;
+
+    final long tmp = (t - u)>>>1;
+    result.d0 = ((t + u)>>>1) - v + ((tmp << MID) & FULL_MASK);;
+    result.d1 = v + (tmp >> MID);
+    return result;
+    */
+    final long a_lower = a & LOWER_MASK;
+    final long a_upper = (a & UPPER_MASK) >> MID;
+
+    final long b_lower = b & LOWER_MASK;
+    final long b_upper = (b & UPPER_MASK) >> MID;
+
+    final long tmp = a_lower*b_upper + a_upper*b_lower;
+    r.d0 = a_lower*b_lower + ((tmp << MID) & FULL_MASK);
+    r.d1 = a_upper*b_upper + (tmp >> MID);
+    return r;
+  }
+}
\ No newline at end of file

Added: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/math/Modular.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/math/Modular.java?rev=790015&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/math/Modular.java (added)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/math/Modular.java Wed Jul  1 01:35:55 2009
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.examples.pi.math;
+
+/** Modular arithmetics */
+public class Modular {
+  static final long MAX_SQRT_LONG = (long)Math.sqrt(Long.MAX_VALUE);
+
+  /** Compute 2^e mod n */
+  public static long mod(long e, long n) {
+    final int HALF = (63 - Long.numberOfLeadingZeros(n)) >> 1;
+    final int FULL = HALF << 1;
+    final long ONES = (1 << HALF) - 1; 
+
+    long r = 2;
+    for (long mask = Long.highestOneBit(e) >> 1; mask > 0; mask >>= 1) {
+      if (r <= MAX_SQRT_LONG) {
+        r *= r;
+        if (r >= n) r %= n;
+      } else {
+        // r^2 will overflow
+        final long high = r >>> HALF;
+        final long low  = r &= ONES;
+        
+        r *= r;
+        if (r >= n) r %= n;
+
+        if (high != 0) {
+          long s = high * high;
+          if (s >= n) s %= n;
+          for(int i = 0; i < FULL; i++)
+            if ((s <<= 1) >= n) s -= n;
+          
+          if (low == 0)
+            r = s;
+          else {
+            long t = high * low;
+            if (t >= n) t %= n;
+            for(int i = -1; i < HALF; i++)
+              if ((t <<= 1) >= n) t -= n;
+            
+            r += s;
+            if (r >= n) r -= n;
+            r += t;
+            if (r >= n) r -= n;
+          }
+        }
+      }
+
+      if ((e & mask) != 0) {
+        r <<= 1;
+        if (r >= n) r -= n;
+      }
+    }
+    return r;
+  }
+
+  /** Given x in [0,1) and a in (-1,1),
+   * return (x, a) mod 1.0. 
+   */
+  public static double addMod(double x, final double a) {
+    x += a;
+    return x >= 1? x - 1: x < 0? x + 1: x;
+  }
+
+  /** Given 0 < x < y,
+   * return x^(-1) mod y.
+   */
+  public static long modInverse(final long x, final long y) {
+    if (x == 1) return 1;
+
+    long a = 1;
+    long b = 0;
+    long c = x;
+
+    long u = 0;
+    long v = 1;
+    long w = y;
+    
+    for(;;) {
+      {
+        final long q = w/c;
+        w -= q*c;
+        u -= q*a;
+        if (w == 1) return u > 0? u: u + y;
+        v -= q*b;
+      }
+      {
+        final long q = c/w;
+        c -= q*w;
+        a -= q*u;
+        if (c == 1) return a > 0? a: a + y;
+        b -= q*v;
+      }
+    }
+  }
+}



Mime
View raw message