hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1077196 - in /hadoop/common/branches/branch-0.20-security-patches/src: core/org/apache/hadoop/io/compress/ core/org/apache/hadoop/io/compress/bzip2/ core/org/apache/hadoop/io/compress/zlib/ test/org/apache/hadoop/io/compress/
Date Fri, 04 Mar 2011 03:50:52 GMT
Author: omalley
Date: Fri Mar  4 03:50:52 2011
New Revision: 1077196

URL: http://svn.apache.org/viewvc?rev=1077196&view=rev
Log:
commit 2b897f5aff5fa2e1c7f2a3e0360b0ae9f2f31498
Author: Hemanth Yamijala <yhemanth@yahoo-inc.com>
Date:   Tue Feb 9 12:07:45 2010 +0530

    HADOOP:5879 from http://issues.apache.org/jira/secure/attachment/12435254/hadoop-5879-yahoo-0.20-v1.0.patch
    
    +++ b/YAHOO-CHANGES.txt
    +    HADOOP-5879. Read compression level and strategy from Configuration for
    +    gzip compression. (He Yongqiang via cdouglas)
    +

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/CodecPool.java
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/Compressor.java
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/GzipCodec.java
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyCompressor.java
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/BuiltInZlibDeflater.java
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/ZlibFactory.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/io/compress/TestCodec.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/CodecPool.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/CodecPool.java?rev=1077196&r1=1077195&r2=1077196&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/CodecPool.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/CodecPool.java
Fri Mar  4 03:50:52 2011
@@ -24,6 +24,7 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.ReflectionUtils;
 
 /**
@@ -91,20 +92,26 @@ public class CodecPool {
    *
    * @param codec the <code>CompressionCodec</code> for which to get the 
    *              <code>Compressor</code>
+   * @param conf the <code>Configuration</code> object which contains confs for
creating or reinit the compressor
    * @return <code>Compressor</code> for the given 
    *         <code>CompressionCodec</code> from the pool or a new one
    */
-  public static Compressor getCompressor(CompressionCodec codec) {
+  public static Compressor getCompressor(CompressionCodec codec, Configuration conf) {
     Compressor compressor = borrow(compressorPool, codec.getCompressorType());
     if (compressor == null) {
       compressor = codec.createCompressor();
       LOG.info("Got brand-new compressor");
     } else {
+      compressor.reinit(conf);
       LOG.debug("Got recycled compressor");
     }
     return compressor;
   }
   
+  public static Compressor getCompressor(CompressionCodec codec) {
+    return getCompressor(codec, null);
+  }
+  
   /**
    * Get a {@link Decompressor} for the given {@link CompressionCodec} from the
    * pool or a new one.

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/Compressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/Compressor.java?rev=1077196&r1=1077195&r2=1077196&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/Compressor.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/Compressor.java
Fri Mar  4 03:50:52 2011
@@ -20,6 +20,8 @@ package org.apache.hadoop.io.compress;
 
 import java.io.IOException;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * Specification of a stream-based 'compressor' which can be  
  * plugged into a {@link CompressionOutputStream} to compress data.
@@ -102,5 +104,13 @@ public interface Compressor {
   /**
    * Closes the compressor and discards any unprocessed input.
    */
-  public void end(); 
+  public void end();
+
+  /**
+   * Prepare the compressor to be used in a new stream with settings defined in
+   * the given Configuration
+   * 
+   * @param conf Configuration from which new setting are fetched
+   */
+  public void reinit(Configuration conf);
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/GzipCodec.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/GzipCodec.java?rev=1077196&r1=1077195&r2=1077196&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/GzipCodec.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/GzipCodec.java
Fri Mar  4 03:50:52 2011
@@ -22,8 +22,11 @@ import java.io.*;
 import java.util.zip.GZIPOutputStream;
 import java.util.zip.GZIPInputStream;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.io.compress.zlib.*;
+import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
+import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
 
 /**
  * This class creates gzip compressors/decompressors. 
@@ -154,7 +157,7 @@ public class GzipCodec extends DefaultCo
 
   public Compressor createCompressor() {
     return (ZlibFactory.isNativeZlibLoaded(conf))
-      ? new GzipZlibCompressor()
+      ? new GzipZlibCompressor(conf)
       : null;
   }
 
@@ -205,6 +208,13 @@ public class GzipCodec extends DefaultCo
           ZlibCompressor.CompressionStrategy.DEFAULT_STRATEGY,
           ZlibCompressor.CompressionHeader.GZIP_FORMAT, 64*1024);
     }
+    
+    public GzipZlibCompressor(Configuration conf) {
+      super(ZlibFactory.getCompressionLevel(conf),
+           ZlibFactory.getCompressionStrategy(conf),
+           ZlibCompressor.CompressionHeader.GZIP_FORMAT,
+           64 * 1024);
+    }
   }
 
   static final class GzipZlibDecompressor extends ZlibDecompressor {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyCompressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyCompressor.java?rev=1077196&r1=1077195&r2=1077196&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyCompressor.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyCompressor.java
Fri Mar  4 03:50:52 2011
@@ -2,6 +2,7 @@ package org.apache.hadoop.io.compress.bz
 
 import java.io.IOException;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.Compressor;
 
 /**
@@ -59,4 +60,9 @@ public class BZip2DummyCompressor implem
     throw new UnsupportedOperationException();
   }
 
+  @Override
+  public void reinit(Configuration conf) {
+    // do nothing
+  }
+
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/BuiltInZlibDeflater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/BuiltInZlibDeflater.java?rev=1077196&r1=1077195&r2=1077196&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/BuiltInZlibDeflater.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/BuiltInZlibDeflater.java
Fri Mar  4 03:50:52 2011
@@ -21,7 +21,9 @@ package org.apache.hadoop.io.compress.zl
 import java.io.IOException;
 import java.util.zip.Deflater;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.Compressor;
+import org.mortbay.log.Log;
 
 /**
  * A wrapper around java.util.zip.Deflater to make it conform 
@@ -46,4 +48,30 @@ public class BuiltInZlibDeflater extends
     throws IOException {
     return super.deflate(b, off, len);
   }
+
+  /**
+   * reinit the compressor with the given configuration. It will reset the
+   * compressor's compression level and compression strategy. Different from
+   * <tt>ZlibCompressor</tt>, <tt>BuiltInZlibDeflater</tt> only support
three
+   * kind of compression strategy: FILTERED, HUFFMAN_ONLY and DEFAULT_STRATEGY.
+   * It will use DEFAULT_STRATEGY as default if the configured compression
+   * strategy is not supported.
+   */
+  @Override
+  public void reinit(Configuration conf) {
+    reset();
+    if (conf == null) {
+      return;
+    }
+    setLevel(ZlibFactory.getCompressionLevel(conf).compressionLevel());
+    final ZlibCompressor.CompressionStrategy strategy =
+      ZlibFactory.getCompressionStrategy(conf);
+    try {
+      setStrategy(strategy.compressionStrategy());
+    } catch (IllegalArgumentException ill) {
+      Log.warn(strategy + " not supported by BuiltInZlibDeflater.");
+      setStrategy(DEFAULT_STRATEGY);
+    }
+    Log.debug("Reinit compressor with new compression configuration");
+  }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java?rev=1077196&r1=1077195&r2=1077196&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java
Fri Mar  4 03:50:52 2011
@@ -22,8 +22,10 @@ import java.io.IOException;
 import java.nio.Buffer;
 import java.nio.ByteBuffer;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.Compressor;
 import org.apache.hadoop.util.NativeCodeLoader;
+import org.mortbay.log.Log;
 
 /**
  * A {@link Compressor} based on the popular 
@@ -40,7 +42,7 @@ public class ZlibCompressor implements C
   private long stream;
   private CompressionLevel level;
   private CompressionStrategy strategy;
-  private CompressionHeader windowBits;
+  private final CompressionHeader windowBits;
   private int directBufferSize;
   private byte[] userBuf = null;
   private int userBufOff = 0, userBufLen = 0;
@@ -178,6 +180,31 @@ public class ZlibCompressor implements C
     return nativeZlibLoaded;
   }
 
+  protected final void construct(CompressionLevel level, CompressionStrategy strategy,
+      CompressionHeader header, int directBufferSize) {
+  }
+
+  /**
+   * Creates a new compressor with the default compression level.
+   * Compressed data will be generated in ZLIB format.
+   */
+  public ZlibCompressor() {
+    this(CompressionLevel.DEFAULT_COMPRESSION,
+         CompressionStrategy.DEFAULT_STRATEGY,
+         CompressionHeader.DEFAULT_HEADER,
+         DEFAULT_DIRECT_BUFFER_SIZE);
+  }
+
+  /**
+   * Creates a new compressor, taking settings from the configuration.
+   */
+  public ZlibCompressor(Configuration conf) {
+    this(ZlibFactory.getCompressionLevel(conf),
+         ZlibFactory.getCompressionStrategy(conf),
+         CompressionHeader.DEFAULT_HEADER,
+         DEFAULT_DIRECT_BUFFER_SIZE);
+  }
+
   /** 
    * Creates a new compressor using the specified compression level.
    * Compressed data will be generated in ZLIB format.
@@ -192,28 +219,38 @@ public class ZlibCompressor implements C
     this.level = level;
     this.strategy = strategy;
     this.windowBits = header;
+    stream = init(this.level.compressionLevel(), 
+                  this.strategy.compressionStrategy(), 
+                  this.windowBits.windowBits());
+
     this.directBufferSize = directBufferSize;
-    
     uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
     compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
     compressedDirectBuf.position(directBufferSize);
-    
-    stream = init(this.level.compressionLevel(), 
-                  this.strategy.compressionStrategy(), 
-                  this.windowBits.windowBits());
   }
-  
+
   /**
-   * Creates a new compressor with the default compression level.
-   * Compressed data will be generated in ZLIB format.
+   * Prepare the compressor to be used in a new stream with settings defined in
+   * the given Configuration. It will reset the compressor's compression level
+   * and compression strategy.
+   * 
+   * @param conf Configuration storing new settings
    */
-  public ZlibCompressor() {
-    this(CompressionLevel.DEFAULT_COMPRESSION, 
-         CompressionStrategy.DEFAULT_STRATEGY, 
-         CompressionHeader.DEFAULT_HEADER, 
-         DEFAULT_DIRECT_BUFFER_SIZE);
+  @Override
+  public synchronized void reinit(Configuration conf) {
+    reset();
+    if (conf == null) {
+      return;
+    }
+    end(stream);
+    level = ZlibFactory.getCompressionLevel(conf);
+    strategy = ZlibFactory.getCompressionStrategy(conf);
+    stream = init(level.compressionLevel(), 
+                  strategy.compressionStrategy(), 
+                  windowBits.windowBits());
+    Log.debug("Reinit compressor with new compression configuration");
   }
-  
+
   public synchronized void setInput(byte[] b, int off, int len) {
     if (b== null) {
       throw new NullPointerException();

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/ZlibFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/ZlibFactory.java?rev=1077196&r1=1077195&r2=1077196&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/ZlibFactory.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/ZlibFactory.java
Fri Mar  4 03:50:52 2011
@@ -23,6 +23,8 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.Compressor;
 import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
+import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
 import org.apache.hadoop.util.NativeCodeLoader;
 
 /**
@@ -106,5 +108,25 @@ public class ZlibFactory {
     return (isNativeZlibLoaded(conf)) ? 
       new ZlibDecompressor() : new BuiltInZlibInflater(); 
   }
-  
+
+  public static void setCompressionStrategy(Configuration conf,
+      CompressionStrategy strategy) {
+    conf.setEnum("zlib.compress.strategy", strategy);
+  }
+
+  public static CompressionStrategy getCompressionStrategy(Configuration conf) {
+    return conf.getEnum("zlib.compress.strategy",
+        CompressionStrategy.DEFAULT_STRATEGY);
+  }
+
+  public static void setCompressionLevel(Configuration conf,
+      CompressionLevel level) {
+    conf.setEnum("zlib.compress.level", level);
+  }
+
+  public static CompressionLevel getCompressionLevel(Configuration conf) {
+    return conf.getEnum("zlib.compress.level",
+        CompressionLevel.DEFAULT_COMPRESSION);
+  }
+
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/io/compress/TestCodec.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/io/compress/TestCodec.java?rev=1077196&r1=1077195&r2=1077196&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/io/compress/TestCodec.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/io/compress/TestCodec.java
Fri Mar  4 03:50:52 2011
@@ -19,9 +19,11 @@ package org.apache.hadoop.io.compress;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Random;
 
 import junit.framework.TestCase;
@@ -41,6 +43,8 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
+import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
 import org.apache.hadoop.io.compress.zlib.ZlibFactory;
 
 public class TestCodec extends TestCase {
@@ -67,6 +71,14 @@ public class TestCodec extends TestCase 
     codecTest(conf, seed, count, "org.apache.hadoop.io.compress.BZip2Codec");
   }
 
+  public void testGzipCodecWithParam() throws IOException {
+    Configuration conf = new Configuration(this.conf);
+    ZlibFactory.setCompressionLevel(conf, CompressionLevel.BEST_COMPRESSION);
+    ZlibFactory.setCompressionStrategy(conf, CompressionStrategy.HUFFMAN_ONLY);
+    codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.GzipCodec");
+    codecTest(conf, seed, count, "org.apache.hadoop.io.compress.GzipCodec");
+  }
+
   private static void codecTest(Configuration conf, int seed, int count, 
                                 String codecClass) 
     throws IOException {
@@ -149,6 +161,53 @@ public class TestCodec extends TestCase 
     assertTrue("Got mismatched ZlibCompressor", c2 != CodecPool.getCompressor(gzc));
   }
 
+  private static void gzipReinitTest(Configuration conf, CompressionCodec codec)
+      throws IOException {
+    // Add codec to cache
+    ZlibFactory.setCompressionLevel(conf, CompressionLevel.BEST_COMPRESSION);
+    ZlibFactory.setCompressionStrategy(conf,
+        CompressionStrategy.DEFAULT_STRATEGY);
+    Compressor c1 = CodecPool.getCompressor(codec);
+    CodecPool.returnCompressor(c1);
+    // reset compressor's compression level to perform no compression
+    ZlibFactory.setCompressionLevel(conf, CompressionLevel.NO_COMPRESSION);
+    Compressor c2 = CodecPool.getCompressor(codec, conf);
+    // ensure same compressor placed earlier
+    assertTrue("Got mismatched ZlibCompressor", c1 == c2);
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    CompressionOutputStream cos = null;
+    // write trivially compressable data
+    byte[] b = new byte[1 << 15];
+    Arrays.fill(b, (byte) 43);
+    try {
+      cos = codec.createOutputStream(bos, c2);
+      cos.write(b);
+    } finally {
+      if (cos != null) {
+        cos.close();
+      }
+      CodecPool.returnCompressor(c2);
+    }
+    byte[] outbytes = bos.toByteArray();
+    // verify data were not compressed
+    assertTrue("Compressed bytes contrary to configuration",
+               outbytes.length >= b.length);
+  }
+
+  public void testCodecPoolCompressorReinit() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean("hadoop.native.lib", true);
+    if (ZlibFactory.isNativeZlibLoaded(conf)) {
+      GzipCodec gzc = ReflectionUtils.newInstance(GzipCodec.class, conf);
+      gzipReinitTest(conf, gzc);
+    } else {
+      LOG.warn("testCodecPoolCompressorReinit skipped: native libs not loaded");
+    }
+    conf.setBoolean("hadoop.native.lib", false);
+    DefaultCodec dfc = ReflectionUtils.newInstance(DefaultCodec.class, conf);
+    gzipReinitTest(conf, dfc);
+  }
+  
   public void testSequenceFileDefaultCodec() throws IOException, ClassNotFoundException,

       InstantiationException, IllegalAccessException {
     sequenceFileCodecTest(conf, 100, "org.apache.hadoop.io.compress.DefaultCodec", 100);



Mime
View raw message