hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r581398 - in /lucene/hadoop/trunk: ./ conf/ src/java/org/apache/hadoop/io/ src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
Date Tue, 02 Oct 2007 21:45:45 GMT
Author: cutting
Date: Tue Oct  2 14:45:45 2007
New Revision: 581398

URL: http://svn.apache.org/viewvc?rev=581398&view=rev
Log:
HADOOP-1851.  Permit specification of map output compression type and codec, independent of
the final output's compression parameters.  Contributed by Arun.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/conf/hadoop-default.xml
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/MapFile.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=581398&r1=581397&r2=581398&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Oct  2 14:45:45 2007
@@ -70,6 +70,11 @@
     namenodes and rebalancing processes to communicate with a primary 
     namenode.  (Hairong Kuang via dhruba)
 
+    HADOOP-1851.  Permit specification of map output compression type
+    and codec, independent of the final output's compression
+    parameters.  (Arun C Murthy via cutting)
+
+
   OPTIMIZATIONS
 
     HADOOP-1910.  Reduce the number of RPCs that DistributedFileSystem.create()

Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?rev=581398&r1=581397&r2=581398&view=diff
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Tue Oct  2 14:45:45 2007
@@ -722,15 +722,22 @@
 <property>
   <name>mapred.output.compress</name>
   <value>false</value>
-  <description>Should the outputs of the reduces be compressed?
+  <description>Should the job outputs be compressed?
+  </description>
+</property>
+
+<property>
+  <name>mapred.output.compression.type</name>
+  <value>RECORD</value>
+  <description>If the job outputs are to compressed as SequenceFiles, how should
+               they be compressed? Should be one of NONE, RECORD or BLOCK.
   </description>
 </property>
 
 <property>
   <name>mapred.output.compression.codec</name>
   <value>org.apache.hadoop.io.compress.DefaultCodec</value>
-  <description>If the reduce outputs are compressed, how should they be 
-               compressed?
+  <description>If the job outputs are compressed, how should they be compressed?
   </description>
 </property>
 
@@ -739,6 +746,22 @@
   <value>false</value>
   <description>Should the outputs of the maps be compressed before being
                sent across the network. Uses SequenceFile compression.
+  </description>
+</property>
+
+<property>
+  <name>mapred.map.output.compression.type</name>
+  <value>RECORD</value>
+  <description>If the map outputs are to compressed, how should they
+               be compressed? Should be one of NONE, RECORD or BLOCK.
+  </description>
+</property>
+
+<property>
+  <name>mapred.map.output.compression.codec</name>
+  <value>org.apache.hadoop.io.compress.DefaultCodec</value>
+  <description>If the map outputs are compressed, how should they be 
+               compressed?
   </description>
 </property>
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/MapFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/MapFile.java?rev=581398&r1=581397&r2=581398&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/MapFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/MapFile.java Tue Oct  2 14:45:45 2007
@@ -24,6 +24,8 @@
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
 
 /** A file-based map from keys to values.
  * 
@@ -87,6 +89,16 @@
 
     /** Create the named map for keys of the named class. */
     public Writer(Configuration conf, FileSystem fs, String dirName,
+                  Class keyClass, Class valClass,
+                  CompressionType compress, CompressionCodec codec,
+                  Progressable progress)
+      throws IOException {
+      this(conf, fs, dirName, WritableComparator.get(keyClass), valClass,
+           compress, codec, progress);
+    }
+
+    /** Create the named map for keys of the named class. */
+    public Writer(Configuration conf, FileSystem fs, String dirName,
                   Class keyClass, Class valClass, CompressionType compress)
       throws IOException {
       this(conf, fs, dirName, WritableComparator.get(keyClass), valClass, compress);
@@ -112,6 +124,15 @@
                   SequenceFile.CompressionType compress,
                   Progressable progress)
       throws IOException {
+      this(conf, fs, dirName, comparator, valClass, 
+           compress, new DefaultCodec(), progress);
+    }
+    /** Create the named map using the named key comparator. */
+    public Writer(Configuration conf, FileSystem fs, String dirName,
+                  WritableComparator comparator, Class valClass,
+                  SequenceFile.CompressionType compress, CompressionCodec codec,
+                  Progressable progress)
+      throws IOException {
 
       this.comparator = comparator;
       this.lastKey = comparator.newKey();
@@ -126,7 +147,7 @@
       Class keyClass = comparator.getKeyClass();
       this.data =
         SequenceFile.createWriter
-        (fs, conf, dataFile, keyClass, valClass, compress, progress);
+        (fs, conf, dataFile, keyClass, valClass, compress, codec, progress);
       this.index =
         SequenceFile.createWriter
         (fs, conf, indexFile, keyClass, LongWritable.class,

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?rev=581398&r1=581397&r2=581398&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Tue Oct  2 14:45:45
2007
@@ -34,6 +34,7 @@
 import org.apache.hadoop.conf.Configuration;
 
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
 
 import org.apache.hadoop.mapred.lib.IdentityMapper;
@@ -296,6 +297,7 @@
   /**
    * Should the map outputs be compressed before transfer?
    * Uses the SequenceFile compression.
+   * @param compress should the map outputs be compressed?
    */
   public void setCompressMapOutput(boolean compress) {
     setBoolean("mapred.compress.map.output", compress);
@@ -303,60 +305,64 @@
   
   /**
    * Are the outputs of the maps be compressed?
-   * @return are they compressed?
+   * @return <code>true</code> if the outputs of the maps are to be compressed,
+   *         <code>false</code> otherwise
    */
   public boolean getCompressMapOutput() {
     return getBoolean("mapred.compress.map.output", false);
   }
 
   /**
-   * Set the compression type for the map outputs.
-   * @param style NONE, RECORD, or BLOCK to control how the map outputs are 
-   *        compressed
+   * Set the {@link CompressionType} for the map outputs.
+   * @param style the {@link CompressionType} to control how the map outputs  
+   *              are compressed
    */
-  public void setMapOutputCompressionType(SequenceFile.CompressionType style) {
-    set("map.output.compression.type", style.toString());
+  public void setMapOutputCompressionType(CompressionType style) {
+    set("mapred.map.output.compression.type", style.toString());
   }
   
   /**
-   * Get the compression type for the map outputs.
-   * @return the compression type, defaulting to job output compression type
+   * Get the {@link CompressionType} for the map outputs.
+   * @return the {@link CompressionType} for map outputs, defaulting to 
+   *         {@link CompressionType#RECORD} 
    */
-  public SequenceFile.CompressionType getMapOutputCompressionType() {
-    String val = get("map.output.compression.type", "RECORD");
-    return SequenceFile.CompressionType.valueOf(val);
+  public CompressionType getMapOutputCompressionType() {
+    String val = get("mapred.map.output.compression.type", 
+                     CompressionType.RECORD.toString());
+    return CompressionType.valueOf(val);
   }
-  
+
   /**
-   * Set the given class as the  compression codec for the map outputs.
-   * @param codecClass the CompressionCodec class that will compress the 
-   *                   map outputs
+   * Set the given class as the  {@link CompressionCodec} for the map outputs.
+   * @param codecClass the {@link CompressionCodec} class that will compress  
+   *                   the map outputs
    */
-  public void setMapOutputCompressorClass(Class<? extends CompressionCodec> codecClass)
{
-    setCompressMapOutput(true);
-    setClass("mapred.output.compression.codec", codecClass, 
+  public void 
+  setMapOutputCompressorClass(Class<? extends CompressionCodec> codecClass) {
+    setClass("mapred.map.output.compression.codec", codecClass, 
              CompressionCodec.class);
   }
   
   /**
-   * Get the codec for compressing the map outputs
-   * @param defaultValue the value to return if it is not set
-   * @return the CompressionCodec class that should be used to compress the 
-   *   map outputs
+   * Get the {@link CompressionCodec} for compressing the map outputs.
+   * @param defaultValue the {@link CompressionCodec} to return if not set
+   * @return the {@link CompressionCodec} class that should be used to compress the 
+   *         map outputs
    * @throws IllegalArgumentException if the class was specified, but not found
    */
-  public Class<? extends CompressionCodec> getMapOutputCompressorClass(Class<? extends
CompressionCodec> defaultValue) {
-    String name = get("mapred.output.compression.codec");
-    if (name == null) {
-      return defaultValue;
-    } else {
+  public Class<? extends CompressionCodec> 
+  getMapOutputCompressorClass(Class<? extends CompressionCodec> defaultValue) {
+    Class<? extends CompressionCodec> codecClass = defaultValue;
+    String name = get("mapred.map.output.compression.codec");
+    if (name != null) {
       try {
-        return getClassByName(name).asSubclass(CompressionCodec.class);
+        codecClass = getClassByName(name).asSubclass(CompressionCodec.class);
       } catch (ClassNotFoundException e) {
         throw new IllegalArgumentException("Compression codec " + name + 
                                            " was not found.", e);
       }
     }
+    return codecClass;
   }
   
   /**

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java?rev=581398&r1=581397&r2=581398&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java Tue Oct
 2 14:45:45 2007
@@ -28,8 +28,12 @@
 import org.apache.hadoop.io.MapFile;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /** An {@link OutputFormat} that writes {@link MapFile}s. */
 public class MapFileOutputFormat extends OutputFormatBase {
@@ -39,13 +43,25 @@
     throws IOException {
 
     Path file = new Path(job.getOutputPath(), name);
+    
+    CompressionCodec codec = null;
+    CompressionType compressionType = CompressionType.NONE;
+    if (getCompressOutput(job)) {
+      // find the kind of compression to do
+      compressionType = SequenceFileOutputFormat.getOutputCompressionType(job);
 
+      // find the right codec
+      Class codecClass = getOutputCompressorClass(job, DefaultCodec.class);
+      codec = (CompressionCodec) 
+        ReflectionUtils.newInstance(codecClass, job);
+    }
+    
     // ignore the progress parameter, since MapFile is local
     final MapFile.Writer out =
       new MapFile.Writer(job, file.getFileSystem(job), file.toString(),
                          job.getOutputKeyClass(),
                          job.getOutputValueClass(),
-                         SequenceFile.getCompressionType(job),
+                         compressionType, codec,
                          progress);
 
     return new RecordWriter() {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java?rev=581398&r1=581397&r2=581398&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java Tue Oct  2
14:45:45 2007
@@ -33,54 +33,62 @@
   implements OutputFormat<K, V> {
 
   /**
-   * Set whether the output of the reduce is compressed
-   * @param val the new setting
+   * Set whether the output of the job is compressed.
+   * @param conf the {@link JobConf} to modify
+   * @param compress should the output of the job be compressed?
    */
-  public static void setCompressOutput(JobConf conf, boolean val) {
-    conf.setBoolean("mapred.output.compress", val);
+  public static void setCompressOutput(JobConf conf, boolean compress) {
+    conf.setBoolean("mapred.output.compress", compress);
   }
   
   /**
-   * Is the reduce output compressed?
-   * @return true, if the output should be compressed
+   * Is the job output compressed?
+   * @param conf the {@link JobConf} to look in
+   * @return <code>true</code> if the job output should be compressed,
+   *         <code>false</code> otherwise
    */
   public static boolean getCompressOutput(JobConf conf) {
     return conf.getBoolean("mapred.output.compress", false);
   }
   
   /**
-   * Set the given class as the output compression codec.
-   * @param conf the JobConf to modify
-   * @param codecClass the CompressionCodec class that will compress the 
-   *                   reduce outputs
+   * Set the {@link CompressionCodec} to be used to compress job outputs.
+   * @param conf the {@link JobConf} to modify
+   * @param codecClass the {@link CompressionCodec} to be used to
+   *                   compress the job outputs
    */
-  public static void setOutputCompressorClass(JobConf conf, Class codecClass) {
+  public static void 
+  setOutputCompressorClass(JobConf conf, 
+                           Class<? extends CompressionCodec> codecClass) {
     setCompressOutput(conf, true);
     conf.setClass("mapred.output.compression.codec", codecClass, 
                   CompressionCodec.class);
   }
   
   /**
-   * Get the codec for compressing the reduce outputs
-   * @param conf the Configuration to look in
-   * @param defaultValue the value to return if it is not set
-   * @return the CompressionCodec class that should be used to compress the 
-   *   reduce outputs
+   * Get the {@link CompressionCodec} for compressing the job outputs.
+   * @param conf the {@link JobConf} to look in
+   * @param defaultValue the {@link CompressionCodec} to return if not set
+   * @return the {@link CompressionCodec} to be used to compress the 
+   *         job outputs
    * @throws IllegalArgumentException if the class was specified, but not found
    */
-  public static Class getOutputCompressorClass(JobConf conf, 
-                                               Class defaultValue) {
+  public static Class<? extends CompressionCodec> 
+  getOutputCompressorClass(JobConf conf, 
+		                       Class<? extends CompressionCodec> defaultValue) {
+    Class<? extends CompressionCodec> codecClass = defaultValue;
+    
     String name = conf.get("mapred.output.compression.codec");
-    if (name == null) {
-      return defaultValue;
-    } else {
+    if (name != null) {
       try {
-        return conf.getClassByName(name);
+        codecClass = 
+        	conf.getClassByName(name).asSubclass(CompressionCodec.class);
       } catch (ClassNotFoundException e) {
         throw new IllegalArgumentException("Compression codec " + name + 
                                            " was not found.", e);
       }
     }
+    return codecClass;
   }
   
   public abstract RecordWriter<K, V> getRecordWriter(FileSystem ignored,

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java?rev=581398&r1=581397&r2=581398&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java Tue
Oct  2 14:45:45 2007
@@ -46,7 +46,7 @@
     CompressionType compressionType = CompressionType.NONE;
     if (getCompressOutput(job)) {
       // find the kind of compression to do
-      compressionType = SequenceFile.getCompressionType(job);
+      compressionType = getOutputCompressionType(job);
 
       // find the right codec
       Class codecClass = getOutputCompressorClass(job, DefaultCodec.class);
@@ -88,5 +88,29 @@
     }
     return parts;
   }
+
+  /**
+   * Get the {@link CompressionType} for the output {@link SequenceFile}.
+   * @param conf the {@link JobConf}
+   * @return the {@link CompressionType} for the output {@link SequenceFile}, 
+   *         defaulting to {@link CompressionType#RECORD}
+   */
+  public static CompressionType getOutputCompressionType(JobConf conf) {
+    String val = conf.get("mapred.output.compression.type", 
+                          CompressionType.RECORD.toString());
+    return CompressionType.valueOf(val);
+  }
+  
+  /**
+   * Set the {@link CompressionType} for the output {@link SequenceFile}.
+   * @param conf the {@link JobConf} to modify
+   * @param style the {@link CompressionType} for the output
+   *              {@link SequenceFile} 
+   */
+  public static void setOutputCompressionType(JobConf conf, 
+		                                          CompressionType style) {
+    conf.set("mapred.output.compression.type", style.toString());
+  }
+
 }
 

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java?rev=581398&r1=581397&r2=581398&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java Tue Oct  2 14:45:45
2007
@@ -259,8 +259,7 @@
       
     public void configure(JobConf conf) {
       this.conf = conf;
-      compressInput = conf.getBoolean("mapred.compress.map.output", 
-                                      false);
+      compressInput = conf.getCompressMapOutput();
       taskId = conf.get("mapred.task.id");
     }
       



Mime
View raw message