hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r511985 - in /lucene/hadoop/trunk: CHANGES.txt src/examples/org/apache/hadoop/examples/RandomWriter.java
Date Mon, 26 Feb 2007 20:13:16 GMT
Author: cutting
Date: Mon Feb 26 12:13:15 2007
New Revision: 511985

URL: http://svn.apache.org/viewvc?view=rev&rev=511985
Log:
HADOOP-1040.  Update RandomWriter example to use counters and user-defined input and output
formats.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=511985&r1=511984&r2=511985
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Feb 26 12:13:15 2007
@@ -138,6 +138,9 @@
 40. HADOOP-1039.  In HDFS's TestCheckpoint, avoid restarting
     MiniDFSCluster so often, speeding this test.  (Dhruba Borthakur via cutting)
 
+41. HADOOP-1040.  Update RandomWriter example to use counters and
+    user-defined input and output formats.  (omalley via cutting)
+
 
 Release 0.11.2 - 2007-02-16
 

Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java?view=diff&rev=511985&r1=511984&r2=511985
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java (original)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java Mon Feb
26 12:13:15 2007
@@ -19,29 +19,16 @@
 package org.apache.hadoop.examples;
 
 import java.io.IOException;
-import java.text.NumberFormat;
 import java.util.Date;
-import java.util.Iterator;
 import java.util.Random;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.*;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.mapred.ClusterStatus;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.util.Progressable;
 
 /**
  * This program uses map/reduce to just run a distributed job where there is
@@ -50,9 +37,91 @@
  * 
  * @author Owen O'Malley
  */
-public class RandomWriter extends MapReduceBase implements Reducer {
+public class RandomWriter {
   
-  public static class Map extends MapReduceBase implements Mapper {
+  /**
+   * User counters
+   */
+  static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN }
+  
+  /**
+   * A custom input format that creates virtual inputs of a single string
+   * for each map.
+   */
+  static class RandomInputFormat implements InputFormat {
+    
+    /** Accept all job confs */
+    public void validateInput(JobConf job) throws IOException {
+    }
+
+    /** 
+     * Generate the requested number of file splits, with the filename
+     * set to the filename of the output file.
+     */
+    public InputSplit[] getSplits(JobConf job, 
+                                  int numSplits) throws IOException {
+      InputSplit[] result = new InputSplit[numSplits];
+      Path outDir = job.getOutputPath();
+      for(int i=0; i < result.length; ++i) {
+        result[i] = new FileSplit(new Path(outDir, "part-" + i), 0, 1, job);
+      }
+      return result;
+    }
+
+    /**
+     * Return a single record (filename, "") where the filename is taken from
+     * the file split.
+     */
+    static class RandomRecordReader implements RecordReader {
+      Path name;
+      public RandomRecordReader(Path p) {
+        name = p;
+      }
+      public boolean next(Writable key, Writable value) {
+        if (name != null) {
+          ((Text) key).set(name.toString());
+          name = null;
+          return true;
+        }
+        return false;
+      }
+      public WritableComparable createKey() {
+        return new Text();
+      }
+      public Writable createValue() {
+        return new Text();
+      }
+      public long getPos() {
+        return 0;
+      }
+      public void close() {}
+      public float getProgress() {
+        return 0.0f;
+      }
+    }
+
+    public RecordReader getRecordReader(InputSplit split,
+                                        JobConf job, 
+                                        Reporter reporter) throws IOException {
+      return new RandomRecordReader(((FileSplit) split).getPath());
+    }
+  }
+
+  /**
+   * Consume all outputs and put them in /dev/null. 
+   */
+  static class DataSink implements OutputFormat {
+    public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, 
+                                        String name, Progressable progress) {
+      return new RecordWriter(){
+        public void write(WritableComparable key, Writable value) { }
+        public void close(Reporter reporter) { }
+      };
+    }
+    public void checkOutputSpecs(FileSystem ignored, JobConf job) { }
+  }
+
+  static class Map extends MapReduceBase implements Mapper {
     private FileSystem fileSys = null;
     private JobConf jobConf = null;
     private long numBytesToWrite;
@@ -77,7 +146,7 @@
                     Writable value,
                     OutputCollector output, 
                     Reporter reporter) throws IOException {
-      String filename = ((Text) value).toString();
+      String filename = ((Text) key).toString();
       SequenceFile.Writer writer = 
         SequenceFile.createWriter(fileSys, jobConf, new Path(filename), 
                                 BytesWritable.class, BytesWritable.class,
@@ -94,6 +163,8 @@
         randomizeBytes(randomValue.get(), 0, randomValue.getSize());
         writer.append(randomKey, randomValue);
         numBytesToWrite -= keyLength + valueLength;
+        reporter.incrCounter(Counters.BYTES_WRITTEN, keyLength + valueLength);
+        reporter.incrCounter(Counters.RECORDS_WRITTEN, 1);
         if (++itemCount % 200 == 0) {
           reporter.setStatus("wrote record " + itemCount + ". " + 
                              numBytesToWrite + " bytes left.");
@@ -126,13 +197,6 @@
     
   }
   
-  public void reduce(WritableComparable key, 
-                     Iterator values,
-                     OutputCollector output, 
-                     Reporter reporter) throws IOException {
-    // nothing
-  }
-  
   /**
    * This is the main routine for launching a distributed random write job.
    * It runs 10 maps/node and each node writes 1 gig of data to a DFS file.
@@ -149,78 +213,48 @@
    * @throws IOException 
    */
   public static void main(String[] args) throws IOException {
-    Configuration defaults = new Configuration();
     if (args.length == 0) {
       System.out.println("Usage: writer <out-dir> [<config>]");
       return;
     }
     Path outDir = new Path(args[0]);
+    JobConf job;
     if (args.length >= 2) {
-      defaults.addFinalResource(new Path(args[1]));
-    }
-    
-    JobConf jobConf = new JobConf(defaults, RandomWriter.class);
-    jobConf.setJobName("random-writer");
+      job = new JobConf(new Path(args[1]));
+    } else {
+      job = new JobConf();
+    }
+    job.setJarByClass(RandomWriter.class);
+    job.setJobName("random-writer");
+    job.setOutputPath(outDir);
     
     // turn off speculative execution, because DFS doesn't handle
     // multiple writers to the same file.
-    jobConf.setSpeculativeExecution(false);
-    jobConf.setOutputKeyClass(BytesWritable.class);
-    jobConf.setOutputValueClass(BytesWritable.class);
-    
-    jobConf.setMapperClass(Map.class);        
-    jobConf.setReducerClass(RandomWriter.class);
+    job.setSpeculativeExecution(false);
+    job.setOutputKeyClass(BytesWritable.class);
+    job.setOutputValueClass(BytesWritable.class);
+    
+    job.setInputFormat(RandomInputFormat.class);
+    job.setMapperClass(Map.class);        
+    job.setReducerClass(IdentityReducer.class);
+    job.setOutputFormat(DataSink.class);
     
-    JobClient client = new JobClient(jobConf);
+    JobClient client = new JobClient(job);
     ClusterStatus cluster = client.getClusterStatus();
     int numMaps = cluster.getTaskTrackers() * 
-         jobConf.getInt("test.randomwriter.maps_per_host", 10);
-    jobConf.setNumMapTasks(numMaps);
+         job.getInt("test.randomwriter.maps_per_host", 10);
+    job.setNumMapTasks(numMaps);
     System.out.println("Running " + numMaps + " maps.");
-    jobConf.setNumReduceTasks(1);
-    
-    Path tmpDir = new Path("random-work");
-    Path inDir = new Path(tmpDir, "in");
-    Path fakeOutDir = new Path(tmpDir, "out");
-    FileSystem fileSys = FileSystem.get(jobConf);
-    if (fileSys.exists(outDir)) {
-      System.out.println("Error: Output directory " + outDir + 
-                         " already exists.");
-      return;
-    }
-    fileSys.delete(tmpDir);
-    if (!fileSys.mkdirs(inDir)) {
-      System.out.println("Error: Mkdirs failed to create " + 
-                         inDir.toString());
-      return;
-    }
-    NumberFormat numberFormat = NumberFormat.getInstance();
-    numberFormat.setMinimumIntegerDigits(6);
-    numberFormat.setGroupingUsed(false);
-
-    for(int i=0; i < numMaps; ++i) {
-      Path file = new Path(inDir, "part"+i);
-      FSDataOutputStream writer = fileSys.create(file);
-      writer.writeBytes(outDir + "/part" + numberFormat.format(i)+ "\n");
-      writer.close();
-    }
-    jobConf.setInputPath(inDir);
-    jobConf.setOutputPath(fakeOutDir);
-    
-    // Uncomment to run locally in a single process
-    //job_conf.set("mapred.job.tracker", "local");
+    job.setNumReduceTasks(1);
     
     Date startTime = new Date();
     System.out.println("Job started: " + startTime);
-    try {
-      JobClient.runJob(jobConf);
-      Date endTime = new Date();
-      System.out.println("Job ended: " + endTime);
-      System.out.println("The job took " + 
-         (endTime.getTime() - startTime.getTime()) /1000 + " seconds.");
-    } finally {
-      fileSys.delete(tmpDir);
-    }
+    JobClient.runJob(job);
+    Date endTime = new Date();
+    System.out.println("Job ended: " + endTime);
+    System.out.println("The job took " + 
+                       (endTime.getTime() - startTime.getTime()) /1000 + 
+                       " seconds.");
   }
   
 }



Mime
View raw message