hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbau...@apache.org
Subject svn commit: r1243667 - in /hbase/trunk/src: main/java/org/apache/hadoop/hbase/io/hfile/ test/java/org/apache/hadoop/hbase/io/hfile/
Date Mon, 13 Feb 2012 19:41:58 GMT
Author: mbautin
Date: Mon Feb 13 19:41:58 2012
New Revision: 1243667

URL: http://svn.apache.org/viewvc?rev=1243667&view=rev
Log:
[jira] [HBASE-5387] Reuse compression streams in HFileBlock.Writer

Summary: We need to to reuse compression streams in HFileBlock.Writer instead of
allocating them every time. The motivation is that when using Java's built-in
implementation of Gzip, we allocate a new GZIPOutputStream object and an
associated native data structure any time. This is one suspected cause of recent
TestHFileBlock failures on Hadoop QA:
https://builds.apache.org/job/HBase-TRUNK/2658/testReport/org.apache.hadoop.hbase.io.hfile/TestHFileBlock/testPreviousOffset_1_/.

Test Plan:
* Run unit tests
* Create a GZIP-compressed CF with new code, load some data, shut down HBase,
deploy old code, restart HBase, and scan the table

Reviewers: tedyu, Liyin, dhruba, JIRA, lhofhansl

Reviewed By: lhofhansl

CC: tedyu, lhofhansl, mbautin

Differential Revision: https://reviews.facebook.net/D1719

Added:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/ReusableStreamGzipCodec.java
Modified:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java?rev=1243667&r1=1243666&r2=1243667&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java Mon Feb 13
19:41:58 2012
@@ -119,7 +119,7 @@ public final class Compression {
       @Override
       DefaultCodec getCodec(Configuration conf) {
         if (codec == null) {
-          codec = new GzipCodec();
+          codec = new ReusableStreamGzipCodec();
           codec.setConf(new Configuration(conf));
         }
 
@@ -213,7 +213,6 @@ public final class Compression {
     public OutputStream createCompressionStream(
         OutputStream downStream, Compressor compressor, int downStreamBufferSize)
         throws IOException {
-      CompressionCodec codec = getCodec(conf);
       OutputStream bos1 = null;
       if (downStreamBufferSize > 0) {
         bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
@@ -221,15 +220,25 @@ public final class Compression {
       else {
         bos1 = downStream;
       }
-      ((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024);
       CompressionOutputStream cos =
-          codec.createOutputStream(bos1, compressor);
+          createPlainCompressionStream(bos1, compressor);
       BufferedOutputStream bos2 =
           new BufferedOutputStream(new FinishOnFlushCompressionStream(cos),
               DATA_OBUF_SIZE);
       return bos2;
     }
 
+    /**
+     * Creates a compression stream without any additional wrapping into
+     * buffering streams.
+     */
+    CompressionOutputStream createPlainCompressionStream(
+        OutputStream downStream, Compressor compressor) throws IOException {
+      CompressionCodec codec = getCodec(conf);
+      ((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024);
+      return codec.createOutputStream(downStream, compressor);
+    }
+
     public Compressor getCompressor() {
       CompressionCodec codec = getCodec(conf);
       if (codec != null) {

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java?rev=1243667&r1=1243666&r2=1243667&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java Mon Feb 13
19:41:58 2012
@@ -28,7 +28,6 @@ import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -44,6 +43,7 @@ import org.apache.hadoop.hbase.util.Pair
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
 import org.apache.hadoop.io.compress.Compressor;
 import org.apache.hadoop.io.compress.Decompressor;
 
@@ -547,6 +547,12 @@ public class HFileBlock extends SchemaCo
     /** Compressor, which is also reused between consecutive blocks. */
     private Compressor compressor;
 
+    /** Compression output stream */
+    private CompressionOutputStream compressionStream;
+    
+    /** Underlying stream to write compressed bytes to */
+    private ByteArrayOutputStream compressedByteStream;
+
     /**
      * Current block type. Set in {@link #startWriting(BlockType)}. Could be
      * changed in {@link #encodeDataBlockForDisk()} from {@link BlockType#DATA}
@@ -602,9 +608,19 @@ public class HFileBlock extends SchemaCo
           ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
 
       baosInMemory = new ByteArrayOutputStream();
-      if (compressAlgo != NONE)
+      if (compressAlgo != NONE) {
         compressor = compressionAlgorithm.getCompressor();
-
+        compressedByteStream = new ByteArrayOutputStream();
+        try {
+          compressionStream =
+              compressionAlgorithm.createPlainCompressionStream(
+                  compressedByteStream, compressor);
+        } catch (IOException e) {
+          throw new RuntimeException("Could not create compression stream " + 
+              "for algorithm " + compressionAlgorithm, e);
+        }
+      }
+      
       prevOffsetByType = new long[BlockType.values().length];
       for (int i = 0; i < prevOffsetByType.length; ++i)
         prevOffsetByType[i] = -1;
@@ -697,19 +713,18 @@ public class HFileBlock extends SchemaCo
     private void doCompression() throws IOException {
       // do the compression
       if (compressAlgo != NONE) {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        baos.write(DUMMY_HEADER);
+        compressedByteStream.reset();
+        compressedByteStream.write(DUMMY_HEADER);
+
+        compressionStream.resetState();
 
-        // compress the data
-        OutputStream compressingOutputStream =
-            compressAlgo.createCompressionStream(baos, compressor, 0);
-        compressingOutputStream.write(uncompressedBytesWithHeader, HEADER_SIZE,
+        compressionStream.write(uncompressedBytesWithHeader, HEADER_SIZE,
             uncompressedBytesWithHeader.length - HEADER_SIZE);
 
-        // finish compression stream
-        compressingOutputStream.flush();
+        compressionStream.flush();
+        compressionStream.finish();
 
-        onDiskBytesWithHeader = baos.toByteArray();
+        onDiskBytesWithHeader = compressedByteStream.toByteArray();
         putHeader(onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length,
             uncompressedBytesWithHeader.length);
       } else {

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/ReusableStreamGzipCodec.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/ReusableStreamGzipCodec.java?rev=1243667&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/ReusableStreamGzipCodec.java
(added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/ReusableStreamGzipCodec.java
Mon Feb 13 19:41:58 2012
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.zip.GZIPOutputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.CompressorStream;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.compress.zlib.ZlibFactory;
+
+/**
+ * Fixes an inefficiency in Hadoop's Gzip codec, allowing to reuse compression
+ * streams.
+ */
+public class ReusableStreamGzipCodec extends GzipCodec {
+
+  private static final Log LOG = LogFactory.getLog(Compression.class);
+
+  /**
+   * A bridge that wraps around a DeflaterOutputStream to make it a
+   * CompressionOutputStream.
+   */
+  protected static class ReusableGzipOutputStream extends CompressorStream {
+
+    private static final int GZIP_HEADER_LENGTH = 10;
+
+    /**
+     * Fixed ten-byte gzip header. See {@link GZIPOutputStream}'s source for
+     * details.
+     */
+    private static final byte[] GZIP_HEADER;
+
+    static {
+      // Capture the fixed ten-byte header hard-coded in GZIPOutputStream.
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      byte[] header = null;
+      GZIPOutputStream gzipStream = null;
+      try {
+        gzipStream  = new GZIPOutputStream(baos);
+        gzipStream.finish();
+        header = Arrays.copyOfRange(baos.toByteArray(), 0, GZIP_HEADER_LENGTH);
+      } catch (IOException e) {
+        throw new RuntimeException("Could not create gzip stream", e);
+      } finally {
+        if (gzipStream != null) {
+          try {
+            gzipStream.close();
+          } catch (IOException e) {
+            LOG.error(e);
+          }
+        }
+      }
+      GZIP_HEADER = header;
+    }
+
+    private static class ResetableGZIPOutputStream extends GZIPOutputStream {
+      public ResetableGZIPOutputStream(OutputStream out) throws IOException {
+        super(out);
+      }
+
+      public void resetState() throws IOException {
+        def.reset();
+        crc.reset();
+        out.write(GZIP_HEADER);
+      }
+    }
+
+    public ReusableGzipOutputStream(OutputStream out) throws IOException {
+      super(new ResetableGZIPOutputStream(out));
+    }
+
+    @Override
+    public void close() throws IOException {
+      out.close();
+    }
+
+    @Override
+    public void flush() throws IOException {
+      out.flush();
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+      out.write(b);
+    }
+
+    @Override
+    public void write(byte[] data, int offset, int length) throws IOException {
+      out.write(data, offset, length);
+    }
+
+    @Override
+    public void finish() throws IOException {
+      ((GZIPOutputStream) out).finish();
+    }
+
+    @Override
+    public void resetState() throws IOException {
+      ((ResetableGZIPOutputStream) out).resetState();
+    }
+  }
+
+  @Override
+  public CompressionOutputStream createOutputStream(OutputStream out)
+      throws IOException {
+    if (ZlibFactory.isNativeZlibLoaded(getConf())) {
+      return super.createOutputStream(out);
+    }
+    return new ReusableGzipOutputStream(out);
+  }
+
+}

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java?rev=1243667&r1=1243666&r2=1243667&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java Mon Feb
13 19:41:58 2012
@@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.io.encodi
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
 import org.apache.hadoop.io.compress.Compressor;
 
 import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.*;
@@ -75,9 +76,6 @@ public class TestHFileBlock {
   static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = {
       NONE, GZ };
 
-  // In case we need to temporarily switch some test cases to just test gzip.
-  static final Compression.Algorithm[] GZIP_ONLY  = { GZ };
-
   private static final int NUM_TEST_BLOCKS = 1000;
   private static final int NUM_READER_THREADS = 26;
 
@@ -206,14 +204,16 @@ public class TestHFileBlock {
     return headerAndData;
   }
 
-  public String createTestBlockStr(Compression.Algorithm algo)
-      throws IOException {
+  public String createTestBlockStr(Compression.Algorithm algo,
+      int correctLength) throws IOException {
     byte[] testV2Block = createTestV2Block(algo);
     int osOffset = HFileBlock.HEADER_SIZE + 9;
-    if (osOffset < testV2Block.length) {
+    if (testV2Block.length == correctLength) {
       // Force-set the "OS" field of the gzip header to 3 (Unix) to avoid
       // variations across operating systems.
       // See http://www.gzip.org/zlib/rfc-gzip.html for gzip format.
+      // We only make this change when the compressed block length matches.
+      // Otherwise, there are obviously other inconsistencies.
       testV2Block[osOffset] = 3;
     }
     return Bytes.toStringBinary(testV2Block);
@@ -226,7 +226,7 @@ public class TestHFileBlock {
 
   @Test
   public void testGzipCompression() throws IOException {
-    assertEquals(
+    final String correctTestBlockStr =
         "DATABLK*\\x00\\x00\\x00:\\x00\\x00\\x0F\\xA0\\xFF\\xFF\\xFF\\xFF"
             + "\\xFF\\xFF\\xFF\\xFF"
             // gzip-compressed block: http://www.gzip.org/zlib/rfc-gzip.html
@@ -240,8 +240,10 @@ public class TestHFileBlock {
             + "\\x03"
             + "\\xED\\xC3\\xC1\\x11\\x00 \\x08\\xC00DD\\xDD\\x7Fa"
             + "\\xD6\\xE8\\xA3\\xB9K\\x84`\\x96Q\\xD3\\xA8\\xDB\\xA8e\\xD4c"
-            + "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\s\\xA0\\x0F\\x00\\x00",
-        createTestBlockStr(GZ));
+            + "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\s\\xA0\\x0F\\x00\\x00";
+    final int correctGzipBlockLength = 82;
+    assertEquals(correctTestBlockStr, createTestBlockStr(GZ,
+        correctGzipBlockLength));
   }
 
   @Test



Mime
View raw message