hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1543538 - in /hadoop/common/trunk/hadoop-common-project/hadoop-common: ./ src/main/java/org/apache/hadoop/io/compress/ src/main/java/org/apache/hadoop/io/compress/zlib/ src/test/java/org/apache/hadoop/io/compress/zlib/
Date Tue, 19 Nov 2013 19:25:31 GMT
Author: acmurthy
Date: Tue Nov 19 19:25:31 2013
New Revision: 1543538

URL: http://svn.apache.org/r1543538
Log:
Revert HADOOP-10047, wrong patch.

Removed:
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DirectCompressor.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DirectDecompressor.java
Modified:
    hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zlib/TestZlibCompressorDecompressor.java

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1543538&r1=1543537&r2=1543538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt Tue Nov 19 19:25:31
2013
@@ -387,9 +387,6 @@ Release 2.3.0 - UNRELEASED
 
     HADOOP-9748. Reduce blocking on UGI.ensureInitialized (daryn)
 
-    HADOOP-10047. Add a direct-buffer based apis for compression. (Gopal V via
-    acmurthy)
-
   BUG FIXES
 
     HADOOP-9964. Fix deadlocks in TestHttpServer by synchronize

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java?rev=1543538&r1=1543537&r2=1543538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java
Tue Nov 19 19:25:31 2013
@@ -24,7 +24,6 @@ import java.nio.ByteBuffer;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.Compressor;
-import org.apache.hadoop.io.compress.DirectCompressor;
 import org.apache.hadoop.util.NativeCodeLoader;
 
 import org.apache.commons.logging.Log;
@@ -36,7 +35,7 @@ import org.apache.commons.logging.LogFac
  * http://www.zlib.net/
  * 
  */
-public class ZlibCompressor implements Compressor,DirectCompressor {
+public class ZlibCompressor implements Compressor {
 
   private static final Log LOG = LogFactory.getLog(ZlibCompressor.class);
 
@@ -421,7 +420,6 @@ public class ZlibCompressor implements C
     compressedDirectBuf.limit(directBufferSize);
     compressedDirectBuf.position(directBufferSize);
     userBufOff = userBufLen = 0;
-    userBuf = null;
   }
   
   @Override
@@ -437,110 +435,6 @@ public class ZlibCompressor implements C
       throw new NullPointerException();
   }
   
-  private int put(ByteBuffer dst, ByteBuffer src) {
-    // this will lop off data from src[pos:limit] into dst[pos:limit]
-    int l1 = src.remaining();
-    int l2 = dst.remaining();
-    int pos1 = src.position();
-    int pos2 = dst.position();
-    int len = Math.min(l1, l2);
-
-    if (len == 0) {
-      return 0;
-    }
-
-    ByteBuffer slice = src.slice();
-    slice.limit(len);
-    dst.put(slice);
-    src.position(pos1 + len);
-    return len;
-  }
-
-  public int compress(ByteBuffer dst, ByteBuffer src) throws IOException {
-    assert dst.remaining() > 0 : "dst.remaining() == 0";
-    int n = 0;
-    
-    /* fast path for clean state and direct buffers */
-    /* TODO: reset should free userBuf? */
-    if((src != null && src.isDirect()) && dst.isDirect() && userBuf
== null) {
-      /*
-       * TODO: fix these assumptions in inflateDirect(), eventually by allowing
-       * it to read position()/limit() directly
-       */
-      boolean cleanDst = (dst.position() == 0 && dst.remaining() == dst.capacity()
&& dst.capacity() >= directBufferSize);
-      boolean cleanState = (keepUncompressedBuf == false && uncompressedDirectBufLen
== 0 && compressedDirectBuf.remaining() == 0);
-      /* use the buffers directly */
-      if(cleanDst && cleanState) {
-        Buffer originalCompressed = compressedDirectBuf;
-        Buffer originalUncompressed = uncompressedDirectBuf;
-        int originalBufferSize = directBufferSize;
-        uncompressedDirectBuf = src;
-        uncompressedDirectBufOff = src.position();
-        uncompressedDirectBufLen = src.remaining();
-        compressedDirectBuf = dst;
-        directBufferSize = dst.remaining();
-        // Compress data
-        n = deflateBytesDirect();
-        // we move dst.position() forward, not limit() 
-        // unlike the local buffer case, which moves it when we put() into the dst
-        dst.position(n);
-        if(uncompressedDirectBufLen > 0) {
-          src.position(uncompressedDirectBufOff);
-        } else {
-          src.position(src.limit());
-        }
-        compressedDirectBuf = originalCompressed;
-        uncompressedDirectBuf = originalUncompressed;
-        uncompressedDirectBufOff = 0;
-        uncompressedDirectBufLen = 0;
-        directBufferSize = originalBufferSize;
-        return n;
-      }
-    }
-    
-    // Check if there is compressed data
-    if (compressedDirectBuf.remaining() > 0) {
-      n = put(dst, (ByteBuffer) compressedDirectBuf);
-    }
-
-    if (dst.remaining() == 0) {
-      return n;
-    } else {
-      needsInput();
-
-      // if we have drained userBuf, read from src (ideally, do not mix buffer
-      // modes, but sometimes you can)
-      if (userBufLen == 0 && src != null && src.remaining() > 0) {
-        put((ByteBuffer) uncompressedDirectBuf, src);
-        uncompressedDirectBufLen = uncompressedDirectBuf.position();
-      }
-
-      // Re-initialize the zlib's output direct buffer
-      compressedDirectBuf.rewind();
-      compressedDirectBuf.limit(directBufferSize);
-
-      // Compress data
-      int more = deflateBytesDirect();
-
-      compressedDirectBuf.limit(more);
-
-      // Check if zlib consumed all input buffer
-      // set keepUncompressedBuf properly
-      if (uncompressedDirectBufLen <= 0) { // zlib consumed all input buffer
-        keepUncompressedBuf = false;
-        uncompressedDirectBuf.clear();
-        uncompressedDirectBufOff = 0;
-        uncompressedDirectBufLen = 0;
-      } else { // zlib did not consume all input buffer
-        keepUncompressedBuf = true;
-      }
-
-      // fill the dst buffer from compressedDirectBuf
-      int fill = put(dst, ((ByteBuffer) compressedDirectBuf));
-      return n + fill;
-    }
-  }
-  
   private native static void initIDs();
   private native static long init(int level, int strategy, int windowBits);
   private native static void setDictionary(long strm, byte[] b, int off,

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java?rev=1543538&r1=1543537&r2=1543538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java
Tue Nov 19 19:25:31 2013
@@ -23,7 +23,6 @@ import java.nio.Buffer;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.io.compress.DirectDecompressor;
 import org.apache.hadoop.util.NativeCodeLoader;
 
 /**
@@ -32,7 +31,7 @@ import org.apache.hadoop.util.NativeCode
  * http://www.zlib.net/
  * 
  */
-public class ZlibDecompressor implements Decompressor,DirectDecompressor {
+public class ZlibDecompressor implements Decompressor {
   private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64*1024;
   
   // HACK - Use this as a global lock in the JNI layer
@@ -281,7 +280,6 @@ public class ZlibDecompressor implements
     uncompressedDirectBuf.limit(directBufferSize);
     uncompressedDirectBuf.position(directBufferSize);
     userBufOff = userBufLen = 0;
-    userBuf = null;
   }
 
   @Override
@@ -301,108 +299,6 @@ public class ZlibDecompressor implements
     if (stream == 0)
       throw new NullPointerException();
   }
-    
-  private int put(ByteBuffer dst, ByteBuffer src) {
-    // this will lop off data from src[pos:limit] into dst[pos:limit], using the
-    // min() of both remaining()
-    int l1 = src.remaining();
-    int l2 = dst.remaining();
-    int pos1 = src.position();
-    int pos2 = dst.position();
-    int len = Math.min(l1, l2);
-
-    if (len == 0) {
-      return 0;
-    }
-
-    ByteBuffer slice = src.slice();
-    slice.limit(len);
-    dst.put(slice);
-    src.position(pos1 + len);
-    return len;
-  }
-
-  public int decompress(ByteBuffer dst, ByteBuffer src) throws IOException {
-    assert dst.remaining() > 0 : "dst.remaining == 0";
-    int n = 0;
-    
-    /* fast path for clean state and direct buffers */
-    if((src != null && src.isDirect()) && dst.isDirect() && userBuf
== null) {
-      /*
-       * TODO: fix these assumptions in inflateDirect(), eventually by allowing
-       * it to read position()/limit() directly
-       */
-      boolean cleanDst = (dst.position() == 0 && dst.remaining() == dst.capacity()
&& dst.remaining() >= directBufferSize);
-      boolean cleanState = (compressedDirectBufLen == 0 && uncompressedDirectBuf.remaining()
== 0);
-      /* use the buffers directly */
-      if(cleanDst && cleanState) {
-        Buffer originalCompressed = compressedDirectBuf;
-        Buffer originalUncompressed = uncompressedDirectBuf;
-        int originalBufferSize = directBufferSize;
-        compressedDirectBuf = src;
-        compressedDirectBufOff = src.position();
-        compressedDirectBufLen = src.remaining();
-        uncompressedDirectBuf = dst;
-        directBufferSize = dst.remaining();
-        // Compress data
-        n = inflateBytesDirect();
-        dst.position(n);
-        if(compressedDirectBufLen > 0) {
-          src.position(compressedDirectBufOff);
-        } else {
-          src.position(src.limit());
-        }
-        compressedDirectBuf = originalCompressed;
-        uncompressedDirectBuf = originalUncompressed;        
-        compressedDirectBufOff = 0;
-        compressedDirectBufLen = 0;
-        directBufferSize = originalBufferSize;
-        return n;
-      }
-    }
-    
-    // Check if there is compressed data
-    if (uncompressedDirectBuf.remaining() > 0) {
-      n = put(dst, (ByteBuffer) uncompressedDirectBuf);
-    }
-
-    if (dst.remaining() == 0) {
-      return n;
-    } else {
-      if (needsInput()) {
-        // this does not update buffers if we have no userBuf
-        if (userBufLen <= 0) {
-          compressedDirectBufOff = 0;
-          compressedDirectBufLen = 0;
-          compressedDirectBuf.rewind().limit(directBufferSize);
-        }
-        if (src != null) {
-          assert src.remaining() > 0 : "src.remaining() == 0";
-        }
-      }
-
-      // if we have drained userBuf, read from src (ideally, do not mix buffer
-      // modes, but sometimes you can)
-      if (userBufLen == 0 && src != null && src.remaining() > 0) {
-        compressedDirectBufLen += put(((ByteBuffer) compressedDirectBuf), src);
-      }
-      
-      // Re-initialize the zlib's output direct buffer
-      uncompressedDirectBuf.rewind();
-      uncompressedDirectBuf.limit(directBufferSize);
-
-      // Compress data
-      int more = inflateBytesDirect();
-
-      uncompressedDirectBuf.limit(more);
-
-      // Get atmost 'len' bytes
-      int fill = put(dst, ((ByteBuffer) uncompressedDirectBuf));
-      return n + fill;
-    }
-  }
-
-  
   
   private native static void initIDs();
   private native static long init(int windowBits);

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zlib/TestZlibCompressorDecompressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zlib/TestZlibCompressorDecompressor.java?rev=1543538&r1=1543537&r2=1543538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zlib/TestZlibCompressorDecompressor.java
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zlib/TestZlibCompressorDecompressor.java
Tue Nov 19 19:25:31 2013
@@ -19,13 +19,8 @@ package org.apache.hadoop.io.compress.zl
 
 import static org.junit.Assert.*;
 import static org.junit.Assume.*;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.Console;
 import java.io.IOException;
 import java.io.InputStream;
-import java.nio.ByteBuffer;
 import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
@@ -38,12 +33,8 @@ import org.apache.hadoop.io.compress.Dec
 import org.apache.hadoop.io.compress.CompressDecompressTester.CompressionTestStrategy;
 import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
 import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
-import org.apache.log4j.ConsoleAppender;
 import org.junit.Before;
 import org.junit.Test;
-
-import sun.util.logging.resources.logging;
-
 import com.google.common.collect.ImmutableSet;
 
 public class TestZlibCompressorDecompressor {
@@ -159,149 +150,6 @@ public class TestZlibCompressorDecompres
     }
   }
   
-  private void compressDecompressLoop(int rawDataSize, int inSize, int outSize)
-      throws IOException {
-    byte[] rawData = null;
-    rawData = generate(rawDataSize);
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    ByteBuffer inBuf = ByteBuffer.allocateDirect(inSize);
-    ByteBuffer outBuf = ByteBuffer.allocateDirect(outSize);
-    ZlibCompressor compressor = new ZlibCompressor();
-    ZlibDecompressor decompressor = new ZlibDecompressor();
-    outBuf.clear();
-    /* compression loop */
-    int off = 0;
-    int len = rawDataSize;
-    int min = Math.min(inBuf.remaining(), len);
-    if (min > 0) {
-      inBuf.put(rawData, off, min);
-    }
-    inBuf.flip();
-    len -= min;
-    off += min;
-    while (!compressor.finished()) {
-      compressor.compress(outBuf, inBuf);
-      if (outBuf.remaining() == 0) {
-        // flush when the buffer is full
-        outBuf.flip();
-        while (outBuf.remaining() > 0) {
-          baos.write(outBuf.get());
-        }
-        outBuf.clear();
-      }
-      if (inBuf != null && inBuf.remaining() == 0) {
-        inBuf.clear();
-        if (len > 0) {
-          min = Math.min(inBuf.remaining(), len);
-          inBuf.put(rawData, off, min);
-          inBuf.flip();
-          len -= min;
-          off += min;
-        } else {
-          inBuf = null;
-          compressor.finish();
-        }
-      }
-    }
-
-    outBuf.flip();
-    if (outBuf.remaining() > 0) {
-      while (outBuf.remaining() > 0) {
-        baos.write(outBuf.get());
-      }
-      outBuf.clear();
-    }
-
-    compressor.end();
-
-    byte[] compressed = baos.toByteArray();
-    ByteBuffer expected = ByteBuffer.wrap(rawData);
-    outBuf.clear();
-    inBuf = ByteBuffer.allocateDirect(inSize);
-    inBuf.clear();
-
-    // zlib always has header
-    if (compressed.length != 0) {
-      off = 0;
-      len = compressed.length;
-      min = Math.min(inBuf.remaining(), len);
-      inBuf.put(compressed, off, min);
-      inBuf.flip();
-      len -= min;
-      off += min;
-      while (!decompressor.finished()) {
-        decompressor.decompress(outBuf, inBuf);
-        if (outBuf.remaining() == 0) {
-          outBuf.flip();
-          while (outBuf.remaining() > 0) {
-            assertEquals(expected.get(), outBuf.get());
-          }
-          outBuf.clear();
-        }
-
-        if (inBuf != null && inBuf.remaining() == 0) {
-          inBuf.clear();
-          if (len > 0) {
-            min = Math.min(inBuf.remaining(), len);
-            inBuf.put(compressed, off, min);
-            inBuf.flip();
-            len -= min;
-            off += min;
-          }
-        }
-      }
-    }
-
-    outBuf.flip();
-    if (outBuf.remaining() > 0) {
-      while (outBuf.remaining() > 0) {
-        assertEquals(expected.get(), outBuf.get());
-      }
-      outBuf.clear();
-    }
-
-    assertEquals(0, expected.remaining());
-  }
-  
-  @Test
-  public void testZlibDirectCompressDecompress() {
-    int[] size = { 4, 16, 4 * 1024, 64 * 1024, 128 * 1024, 256 * 1024,
-        1024 * 1024 };
-    try {
-      // 0-2 bytes results in sizeof(outBuf) > sizeof(inBuf)
-      compressDecompressLoop(0, 4096, 4096);
-      compressDecompressLoop(0, 1, 1);
-      compressDecompressLoop(1, 1, 2);
-      compressDecompressLoop(1, 2, 1);
-      compressDecompressLoop(2, 3, 2);
-
-      for (int i = 0; i < size.length; i++) {
-        compressDecompressLoop(size[i], 4096, 4096);
-        compressDecompressLoop(size[i], 1, 1);
-        compressDecompressLoop(size[i], 1, 2);
-        compressDecompressLoop(size[i], 2, 1);
-        compressDecompressLoop(size[i], 3, 2);
-        compressDecompressLoop(size[i], size[i], 4096);
-        compressDecompressLoop(size[i], size[i] - 1, 4096);
-        compressDecompressLoop(size[i], size[i] + 1, 4096);
-        compressDecompressLoop(size[i], 4096, size[i]);
-        compressDecompressLoop(size[i], 4096, size[i] - 1);
-        compressDecompressLoop(size[i], 4096, size[i] + 1);
-        compressDecompressLoop(size[i], size[i] - 1, size[i] - 1);
-
-        compressDecompressLoop(size[i], size[i] / 2, 4096);
-        compressDecompressLoop(size[i], size[i] / 2 - 1, 4096);
-        compressDecompressLoop(size[i], size[i] / 2 + 1, 4096);
-        compressDecompressLoop(size[i], 4096, size[i] / 2);
-        compressDecompressLoop(size[i], 4096, size[i] / 2 - 1);
-        compressDecompressLoop(size[i], 4096, size[i] / 2 + 1);
-        compressDecompressLoop(size[i], size[i] / 2 - 1, size[i] / 2 - 1);
-      }
-    } catch (IOException ex) {
-      fail("testZlibDirectCompressDecompress ex !!!" + ex);
-    }
-  }
-  
   @Test
   public void testZlibCompressorDecompressorSetDictionary() {
     Configuration conf = new Configuration();



Mime
View raw message