hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1543543 - in /hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common: ./ src/main/java/org/apache/hadoop/io/compress/ src/main/java/org/apache/hadoop/io/compress/snappy/ src/main/java/org/apache/hadoop/io/compress/zlib/ src/te...
Date Tue, 19 Nov 2013 19:30:04 GMT
Author: acmurthy
Date: Tue Nov 19 19:30:04 2013
New Revision: 1543543

URL: http://svn.apache.org/r1543543
Log:
Merge -c 1543542 from trunk to branch-2 to fix HADOOP-10047. Add a direct-buffer based apis
for compression. Contributed by Gopal V.

Added:
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DirectDecompressionCodec.java
      - copied unchanged from r1543542, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DirectDecompressionCodec.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DirectDecompressor.java
      - copied unchanged from r1543542, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DirectDecompressor.java
Modified:
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DefaultCodec.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/snappy/TestSnappyCompressorDecompressor.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zlib/TestZlibCompressorDecompressor.java

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1543543&r1=1543542&r2=1543543&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt Tue Nov
19 19:30:04 2013
@@ -97,6 +97,9 @@ Release 2.3.0 - UNRELEASED
 
     HADOOP-9748. Reduce blocking on UGI.ensureInitialized (daryn)
 
+   HADOOP-10047. Add a direct-buffer based apis for compression. (Gopal V
+   via acmurthy)
+
   BUG FIXES
 
     HADOOP-9964. Fix deadlocks in TestHttpServer by synchronize

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DefaultCodec.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DefaultCodec.java?rev=1543543&r1=1543542&r2=1543543&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DefaultCodec.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DefaultCodec.java
Tue Nov 19 19:30:04 2013
@@ -28,11 +28,12 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.zlib.ZlibDecompressor;
 import org.apache.hadoop.io.compress.zlib.ZlibFactory;
 
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
-public class DefaultCodec implements Configurable, CompressionCodec {
+public class DefaultCodec implements Configurable, CompressionCodec, DirectDecompressionCodec
{
   private static final Log LOG = LogFactory.getLog(DefaultCodec.class);
   
   Configuration conf;
@@ -103,6 +104,15 @@ public class DefaultCodec implements Con
     return ZlibFactory.getZlibDecompressor(conf);
   }
   
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public DirectDecompressor createDirectDecompressor() {
+    return ZlibFactory.getZlibDirectDecompressor(conf);
+  }
+  
+  
   @Override
   public String getDefaultExtension() {
     return ".deflate";

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java?rev=1543543&r1=1543542&r2=1543543&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java
Tue Nov 19 19:30:04 2013
@@ -25,6 +25,8 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.io.compress.zlib.*;
+import org.apache.hadoop.io.compress.zlib.ZlibDecompressor.ZlibDirectDecompressor;
+
 import static org.apache.hadoop.util.PlatformName.IBM_JAVA;
 
 /**
@@ -163,6 +165,13 @@ public class GzipCodec extends DefaultCo
       ? GzipZlibDecompressor.class
       : BuiltInGzipDecompressor.class;
   }
+    
+  @Override
+  public DirectDecompressor createDirectDecompressor() {
+    return ZlibFactory.isNativeZlibLoaded(conf) 
+        ? new ZlibDecompressor.ZlibDirectDecompressor(
+          ZlibDecompressor.CompressionHeader.AUTODETECT_GZIP_ZLIB, 0) : null;
+  }
 
   @Override
   public String getDefaultExtension() {

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java?rev=1543543&r1=1543542&r2=1543543&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java
Tue Nov 19 19:30:04 2013
@@ -26,13 +26,14 @@ import org.apache.hadoop.conf.Configurab
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.snappy.SnappyCompressor;
 import org.apache.hadoop.io.compress.snappy.SnappyDecompressor;
+import org.apache.hadoop.io.compress.snappy.SnappyDecompressor.SnappyDirectDecompressor;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.util.NativeCodeLoader;
 
 /**
  * This class creates snappy compressors/decompressors.
  */
-public class SnappyCodec implements Configurable, CompressionCodec {
+public class SnappyCodec implements Configurable, CompressionCodec, DirectDecompressionCodec
{
   Configuration conf;
 
   /**
@@ -203,6 +204,14 @@ public class SnappyCodec implements Conf
         CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT);
     return new SnappyDecompressor(bufferSize);
   }
+  
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public DirectDecompressor createDirectDecompressor() {
+    return isNativeCodeLoaded() ? new SnappyDirectDecompressor() : null;
+  }
 
   /**
    * Get the default filename extension for this kind of compression.

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java?rev=1543543&r1=1543542&r2=1543543&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java
Tue Nov 19 19:30:04 2013
@@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DirectDecompressor;
 import org.apache.hadoop.util.NativeCodeLoader;
 
 /**
@@ -282,4 +283,75 @@ public class SnappyDecompressor implemen
   private native static void initIDs();
 
   private native int decompressBytesDirect();
+  
+  int decompressDirect(ByteBuffer src, ByteBuffer dst) throws IOException {
+    assert (this instanceof SnappyDirectDecompressor);
+    
+    ByteBuffer presliced = dst;
+    if (dst.position() > 0) {
+      presliced = dst;
+      dst = dst.slice();
+    }
+
+    Buffer originalCompressed = compressedDirectBuf;
+    Buffer originalUncompressed = uncompressedDirectBuf;
+    int originalBufferSize = directBufferSize;
+    compressedDirectBuf = src.slice();
+    compressedDirectBufLen = src.remaining();
+    uncompressedDirectBuf = dst;
+    directBufferSize = dst.remaining();
+    int n = 0;
+    try {
+      n = decompressBytesDirect();
+      presliced.position(presliced.position() + n);
+      // SNAPPY always consumes the whole buffer or throws an exception
+      src.position(src.limit());
+      finished = true;
+    } finally {
+      compressedDirectBuf = originalCompressed;
+      uncompressedDirectBuf = originalUncompressed;
+      compressedDirectBufLen = 0;
+      directBufferSize = originalBufferSize;
+    }
+    return n;
+  }
+  
+  public static class SnappyDirectDecompressor extends SnappyDecompressor implements
+      DirectDecompressor {
+    
+    @Override
+    public boolean finished() {
+      return (endOfInput && super.finished());
+    }
+
+    @Override
+    public void reset() {
+      super.reset();
+      endOfInput = true;
+    }
+
+    private boolean endOfInput;
+
+    @Override
+    public synchronized void decompress(ByteBuffer src, ByteBuffer dst)
+        throws IOException {
+      assert dst.isDirect() : "dst.isDirect()";
+      assert src.isDirect() : "src.isDirect()";
+      assert dst.remaining() > 0 : "dst.remaining() > 0";
+      this.decompressDirect(src, dst);
+      endOfInput = !src.hasRemaining();
+    }
+
+    @Override
+    public synchronized void setDictionary(byte[] b, int off, int len) {
+      throw new UnsupportedOperationException(
+          "byte[] arrays are not supported for DirectDecompressor");
+    }
+
+    @Override
+    public synchronized int decompress(byte[] b, int off, int len) {
+      throw new UnsupportedOperationException(
+          "byte[] arrays are not supported for DirectDecompressor");
+    }
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java?rev=1543543&r1=1543542&r2=1543543&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java
Tue Nov 19 19:30:04 2013
@@ -23,6 +23,7 @@ import java.nio.Buffer;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DirectDecompressor;
 import org.apache.hadoop.util.NativeCodeLoader;
 
 /**
@@ -106,7 +107,7 @@ public class ZlibDecompressor implements
    */
   public ZlibDecompressor(CompressionHeader header, int directBufferSize) {
     this.header = header;
-    this.directBufferSize = directBufferSize;
+    this.directBufferSize = directBufferSize;    
     compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
     uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
     uncompressedDirectBuf.position(directBufferSize);
@@ -310,4 +311,86 @@ public class ZlibDecompressor implements
   private native static int getRemaining(long strm);
   private native static void reset(long strm);
   private native static void end(long strm);
+    
+  int inflateDirect(ByteBuffer src, ByteBuffer dst) throws IOException {
+    assert (this instanceof ZlibDirectDecompressor);
+    
+    ByteBuffer presliced = dst;
+    if (dst.position() > 0) {
+      presliced = dst;
+      dst = dst.slice();
+    }
+
+    Buffer originalCompressed = compressedDirectBuf;
+    Buffer originalUncompressed = uncompressedDirectBuf;
+    int originalBufferSize = directBufferSize;
+    compressedDirectBuf = src;
+    compressedDirectBufOff = src.position();
+    compressedDirectBufLen = src.remaining();
+    uncompressedDirectBuf = dst;
+    directBufferSize = dst.remaining();
+    int n = 0;
+    try {
+      n = inflateBytesDirect();
+      presliced.position(presliced.position() + n);
+      if (compressedDirectBufLen > 0) {
+        src.position(compressedDirectBufOff);
+      } else {
+        src.position(src.limit());
+      }
+    } finally {
+      compressedDirectBuf = originalCompressed;
+      uncompressedDirectBuf = originalUncompressed;
+      compressedDirectBufOff = 0;
+      compressedDirectBufLen = 0;
+      directBufferSize = originalBufferSize;
+    }
+    return n;
+  }
+  
+  public static class ZlibDirectDecompressor 
+      extends ZlibDecompressor implements DirectDecompressor {
+    public ZlibDirectDecompressor() {
+      super(CompressionHeader.DEFAULT_HEADER, 0);
+    }
+
+    public ZlibDirectDecompressor(CompressionHeader header, int directBufferSize) {
+      super(header, directBufferSize);
+    }
+    
+    @Override
+    public boolean finished() {
+      return (endOfInput && super.finished());
+    }
+    
+    @Override
+    public void reset() {
+      super.reset();
+      endOfInput = true;
+    }
+    
+    private boolean endOfInput;
+
+    @Override
+    public synchronized void decompress(ByteBuffer src, ByteBuffer dst)
+        throws IOException {
+      assert dst.isDirect() : "dst.isDirect()";
+      assert src.isDirect() : "src.isDirect()";
+      assert dst.remaining() > 0 : "dst.remaining() > 0";      
+      this.inflateDirect(src, dst);
+      endOfInput = !src.hasRemaining();
+    }
+
+    @Override
+    public synchronized void setDictionary(byte[] b, int off, int len) {
+      throw new UnsupportedOperationException(
+          "byte[] arrays are not supported for DirectDecompressor");
+    }
+
+    @Override
+    public synchronized int decompress(byte[] b, int off, int len) {
+      throw new UnsupportedOperationException(
+          "byte[] arrays are not supported for DirectDecompressor");
+    }
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java?rev=1543543&r1=1543542&r2=1543543&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java
Tue Nov 19 19:30:04 2013
@@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.Compressor;
 import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DirectDecompressor;
 import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
 import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
 import org.apache.hadoop.util.NativeCodeLoader;
@@ -116,6 +117,17 @@ public class ZlibFactory {
     return (isNativeZlibLoaded(conf)) ? 
       new ZlibDecompressor() : new BuiltInZlibInflater(); 
   }
+  
+  /**
+   * Return the appropriate implementation of the zlib direct decompressor. 
+   * 
+   * @param conf configuration
+   * @return the appropriate implementation of the zlib decompressor.
+   */
+  public static DirectDecompressor getZlibDirectDecompressor(Configuration conf) {
+    return (isNativeZlibLoaded(conf)) ? 
+      new ZlibDecompressor.ZlibDirectDecompressor() : null; 
+  }
 
   public static void setCompressionStrategy(Configuration conf,
       CompressionStrategy strategy) {

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/snappy/TestSnappyCompressorDecompressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/snappy/TestSnappyCompressorDecompressor.java?rev=1543543&r1=1543542&r2=1543543&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/snappy/TestSnappyCompressorDecompressor.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/snappy/TestSnappyCompressorDecompressor.java
Tue Nov 19 19:30:04 2013
@@ -29,6 +29,7 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.lang.reflect.Array;
+import java.nio.ByteBuffer;
 import java.util.Random;
 
 import org.apache.hadoop.io.DataInputBuffer;
@@ -38,6 +39,7 @@ import org.apache.hadoop.io.compress.Blo
 import org.apache.hadoop.io.compress.CompressionInputStream;
 import org.apache.hadoop.io.compress.CompressionOutputStream;
 import org.apache.hadoop.io.compress.SnappyCodec;
+import org.apache.hadoop.io.compress.snappy.SnappyDecompressor.SnappyDirectDecompressor;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -147,7 +149,7 @@ public class TestSnappyCompressorDecompr
       fail("testSnappyCompressorCompressAIOBException ex error !!!");
     }
   }
-
+  
   @Test
   public void testSnappyDecompressorCompressAIOBException() {
     try {
@@ -275,6 +277,56 @@ public class TestSnappyCompressorDecompr
       fail("testSnappyBlockCompression ex error !!!");
     }
   }
+  
+  private void compressDecompressLoop(int rawDataSize) throws IOException {
+    byte[] rawData = BytesGenerator.get(rawDataSize);    
+    byte[] compressedResult = new byte[rawDataSize+20];
+    int directBufferSize = Math.max(rawDataSize*2, 64*1024);    
+    SnappyCompressor compressor = new SnappyCompressor(directBufferSize);
+    compressor.setInput(rawData, 0, rawDataSize);
+    int compressedSize = compressor.compress(compressedResult, 0, compressedResult.length);
+    SnappyDirectDecompressor decompressor = new SnappyDirectDecompressor();
+   
+    ByteBuffer inBuf = ByteBuffer.allocateDirect(compressedSize);
+    ByteBuffer outBuf = ByteBuffer.allocateDirect(rawDataSize);
+
+    inBuf.put(compressedResult, 0, compressedSize);
+    inBuf.flip();    
+
+    ByteBuffer expected = ByteBuffer.wrap(rawData);
+    
+    outBuf.clear();
+    while(!decompressor.finished()) {
+      decompressor.decompress(inBuf, outBuf);
+      if (outBuf.remaining() == 0) {
+        outBuf.flip();
+        while (outBuf.remaining() > 0) {        
+          assertEquals(expected.get(), outBuf.get());
+        }
+        outBuf.clear();
+      }
+    }
+    outBuf.flip();
+    while (outBuf.remaining() > 0) {        
+      assertEquals(expected.get(), outBuf.get());
+    }
+    outBuf.clear();
+    
+    assertEquals(0, expected.remaining());
+  }
+  
+  @Test
+  public void testSnappyDirectBlockCompression() {
+    int[] size = { 4 * 1024, 64 * 1024, 128 * 1024, 1024 * 1024 };    
+    assumeTrue(SnappyCodec.isNativeCodeLoaded());
+    try {
+      for (int i = 0; i < size.length; i++) {
+        compressDecompressLoop(size[i]);
+      }
+    } catch (IOException ex) {
+      fail("testSnappyDirectBlockCompression ex !!!" + ex);
+    }
+  }
 
   @Test
   public void testSnappyCompressorDecopressorLogicWithCompressionStreams() {

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zlib/TestZlibCompressorDecompressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zlib/TestZlibCompressorDecompressor.java?rev=1543543&r1=1543542&r2=1543543&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zlib/TestZlibCompressorDecompressor.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zlib/TestZlibCompressorDecompressor.java
Tue Nov 19 19:30:04 2013
@@ -19,9 +19,13 @@ package org.apache.hadoop.io.compress.zl
 
 import static org.junit.Assert.*;
 import static org.junit.Assume.*;
+
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 import java.util.Random;
+import java.util.zip.DeflaterOutputStream;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
@@ -33,6 +37,8 @@ import org.apache.hadoop.io.compress.Dec
 import org.apache.hadoop.io.compress.CompressDecompressTester.CompressionTestStrategy;
 import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
 import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
+import org.apache.hadoop.io.compress.zlib.ZlibDecompressor.ZlibDirectDecompressor;
+import org.apache.hadoop.util.NativeCodeLoader;
 import org.junit.Before;
 import org.junit.Test;
 import com.google.common.collect.ImmutableSet;
@@ -150,6 +156,60 @@ public class TestZlibCompressorDecompres
     }
   }
   
+  
+  private void compressDecompressLoop(int rawDataSize) throws IOException {
+    byte[] rawData = null;
+    rawData = generate(rawDataSize);
+    ByteArrayOutputStream baos = new ByteArrayOutputStream(rawDataSize+12);
+    DeflaterOutputStream dos = new DeflaterOutputStream(baos);
+    dos.write(rawData);
+    dos.flush();
+    dos.close();
+    byte[] compressedResult = baos.toByteArray();
+    int compressedSize = compressedResult.length;
+    ZlibDirectDecompressor decompressor = new ZlibDirectDecompressor();
+   
+    ByteBuffer inBuf = ByteBuffer.allocateDirect(compressedSize);
+    ByteBuffer outBuf = ByteBuffer.allocateDirect(rawDataSize);
+
+    inBuf.put(compressedResult, 0, compressedSize);
+    inBuf.flip();    
+
+    ByteBuffer expected = ByteBuffer.wrap(rawData);
+    
+    outBuf.clear();
+    while(!decompressor.finished()) {
+      decompressor.decompress(inBuf, outBuf);
+      if (outBuf.remaining() == 0) {
+        outBuf.flip();
+        while (outBuf.remaining() > 0) {        
+          assertEquals(expected.get(), outBuf.get());
+        }
+        outBuf.clear();
+      }
+    }
+    outBuf.flip();
+    while (outBuf.remaining() > 0) {        
+      assertEquals(expected.get(), outBuf.get());
+    }
+    outBuf.clear();
+    
+    assertEquals(0, expected.remaining());
+  }
+
+  @Test
+  public void testZlibDirectCompressDecompress() {
+    int[] size = { 1, 4, 16, 4 * 1024, 64 * 1024, 128 * 1024, 1024 * 1024 };
+    assumeTrue(NativeCodeLoader.isNativeCodeLoaded());
+    try {
+      for (int i = 0; i < size.length; i++) {
+        compressDecompressLoop(size[i]);
+      }
+    } catch (IOException ex) {
+      fail("testZlibDirectCompressDecompress ex !!!" + ex);
+    }
+  }
+  
   @Test
   public void testZlibCompressorDecompressorSetDictionary() {
     Configuration conf = new Configuration();



Mime
View raw message