hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject svn commit: r1037901 - in /hadoop/common/trunk: CHANGES.txt src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java
Date Mon, 22 Nov 2010 21:51:58 GMT
Author: eli
Date: Mon Nov 22 21:51:57 2010
New Revision: 1037901

URL: http://svn.apache.org/viewvc?rev=1037901&view=rev
Log:
HADOOP-6683. ZlibCompressor does not fully utilize the buffer. Contributed by Kang Xiao

Modified:
    hadoop/common/trunk/CHANGES.txt
    hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java

Modified: hadoop/common/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=1037901&r1=1037900&r2=1037901&view=diff
==============================================================================
--- hadoop/common/trunk/CHANGES.txt (original)
+++ hadoop/common/trunk/CHANGES.txt Mon Nov 22 21:51:57 2010
@@ -212,6 +212,9 @@ Release 0.22.0 - Unreleased
     HADOOP-6884. Add LOG.isDebugEnabled() guard for each LOG.debug(..).
     (Erik Steffl via szetszwo)
 
+    HADOOP-6683. ZlibCompressor does not fully utilize the buffer.
+    (Kang Xiao via eli)
+
   BUG FIXES
 
     HADOOP-6638. try to relogin in a case of failed RPC connection (expired 

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java?rev=1037901&r1=1037900&r2=1037901&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java Mon
Nov 22 21:51:57 2010
@@ -53,6 +53,7 @@ public class ZlibCompressor implements C
   private int userBufOff = 0, userBufLen = 0;
   private Buffer uncompressedDirectBuf = null;
   private int uncompressedDirectBufOff = 0, uncompressedDirectBufLen = 0;
+  private boolean keepUncompressedBuf = false;
   private Buffer compressedDirectBuf = null;
   private boolean finish, finished;
 
@@ -269,6 +270,7 @@ public class ZlibCompressor implements C
     this.userBuf = b;
     this.userBufOff = off;
     this.userBufLen = len;
+    uncompressedDirectBufOff = 0;
     setInputFromSavedData();
     
     // Reinitialize zlib's output direct buffer 
@@ -276,21 +278,13 @@ public class ZlibCompressor implements C
     compressedDirectBuf.position(directBufferSize);
   }
   
+  //copy enough data from userBuf to uncompressedDirectBuf
   synchronized void setInputFromSavedData() {
-    uncompressedDirectBufOff = 0;
-    uncompressedDirectBufLen = userBufLen;
-    if (uncompressedDirectBufLen > directBufferSize) {
-      uncompressedDirectBufLen = directBufferSize;
-    }
-
-    // Reinitialize zlib's input direct buffer
-    uncompressedDirectBuf.rewind();
-    ((ByteBuffer)uncompressedDirectBuf).put(userBuf, userBufOff,  
-                                            uncompressedDirectBufLen);
-
-    // Note how much data is being fed to zlib
-    userBufOff += uncompressedDirectBufLen;
-    userBufLen -= uncompressedDirectBufLen;
+    int len = Math.min(userBufLen, uncompressedDirectBuf.remaining());
+    ((ByteBuffer)uncompressedDirectBuf).put(userBuf, userBufOff, len);
+    userBufLen -= len;
+    userBufOff += len;
+    uncompressedDirectBufLen = uncompressedDirectBuf.position();
   }
 
   public synchronized void setDictionary(byte[] b, int off, int len) {
@@ -310,12 +304,21 @@ public class ZlibCompressor implements C
     }
 
     // Check if zlib has consumed all input
-    if (uncompressedDirectBufLen <= 0) {
+    // compress should be invoked if keepUncompressedBuf true
+    if (keepUncompressedBuf && uncompressedDirectBufLen > 0)
+      return false;
+    
+    if (uncompressedDirectBuf.remaining() > 0) {
       // Check if we have consumed all user-input
       if (userBufLen <= 0) {
         return true;
       } else {
+        // copy enough data from userBuf to uncompressedDirectBuf
         setInputFromSavedData();
+        if (uncompressedDirectBuf.remaining() > 0) // uncompressedDirectBuf is not full
+          return true;
+        else 
+          return false;
       }
     }
     
@@ -359,6 +362,17 @@ public class ZlibCompressor implements C
     n = deflateBytesDirect();
     compressedDirectBuf.limit(n);
     
+    // Check if zlib consumed all input buffer
+    // set keepUncompressedBuf properly
+    if (uncompressedDirectBufLen <= 0) { // zlib consumed all input buffer
+      keepUncompressedBuf = false;
+      uncompressedDirectBuf.clear();
+      uncompressedDirectBufOff = 0;
+      uncompressedDirectBufLen = 0;
+    } else { // zlib did not consume all input buffer
+      keepUncompressedBuf = true;
+    }
+    
     // Get atmost 'len' bytes
     n = Math.min(n, len);
     ((ByteBuffer)compressedDirectBuf).get(b, off, n);
@@ -393,6 +407,7 @@ public class ZlibCompressor implements C
     finished = false;
     uncompressedDirectBuf.rewind();
     uncompressedDirectBufOff = uncompressedDirectBufLen = 0;
+    keepUncompressedBuf = false;
     compressedDirectBuf.limit(directBufferSize);
     compressedDirectBuf.position(directBufferSize);
     userBufOff = userBufLen = 0;



Mime
View raw message