hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r529410 [26/27] - in /lucene/hadoop/trunk: ./ src/contrib/abacus/src/examples/org/apache/hadoop/abacus/examples/ src/contrib/abacus/src/java/org/apache/hadoop/abacus/ src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ src/...
Date Mon, 16 Apr 2007 21:44:46 GMT
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/SortValidator.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/SortValidator.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/SortValidator.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/SortValidator.java Mon Apr 16 14:44:35 2007
@@ -57,7 +57,7 @@
 
   static void printUsage() {
     System.err.println("sortvalidate [-m <maps>] [-r <reduces>] [-deep] " +
-    "-sortInput <sort-input-dir> -sortOutput <sort-output-dir>");
+                       "-sortInput <sort-input-dir> -sortOutput <sort-output-dir>");
     System.exit(1);
   }
 
@@ -73,7 +73,7 @@
     
     // value == one for sort-input; value == two for sort-output
     return (inputFile.startsWith(inputPaths[0].toString()+"/")) ? 
-            sortInput : sortOutput;
+      sortInput : sortOutput;
   }
   
   static private byte[] pair(BytesWritable a, BytesWritable b) {
@@ -145,8 +145,8 @@
             URI inputURI = new URI(job.get("map.input.file"));
             String inputFile = inputURI.getPath();
             partition = Integer.valueOf(
-                          inputFile.substring(inputFile.lastIndexOf("part")+5)
-                          ).intValue();
+                                        inputFile.substring(inputFile.lastIndexOf("part")+5)
+                                        ).intValue();
             noSortReducers = job.getInt("sortvalidate.sort.reduce.tasks", -1);
           } catch (Exception e) {
             System.err.println("Caught: " + e);
@@ -156,9 +156,9 @@
       }
       
       public void map(WritableComparable key, 
-              Writable value,
-              OutputCollector output, 
-              Reporter reporter) throws IOException {
+                      Writable value,
+                      OutputCollector output, 
+                      Reporter reporter) throws IOException {
         BytesWritable bwKey = (BytesWritable)key;
         BytesWritable bwValue = (BytesWritable)value;
         
@@ -170,7 +170,7 @@
           } else {
             if (prevKey.compareTo(bwKey) > 0) {
               throw new IOException("The 'map-reduce' framework wrongly classifed"
-                      + "(" + prevKey + ") > (" + bwKey + ")"); 
+                                    + "(" + prevKey + ") > (" + bwKey + ")"); 
             }
             prevKey = bwKey;
           }
@@ -180,24 +180,24 @@
             partitioner.getPartition(bwKey, bwValue, noSortReducers);
           if (partition != keyPartition) {
             throw new IOException("Paritions do not match! - '" + partition + 
-                    "' v/s '" + keyPartition + "'");
+                                  "' v/s '" + keyPartition + "'");
           }
         }
 
         int keyValueChecksum = 
-            (WritableComparator.hashBytes(bwKey.get(), bwKey.getSize()) ^
-             WritableComparator.hashBytes(bwValue.get(), bwValue.getSize()));
+          (WritableComparator.hashBytes(bwKey.get(), bwKey.getSize()) ^
+           WritableComparator.hashBytes(bwValue.get(), bwValue.getSize()));
 
         // output (this.key, record-stats)
         output.collect(this.key, new RecordStatsWritable(
-                (bwKey.getSize()+bwValue.getSize()), 1, keyValueChecksum));
+                                                         (bwKey.getSize()+bwValue.getSize()), 1, keyValueChecksum));
       }
     }
     
     public static class Reduce extends MapReduceBase implements Reducer {
       public void reduce(WritableComparable key, Iterator values,
-              OutputCollector output, 
-              Reporter reporter) throws IOException {
+                         OutputCollector output, 
+                         Reporter reporter) throws IOException {
         long bytes = 0;
         long records = 0;
         int xor = 0;
@@ -213,14 +213,14 @@
     }
     
     public static class NonSplitableSequenceFileInputFormat 
-    extends SequenceFileInputFormat {
+      extends SequenceFileInputFormat {
       protected boolean isSplitable(FileSystem fs, Path filename) {
         return false;
       }
     }
     
     static void checkRecords(Configuration defaults, 
-            Path sortInput, Path sortOutput) throws IOException {
+                             Path sortInput, Path sortOutput) throws IOException {
       FileSystem fs = FileSystem.get(defaults);
       JobConf jobConf = new JobConf(defaults, RecordStatsChecker.class);
       jobConf.setJobName("sortvalidate-recordstats-checker");
@@ -253,21 +253,21 @@
       //job_conf.set("mapred.job.tracker", "local");
       
       System.out.println("\nSortValidator.RecordStatsChecker: Validate sort " +
-              "from " + jobConf.getInputPaths()[0] + ", " + 
-              jobConf.getInputPaths()[1] + " into " + jobConf.getOutputPath() + 
-              " with 1 reducer.");
+                         "from " + jobConf.getInputPaths()[0] + ", " + 
+                         jobConf.getInputPaths()[1] + " into " + jobConf.getOutputPath() + 
+                         " with 1 reducer.");
       Date startTime = new Date();
       System.out.println("Job started: " + startTime);
       JobClient.runJob(jobConf);
       Date end_time = new Date();
       System.out.println("Job ended: " + end_time);
       System.out.println("The job took " + 
-              (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
+                         (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
       
       // Check to ensure that the statistics of the 
       // framework's sort-input and sort-output match
       SequenceFile.Reader stats = new SequenceFile.Reader(fs, 
-              new Path(outputPath, "part-00000"), defaults);
+                                                          new Path(outputPath, "part-00000"), defaults);
       IntWritable k1 = new IntWritable();
       IntWritable k2 = new IntWritable();
       RecordStatsWritable v1 = new RecordStatsWritable();
@@ -280,10 +280,10 @@
       }
 
       if ((v1.getBytes() != v2.getBytes()) || (v1.getRecords() != v2.getRecords()) || 
-              v1.getChecksum() != v2.getChecksum()) {
+          v1.getChecksum() != v2.getChecksum()) {
         throw new IOException("(" + 
-                v1.getBytes() + ", " + v1.getRecords() + ", " + v1.getChecksum() + ") v/s (" +
-                v2.getBytes() + ", " + v2.getRecords() + ", " + v2.getChecksum() + ")");
+                              v1.getBytes() + ", " + v1.getRecords() + ", " + v1.getChecksum() + ") v/s (" +
+                              v2.getBytes() + ", " + v2.getRecords() + ", " + v2.getChecksum() + ")");
       }
     }
 
@@ -307,9 +307,9 @@
       }
       
       public void map(WritableComparable key, 
-              Writable value,
-              OutputCollector output, 
-              Reporter reporter) throws IOException {
+                      Writable value,
+                      OutputCollector output, 
+                      Reporter reporter) throws IOException {
         // newKey = (key, value)
         BytesWritable keyValue = 
           new BytesWritable(pair((BytesWritable)key, (BytesWritable)value));
@@ -321,8 +321,8 @@
     
     public static class Reduce extends MapReduceBase implements Reducer {
       public void reduce(WritableComparable key, Iterator values,
-              OutputCollector output, 
-              Reporter reporter) throws IOException {
+                         OutputCollector output, 
+                         Reporter reporter) throws IOException {
         int ones = 0;
         int twos = 0;
         while (values.hasNext()) {
@@ -333,20 +333,20 @@
             ++twos;
           } else {
             throw new IOException("Invalid 'value' of " + count.get() + 
-                    " for (key,value): " + key.toString());
+                                  " for (key,value): " + key.toString());
           }
         }
         
         // Check to ensure there are equal no. of ones and twos
         if (ones != twos) {
           throw new IOException("Illegal ('one', 'two'): (" + ones + ", " + twos +
-                  ") for (key, value): " + key.toString());
+                                ") for (key, value): " + key.toString());
         }
       }
     }
     
     static void checkRecords(Configuration defaults, int noMaps, int noReduces,
-            Path sortInput, Path sortOutput) throws IOException {
+                             Path sortInput, Path sortOutput) throws IOException {
       JobConf jobConf = new JobConf(defaults, RecordChecker.class);
       jobConf.setJobName("sortvalidate-record-checker");
       
@@ -363,12 +363,12 @@
       ClusterStatus cluster = client.getClusterStatus();
       if (noMaps == -1) {
         noMaps = cluster.getTaskTrackers() * 
-                   jobConf.getInt("test.sortvalidate.maps_per_host", 10);
+          jobConf.getInt("test.sortvalidate.maps_per_host", 10);
       }
       if (noReduces == -1) {
         noReduces = cluster.getTaskTrackers() * 
-                      jobConf.getInt("test.sortvalidate.reduces_per_host", 
-                              cluster.getMaxTasks());
+          jobConf.getInt("test.sortvalidate.reduces_per_host", 
+                         cluster.getMaxTasks());
       }
       jobConf.setNumMapTasks(noMaps);
       jobConf.setNumReduceTasks(noReduces);
@@ -386,17 +386,17 @@
       //job_conf.set("mapred.job.tracker", "local");
       
       System.out.println("\nSortValidator.RecordChecker: Running on " +
-              cluster.getTaskTrackers() +
-              " nodes to validate sort from " + jobConf.getInputPaths()[0] + ", " + 
-              jobConf.getInputPaths()[1] + " into " + jobConf.getOutputPath() + 
-              " with " + noReduces + " reduces.");
+                         cluster.getTaskTrackers() +
+                         " nodes to validate sort from " + jobConf.getInputPaths()[0] + ", " + 
+                         jobConf.getInputPaths()[1] + " into " + jobConf.getOutputPath() + 
+                         " with " + noReduces + " reduces.");
       Date startTime = new Date();
       System.out.println("Job started: " + startTime);
       JobClient.runJob(jobConf);
       Date end_time = new Date();
       System.out.println("Job ended: " + end_time);
       System.out.println("The job took " + 
-              (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
+                         (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
     }
   }
 
@@ -433,7 +433,7 @@
         printUsage();
       } catch (ArrayIndexOutOfBoundsException except) {
         System.err.println("ERROR: Required parameter missing from " +
-                args[i-1]);
+                           args[i-1]);
         printUsage(); // exits
       }
     }
@@ -449,11 +449,11 @@
     // Check if the same records are present in sort's inputs & outputs
     if (deepTest) {
       RecordChecker.checkRecords(defaults, noMaps, noReduces, sortInput, 
-            sortOutput);
+                                 sortOutput);
     }
     
     System.out.println("\nSUCCESS! Validated the MapReduce framework's 'sort'" +
-            " successfully.");
+                       " successfully.");
   }
   
 }

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java Mon Apr 16 14:44:35 2007
@@ -52,58 +52,58 @@
    * @throws IOException
    */
   public static boolean launchEmptyJob(String fileSys,
-                                      String jobTracker,
-                                      JobConf conf,
-                                      int numMaps,
-                                      int numReduces) throws IOException {
-      // create an empty input dir
-      final Path inDir = new Path("/testing/empty/input");
-      final Path outDir = new Path("/testing/empty/output");
-      FileSystem fs = FileSystem.getNamed(fileSys, conf);
-      fs.delete(outDir);
-      if (!fs.mkdirs(inDir)) {
-          LOG.warn("Can't create " + inDir);
-          return false;
-      }
+                                       String jobTracker,
+                                       JobConf conf,
+                                       int numMaps,
+                                       int numReduces) throws IOException {
+    // create an empty input dir
+    final Path inDir = new Path("/testing/empty/input");
+    final Path outDir = new Path("/testing/empty/output");
+    FileSystem fs = FileSystem.getNamed(fileSys, conf);
+    fs.delete(outDir);
+    if (!fs.mkdirs(inDir)) {
+      LOG.warn("Can't create " + inDir);
+      return false;
+    }
 
-      // use WordCount example
-      conf.set("fs.default.name", fileSys);
-      conf.set("mapred.job.tracker", jobTracker);
-      conf.setJobName("empty");
-      // use an InputFormat which returns no split
-      conf.setInputFormat(EmptyInputFormat.class);
-      conf.setOutputKeyClass(Text.class);
-      conf.setOutputValueClass(IntWritable.class);
-      conf.setMapperClass(IdentityMapper.class);        
-      conf.setReducerClass(IdentityReducer.class);
-      conf.setInputPath(inDir);
-      conf.setOutputPath(outDir);
-      conf.setNumMapTasks(numMaps);
-      conf.setNumReduceTasks(numReduces);
-      
-      // run job and wait for completion
-      JobClient jc = new JobClient(conf);
-      RunningJob runningJob = jc.submitJob(conf);
-      while (true) {
-          try {
-              Thread.sleep(1000);
-          } catch (InterruptedException e) {}
-          if (runningJob.isComplete()) {
-              break;
-          }
-      }
+    // use WordCount example
+    conf.set("fs.default.name", fileSys);
+    conf.set("mapred.job.tracker", jobTracker);
+    conf.setJobName("empty");
+    // use an InputFormat which returns no split
+    conf.setInputFormat(EmptyInputFormat.class);
+    conf.setOutputKeyClass(Text.class);
+    conf.setOutputValueClass(IntWritable.class);
+    conf.setMapperClass(IdentityMapper.class);        
+    conf.setReducerClass(IdentityReducer.class);
+    conf.setInputPath(inDir);
+    conf.setOutputPath(outDir);
+    conf.setNumMapTasks(numMaps);
+    conf.setNumReduceTasks(numReduces);
       
+    // run job and wait for completion
+    JobClient jc = new JobClient(conf);
+    RunningJob runningJob = jc.submitJob(conf);
+    while (true) {
       try {
-          assertTrue(runningJob.isComplete());
-          assertTrue(runningJob.isSuccessful());
-      } catch (NullPointerException npe) {
-          // This NPE should no more happens
-          fail("A NPE should not have happened.");
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {}
+      if (runningJob.isComplete()) {
+        break;
       }
+    }
+      
+    try {
+      assertTrue(runningJob.isComplete());
+      assertTrue(runningJob.isSuccessful());
+    } catch (NullPointerException npe) {
+      // This NPE should no more happens
+      fail("A NPE should not have happened.");
+    }
           
-      // return job result
-      LOG.info("job is complete: " + runningJob.isSuccessful());
-      return (runningJob.isSuccessful());
+    // return job result
+    LOG.info("job is complete: " + runningJob.isSuccessful());
+    return (runningJob.isSuccessful());
   }
   
   /**
@@ -112,30 +112,30 @@
    * @throws IOException
    */
   public void testEmptyJobWithDFS() throws IOException {
-      String namenode = null;
-      MiniDFSCluster dfs = null;
-      MiniMRCluster mr = null;
-      FileSystem fileSys = null;
-      try {
-          final int taskTrackers = 4;
-          final int jobTrackerPort = 60050;
-          Configuration conf = new Configuration();
-          dfs = new MiniDFSCluster(conf, 1, true, null);
-          fileSys = dfs.getFileSystem();
-          namenode = fileSys.getName();
-          mr = new MiniMRCluster(taskTrackers, namenode, 2);
-          final String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
-          JobConf jobConf = new JobConf();
-          boolean result;
-          result = launchEmptyJob(namenode, jobTrackerName, jobConf, 
-                                   3, 1);
-          assertTrue(result);
+    String namenode = null;
+    MiniDFSCluster dfs = null;
+    MiniMRCluster mr = null;
+    FileSystem fileSys = null;
+    try {
+      final int taskTrackers = 4;
+      final int jobTrackerPort = 60050;
+      Configuration conf = new Configuration();
+      dfs = new MiniDFSCluster(conf, 1, true, null);
+      fileSys = dfs.getFileSystem();
+      namenode = fileSys.getName();
+      mr = new MiniMRCluster(taskTrackers, namenode, 2);
+      final String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
+      JobConf jobConf = new JobConf();
+      boolean result;
+      result = launchEmptyJob(namenode, jobTrackerName, jobConf, 
+                              3, 1);
+      assertTrue(result);
           
-      } finally {
-          if (fileSys != null) { fileSys.close(); }
-          if (dfs != null) { dfs.shutdown(); }
-          if (mr != null) { mr.shutdown(); }
-      }
+    } finally {
+      if (fileSys != null) { fileSys.close(); }
+      if (dfs != null) { dfs.shutdown(); }
+      if (mr != null) { mr.shutdown(); }
+    }
   }
   
 }

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapOutputType.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapOutputType.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapOutputType.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapOutputType.java Mon Apr 16 14:44:35 2007
@@ -98,7 +98,7 @@
     }
     Path inFile = new Path(inDir, "part0");
     SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, inFile, 
-        Text.class, Text.class);
+                                                           Text.class, Text.class);
     writer.append(new Text("rec: 1"), new Text("Hello"));
     writer.close();
     
@@ -108,7 +108,7 @@
   public void testKeyMismatch() throws Exception {
     configure();
     
-//  Set bad MapOutputKeyClass and MapOutputValueClass
+    //  Set bad MapOutputKeyClass and MapOutputValueClass
     conf.setMapOutputKeyClass(IntWritable.class);
     conf.setMapOutputValueClass(IntWritable.class);
     
@@ -125,7 +125,7 @@
   public void testValueMismatch() throws Exception {
     configure();
   
-// Set good MapOutputKeyClass, bad MapOutputValueClass    
+    // Set good MapOutputKeyClass, bad MapOutputValueClass    
     conf.setMapOutputKeyClass(Text.class);
     conf.setMapOutputValueClass(IntWritable.class);
     
@@ -142,17 +142,17 @@
   public void testNoMismatch() throws Exception{ 
     configure();
     
-//  Set good MapOutputKeyClass and MapOutputValueClass    
-     conf.setMapOutputKeyClass(Text.class);
-     conf.setMapOutputValueClass(Text.class);
+    //  Set good MapOutputKeyClass and MapOutputValueClass    
+    conf.setMapOutputKeyClass(Text.class);
+    conf.setMapOutputValueClass(Text.class);
      
-     RunningJob r_job = jc.submitJob(conf);
-     while (!r_job.isComplete()) {
-       Thread.sleep(1000);
-     }
+    RunningJob r_job = jc.submitJob(conf);
+    while (!r_job.isComplete()) {
+      Thread.sleep(1000);
+    }
      
-     if (!r_job.isSuccessful()) {
-       fail("Oops! The job broke due to an unexpected error");
-     }
-   }
+    if (!r_job.isSuccessful()) {
+      fail("Oops! The job broke due to an unexpected error");
+    }
+  }
 }

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java Mon Apr 16 14:44:35 2007
@@ -59,558 +59,558 @@
  *
  **********************************************************/
 public class TestMapRed extends TestCase {
-    /**
-     * Modified to make it a junit test.
-     * The RandomGen Job does the actual work of creating
-     * a huge file of assorted numbers.  It receives instructions
-     * as to how many times each number should be counted.  Then
-     * it emits those numbers in a crazy order.
-     *
-     * The map() function takes a key/val pair that describes
-     * a value-to-be-emitted (the key) and how many times it 
-     * should be emitted (the value), aka "numtimes".  map() then
-     * emits a series of intermediate key/val pairs.  It emits
-     * 'numtimes' of these.  The key is a random number and the
-     * value is the 'value-to-be-emitted'.
-     *
-     * The system collates and merges these pairs according to
-     * the random number.  reduce() function takes in a key/value
-     * pair that consists of a crazy random number and a series
-     * of values that should be emitted.  The random number key
-     * is now dropped, and reduce() emits a pair for every intermediate value.
-     * The emitted key is an intermediate value.  The emitted value
-     * is just a blank string.  Thus, we've created a huge file
-     * of numbers in random order, but where each number appears
-     * as many times as we were instructed.
-     */
-    static class RandomGenMapper implements Mapper {
-        public void configure(JobConf job) {
-        }
-
-        public void map(WritableComparable key, Writable val, OutputCollector out, Reporter reporter) throws IOException {
-            int randomVal = ((IntWritable) key).get();
-            int randomCount = ((IntWritable) val).get();
-
-            for (int i = 0; i < randomCount; i++) {
-                out.collect(new IntWritable(Math.abs(r.nextInt())), new IntWritable(randomVal));
-            }
-        }
-        public void close() {
-        }
-    }
-    /**
-     */
-    static class RandomGenReducer implements Reducer {
-        public void configure(JobConf job) {
-        }
-
-        public void reduce(WritableComparable key, Iterator it, OutputCollector out, Reporter reporter) throws IOException {
-            while (it.hasNext()) {
-                int val = ((IntWritable) it.next()).get();
-                out.collect(new Text("" + val), new Text(""));
-            }
-        }
-        public void close() {
-        }
+  /**
+   * Modified to make it a junit test.
+   * The RandomGen Job does the actual work of creating
+   * a huge file of assorted numbers.  It receives instructions
+   * as to how many times each number should be counted.  Then
+   * it emits those numbers in a crazy order.
+   *
+   * The map() function takes a key/val pair that describes
+   * a value-to-be-emitted (the key) and how many times it 
+   * should be emitted (the value), aka "numtimes".  map() then
+   * emits a series of intermediate key/val pairs.  It emits
+   * 'numtimes' of these.  The key is a random number and the
+   * value is the 'value-to-be-emitted'.
+   *
+   * The system collates and merges these pairs according to
+   * the random number.  reduce() function takes in a key/value
+   * pair that consists of a crazy random number and a series
+   * of values that should be emitted.  The random number key
+   * is now dropped, and reduce() emits a pair for every intermediate value.
+   * The emitted key is an intermediate value.  The emitted value
+   * is just a blank string.  Thus, we've created a huge file
+   * of numbers in random order, but where each number appears
+   * as many times as we were instructed.
+   */
+  static class RandomGenMapper implements Mapper {
+    public void configure(JobConf job) {
+    }
+
+    public void map(WritableComparable key, Writable val, OutputCollector out, Reporter reporter) throws IOException {
+      int randomVal = ((IntWritable) key).get();
+      int randomCount = ((IntWritable) val).get();
+
+      for (int i = 0; i < randomCount; i++) {
+        out.collect(new IntWritable(Math.abs(r.nextInt())), new IntWritable(randomVal));
+      }
+    }
+    public void close() {
+    }
+  }
+  /**
+   */
+  static class RandomGenReducer implements Reducer {
+    public void configure(JobConf job) {
+    }
+
+    public void reduce(WritableComparable key, Iterator it, OutputCollector out, Reporter reporter) throws IOException {
+      while (it.hasNext()) {
+        int val = ((IntWritable) it.next()).get();
+        out.collect(new Text("" + val), new Text(""));
+      }
+    }
+    public void close() {
+    }
+  }
+
+  /**
+   * The RandomCheck Job does a lot of our work.  It takes
+   * in a num/string keyspace, and transforms it into a
+   * key/count(int) keyspace.
+   *
+   * The map() function just emits a num/1 pair for every
+   * num/string input pair.
+   *
+   * The reduce() function sums up all the 1s that were
+   * emitted for a single key.  It then emits the key/total
+   * pair.
+   *
+   * This is used to regenerate the random number "answer key".
+   * Each key here is a random number, and the count is the
+   * number of times the number was emitted.
+   */
+  static class RandomCheckMapper implements Mapper {
+    public void configure(JobConf job) {
+    }
+
+    public void map(WritableComparable key, Writable val, OutputCollector out, Reporter reporter) throws IOException {
+      Text str = (Text) val;
+
+      out.collect(new IntWritable(Integer.parseInt(str.toString().trim())), new IntWritable(1));
+    }
+    public void close() {
+    }
+  }
+  /**
+   */
+  static class RandomCheckReducer implements Reducer {
+    public void configure(JobConf job) {
     }
-
-    /**
-     * The RandomCheck Job does a lot of our work.  It takes
-     * in a num/string keyspace, and transforms it into a
-     * key/count(int) keyspace.
-     *
-     * The map() function just emits a num/1 pair for every
-     * num/string input pair.
-     *
-     * The reduce() function sums up all the 1s that were
-     * emitted for a single key.  It then emits the key/total
-     * pair.
-     *
-     * This is used to regenerate the random number "answer key".
-     * Each key here is a random number, and the count is the
-     * number of times the number was emitted.
-     */
-    static class RandomCheckMapper implements Mapper {
-        public void configure(JobConf job) {
-        }
-
-        public void map(WritableComparable key, Writable val, OutputCollector out, Reporter reporter) throws IOException {
-            Text str = (Text) val;
-
-            out.collect(new IntWritable(Integer.parseInt(str.toString().trim())), new IntWritable(1));
-        }
-        public void close() {
-        }
-    }
-    /**
-     */
-    static class RandomCheckReducer implements Reducer {
-        public void configure(JobConf job) {
-        }
         
-        public void reduce(WritableComparable key, Iterator it, OutputCollector out, Reporter reporter) throws IOException {
-            int keyint = ((IntWritable) key).get();
-            int count = 0;
-            while (it.hasNext()) {
-                it.next();
-                count++;
-            }
-            out.collect(new IntWritable(keyint), new IntWritable(count));
-        }
-        public void close() {
-        }
+    public void reduce(WritableComparable key, Iterator it, OutputCollector out, Reporter reporter) throws IOException {
+      int keyint = ((IntWritable) key).get();
+      int count = 0;
+      while (it.hasNext()) {
+        it.next();
+        count++;
+      }
+      out.collect(new IntWritable(keyint), new IntWritable(count));
+    }
+    public void close() {
+    }
+  }
+
+  /**
+   * The Merge Job is a really simple one.  It takes in
+   * an int/int key-value set, and emits the same set.
+   * But it merges identical keys by adding their values.
+   *
+   * Thus, the map() function is just the identity function
+   * and reduce() just sums.  Nothing to see here!
+   */
+  static class MergeMapper implements Mapper {
+    public void configure(JobConf job) {
+    }
+
+    public void map(WritableComparable key, Writable val, OutputCollector out, Reporter reporter) throws IOException {
+      int keyint = ((IntWritable) key).get();
+      int valint = ((IntWritable) val).get();
+
+      out.collect(new IntWritable(keyint), new IntWritable(valint));
+    }
+    public void close() {
+    }
+  }
+  static class MergeReducer implements Reducer {
+    public void configure(JobConf job) {
     }
-
-    /**
-     * The Merge Job is a really simple one.  It takes in
-     * an int/int key-value set, and emits the same set.
-     * But it merges identical keys by adding their values.
-     *
-     * Thus, the map() function is just the identity function
-     * and reduce() just sums.  Nothing to see here!
-     */
-    static class MergeMapper implements Mapper {
-        public void configure(JobConf job) {
-        }
-
-        public void map(WritableComparable key, Writable val, OutputCollector out, Reporter reporter) throws IOException {
-            int keyint = ((IntWritable) key).get();
-            int valint = ((IntWritable) val).get();
-
-            out.collect(new IntWritable(keyint), new IntWritable(valint));
-        }
-        public void close() {
-        }
-    }
-    static class MergeReducer implements Reducer {
-        public void configure(JobConf job) {
-        }
         
-        public void reduce(WritableComparable key, Iterator it, OutputCollector out, Reporter reporter) throws IOException {
-            int keyint = ((IntWritable) key).get();
-            int total = 0;
-            while (it.hasNext()) {
-                total += ((IntWritable) it.next()).get();
-            }
-            out.collect(new IntWritable(keyint), new IntWritable(total));
-        }
-        public void close() {
-        }
-    }
-
-    private static int range = 10;
-    private static int counts = 100;
-    private static Random r = new Random();
-
-    /**
-       public TestMapRed(int range, int counts, Configuration conf) throws IOException {
-       this.range = range;
-       this.counts = counts;
-       this.conf = conf;
-       }
-    **/
-
-    public void testMapred() throws Exception {
-      launch();
-    }
-
-    private static class MyMap implements Mapper {
-      private JobConf conf;
-      private boolean compress;
-      private String taskId;
+    public void reduce(WritableComparable key, Iterator it, OutputCollector out, Reporter reporter) throws IOException {
+      int keyint = ((IntWritable) key).get();
+      int total = 0;
+      while (it.hasNext()) {
+        total += ((IntWritable) it.next()).get();
+      }
+      out.collect(new IntWritable(keyint), new IntWritable(total));
+    }
+    public void close() {
+    }
+  }
+
+  private static int range = 10;
+  private static int counts = 100;
+  private static Random r = new Random();
+
+  /**
+     public TestMapRed(int range, int counts, Configuration conf) throws IOException {
+     this.range = range;
+     this.counts = counts;
+     this.conf = conf;
+     }
+  **/
+
+  public void testMapred() throws Exception {
+    launch();
+  }
+
+  private static class MyMap implements Mapper {
+    private JobConf conf;
+    private boolean compress;
+    private String taskId;
       
-      public void configure(JobConf conf) {
-        this.conf = conf;
-        compress = conf.getBoolean("mapred.compress.map.output", false);
-        taskId = conf.get("mapred.task.id");
-      }
+    public void configure(JobConf conf) {
+      this.conf = conf;
+      compress = conf.getBoolean("mapred.compress.map.output", false);
+      taskId = conf.get("mapred.task.id");
+    }
       
-      public void map(WritableComparable key, Writable value,
-                      OutputCollector output, Reporter reporter
-                      ) throws IOException {
-        String str = ((Text) value).toString().toLowerCase();
-        output.collect(new Text(str), value);
-      }
+    public void map(WritableComparable key, Writable value,
+                    OutputCollector output, Reporter reporter
+                    ) throws IOException {
+      String str = ((Text) value).toString().toLowerCase();
+      output.collect(new Text(str), value);
+    }
 
-      public void close() throws IOException {
-      }
+    public void close() throws IOException {
     }
+  }
     
-    private static class MyReduce extends IdentityReducer {
-      private JobConf conf;
-      private boolean compressInput;
-      private String taskId;
-      private boolean first = true;
+  private static class MyReduce extends IdentityReducer {
+    private JobConf conf;
+    private boolean compressInput;
+    private String taskId;
+    private boolean first = true;
       
-      public void configure(JobConf conf) {
-        this.conf = conf;
-        compressInput = conf.getBoolean("mapred.compress.map.output", 
-                                        false);
-        taskId = conf.get("mapred.task.id");
-      }
+    public void configure(JobConf conf) {
+      this.conf = conf;
+      compressInput = conf.getBoolean("mapred.compress.map.output", 
+                                      false);
+      taskId = conf.get("mapred.task.id");
+    }
       
-      public void reduce(WritableComparable key, Iterator values,
-                         OutputCollector output, Reporter reporter
-                        ) throws IOException {
-        if (first) {
-          first = false;
-          Path input = conf.getLocalPath(taskId+"/map_0.out");
-          FileSystem fs = FileSystem.get(conf);
-          assertTrue("reduce input exists " + input, fs.exists(input));
-          SequenceFile.Reader rdr = 
-            new SequenceFile.Reader(fs, input, conf);
-          assertEquals("is reduce input compressed " + input, 
-                       compressInput, 
-                       rdr.isCompressed());
-          rdr.close();          
-        }
+    public void reduce(WritableComparable key, Iterator values,
+                       OutputCollector output, Reporter reporter
+                       ) throws IOException {
+      if (first) {
+        first = false;
+        Path input = conf.getLocalPath(taskId+"/map_0.out");
+        FileSystem fs = FileSystem.get(conf);
+        assertTrue("reduce input exists " + input, fs.exists(input));
+        SequenceFile.Reader rdr = 
+          new SequenceFile.Reader(fs, input, conf);
+        assertEquals("is reduce input compressed " + input, 
+                     compressInput, 
+                     rdr.isCompressed());
+        rdr.close();          
       }
-      
     }
+      
+  }
     
-    private void checkCompression(boolean compressMapOutput,
-                                  boolean compressReduceOutput,
-                                  boolean includeCombine
-                                  ) throws Exception {
-      JobConf conf = new JobConf(TestMapRed.class);
-      Path testdir = new Path("build/test/test.mapred.compress");
-      Path inDir = new Path(testdir, "in");
-      Path outDir = new Path(testdir, "out");
-      FileSystem fs = FileSystem.get(conf);
+  private void checkCompression(boolean compressMapOutput,
+                                boolean compressReduceOutput,
+                                boolean includeCombine
+                                ) throws Exception {
+    JobConf conf = new JobConf(TestMapRed.class);
+    Path testdir = new Path("build/test/test.mapred.compress");
+    Path inDir = new Path(testdir, "in");
+    Path outDir = new Path(testdir, "out");
+    FileSystem fs = FileSystem.get(conf);
+    fs.delete(testdir);
+    conf.setInputPath(inDir);
+    conf.setOutputPath(outDir);
+    conf.setMapperClass(MyMap.class);
+    conf.setReducerClass(MyReduce.class);
+    conf.setOutputKeyClass(Text.class);
+    conf.setOutputValueClass(Text.class);
+    conf.setOutputFormat(SequenceFileOutputFormat.class);
+    if (includeCombine) {
+      conf.setCombinerClass(IdentityReducer.class);
+    }
+    if (compressMapOutput) {
+      conf.setCompressMapOutput(true);
+    }
+    if (compressReduceOutput) {
+      SequenceFileOutputFormat.setCompressOutput(conf, true);
+    }
+    try {
+      if (!fs.mkdirs(testdir)) {
+        throw new IOException("Mkdirs failed to create " + testdir.toString());
+      }
+      if (!fs.mkdirs(inDir)) {
+        throw new IOException("Mkdirs failed to create " + inDir.toString());
+      }
+      Path inFile = new Path(inDir, "part0");
+      DataOutputStream f = fs.create(inFile);
+      f.writeBytes("Owen was here\n");
+      f.writeBytes("Hadoop is fun\n");
+      f.writeBytes("Is this done, yet?\n");
+      f.close();
+      JobClient.runJob(conf);
+      Path output = new Path(outDir,
+                             ReduceTask.getOutputName(0));
+      assertTrue("reduce output exists " + output, fs.exists(output));
+      SequenceFile.Reader rdr = 
+        new SequenceFile.Reader(fs, output, conf);
+      assertEquals("is reduce output compressed " + output, 
+                   compressReduceOutput, 
+                   rdr.isCompressed());
+      rdr.close();
+    } finally {
       fs.delete(testdir);
-      conf.setInputPath(inDir);
-      conf.setOutputPath(outDir);
-      conf.setMapperClass(MyMap.class);
-      conf.setReducerClass(MyReduce.class);
-      conf.setOutputKeyClass(Text.class);
-      conf.setOutputValueClass(Text.class);
-      conf.setOutputFormat(SequenceFileOutputFormat.class);
-      if (includeCombine) {
-        conf.setCombinerClass(IdentityReducer.class);
-      }
-      if (compressMapOutput) {
-        conf.setCompressMapOutput(true);
-      }
-      if (compressReduceOutput) {
-        SequenceFileOutputFormat.setCompressOutput(conf, true);
-      }
-      try {
-        if (!fs.mkdirs(testdir)) {
-          throw new IOException("Mkdirs failed to create " + testdir.toString());
-        }
-        if (!fs.mkdirs(inDir)) {
-          throw new IOException("Mkdirs failed to create " + inDir.toString());
-        }
-        Path inFile = new Path(inDir, "part0");
-        DataOutputStream f = fs.create(inFile);
-        f.writeBytes("Owen was here\n");
-        f.writeBytes("Hadoop is fun\n");
-        f.writeBytes("Is this done, yet?\n");
-        f.close();
-        JobClient.runJob(conf);
-        Path output = new Path(outDir,
-                               ReduceTask.getOutputName(0));
-        assertTrue("reduce output exists " + output, fs.exists(output));
-        SequenceFile.Reader rdr = 
-            new SequenceFile.Reader(fs, output, conf);
-        assertEquals("is reduce output compressed " + output, 
-                     compressReduceOutput, 
-                     rdr.isCompressed());
-        rdr.close();
-      } finally {
-        fs.delete(testdir);
-      }
     }
+  }
     
-    public void testCompression() throws Exception {
-      for(int compressMap=0; compressMap < 2; ++compressMap) {
-        for(int compressOut=0; compressOut < 2; ++compressOut) {
-          for(int combine=0; combine < 2; ++combine) {
-            checkCompression(compressMap == 1, compressOut == 1,
-                             combine == 1);
-          }
+  public void testCompression() throws Exception {
+    for(int compressMap=0; compressMap < 2; ++compressMap) {
+      for(int compressOut=0; compressOut < 2; ++compressOut) {
+        for(int combine=0; combine < 2; ++combine) {
+          checkCompression(compressMap == 1, compressOut == 1,
+                           combine == 1);
         }
       }
     }
+  }
     
     
-    /**
-     * 
-     */
-    public static void launch() throws Exception {
-        //
-        // Generate distribution of ints.  This is the answer key.
-        //
-        JobConf conf = new JobConf(TestMapRed.class);
-        int countsToGo = counts;
-        int dist[] = new int[range];
-        for (int i = 0; i < range; i++) {
-            double avgInts = (1.0 * countsToGo) / (range - i);
-            dist[i] = (int) Math.max(0, Math.round(avgInts + (Math.sqrt(avgInts) * r.nextGaussian())));
-            countsToGo -= dist[i];
-        }
-        if (countsToGo > 0) {
-            dist[dist.length-1] += countsToGo;
-        }
-
-        //
-        // Write the answer key to a file.  
-        //
-        FileSystem fs = FileSystem.get(conf);
-        Path testdir = new Path("mapred.loadtest");
-        if (!fs.mkdirs(testdir)) {
-            throw new IOException("Mkdirs failed to create " + testdir.toString());
-        }
-
-        Path randomIns = new Path(testdir, "genins");
-        if (!fs.mkdirs(randomIns)) {
-            throw new IOException("Mkdirs failed to create " + randomIns.toString());
-        }
-
-        Path answerkey = new Path(randomIns, "answer.key");
-        SequenceFile.Writer out = 
-          SequenceFile.createWriter(fs, conf, answerkey, IntWritable.class,
-                                    IntWritable.class, 
-                                    SequenceFile.CompressionType.NONE);
-        try {
-            for (int i = 0; i < range; i++) {
-                out.append(new IntWritable(i), new IntWritable(dist[i]));
-            }
-        } finally {
-            out.close();
-        }
-
-        //
-        // Now we need to generate the random numbers according to
-        // the above distribution.
-        //
-        // We create a lot of map tasks, each of which takes at least
-        // one "line" of the distribution.  (That is, a certain number
-        // X is to be generated Y number of times.)
-        //
-        // A map task emits Y key/val pairs.  The val is X.  The key
-        // is a randomly-generated number.
-        //
-        // The reduce task gets its input sorted by key.  That is, sorted
-        // in random order.  It then emits a single line of text that
-        // for the given values.  It does not emit the key.
-        //
-        // Because there's just one reduce task, we emit a single big
-        // file of random numbers.
-        //
-        Path randomOuts = new Path(testdir, "genouts");
-        fs.delete(randomOuts);
-
-
-        JobConf genJob = new JobConf(conf,TestMapRed.class);
-        genJob.setInputPath(randomIns);
-        genJob.setInputFormat(SequenceFileInputFormat.class);
-        genJob.setMapperClass(RandomGenMapper.class);
-
-        genJob.setOutputPath(randomOuts);
-        genJob.setOutputKeyClass(IntWritable.class);
-        genJob.setOutputValueClass(IntWritable.class);
-        genJob.setOutputFormat(TextOutputFormat.class);
-        genJob.setReducerClass(RandomGenReducer.class);
-        genJob.setNumReduceTasks(1);
-
-        JobClient.runJob(genJob);
-
-        //
-        // Next, we read the big file in and regenerate the 
-        // original map.  It's split into a number of parts.
-        // (That number is 'intermediateReduces'.)
-        //
-        // We have many map tasks, each of which read at least one
-        // of the output numbers.  For each number read in, the
-        // map task emits a key/value pair where the key is the
-        // number and the value is "1".
-        //
-        // We have a single reduce task, which receives its input
-        // sorted by the key emitted above.  For each key, there will
-        // be a certain number of "1" values.  The reduce task sums
-        // these values to compute how many times the given key was
-        // emitted.
-        //
-        // The reduce task then emits a key/val pair where the key
-        // is the number in question, and the value is the number of
-        // times the key was emitted.  This is the same format as the
-        // original answer key (except that numbers emitted zero times
-        // will not appear in the regenerated key.)  The answer set
-        // is split into a number of pieces.  A final MapReduce job
-        // will merge them.
-        //
-        // There's not really a need to go to 10 reduces here 
-        // instead of 1.  But we want to test what happens when
-        // you have multiple reduces at once.
-        //
-        int intermediateReduces = 10;
-        Path intermediateOuts = new Path(testdir, "intermediateouts");
-        fs.delete(intermediateOuts);
-        JobConf checkJob = new JobConf(conf,TestMapRed.class);
-        checkJob.setInputPath(randomOuts);
-        checkJob.setInputFormat(TextInputFormat.class);
-        checkJob.setMapperClass(RandomCheckMapper.class);
-
-        checkJob.setOutputPath(intermediateOuts);
-        checkJob.setOutputKeyClass(IntWritable.class);
-        checkJob.setOutputValueClass(IntWritable.class);
-        checkJob.setOutputFormat(MapFileOutputFormat.class);
-        checkJob.setReducerClass(RandomCheckReducer.class);
-        checkJob.setNumReduceTasks(intermediateReduces);
-
-        JobClient.runJob(checkJob);
-
-        //
-        // OK, now we take the output from the last job and
-        // merge it down to a single file.  The map() and reduce()
-        // functions don't really do anything except reemit tuples.
-        // But by having a single reduce task here, we end up merging
-        // all the files.
-        //
-        Path finalOuts = new Path(testdir, "finalouts");        
-        fs.delete(finalOuts);
-        JobConf mergeJob = new JobConf(conf,TestMapRed.class);
-        mergeJob.setInputPath(intermediateOuts);
-        mergeJob.setInputFormat(SequenceFileInputFormat.class);
-        mergeJob.setMapperClass(MergeMapper.class);
+  /**
+   * 
+   */
+  public static void launch() throws Exception {
+    //
+    // Generate distribution of ints.  This is the answer key.
+    //
+    JobConf conf = new JobConf(TestMapRed.class);
+    int countsToGo = counts;
+    int dist[] = new int[range];
+    for (int i = 0; i < range; i++) {
+      double avgInts = (1.0 * countsToGo) / (range - i);
+      dist[i] = (int) Math.max(0, Math.round(avgInts + (Math.sqrt(avgInts) * r.nextGaussian())));
+      countsToGo -= dist[i];
+    }
+    if (countsToGo > 0) {
+      dist[dist.length-1] += countsToGo;
+    }
+
+    //
+    // Write the answer key to a file.  
+    //
+    FileSystem fs = FileSystem.get(conf);
+    Path testdir = new Path("mapred.loadtest");
+    if (!fs.mkdirs(testdir)) {
+      throw new IOException("Mkdirs failed to create " + testdir.toString());
+    }
+
+    Path randomIns = new Path(testdir, "genins");
+    if (!fs.mkdirs(randomIns)) {
+      throw new IOException("Mkdirs failed to create " + randomIns.toString());
+    }
+
+    Path answerkey = new Path(randomIns, "answer.key");
+    SequenceFile.Writer out = 
+      SequenceFile.createWriter(fs, conf, answerkey, IntWritable.class,
+                                IntWritable.class, 
+                                SequenceFile.CompressionType.NONE);
+    try {
+      for (int i = 0; i < range; i++) {
+        out.append(new IntWritable(i), new IntWritable(dist[i]));
+      }
+    } finally {
+      out.close();
+    }
+
+    //
+    // Now we need to generate the random numbers according to
+    // the above distribution.
+    //
+    // We create a lot of map tasks, each of which takes at least
+    // one "line" of the distribution.  (That is, a certain number
+    // X is to be generated Y number of times.)
+    //
+    // A map task emits Y key/val pairs.  The val is X.  The key
+    // is a randomly-generated number.
+    //
+    // The reduce task gets its input sorted by key.  That is, sorted
+    // in random order.  It then emits a single line of text that
+    // for the given values.  It does not emit the key.
+    //
+    // Because there's just one reduce task, we emit a single big
+    // file of random numbers.
+    //
+    Path randomOuts = new Path(testdir, "genouts");
+    fs.delete(randomOuts);
+
+
+    JobConf genJob = new JobConf(conf,TestMapRed.class);
+    genJob.setInputPath(randomIns);
+    genJob.setInputFormat(SequenceFileInputFormat.class);
+    genJob.setMapperClass(RandomGenMapper.class);
+
+    genJob.setOutputPath(randomOuts);
+    genJob.setOutputKeyClass(IntWritable.class);
+    genJob.setOutputValueClass(IntWritable.class);
+    genJob.setOutputFormat(TextOutputFormat.class);
+    genJob.setReducerClass(RandomGenReducer.class);
+    genJob.setNumReduceTasks(1);
+
+    JobClient.runJob(genJob);
+
+    //
+    // Next, we read the big file in and regenerate the 
+    // original map.  It's split into a number of parts.
+    // (That number is 'intermediateReduces'.)
+    //
+    // We have many map tasks, each of which read at least one
+    // of the output numbers.  For each number read in, the
+    // map task emits a key/value pair where the key is the
+    // number and the value is "1".
+    //
+    // We have a single reduce task, which receives its input
+    // sorted by the key emitted above.  For each key, there will
+    // be a certain number of "1" values.  The reduce task sums
+    // these values to compute how many times the given key was
+    // emitted.
+    //
+    // The reduce task then emits a key/val pair where the key
+    // is the number in question, and the value is the number of
+    // times the key was emitted.  This is the same format as the
+    // original answer key (except that numbers emitted zero times
+    // will not appear in the regenerated key.)  The answer set
+    // is split into a number of pieces.  A final MapReduce job
+    // will merge them.
+    //
+    // There's not really a need to go to 10 reduces here 
+    // instead of 1.  But we want to test what happens when
+    // you have multiple reduces at once.
+    //
+    int intermediateReduces = 10;
+    Path intermediateOuts = new Path(testdir, "intermediateouts");
+    fs.delete(intermediateOuts);
+    JobConf checkJob = new JobConf(conf,TestMapRed.class);
+    checkJob.setInputPath(randomOuts);
+    checkJob.setInputFormat(TextInputFormat.class);
+    checkJob.setMapperClass(RandomCheckMapper.class);
+
+    checkJob.setOutputPath(intermediateOuts);
+    checkJob.setOutputKeyClass(IntWritable.class);
+    checkJob.setOutputValueClass(IntWritable.class);
+    checkJob.setOutputFormat(MapFileOutputFormat.class);
+    checkJob.setReducerClass(RandomCheckReducer.class);
+    checkJob.setNumReduceTasks(intermediateReduces);
+
+    JobClient.runJob(checkJob);
+
+    //
+    // OK, now we take the output from the last job and
+    // merge it down to a single file.  The map() and reduce()
+    // functions don't really do anything except reemit tuples.
+    // But by having a single reduce task here, we end up merging
+    // all the files.
+    //
+    Path finalOuts = new Path(testdir, "finalouts");        
+    fs.delete(finalOuts);
+    JobConf mergeJob = new JobConf(conf,TestMapRed.class);
+    mergeJob.setInputPath(intermediateOuts);
+    mergeJob.setInputFormat(SequenceFileInputFormat.class);
+    mergeJob.setMapperClass(MergeMapper.class);
         
-        mergeJob.setOutputPath(finalOuts);
-        mergeJob.setOutputKeyClass(IntWritable.class);
-        mergeJob.setOutputValueClass(IntWritable.class);
-        mergeJob.setOutputFormat(SequenceFileOutputFormat.class);
-        mergeJob.setReducerClass(MergeReducer.class);
-        mergeJob.setNumReduceTasks(1);
+    mergeJob.setOutputPath(finalOuts);
+    mergeJob.setOutputKeyClass(IntWritable.class);
+    mergeJob.setOutputValueClass(IntWritable.class);
+    mergeJob.setOutputFormat(SequenceFileOutputFormat.class);
+    mergeJob.setReducerClass(MergeReducer.class);
+    mergeJob.setNumReduceTasks(1);
         
-        JobClient.runJob(mergeJob);
+    JobClient.runJob(mergeJob);
         
  
-        //
-        // Finally, we compare the reconstructed answer key with the
-        // original one.  Remember, we need to ignore zero-count items
-        // in the original key.
-        //
-        boolean success = true;
-        Path recomputedkey = new Path(finalOuts, "part-00000");
-        SequenceFile.Reader in = new SequenceFile.Reader(fs, recomputedkey, conf);
-        int totalseen = 0;
-        try {
-            IntWritable key = new IntWritable();
-            IntWritable val = new IntWritable();            
-            for (int i = 0; i < range; i++) {
-                if (dist[i] == 0) {
-                    continue;
-                }
-                if (! in.next(key, val)) {
-                    System.err.println("Cannot read entry " + i);
-                    success = false;
-                    break;
-                } else {
-                    if ( !((key.get() == i ) && (val.get() == dist[i]))) {
-                        System.err.println("Mismatch!  Pos=" + key.get() + ", i=" + i + ", val=" + val.get() + ", dist[i]=" + dist[i]);
-                        success = false;
-                    }
-                    totalseen += val.get();
-                }
-            }
-            if (success) {
-                if (in.next(key, val)) {
-                    System.err.println("Unnecessary lines in recomputed key!");
-                    success = false;
-                }
-            }
-        } finally {
-            in.close();
-        }
-        int originalTotal = 0;
-        for (int i = 0; i < dist.length; i++) {
-            originalTotal += dist[i];
-        }
-        System.out.println("Original sum: " + originalTotal);
-        System.out.println("Recomputed sum: " + totalseen);
-
-        //
-        // Write to "results" whether the test succeeded or not.
-        //
-        Path resultFile = new Path(testdir, "results");
-        BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fs.create(resultFile)));
-        try {
-            bw.write("Success=" + success + "\n");
-            System.out.println("Success=" + success);            
-        } finally {
-            bw.close();
-        }
-	fs.delete(testdir);
-    }
-
-    /**
-     * Launches all the tasks in order.
-     */
-    public static void main(String[] argv) throws Exception {
-        if (argv.length < 2) {
-            System.err.println("Usage: TestMapRed <range> <counts>");
-            System.err.println();
-            System.err.println("Note: a good test will have a <counts> value that is substantially larger than the <range>");
-            return;
+    //
+    // Finally, we compare the reconstructed answer key with the
+    // original one.  Remember, we need to ignore zero-count items
+    // in the original key.
+    //
+    boolean success = true;
+    Path recomputedkey = new Path(finalOuts, "part-00000");
+    SequenceFile.Reader in = new SequenceFile.Reader(fs, recomputedkey, conf);
+    int totalseen = 0;
+    try {
+      IntWritable key = new IntWritable();
+      IntWritable val = new IntWritable();            
+      for (int i = 0; i < range; i++) {
+        if (dist[i] == 0) {
+          continue;
+        }
+        if (! in.next(key, val)) {
+          System.err.println("Cannot read entry " + i);
+          success = false;
+          break;
+        } else {
+          if ( !((key.get() == i ) && (val.get() == dist[i]))) {
+            System.err.println("Mismatch!  Pos=" + key.get() + ", i=" + i + ", val=" + val.get() + ", dist[i]=" + dist[i]);
+            success = false;
+          }
+          totalseen += val.get();
         }
-
-        int i = 0;
-        range = Integer.parseInt(argv[i++]);
-        counts = Integer.parseInt(argv[i++]);
-	      launch();
-    }
+      }
+      if (success) {
+        if (in.next(key, val)) {
+          System.err.println("Unnecessary lines in recomputed key!");
+          success = false;
+        }
+      }
+    } finally {
+      in.close();
+    }
+    int originalTotal = 0;
+    for (int i = 0; i < dist.length; i++) {
+      originalTotal += dist[i];
+    }
+    System.out.println("Original sum: " + originalTotal);
+    System.out.println("Recomputed sum: " + totalseen);
+
+    //
+    // Write to "results" whether the test succeeded or not.
+    //
+    Path resultFile = new Path(testdir, "results");
+    BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fs.create(resultFile)));
+    try {
+      bw.write("Success=" + success + "\n");
+      System.out.println("Success=" + success);            
+    } finally {
+      bw.close();
+    }
+    fs.delete(testdir);
+  }
+
+  /**
+   * Launches all the tasks in order.
+   */
+  public static void main(String[] argv) throws Exception {
+    if (argv.length < 2) {
+      System.err.println("Usage: TestMapRed <range> <counts>");
+      System.err.println();
+      System.err.println("Note: a good test will have a <counts> value that is substantially larger than the <range>");
+      return;
+    }
+
+    int i = 0;
+    range = Integer.parseInt(argv[i++]);
+    counts = Integer.parseInt(argv[i++]);
+    launch();
+  }
     
-    public void testSmallInput(){
-      runJob(100);
-    }
+  public void testSmallInput(){
+    runJob(100);
+  }
+
+  public void testBiggerInput(){
+    runJob(1000);
+  }
 
-    public void testBiggerInput(){
-      runJob(1000);
-    }
-
-    public void runJob(int items) {
-      try {
-        JobConf conf = new JobConf(TestMapRed.class);
-        Path testdir = new Path("build/test/test.mapred.spill");
-        Path inDir = new Path(testdir, "in");
-        Path outDir = new Path(testdir, "out");
-        FileSystem fs = FileSystem.get(conf);
-        fs.delete(testdir);
-        conf.setInt("io.sort.mb", 1);
-        conf.setInputFormat(SequenceFileInputFormat.class);
-        conf.setInputPath(inDir);
-        conf.setOutputPath(outDir);
-        conf.setMapperClass(IdentityMapper.class);
-        conf.setReducerClass(IdentityReducer.class);
-        conf.setOutputKeyClass(Text.class);
-        conf.setOutputValueClass(Text.class);
-        conf.setOutputFormat(SequenceFileOutputFormat.class);
-        if (!fs.mkdirs(testdir)) {
-          throw new IOException("Mkdirs failed to create " + testdir.toString());
-        }
-        if (!fs.mkdirs(inDir)) {
-          throw new IOException("Mkdirs failed to create " + inDir.toString());
-        }
-        Path inFile = new Path(inDir, "part0");
-        SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, inFile,
-            Text.class, Text.class);
-
-        StringBuffer content = new StringBuffer();
+  public void runJob(int items) {
+    try {
+      JobConf conf = new JobConf(TestMapRed.class);
+      Path testdir = new Path("build/test/test.mapred.spill");
+      Path inDir = new Path(testdir, "in");
+      Path outDir = new Path(testdir, "out");
+      FileSystem fs = FileSystem.get(conf);
+      fs.delete(testdir);
+      conf.setInt("io.sort.mb", 1);
+      conf.setInputFormat(SequenceFileInputFormat.class);
+      conf.setInputPath(inDir);
+      conf.setOutputPath(outDir);
+      conf.setMapperClass(IdentityMapper.class);
+      conf.setReducerClass(IdentityReducer.class);
+      conf.setOutputKeyClass(Text.class);
+      conf.setOutputValueClass(Text.class);
+      conf.setOutputFormat(SequenceFileOutputFormat.class);
+      if (!fs.mkdirs(testdir)) {
+        throw new IOException("Mkdirs failed to create " + testdir.toString());
+      }
+      if (!fs.mkdirs(inDir)) {
+        throw new IOException("Mkdirs failed to create " + inDir.toString());
+      }
+      Path inFile = new Path(inDir, "part0");
+      SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, inFile,
+                                                             Text.class, Text.class);
 
-        for (int i = 0; i < 1000; i++) {
-          content.append(i).append(": This is one more line of content\n");
-        }
+      StringBuffer content = new StringBuffer();
 
-        Text text = new Text(content.toString());
+      for (int i = 0; i < 1000; i++) {
+        content.append(i).append(": This is one more line of content\n");
+      }
 
-        for (int i = 0; i < items; i++) {
-          writer.append(new Text("rec:" + i), text);
-        }
-        writer.close();
+      Text text = new Text(content.toString());
 
-        JobClient.runJob(conf);
-      } catch (Exception e) {
-        fail("Threw exception:" + e);
+      for (int i = 0; i < items; i++) {
+        writer.append(new Text("rec:" + i), text);
       }
+      writer.close();
+
+      JobClient.runJob(conf);
+    } catch (Exception e) {
+      fail("Threw exception:" + e);
     }
+  }
 }

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRBringup.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRBringup.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRBringup.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRBringup.java Mon Apr 16 14:44:35 2007
@@ -28,13 +28,13 @@
  */
 public class TestMiniMRBringup extends TestCase {
 
-    public void testBringUp() throws IOException {
-      MiniMRCluster mr = null;
-      try {
-          mr = new MiniMRCluster(1, "local", 1);
-      } finally {
-          if (mr != null) { mr.shutdown(); }
-      }
+  public void testBringUp() throws IOException {
+    MiniMRCluster mr = null;
+    try {
+      mr = new MiniMRCluster(1, "local", 1);
+    } finally {
+      if (mr != null) { mr.shutdown(); }
+    }
   }
   
 }

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java Mon Apr 16 14:44:35 2007
@@ -35,12 +35,12 @@
 public class TestMiniMRClasspath extends TestCase {
   
   
-   static String launchWordCount(String fileSys,
-                                       String jobTracker,
-                                       JobConf conf,
-                                       String input,
-                                       int numMaps,
-                                       int numReduces) throws IOException {
+  static String launchWordCount(String fileSys,
+                                String jobTracker,
+                                JobConf conf,
+                                String input,
+                                int numMaps,
+                                int numReduces) throws IOException {
     final Path inDir = new Path("/testing/wc/input");
     final Path outDir = new Path("/testing/wc/output");
     FileSystem fs = FileSystem.getNamed(fileSys, conf);
@@ -92,8 +92,8 @@
     return result.toString();
   }
 
-   static String launchExternal(String fileSys, String jobTracker, JobConf conf,
-    String input, int numMaps, int numReduces)
+  static String launchExternal(String fileSys, String jobTracker, JobConf conf,
+                               String input, int numMaps, int numReduces)
     throws IOException {
 
     final Path inDir = new Path("/testing/ext/input");
@@ -134,7 +134,7 @@
     Path[] fileList = fs.listPaths(outDir);
     for (int i = 0; i < fileList.length; ++i) {
       BufferedReader file = new BufferedReader(new InputStreamReader(
-        fs.open(fileList[i])));
+                                                                     fs.open(fileList[i])));
       String line = file.readLine();
       while (line != null) {
         result.append(line);
@@ -148,35 +148,35 @@
   }
    
   public void testClassPath() throws IOException {
-      String namenode = null;
-      MiniDFSCluster dfs = null;
-      MiniMRCluster mr = null;
-      FileSystem fileSys = null;
-      try {
-          final int taskTrackers = 4;
-          final int jobTrackerPort = 60050;
-
-          Configuration conf = new Configuration();
-          dfs = new MiniDFSCluster(conf, 1, true, null);
-          fileSys = dfs.getFileSystem();
-          namenode = fileSys.getName();
-          mr = new MiniMRCluster(taskTrackers, namenode, 3);
-          JobConf jobConf = new JobConf();
-          String result;
-          final String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
-          result = launchWordCount(namenode, jobTrackerName, jobConf, 
-                                   "The quick brown fox\nhas many silly\n" + 
-                                   "red fox sox\n",
-                                   3, 1);
-          assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" +
-                       "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result);
+    String namenode = null;
+    MiniDFSCluster dfs = null;
+    MiniMRCluster mr = null;
+    FileSystem fileSys = null;
+    try {
+      final int taskTrackers = 4;
+      final int jobTrackerPort = 60050;
+
+      Configuration conf = new Configuration();
+      dfs = new MiniDFSCluster(conf, 1, true, null);
+      fileSys = dfs.getFileSystem();
+      namenode = fileSys.getName();
+      mr = new MiniMRCluster(taskTrackers, namenode, 3);
+      JobConf jobConf = new JobConf();
+      String result;
+      final String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
+      result = launchWordCount(namenode, jobTrackerName, jobConf, 
+                               "The quick brown fox\nhas many silly\n" + 
+                               "red fox sox\n",
+                               3, 1);
+      assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" +
+                   "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result);
           
-      } finally {
-          if (fileSys != null) { fileSys.close(); }
-          if (dfs != null) { dfs.shutdown(); }
-          if (mr != null) { mr.shutdown();
-          }
+    } finally {
+      if (fileSys != null) { fileSys.close(); }
+      if (dfs != null) { dfs.shutdown(); }
+      if (mr != null) { mr.shutdown();
       }
+    }
   }
   
   public void testExternalWritable()
@@ -201,8 +201,8 @@
       final String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
       
       result = launchExternal(namenode, jobTrackerName, jobConf, 
-                               "Dennis was here!\nDennis again!",
-                               3, 1);
+                              "Dennis was here!\nDennis again!",
+                              3, 1);
       assertEquals("Dennis again!\t1\nDennis was here!\t1\n", result);
       
     } 

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java Mon Apr 16 14:44:35 2007
@@ -45,7 +45,7 @@
                                             "/cachedir",
                                             mr.createJobConf(),
                                             "The quick brown fox\nhas many silly\n"
-                                                + "red fox sox\n");
+                                            + "red fox sox\n");
       assertTrue("Archives not matching", ret);
     } finally {
       if (fileSys != null) {

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java Mon Apr 16 14:44:35 2007
@@ -46,34 +46,34 @@
     .toString().replace(' ', '+');
     
   public void testWithLocal() throws IOException {
-      MiniMRCluster mr = null;
-      try {
-          mr = new MiniMRCluster(2, "local", 3);
-          double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, 
-                                               mr.createJobConf());
-          double error = Math.abs(Math.PI - estimate);
-          assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));
-          // run the wordcount example with caching
-          JobConf job = mr.createJobConf();
-          boolean ret = MRCaching.launchMRCache(TEST_ROOT_DIR + "/wc/input",
-                                                TEST_ROOT_DIR + "/wc/output", 
-                                                TEST_ROOT_DIR + "/cachedir",
-                                                job,
-                                                "The quick brown fox\n" 
-                                                + "has many silly\n"
-                                                + "red fox sox\n");
-          // assert the number of lines read during caching
-          assertTrue("Failed test archives not matching", ret);
-          // test the task report fetchers
-          JobClient client = new JobClient(job);
-          TaskReport[] reports = client.getMapTaskReports("job_0001");
-          assertEquals("number of maps", 10, reports.length);
-          reports = client.getReduceTaskReports("job_0001");
-          assertEquals("number of reduces", 1, reports.length);
-          runCustomFormats(mr);
-      } finally {
-          if (mr != null) { mr.shutdown(); }
-      }
+    MiniMRCluster mr = null;
+    try {
+      mr = new MiniMRCluster(2, "local", 3);
+      double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, 
+                                           mr.createJobConf());
+      double error = Math.abs(Math.PI - estimate);
+      assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));
+      // run the wordcount example with caching
+      JobConf job = mr.createJobConf();
+      boolean ret = MRCaching.launchMRCache(TEST_ROOT_DIR + "/wc/input",
+                                            TEST_ROOT_DIR + "/wc/output", 
+                                            TEST_ROOT_DIR + "/cachedir",
+                                            job,
+                                            "The quick brown fox\n" 
+                                            + "has many silly\n"
+                                            + "red fox sox\n");
+      // assert the number of lines read during caching
+      assertTrue("Failed test archives not matching", ret);
+      // test the task report fetchers
+      JobClient client = new JobClient(job);
+      TaskReport[] reports = client.getMapTaskReports("job_0001");
+      assertEquals("number of maps", 10, reports.length);
+      reports = client.getReduceTaskReports("job_0001");
+      assertEquals("number of reduces", 1, reports.length);
+      runCustomFormats(mr);
+    } finally {
+      if (mr != null) { mr.shutdown(); }
+    }
   }
   
   private void runCustomFormats(MiniMRCluster mr) throws IOException {
@@ -114,13 +114,13 @@
   
   private static class MyInputFormat implements InputFormat {
     static final String[] data = new String[]{
-                                              "crocodile pants", 
-                                              "aunt annie", 
-                                              "eggs",
-                                              "finagle the agent",
-                                              "bumble boat", 
-                                              "duck-dog",
-                                              };
+      "crocodile pants", 
+      "aunt annie", 
+      "eggs",
+      "finagle the agent",
+      "bumble boat", 
+      "duck-dog",
+    };
 
     private static class MySplit implements InputSplit {
       int first;

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Mon Apr 16 14:44:35 2007
@@ -118,9 +118,9 @@
       LOG.debug("Tracker directory: " + localDir);
       File trackerDir = new File(localDir, "taskTracker");
       assertTrue("local dir " + localDir + " does not exist.", 
-                   localDir.isDirectory());
+                 localDir.isDirectory());
       assertTrue("task tracker dir " + trackerDir + " does not exist.", 
-                   trackerDir.isDirectory());
+                 trackerDir.isDirectory());
       String contents[] = localDir.list();
       String trackerContents[] = trackerDir.list();
       for(int j=0; j < contents.length; ++j) {
@@ -150,44 +150,44 @@
   }
   
   public void testWithDFS() throws IOException {
-      MiniDFSCluster dfs = null;
-      MiniMRCluster mr = null;
-      FileSystem fileSys = null;
-      try {
-          final int taskTrackers = 4;
+    MiniDFSCluster dfs = null;
+    MiniMRCluster mr = null;
+    FileSystem fileSys = null;
+    try {
+      final int taskTrackers = 4;
 
-          Configuration conf = new Configuration();
-          dfs = new MiniDFSCluster(conf, 4, true, null);
-          fileSys = dfs.getFileSystem();
-          mr = new MiniMRCluster(taskTrackers, fileSys.getName(), 1);
-          double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, 
-                                               mr.createJobConf());
-          double error = Math.abs(Math.PI - estimate);
-          assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));
-          checkTaskDirectories(mr, new String[]{}, new String[]{});
+      Configuration conf = new Configuration();
+      dfs = new MiniDFSCluster(conf, 4, true, null);
+      fileSys = dfs.getFileSystem();
+      mr = new MiniMRCluster(taskTrackers, fileSys.getName(), 1);
+      double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, 
+                                           mr.createJobConf());
+      double error = Math.abs(Math.PI - estimate);
+      assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));
+      checkTaskDirectories(mr, new String[]{}, new String[]{});
           
-          // Run a word count example
-          JobConf jobConf = mr.createJobConf();
-          // Keeping tasks that match this pattern
-          jobConf.setKeepTaskFilesPattern("task_[0-9]*_m_000001_.*");
-          String result;
-          result = launchWordCount(jobConf, 
-                                   "The quick brown fox\nhas many silly\n" + 
-                                   "red fox sox\n",
-                                   3, 1);
-          assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" +
-                       "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result);
-          checkTaskDirectories(mr, new String[]{"job_0002"}, new String[]{"task_0002_m_000001_0"});
-          // test with maps=0
-          jobConf = mr.createJobConf();
-          result = launchWordCount(jobConf, "owen is oom", 0, 1);
-          assertEquals("is\t1\noom\t1\nowen\t1\n", result);
-      } finally {
-          if (fileSys != null) { fileSys.close(); }
-          if (dfs != null) { dfs.shutdown(); }
-          if (mr != null) { mr.shutdown();
-          }
+      // Run a word count example
+      JobConf jobConf = mr.createJobConf();
+      // Keeping tasks that match this pattern
+      jobConf.setKeepTaskFilesPattern("task_[0-9]*_m_000001_.*");
+      String result;
+      result = launchWordCount(jobConf, 
+                               "The quick brown fox\nhas many silly\n" + 
+                               "red fox sox\n",
+                               3, 1);
+      assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" +
+                   "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result);
+      checkTaskDirectories(mr, new String[]{"job_0002"}, new String[]{"task_0002_m_000001_0"});
+      // test with maps=0
+      jobConf = mr.createJobConf();
+      result = launchWordCount(jobConf, "owen is oom", 0, 1);
+      assertEquals("is\t1\noom\t1\nowen\t1\n", result);
+    } finally {
+      if (fileSys != null) { fileSys.close(); }
+      if (dfs != null) { dfs.shutdown(); }
+      if (mr != null) { mr.shutdown();
       }
+    }
   }
   
 }

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFilter.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFilter.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFilter.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFilter.java Mon Apr 16 14:44:35 2007
@@ -41,67 +41,67 @@
   private static final Reporter reporter = Reporter.NULL;
   
   static {
-      job.setInputPath(inDir);
-      try {
-        fs = FileSystem.getLocal(conf);
+    job.setInputPath(inDir);
+    try {
+      fs = FileSystem.getLocal(conf);
     } catch (IOException e) {
-        e.printStackTrace();
-        throw new RuntimeException(e);
+      e.printStackTrace();
+      throw new RuntimeException(e);
     }
   }
 
   private static void createSequenceFile(int numRecords) throws Exception {
-      // create a file with length entries
-      SequenceFile.Writer writer =
-          SequenceFile.createWriter(fs, conf, inFile,
-                                    Text.class, BytesWritable.class);
-      try {
-          for (int i = 1; i <= numRecords; i++) {
-              Text key = new Text(Integer.toString(i));
-              byte[] data = new byte[random.nextInt(10)];
-              random.nextBytes(data);
-              BytesWritable value = new BytesWritable(data);
-              writer.append(key, value);
-          }
-      } finally {
-          writer.close();
+    // create a file with length entries
+    SequenceFile.Writer writer =
+      SequenceFile.createWriter(fs, conf, inFile,
+                                Text.class, BytesWritable.class);
+    try {
+      for (int i = 1; i <= numRecords; i++) {
+        Text key = new Text(Integer.toString(i));
+        byte[] data = new byte[random.nextInt(10)];
+        random.nextBytes(data);
+        BytesWritable value = new BytesWritable(data);
+        writer.append(key, value);
       }
+    } finally {
+      writer.close();
+    }
   }
 
 
   private int countRecords(int numSplits) throws IOException {
-      InputFormat format = new SequenceFileInputFilter();
-      Text key = new Text();
-      BytesWritable value = new BytesWritable();
-      if(numSplits==0) {
-        numSplits =
-            random.nextInt(MAX_LENGTH/(SequenceFile.SYNC_INTERVAL/20))+1;
-      }
-      InputSplit[] splits = format.getSplits(job, numSplits);
+    InputFormat format = new SequenceFileInputFilter();
+    Text key = new Text();
+    BytesWritable value = new BytesWritable();
+    if(numSplits==0) {
+      numSplits =
+        random.nextInt(MAX_LENGTH/(SequenceFile.SYNC_INTERVAL/20))+1;
+    }
+    InputSplit[] splits = format.getSplits(job, numSplits);
       
-      // check each split
-      int count = 0;
-      LOG.info("Generated " + splits.length + " splits.");
-      for (int j = 0; j < splits.length; j++) {
-          RecordReader reader =
-              format.getRecordReader(splits[j], job, reporter);
-          try {
-              while (reader.next(key, value)) {
-                  LOG.info("Accept record "+key.toString());
-                  count++;
-              }
-          } finally {
-              reader.close();
-          }
+    // check each split
+    int count = 0;
+    LOG.info("Generated " + splits.length + " splits.");
+    for (int j = 0; j < splits.length; j++) {
+      RecordReader reader =
+        format.getRecordReader(splits[j], job, reporter);
+      try {
+        while (reader.next(key, value)) {
+          LOG.info("Accept record "+key.toString());
+          count++;
+        }
+      } finally {
+        reader.close();
       }
-      return count;
+    }
+    return count;
   }
   
   public void testRegexFilter() throws Exception {
     // set the filter class
     LOG.info("Testing Regex Filter with patter: \\A10*");
     SequenceFileInputFilter.setFilterClass(job, 
-            SequenceFileInputFilter.RegexFilter.class);
+                                           SequenceFileInputFilter.RegexFilter.class);
     SequenceFileInputFilter.RegexFilter.setPattern(job, "\\A10*");
     
     // clean input dir
@@ -109,11 +109,11 @@
   
     // for a variety of lengths
     for (int length = 1; length < MAX_LENGTH;
-               length+= random.nextInt(MAX_LENGTH/10)+1) {
-        LOG.info("******Number of records: "+length);
-        createSequenceFile(length);
-        int count = countRecords(0);
-        assertEquals(count, length==0?0:(int)Math.log10(length)+1);
+         length+= random.nextInt(MAX_LENGTH/10)+1) {
+      LOG.info("******Number of records: "+length);
+      createSequenceFile(length);
+      int count = countRecords(0);
+      assertEquals(count, length==0?0:(int)Math.log10(length)+1);
     }
     
     // clean up
@@ -121,52 +121,52 @@
   }
 
   public void testPercentFilter() throws Exception {
-      LOG.info("Testing Percent Filter with frequency: 1000");
-      // set the filter class
-      SequenceFileInputFilter.setFilterClass(job, 
-              SequenceFileInputFilter.PercentFilter.class);
-      SequenceFileInputFilter.PercentFilter.setFrequency(job, 1000);
+    LOG.info("Testing Percent Filter with frequency: 1000");
+    // set the filter class
+    SequenceFileInputFilter.setFilterClass(job, 
+                                           SequenceFileInputFilter.PercentFilter.class);
+    SequenceFileInputFilter.PercentFilter.setFrequency(job, 1000);
       
-      // clean input dir
-      fs.delete(inDir);
+    // clean input dir
+    fs.delete(inDir);
     
-      // for a variety of lengths
-      for (int length = 0; length < MAX_LENGTH;
-                 length+= random.nextInt(MAX_LENGTH/10)+1) {
-          LOG.info("******Number of records: "+length);
-          createSequenceFile(length);
-          int count = countRecords(1);
-          LOG.info("Accepted "+count+" records");
-          int expectedCount = length/1000;
-          if(expectedCount*1000!=length)
-              expectedCount++;
-          assertEquals(count, expectedCount);
-      }
+    // for a variety of lengths
+    for (int length = 0; length < MAX_LENGTH;
+         length+= random.nextInt(MAX_LENGTH/10)+1) {
+      LOG.info("******Number of records: "+length);
+      createSequenceFile(length);
+      int count = countRecords(1);
+      LOG.info("Accepted "+count+" records");
+      int expectedCount = length/1000;
+      if(expectedCount*1000!=length)
+        expectedCount++;
+      assertEquals(count, expectedCount);
+    }
       
-      // clean up
-      fs.delete(inDir);
+    // clean up
+    fs.delete(inDir);
   }
   
   public void testMD5Filter() throws Exception {
-      // set the filter class
-      LOG.info("Testing MD5 Filter with frequency: 1000");
-      SequenceFileInputFilter.setFilterClass(job, 
-              SequenceFileInputFilter.MD5Filter.class);
-      SequenceFileInputFilter.MD5Filter.setFrequency(job, 1000);
+    // set the filter class
+    LOG.info("Testing MD5 Filter with frequency: 1000");
+    SequenceFileInputFilter.setFilterClass(job, 
+                                           SequenceFileInputFilter.MD5Filter.class);
+    SequenceFileInputFilter.MD5Filter.setFrequency(job, 1000);
       
-      // clean input dir
-      fs.delete(inDir);
+    // clean input dir
+    fs.delete(inDir);
     
-      // for a variety of lengths
-      for (int length = 0; length < MAX_LENGTH;
-                 length+= random.nextInt(MAX_LENGTH/10)+1) {
-          LOG.info("******Number of records: "+length);
-          createSequenceFile(length);
-          LOG.info("Accepted "+countRecords(0)+" records");
-      }
-      // clean up
-      fs.delete(inDir);
+    // for a variety of lengths
+    for (int length = 0; length < MAX_LENGTH;
+         length+= random.nextInt(MAX_LENGTH/10)+1) {
+      LOG.info("******Number of records: "+length);
+      createSequenceFile(length);
+      LOG.info("Accepted "+countRecords(0)+" records");
     }
+    // clean up
+    fs.delete(inDir);
+  }
 
   public static void main(String[] args) throws Exception {
     TestSequenceFileInputFilter filter = new TestSequenceFileInputFilter();

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java Mon Apr 16 14:44:35 2007
@@ -30,7 +30,7 @@
 
 public class TestTextOutputFormat extends TestCase {
   private static final Log LOG = LogFactory.getLog(TestTextOutputFormat.class
-      .getName());
+                                                   .getName());
 
   private static JobConf defaultConf = new JobConf();
 
@@ -44,7 +44,7 @@
   }
 
   private static Path workDir = new Path(new Path(System.getProperty(
-      "test.build.data", "."), "data"), "TestTextOutputFormat");
+                                                                     "test.build.data", "."), "data"), "TestTextOutputFormat");
 
   public void testFormat() throws Exception {
     JobConf job = new JobConf();
@@ -56,7 +56,7 @@
 
     TextOutputFormat theOutputFormat = new TextOutputFormat();
     RecordWriter theRecodWriter = theOutputFormat.getRecordWriter(localFs, job,
-        file, reporter);
+                                                                  file, reporter);
 
     Text key1 = new Text("key1");
     Text key2 = new Text("key2");



Mime
View raw message