hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbau...@apache.org
Subject svn commit: r1302602 [1/2] - in /hbase/trunk/src: main/java/org/apache/hadoop/hbase/io/encoding/ main/java/org/apache/hadoop/hbase/io/hfile/ test/java/org/apache/hadoop/hbase/io/encoding/ test/java/org/apache/hadoop/hbase/io/hfile/ test/java/org/apache...
Date Mon, 19 Mar 2012 19:12:20 GMT
Author: mbautin
Date: Mon Mar 19 19:12:19 2012
New Revision: 1302602

URL: http://svn.apache.org/viewvc?rev=1302602&view=rev
Log:
HBASE-5521 [jira] Move compression/decompression to an encoder specific encoding
context

Author: Yongqiang He

Summary:
https://issues.apache.org/jira/browse/HBASE-5521

As part of working on HBASE-5313, we want to add a new columnar encoder/decoder.
It makes sense to move compression to be part of encoder/decoder:
1) a scanner for a columnar encoded block can do lazy decompression to a
specific part of a key value object
2) avoid an extra bytes copy from encoder to hblock-writer.

If there is no encoder specified for a writer, the HBlock.Writer will use a
default compression-context to do something very similar to today's code.

Test Plan: existing unit tests verified by mbautin and tedyu. And no new test
added here since this code is just a preparation for columnar encoder. Will add
testcase later in that diff.

Reviewers: dhruba, tedyu, sc, mbautin

Reviewed By: mbautin

Differential Revision: https://reviews.facebook.net/D2097

Added:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java
Modified:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java?rev=1302602&r1=1302601&r2=1302602&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java Mon Mar 19 19:12:19 2012
@@ -24,6 +24,8 @@ import java.nio.ByteBuffer;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.SamePrefixComparator;
+import org.apache.hadoop.hbase.io.hfile.BlockType;
+import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.RawComparator;
@@ -301,4 +303,54 @@ abstract class BufferedDataBlockEncoder 
     }
   }
 
+  @Override
+  public HFileBlockEncodingContext newDataBlockEncodingContext(
+      Algorithm compressionAlgorithm,
+      DataBlockEncoding encoding, byte[] header) {
+    return new HFileBlockDefaultEncodingContext(
+        compressionAlgorithm, encoding, header);
+  }
+
+  @Override
+  public HFileBlockDecodingContext newDataBlockDecodingContext(
+      Algorithm compressionAlgorithm) {
+    return new HFileBlockDefaultDecodingContext(compressionAlgorithm);
+  }
+
+  /**
+   * Compress KeyValues and write them to output buffer.
+   * @param out Where to write compressed data.
+   * @param in Source of KeyValue for compression.
+   * @param includesMemstoreTS true if including memstore timestamp after every
+   *          key-value pair
+   * @throws IOException If there is an error writing to output stream.
+   */
+  public abstract void internalEncodeKeyValues(DataOutputStream out,
+      ByteBuffer in, boolean includesMemstoreTS) throws IOException;
+
+  @Override
+  public void compressKeyValues(ByteBuffer in,
+      boolean includesMemstoreTS,
+      HFileBlockEncodingContext blkEncodingCtx) throws IOException {
+    if (!(blkEncodingCtx.getClass().getName().equals(
+        HFileBlockDefaultEncodingContext.class.getName()))) {
+      throw new IOException (this.getClass().getName() + " only accepts "
+          + HFileBlockDefaultEncodingContext.class.getName() + " as the " +
+          "encoding context.");
+    }
+
+    HFileBlockDefaultEncodingContext encodingCtx =
+        (HFileBlockDefaultEncodingContext) blkEncodingCtx;
+    encodingCtx.prepareEncoding();
+    DataOutputStream dataOut =
+        ((HFileBlockDefaultEncodingContext) encodingCtx)
+        .getOutputStreamForEncoder();
+    internalEncodeKeyValues(dataOut, in, includesMemstoreTS);
+    if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) {
+      encodingCtx.postEncoding(BlockType.ENCODED_DATA);
+    } else {
+      encodingCtx.postEncoding(BlockType.DATA);
+    }
+  }
+
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java?rev=1302602&r1=1302601&r2=1302602&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java Mon Mar 19 19:12:19 2012
@@ -33,7 +33,7 @@ import org.apache.hadoop.io.RawComparato
 @InterfaceAudience.Private
 public class CopyKeyDataBlockEncoder extends BufferedDataBlockEncoder {
   @Override
-  public void compressKeyValues(DataOutputStream out,
+  public void internalEncodeKeyValues(DataOutputStream out,
       ByteBuffer in, boolean includesMemstoreTS) throws IOException {
     in.rewind();
     ByteBufferUtils.putInt(out, in.limit());
@@ -94,4 +94,5 @@ public class CopyKeyDataBlockEncoder ext
       }
     };
   }
+
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java?rev=1302602&r1=1302601&r2=1302602&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java Mon Mar 19 19:12:19 2012
@@ -17,12 +17,12 @@
 package org.apache.hadoop.hbase.io.encoding;
 
 import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
 import org.apache.hadoop.io.RawComparator;
 
 /**
@@ -34,19 +34,32 @@ import org.apache.hadoop.io.RawComparato
  * <li>knowledge of Key Value format</li>
  * </ul>
  * It is designed to work fast enough to be feasible as in memory compression.
+ *
+ * After encoding, it also optionally compresses the encoded data if a
+ * compression algorithm is specified in HFileBlockEncodingContext argument of
+ * {@link #compressKeyValues(ByteBuffer, boolean, HFileBlockEncodingContext)}.
  */
 @InterfaceAudience.Private
 public interface DataBlockEncoder {
+
   /**
-   * Compress KeyValues and write them to output buffer.
-   * @param out Where to write compressed data.
-   * @param in Source of KeyValue for compression.
-   * @param includesMemstoreTS true if including memstore timestamp after every
-   *          key-value pair
-   * @throws IOException If there is an error writing to output stream.
-   */
-  public void compressKeyValues(DataOutputStream out,
-      ByteBuffer in, boolean includesMemstoreTS) throws IOException;
+   * Compress KeyValues. It will first encode key value pairs, and then
+   * optionally do the compression for the encoded data.
+   *
+   * @param in
+   *          Source of KeyValue for compression.
+   * @param includesMemstoreTS
+   *          true if including memstore timestamp after every key-value pair
+   * @param encodingContext
+   *          the encoding context which will contain encoded uncompressed bytes
+   *          as well as compressed encoded bytes if compression is enabled, and
+   *          also it will reuse resources across multiple calls.
+   * @throws IOException
+   *           If there is an error writing to output stream.
+   */
+  public void compressKeyValues(
+      ByteBuffer in, boolean includesMemstoreTS,
+      HFileBlockEncodingContext encodingContext) throws IOException;
 
   /**
    * Uncompress.
@@ -94,6 +107,34 @@ public interface DataBlockEncoder {
       boolean includesMemstoreTS);
 
   /**
+   * Creates a encoder specific encoding context
+   *
+   * @param compressionAlgorithm
+   *          compression algorithm used if the final data needs to be
+   *          compressed
+   * @param encoding
+   *          encoding strategy used
+   * @param headerBytes
+   *          header bytes to be written, put a dummy header here if the header
+   *          is unknown
+   * @return a newly created encoding context
+   */
+  public HFileBlockEncodingContext newDataBlockEncodingContext(
+      Algorithm compressionAlgorithm, DataBlockEncoding encoding,
+      byte[] headerBytes);
+
+  /**
+   * Creates an encoder specific decoding context, which will prepare the data
+   * before actual decoding
+   *
+   * @param compressionAlgorithm
+   *          compression algorithm used if the data needs to be decompressed
+   * @return a newly created decoding context
+   */
+  public HFileBlockDecodingContext newDataBlockDecodingContext(
+      Algorithm compressionAlgorithm);
+
+  /**
    * An interface which enable to seek while underlying data is encoded.
    *
    * It works on one HFileBlock, but it is reusable. See

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java?rev=1302602&r1=1302601&r2=1302602&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java Mon Mar 19 19:12:19 2012
@@ -103,6 +103,18 @@ public enum DataBlockEncoding {
     stream.write(idInBytes);
   }
 
+
+  /**
+   * Writes id bytes to the given array starting from offset.
+   *
+   * @param dest output array
+   * @param offset starting offset of the output array
+   * @throws IOException
+   */
+  public void writeIdInBytes(byte[] dest, int offset) throws IOException {
+    System.arraycopy(idInBytes, 0, dest, offset, ID_SIZE);
+  }
+
   /**
    * Return new data block encoder for given algorithm type.
    * @return data block encoder if algorithm is specified, null if none is
@@ -113,25 +125,6 @@ public enum DataBlockEncoding {
   }
 
   /**
-   * Provide access to all data block encoders, even those which are not
-   * exposed in the enum. Useful for testing and benchmarking.
-   * @return list of all data block encoders.
-   */
-  public static List<DataBlockEncoder> getAllEncoders() {
-    ArrayList<DataBlockEncoder> encoders = new ArrayList<DataBlockEncoder>();
-    for (DataBlockEncoding algo : values()) {
-      DataBlockEncoder encoder = algo.getEncoder();
-      if (encoder != null) {
-        encoders.add(encoder);
-      }
-    }
-
-    // Add encoders that are only used in testing.
-    encoders.add(new CopyKeyDataBlockEncoder());
-    return encoders;
-  }
-
-  /**
    * Find and create data block encoder for given id;
    * @param encoderId id of data block encoder.
    * @return Newly created data block encoder.

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java?rev=1302602&r1=1302601&r2=1302602&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java Mon Mar 19 19:12:19 2012
@@ -316,7 +316,7 @@ public class DiffKeyDeltaEncoder extends
   }
 
   @Override
-  public void compressKeyValues(DataOutputStream out,
+  public void internalEncodeKeyValues(DataOutputStream out,
       ByteBuffer in, boolean includesMemstoreTS) throws IOException {
     in.rewind();
     ByteBufferUtils.putInt(out, in.limit());

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java?rev=1302602&r1=1302601&r2=1302602&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java Mon Mar 19 19:12:19 2012
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.io.encod
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Iterator;
@@ -27,6 +26,8 @@ import java.util.Iterator;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.io.hfile.HFileBlock;
 import org.apache.hadoop.io.compress.Compressor;
 
 /**
@@ -40,17 +41,22 @@ public class EncodedDataBlock {
   ByteArrayOutputStream uncompressedOutputStream;
   ByteBuffer uncompressedBuffer;
   private byte[] cacheCompressData;
-  private ByteArrayOutputStream compressedStream = new ByteArrayOutputStream();
   private boolean includesMemstoreTS;
 
+  private final HFileBlockEncodingContext encodingCxt;
+
   /**
    * Create a buffer which will be encoded using dataBlockEncoder.
    * @param dataBlockEncoder Algorithm used for compression.
+   * @param encoding encoding type used
    */
   public EncodedDataBlock(DataBlockEncoder dataBlockEncoder,
-      boolean includesMemstoreTS) {
+      boolean includesMemstoreTS, DataBlockEncoding encoding) {
     this.dataBlockEncoder = dataBlockEncoder;
     uncompressedOutputStream = new ByteArrayOutputStream(BUFFER_SIZE);
+    encodingCxt =
+        dataBlockEncoder.newDataBlockEncodingContext(Compression.Algorithm.NONE,
+            encoding, HFileBlock.DUMMY_HEADER);
   }
 
   /**
@@ -175,7 +181,7 @@ public class EncodedDataBlock {
     if (cacheCompressData != null) {
       return cacheCompressData;
     }
-    cacheCompressData = doCompressData();
+    cacheCompressData = encodeData();
 
     return cacheCompressData;
   }
@@ -190,22 +196,20 @@ public class EncodedDataBlock {
   }
 
   /**
-   * Do the compression.
-   * @return Compressed byte buffer.
+   * Do the encoding .
+   * @return encoded byte buffer.
    */
-  public byte[] doCompressData() {
-    compressedStream.reset();
-    DataOutputStream dataOut = new DataOutputStream(compressedStream);
+  public byte[] encodeData() {
     try {
       this.dataBlockEncoder.compressKeyValues(
-          dataOut, getUncompressedBuffer(), includesMemstoreTS);
+          getUncompressedBuffer(), includesMemstoreTS, encodingCxt);
     } catch (IOException e) {
       throw new RuntimeException(String.format(
-          "Bug in decoding part of algorithm %s. " +
+          "Bug in encoding part of algorithm %s. " +
           "Probably it requested more bytes than are available.",
           toString()), e);
     }
-    return compressedStream.toByteArray();
+    return encodingCxt.getUncompressedBytesWithHeader();
   }
 
   @Override

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java?rev=1302602&r1=1302601&r2=1302602&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java Mon Mar 19 19:12:19 2012
@@ -343,7 +343,7 @@ public class FastDiffDeltaEncoder extend
   }
 
   @Override
-  public void compressKeyValues(DataOutputStream out,
+  public void internalEncodeKeyValues(DataOutputStream out,
       ByteBuffer in, boolean includesMemstoreTS) throws IOException {
     in.rewind();
     ByteBufferUtils.putInt(out, in.limit());

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java?rev=1302602&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java Mon Mar 19 19:12:19 2012
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.io.hfile.HFileBlock;
+
+/**
+ * A decoding context that is created by a reader's encoder, and is shared
+ * across the reader's all read operations.
+ *
+ * @see HFileBlockEncodingContext for encoding
+ */
+public interface HFileBlockDecodingContext {
+
+  /**
+   * @return the compression algorithm used by this decoding context
+   */
+  public Compression.Algorithm getCompression();
+
+  /**
+   * Perform all actions that need to be done before the encoder's real
+   * decoding process. Decompression needs to be done if
+   * {@link #getCompression()} returns a valid compression algorithm.
+   *
+   * @param block HFile block object
+   * @param onDiskBlock on disk bytes to be decoded
+   * @param offset data start offset in onDiskBlock
+   * @throws IOException
+   */
+  public void prepareDecoding(HFileBlock block, byte[] onDiskBlock,
+      int offset) throws IOException;
+
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java?rev=1302602&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java Mon Mar 19 19:12:19 2012
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.io.hfile.HFileBlock;
+import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
+
+/**
+ * A default implementation of {@link HFileBlockDecodingContext}. It assumes the
+ * block data section is compressed as a whole.
+ *
+ * @see HFileBlockDefaultEncodingContext for the default compression context
+ *
+ */
+public class HFileBlockDefaultDecodingContext implements
+    HFileBlockDecodingContext {
+
+  private final Compression.Algorithm compressAlgo;
+
+  public HFileBlockDefaultDecodingContext(
+      Compression.Algorithm compressAlgo) {
+    this.compressAlgo = compressAlgo;
+  }
+
+  @Override
+  public void prepareDecoding(HFileBlock block,
+      byte[] onDiskBlock, int offset) throws IOException {
+    DataInputStream dis =
+        new DataInputStream(new ByteArrayInputStream(
+            onDiskBlock, offset,
+            block.getOnDiskSizeWithoutHeader()));
+
+    ByteBuffer buffer = block.getBufferWithoutHeader();
+    Compression.decompress(buffer.array(), buffer.arrayOffset(),
+        (InputStream) dis, block.getOnDiskSizeWithoutHeader(),
+        block.getUncompressedSizeWithoutHeader(), compressAlgo);
+  }
+
+  @Override
+  public Algorithm getCompression() {
+    return compressAlgo;
+  }
+
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java?rev=1302602&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java Mon Mar 19 19:12:19 2012
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.NONE;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.io.hfile.BlockType;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.io.hfile.HFileBlock;
+import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+
+/**
+ * A default implementation of {@link HFileBlockEncodingContext}. It will
+ * compress the data section as one continuous buffer.
+ *
+ * @see HFileBlockDefaultDecodingContext for the decompression part
+ *
+ */
+public class HFileBlockDefaultEncodingContext implements
+    HFileBlockEncodingContext {
+
+  private byte[] onDiskBytesWithHeader;
+  private byte[] uncompressedBytesWithHeader;
+  private BlockType blockType;
+  private final DataBlockEncoding encodingAlgo;
+
+  /** Compressor, which is also reused between consecutive blocks. */
+  private Compressor compressor;
+
+  /** Compression output stream */
+  private CompressionOutputStream compressionStream;
+
+  /** Underlying stream to write compressed bytes to */
+  private ByteArrayOutputStream compressedByteStream;
+
+  /** Compression algorithm for all blocks this instance writes. */
+  private final Compression.Algorithm compressionAlgorithm;
+
+  private ByteArrayOutputStream encodedStream = new ByteArrayOutputStream();
+  private DataOutputStream dataOut = new DataOutputStream(encodedStream);
+
+  private final byte[] dummyHeader;
+
+  /**
+   * @param compressionAlgorithm compression algorithm used
+   * @param encoding encoding used
+   * @param headerBytes dummy header bytes
+   */
+  public HFileBlockDefaultEncodingContext(
+      Compression.Algorithm compressionAlgorithm,
+      DataBlockEncoding encoding, byte[] headerBytes) {
+    this.encodingAlgo = encoding;
+    this.compressionAlgorithm =
+        compressionAlgorithm == null ? NONE : compressionAlgorithm;
+    if (this.compressionAlgorithm != NONE) {
+      compressor = compressionAlgorithm.getCompressor();
+      compressedByteStream = new ByteArrayOutputStream();
+      try {
+        compressionStream =
+            compressionAlgorithm.createPlainCompressionStream(
+                compressedByteStream, compressor);
+      } catch (IOException e) {
+        throw new RuntimeException(
+            "Could not create compression stream for algorithm "
+                + compressionAlgorithm, e);
+      }
+    }
+    if (headerBytes == null) {
+      dummyHeader = HFileBlock.DUMMY_HEADER;
+    } else {
+      dummyHeader = headerBytes;
+    }
+  }
+
+  /**
+   * @param compressionAlgorithm compression algorithm
+   * @param encoding encoding
+   */
+  public HFileBlockDefaultEncodingContext(
+      Compression.Algorithm compressionAlgorithm,
+      DataBlockEncoding encoding) {
+    this(compressionAlgorithm, encoding, null);
+  }
+
+  /**
+   * prepare to start a new encoding.
+   * @throws IOException
+   */
+  void prepareEncoding() throws IOException {
+    encodedStream.reset();
+    dataOut.write(dummyHeader);
+    if (encodingAlgo != null
+        && encodingAlgo != DataBlockEncoding.NONE) {
+      encodingAlgo.writeIdInBytes(dataOut);
+    }
+  }
+
+  @Override
+  public void postEncoding(BlockType blockType)
+      throws IOException {
+    dataOut.flush();
+    compressAfterEncoding(encodedStream.toByteArray(), blockType);
+    this.blockType = blockType;
+  }
+
+  /**
+   * @param uncompressedBytesWithHeader
+   * @param blockType
+   * @throws IOException
+   */
+  public void compressAfterEncoding(byte[] uncompressedBytesWithHeader,
+      BlockType blockType) throws IOException {
+    compressAfterEncoding(uncompressedBytesWithHeader, blockType, dummyHeader);
+  }
+
+  /**
+   * @param uncompressedBytesWithHeader
+   * @param blockType
+   * @param headerBytes
+   * @throws IOException
+   */
+  protected void compressAfterEncoding(byte[] uncompressedBytesWithHeader,
+      BlockType blockType, byte[] headerBytes) throws IOException {
+    this.uncompressedBytesWithHeader = uncompressedBytesWithHeader;
+    if (compressionAlgorithm != NONE) {
+      compressedByteStream.reset();
+      compressedByteStream.write(headerBytes);
+      compressionStream.resetState();
+      compressionStream.write(uncompressedBytesWithHeader,
+          headerBytes.length, uncompressedBytesWithHeader.length
+              - headerBytes.length);
+
+      compressionStream.flush();
+      compressionStream.finish();
+      onDiskBytesWithHeader = compressedByteStream.toByteArray();
+    } else {
+      onDiskBytesWithHeader = uncompressedBytesWithHeader;
+    }
+    this.blockType = blockType;
+  }
+
+  @Override
+  public byte[] getOnDiskBytesWithHeader() {
+    return onDiskBytesWithHeader;
+  }
+
+  @Override
+  public byte[] getUncompressedBytesWithHeader() {
+    return uncompressedBytesWithHeader;
+  }
+
+  @Override
+  public BlockType getBlockType() {
+    return blockType;
+  }
+
+  /**
+   * Releases the compressor this writer uses to compress blocks into the
+   * compressor pool.
+   */
+  @Override
+  public void close() {
+    if (compressor != null) {
+      compressionAlgorithm.returnCompressor(compressor);
+      compressor = null;
+    }
+  }
+
+  @Override
+  public Algorithm getCompression() {
+    return this.compressionAlgorithm;
+  }
+
+  public DataOutputStream getOutputStreamForEncoder() {
+    return this.dataOut;
+  }
+
+  @Override
+  public DataBlockEncoding getDataBlockEncoding() {
+    return this.encodingAlgo;
+  }
+
+  @Override
+  public int getHeaderSize() {
+    return this.dummyHeader.length;
+  }
+
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java?rev=1302602&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java Mon Mar 19 19:12:19 2012
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.io.hfile.BlockType;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+
+/**
+ * An encoding context that is created by a writer's encoder, and is shared
+ * across the writer's whole lifetime.
+ *
+ * @see HFileBlockDecodingContext for decoding
+ *
+ */
+public interface HFileBlockEncodingContext {
+
+  /**
+   * @return encoded and compressed bytes with header which are ready to write
+   *         out to disk
+   */
+  public byte[] getOnDiskBytesWithHeader();
+
+  /**
+   * @return encoded but not heavily compressed bytes with header which can be
+   *         cached in block cache
+   */
+  public byte[] getUncompressedBytesWithHeader();
+
+  /**
+   * @return the block type after encoding
+   */
+  public BlockType getBlockType();
+
+  /**
+   * @return the compression algorithm used by this encoding context
+   */
+  public Compression.Algorithm getCompression();
+
+  /**
+   * @return the header size used
+   */
+  public int getHeaderSize();
+
+  /**
+   * @return the {@link DataBlockEncoding} encoding used
+   */
+  public DataBlockEncoding getDataBlockEncoding();
+
+  /**
+   * Do any action that needs to be performed after the encoding.
+   * Compression is also included if {@link #getCompression()} returns non-null
+   * compression algorithm
+   *
+   * @param blockType
+   * @throws IOException
+   */
+  public void postEncoding(BlockType blockType) throws IOException;
+
+  /**
+   * Releases the resources used.
+   */
+  public void close();
+
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java?rev=1302602&r1=1302601&r2=1302602&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java Mon Mar 19 19:12:19 2012
@@ -75,7 +75,7 @@ public class PrefixKeyDeltaEncoder exten
   }
 
   @Override
-  public void compressKeyValues(DataOutputStream writeHere,
+  public void internalEncodeKeyValues(DataOutputStream writeHere,
       ByteBuffer in, boolean includesMemstoreTS) throws IOException {
     in.rewind();
     ByteBufferUtils.putInt(writeHere, in.limit());

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java?rev=1302602&r1=1302601&r2=1302602&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java Mon Mar 19 19:12:19 2012
@@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionInputStream;
@@ -234,7 +235,7 @@ public final class Compression {
      * Creates a compression stream without any additional wrapping into
      * buffering streams.
      */
-    CompressionOutputStream createPlainCompressionStream(
+    public CompressionOutputStream createPlainCompressionStream(
         OutputStream downStream, Compressor compressor) throws IOException {
       CompressionCodec codec = getCodec(conf);
       ((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024);
@@ -323,4 +324,52 @@ public final class Compression {
 
     return ret;
   }
+
+  /**
+   * Decompresses data from the given stream using the configured compression
+   * algorithm. It will throw an exception if the dest buffer does not have
+   * enough space to hold the decompressed data.
+   *
+   * @param dest
+   *          the output bytes buffer
+   * @param destOffset
+   *          start writing position of the output buffer
+   * @param bufferedBoundedStream
+   *          a stream to read compressed data from, bounded to the exact amount
+   *          of compressed data
+   * @param compressedSize
+   *          compressed data size, header not included
+   * @param uncompressedSize
+   *          uncompressed data size, header not included
+   * @param compressAlgo
+   *          compression algorithm used
+   * @throws IOException
+   */
+  public static void decompress(byte[] dest, int destOffset,
+      InputStream bufferedBoundedStream, int compressedSize,
+      int uncompressedSize, Compression.Algorithm compressAlgo)
+      throws IOException {
+
+    if (dest.length - destOffset < uncompressedSize) {
+      throw new IllegalArgumentException(
+          "Output buffer does not have enough space to hold "
+              + uncompressedSize + " decompressed bytes, available: "
+              + (dest.length - destOffset));
+    }
+
+    Decompressor decompressor = null;
+    try {
+      decompressor = compressAlgo.getDecompressor();
+      InputStream is = compressAlgo.createDecompressionStream(
+          bufferedBoundedStream, decompressor, 0);
+
+      IOUtils.readFully(is, dest, destOffset, uncompressedSize);
+      is.close();
+    } finally {
+      if (decompressor != null) {
+        compressAlgo.returnDecompressor(decompressor);
+      }
+    }
+  }
+
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java?rev=1302602&r1=1302601&r2=1302602&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java Mon Mar 19 19:12:19 2012
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hbase.io.hfile;
 
 import static org.apache.hadoop.hbase.io.hfile.BlockType.MAGIC_LENGTH;
-import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.NONE;
 
 import java.io.BufferedInputStream;
 import java.io.ByteArrayInputStream;
@@ -29,7 +28,6 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
-import java.util.zip.Checksum;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -38,6 +36,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
+import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
+import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
+import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
 import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
 import org.apache.hadoop.hbase.regionserver.MemStore;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
@@ -45,14 +47,9 @@ import org.apache.hadoop.hbase.util.Byte
 import org.apache.hadoop.hbase.util.ChecksumType;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.CompoundBloomFilter;
-import org.apache.hadoop.hbase.util.ChecksumFactory;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.compress.CompressionOutputStream;
-import org.apache.hadoop.io.compress.Compressor;
-import org.apache.hadoop.io.compress.Decompressor;
 
 import com.google.common.base.Preconditions;
 
@@ -114,8 +111,8 @@ public class HFileBlock extends SchemaCo
    * There is a 1 byte checksum type, followed by a 4 byte bytesPerChecksum
    * followed by another 4 byte value to store sizeofDataOnDisk.
    */
-  static final int HEADER_SIZE = HEADER_SIZE_NO_CHECKSUM + Bytes.SIZEOF_BYTE +
-                                 2 * Bytes.SIZEOF_INT;
+  public static final int HEADER_SIZE = HEADER_SIZE_NO_CHECKSUM +
+    Bytes.SIZEOF_BYTE + 2 * Bytes.SIZEOF_INT;
 
   /**
    * The size of block header when blockType is {@link BlockType#ENCODED_DATA}.
@@ -125,7 +122,7 @@ public class HFileBlock extends SchemaCo
       + DataBlockEncoding.ID_SIZE;
 
   /** Just an array of bytes of the right size. */
-  static final byte[] DUMMY_HEADER = new byte[HEADER_SIZE];
+  public static final byte[] DUMMY_HEADER = new byte[HEADER_SIZE];
   static final byte[] DUMMY_HEADER_NO_CHECKSUM = 
      new byte[HEADER_SIZE_NO_CHECKSUM];
 
@@ -303,7 +300,7 @@ public class HFileBlock extends SchemaCo
    * @return the on-disk size of the data part of the block, header and
    *         checksum not included. 
    */
-  int getOnDiskSizeWithoutHeader() {
+  public int getOnDiskSizeWithoutHeader() {
     return onDiskSizeWithoutHeader;
   }
 
@@ -342,7 +339,7 @@ public class HFileBlock extends SchemaCo
    *
    * @return the buffer with header skipped
    */
-  ByteBuffer getBufferWithoutHeader() {
+  public ByteBuffer getBufferWithoutHeader() {
     return ByteBuffer.wrap(buf.array(), buf.arrayOffset() + headerSize(),
         buf.limit() - headerSize() - totalChecksumBytes()).slice();
   }
@@ -644,6 +641,11 @@ public class HFileBlock extends SchemaCo
     /** Data block encoder used for data blocks */
     private final HFileDataBlockEncoder dataBlockEncoder;
 
+    private HFileBlockEncodingContext dataBlockEncodingCtx;
+
+    /** block encoding context for non-data blocks */
+    private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx;
+
     /**
      * The stream we use to accumulate data in uncompressed format for each
      * block. We reset this stream at the end of each block and reuse it. The
@@ -652,15 +654,6 @@ public class HFileBlock extends SchemaCo
      */
     private ByteArrayOutputStream baosInMemory;
 
-    /** Compressor, which is also reused between consecutive blocks. */
-    private Compressor compressor;
-
-    /** Compression output stream */
-    private CompressionOutputStream compressionStream;
-    
-    /** Underlying stream to write compressed bytes to */
-    private ByteArrayOutputStream compressedByteStream;
-
     /**
      * Current block type. Set in {@link #startWriting(BlockType)}. Could be
      * changed in {@link #encodeDataBlockForDisk()} from {@link BlockType#DATA}
@@ -682,12 +675,6 @@ public class HFileBlock extends SchemaCo
     private byte[] onDiskBytesWithHeader;
 
     /**
-     * The size of the data on disk that does not include the checksums.
-     * (header + data)
-     */
-    private int onDiskDataSizeWithHeader;
-
-    /**
      * The size of the checksum data on disk. It is used only if data is
      * not compressed. If data is compressed, then the checksums are already
      * part of onDiskBytesWithHeader. If data is uncompressed, then this
@@ -734,28 +721,23 @@ public class HFileBlock extends SchemaCo
     public Writer(Compression.Algorithm compressionAlgorithm,
           HFileDataBlockEncoder dataBlockEncoder, boolean includesMemstoreTS,
           ChecksumType checksumType, int bytesPerChecksum) {
-      compressAlgo = compressionAlgorithm == null ? NONE : compressionAlgorithm;
+      compressAlgo = compressionAlgorithm == null ? Compression.Algorithm.NONE :
+        compressionAlgorithm;
       this.dataBlockEncoder = dataBlockEncoder != null
           ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
+      defaultBlockEncodingCtx =
+        new HFileBlockDefaultEncodingContext(compressionAlgorithm, null);
+      dataBlockEncodingCtx =
+        this.dataBlockEncoder.newOnDiskDataBlockEncodingContext(
+            compressionAlgorithm, DUMMY_HEADER);
 
-      baosInMemory = new ByteArrayOutputStream();
-      if (compressAlgo != NONE) {
-        compressor = compressionAlgorithm.getCompressor();
-        compressedByteStream = new ByteArrayOutputStream();
-        try {
-          compressionStream =
-              compressionAlgorithm.createPlainCompressionStream(
-                  compressedByteStream, compressor);
-        } catch (IOException e) {
-          throw new RuntimeException("Could not create compression stream " + 
-              "for algorithm " + compressionAlgorithm, e);
-        }
-      }
       if (bytesPerChecksum < HEADER_SIZE) {
         throw new RuntimeException("Unsupported value of bytesPerChecksum. " +
             " Minimum is " + HEADER_SIZE + " but the configured value is " +
             bytesPerChecksum);
       }
+
+      baosInMemory = new ByteArrayOutputStream();
       
       prevOffsetByType = new long[BlockType.values().length];
       for (int i = 0; i < prevOffsetByType.length; ++i)
@@ -828,7 +810,6 @@ public class HFileBlock extends SchemaCo
      */
     private void finishBlock() throws IOException {
       userDataStream.flush();
-
       // This does an array copy, so it is safe to cache this byte array.
       uncompressedBytesWithHeader = baosInMemory.toByteArray();
       prevOffset = prevOffsetByType[blockType.getId()];
@@ -837,81 +818,32 @@ public class HFileBlock extends SchemaCo
       // cache-on-write. In a way, the block is ready, but not yet encoded or
       // compressed.
       state = State.BLOCK_READY;
-      encodeDataBlockForDisk();
-
-      doCompressionAndChecksumming();
-    }
-
-    /**
-     * Do compression if it is enabled, or re-use the uncompressed buffer if
-     * it is not. Fills in the compressed block's header if doing compression.
-     * Also, compute the checksums. In the case of no-compression, write the
-     * checksums to its own seperate data structure called onDiskChecksum. In
-     * the case when compression is enabled, the checksums are written to the
-     * outputbyte stream 'baos'.
-     */
-    private void doCompressionAndChecksumming() throws IOException {
-      // do the compression
-      if (compressAlgo != NONE) {
-        compressedByteStream.reset();
-        compressedByteStream.write(DUMMY_HEADER);
-
-        compressionStream.resetState();
-
-        compressionStream.write(uncompressedBytesWithHeader, HEADER_SIZE,
-            uncompressedBytesWithHeader.length - HEADER_SIZE);
-
-        compressionStream.flush();
-        compressionStream.finish();
-
-        // generate checksums
-        onDiskDataSizeWithHeader = compressedByteStream.size(); // data size
-
-        // reserve space for checksums in the output byte stream
-        ChecksumUtil.reserveSpaceForChecksums(compressedByteStream, 
-          onDiskDataSizeWithHeader, bytesPerChecksum);
-
-
-        onDiskBytesWithHeader = compressedByteStream.toByteArray();
-        putHeader(onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length,
-            uncompressedBytesWithHeader.length, onDiskDataSizeWithHeader);
-
-       // generate checksums for header and data. The checksums are
-       // part of onDiskBytesWithHeader itself.
-       ChecksumUtil.generateChecksums(
-         onDiskBytesWithHeader, 0, onDiskDataSizeWithHeader,
-         onDiskBytesWithHeader, onDiskDataSizeWithHeader,
-         checksumType, bytesPerChecksum);
-
-        // Checksums are already part of onDiskBytesWithHeader
-        onDiskChecksum = HConstants.EMPTY_BYTE_ARRAY;
-
-        //set the header for the uncompressed bytes (for cache-on-write)
-        putHeader(uncompressedBytesWithHeader, 0,
-          onDiskBytesWithHeader.length + onDiskChecksum.length,
-          uncompressedBytesWithHeader.length, onDiskDataSizeWithHeader);
-
+      if (blockType == BlockType.DATA) {
+        encodeDataBlockForDisk();
       } else {
-        // If we are not using any compression, then the
-        // checksums are written to its own array onDiskChecksum.
-        onDiskBytesWithHeader = uncompressedBytesWithHeader;
-
-        onDiskDataSizeWithHeader = onDiskBytesWithHeader.length;
-        int numBytes = (int)ChecksumUtil.numBytes(
-                          uncompressedBytesWithHeader.length,
-                          bytesPerChecksum);
-        onDiskChecksum = new byte[numBytes];
-
-        //set the header for the uncompressed bytes
-        putHeader(uncompressedBytesWithHeader, 0,
-          onDiskBytesWithHeader.length + onDiskChecksum.length,
-          uncompressedBytesWithHeader.length, onDiskDataSizeWithHeader);
-
-        ChecksumUtil.generateChecksums(
-          uncompressedBytesWithHeader, 0, uncompressedBytesWithHeader.length,
-          onDiskChecksum, 0,
-          checksumType, bytesPerChecksum);
-      }
+        defaultBlockEncodingCtx.compressAfterEncoding(
+            uncompressedBytesWithHeader, blockType);
+        onDiskBytesWithHeader =
+          defaultBlockEncodingCtx.getOnDiskBytesWithHeader();
+      }
+
+      int numBytes = (int) ChecksumUtil.numBytes(
+          onDiskBytesWithHeader.length,
+          bytesPerChecksum);
+
+      // put the header for on disk bytes
+      putHeader(onDiskBytesWithHeader, 0,
+          onDiskBytesWithHeader.length + numBytes,
+          uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length);
+      //set the header for the uncompressed bytes (for cache-on-write)
+      putHeader(uncompressedBytesWithHeader, 0,
+          onDiskBytesWithHeader.length + numBytes,
+        uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length);
+
+      onDiskChecksum = new byte[numBytes];
+      ChecksumUtil.generateChecksums(
+          onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length,
+          onDiskChecksum, 0, checksumType, bytesPerChecksum);
     }
 
     /**
@@ -919,35 +851,20 @@ public class HFileBlock extends SchemaCo
      * {@link #dataBlockEncoder}.
      */
     private void encodeDataBlockForDisk() throws IOException {
-      if (blockType != BlockType.DATA) {
-        return; // skip any non-data block
-      }
-
       // do data block encoding, if data block encoder is set
-      ByteBuffer rawKeyValues = ByteBuffer.wrap(uncompressedBytesWithHeader,
-          HEADER_SIZE, uncompressedBytesWithHeader.length -
-          HEADER_SIZE).slice();
-      Pair<ByteBuffer, BlockType> encodingResult =
-          dataBlockEncoder.beforeWriteToDisk(rawKeyValues,
-              includesMemstoreTS, DUMMY_HEADER);
-
-      BlockType encodedBlockType = encodingResult.getSecond();
-      if (encodedBlockType == BlockType.ENCODED_DATA) {
-        uncompressedBytesWithHeader = encodingResult.getFirst().array();
-        blockType = BlockType.ENCODED_DATA;
-      } else {
-        // There is no encoding configured. Do some extra sanity-checking.
-        if (encodedBlockType != BlockType.DATA) {
-          throw new IOException("Unexpected block type coming out of data " +
-              "block encoder: " + encodedBlockType);
-        }
-        if (userDataStream.size() !=
-            uncompressedBytesWithHeader.length - HEADER_SIZE) {
-          throw new IOException("Uncompressed size mismatch: "
-              + userDataStream.size() + " vs. "
-              + (uncompressedBytesWithHeader.length - HEADER_SIZE));
-        }
-      }
+      ByteBuffer rawKeyValues =
+          ByteBuffer.wrap(uncompressedBytesWithHeader, HEADER_SIZE,
+              uncompressedBytesWithHeader.length - HEADER_SIZE).slice();
+
+      //do the encoding
+      dataBlockEncoder.beforeWriteToDisk(rawKeyValues,
+              includesMemstoreTS, dataBlockEncodingCtx, blockType);
+
+      uncompressedBytesWithHeader =
+          dataBlockEncodingCtx.getUncompressedBytesWithHeader();
+      onDiskBytesWithHeader =
+          dataBlockEncodingCtx.getOnDiskBytesWithHeader();
+      blockType = dataBlockEncodingCtx.getBlockType();
     }
 
     /**
@@ -966,7 +883,7 @@ public class HFileBlock extends SchemaCo
       offset = Bytes.putLong(dest, offset, prevOffset);
       offset = Bytes.putByte(dest, offset, checksumType.getCode());
       offset = Bytes.putInt(dest, offset, bytesPerChecksum);
-      offset = Bytes.putInt(dest, offset, onDiskDataSizeWithHeader);
+      offset = Bytes.putInt(dest, offset, onDiskDataSize);
     }
 
     /**
@@ -986,7 +903,7 @@ public class HFileBlock extends SchemaCo
       }
       startOffset = offset;
 
-      writeHeaderAndData((DataOutputStream) out);
+      finishBlockAndWriteHeaderAndData((DataOutputStream) out);
     }
 
     /**
@@ -998,17 +915,11 @@ public class HFileBlock extends SchemaCo
      * @param out the output stream to write the
      * @throws IOException
      */
-    private void writeHeaderAndData(DataOutputStream out) throws IOException {
+    private void finishBlockAndWriteHeaderAndData(DataOutputStream out)
+      throws IOException {
       ensureBlockReady();
       out.write(onDiskBytesWithHeader);
-      if (compressAlgo == NONE) {
-        if (onDiskChecksum == HConstants.EMPTY_BYTE_ARRAY) {
-          throw new IOException("A " + blockType 
-              + " without compression should have checksums " 
-              + " stored separately.");
-        }
-        out.write(onDiskChecksum);
-      }
+      out.write(onDiskChecksum);
     }
 
     /**
@@ -1023,34 +934,29 @@ public class HFileBlock extends SchemaCo
      */
     byte[] getHeaderAndDataForTest() throws IOException {
       ensureBlockReady();
-      if (compressAlgo == NONE) {
-        if (onDiskChecksum == HConstants.EMPTY_BYTE_ARRAY) {
-          throw new IOException("A " + blockType 
-              + " without compression should have checksums " 
-              + " stored separately.");
-        }
-        // This is not very optimal, because we are doing an extra copy.
-        // But this method is used only by unit tests.
-        byte[] output = new byte[onDiskBytesWithHeader.length +
-                                 onDiskChecksum.length];
-        System.arraycopy(onDiskBytesWithHeader, 0,
-                         output, 0, onDiskBytesWithHeader.length);
-        System.arraycopy(onDiskChecksum, 0,
-                         output, onDiskBytesWithHeader.length,
-                         onDiskChecksum.length);
-        return output;
-      }
-      return onDiskBytesWithHeader;
+      // This is not very optimal, because we are doing an extra copy.
+      // But this method is used only by unit tests.
+      byte[] output =
+          new byte[onDiskBytesWithHeader.length
+              + onDiskChecksum.length];
+      System.arraycopy(onDiskBytesWithHeader, 0, output, 0,
+          onDiskBytesWithHeader.length);
+      System.arraycopy(onDiskChecksum, 0, output,
+          onDiskBytesWithHeader.length, onDiskChecksum.length);
+      return output;
     }
 
     /**
-     * Releases the compressor this writer uses to compress blocks into the
-     * compressor pool. Needs to be called before the writer is discarded.
-     */
-    public void releaseCompressor() {
-      if (compressor != null) {
-        compressAlgo.returnCompressor(compressor);
-        compressor = null;
+     * Releases resources used by this writer.
+     */
+    public void release() {
+      if (dataBlockEncodingCtx != null) {
+        dataBlockEncodingCtx.close();
+        dataBlockEncodingCtx = null;
+      }
+      if (defaultBlockEncodingCtx != null) {
+        defaultBlockEncodingCtx.close();
+        defaultBlockEncodingCtx = null;
       }
     }
 
@@ -1252,7 +1158,7 @@ public class HFileBlock extends SchemaCo
     private int minorVersion;
 
     /** The size of the header */
-    protected int hdrSize;
+    protected final int hdrSize;
 
     /** The filesystem used to access data */
     protected HFileSystem hfs;
@@ -1377,36 +1283,6 @@ public class HFileBlock extends SchemaCo
     }
 
     /**
-     * Decompresses data from the given stream using the configured compression
-     * algorithm.
-     * @param dest
-     * @param destOffset
-     * @param bufferedBoundedStream
-     *          a stream to read compressed data from, bounded to the exact
-     *          amount of compressed data
-     * @param uncompressedSize
-     *          uncompressed data size, header not included
-     * @throws IOException
-     */
-    protected void decompress(byte[] dest, int destOffset,
-        InputStream bufferedBoundedStream,
-        int uncompressedSize) throws IOException {
-      Decompressor decompressor = null;
-      try {
-        decompressor = compressAlgo.getDecompressor();
-        InputStream is = compressAlgo.createDecompressionStream(
-            bufferedBoundedStream, decompressor, 0);
-
-        IOUtils.readFully(is, dest, destOffset, uncompressedSize);
-        is.close();
-      } finally {
-        if (decompressor != null) {
-          compressAlgo.returnDecompressor(decompressor);
-        }
-      }
-    }
-
-    /**
      * Creates a buffered stream reading a certain slice of the file system
      * input stream. We need this because the decompression we use seems to
      * expect the input stream to be bounded.
@@ -1511,8 +1387,9 @@ public class HFileBlock extends SchemaCo
       } else {
         InputStream bufferedBoundedStream = createBufferedBoundedStream(
             offset, onDiskSize, pread);
-        decompress(buf.array(), buf.arrayOffset() + HEADER_DELTA,
-            bufferedBoundedStream, uncompressedSizeWithMagic);
+        Compression.decompress(buf.array(), buf.arrayOffset()
+            + HEADER_DELTA, bufferedBoundedStream, onDiskSize,
+            uncompressedSizeWithMagic, this.compressAlgo);
 
         // We don't really have a good way to exclude the "magic record" size
         // from the compressed block's size, since it is compressed as well.
@@ -1566,6 +1443,10 @@ public class HFileBlock extends SchemaCo
     protected HFileDataBlockEncoder dataBlockEncoder =
         NoOpDataBlockEncoder.INSTANCE;
 
+    private HFileBlockDecodingContext encodedBlockDecodingCtx;
+
+    private HFileBlockDefaultDecodingContext defaultDecodingCtx;
+
     private ThreadLocal<PrefetchedHeader> prefetchedHeaderForThread =
         new ThreadLocal<PrefetchedHeader>() {
           @Override
@@ -1598,6 +1479,10 @@ public class HFileBlock extends SchemaCo
         useHBaseChecksum = false;
       }
       this.useHBaseChecksumConfigured = useHBaseChecksum;
+      defaultDecodingCtx =
+        new HFileBlockDefaultDecodingContext(compressAlgo);
+      encodedBlockDecodingCtx =
+          new HFileBlockDefaultDecodingContext(compressAlgo);
     }
 
     /**
@@ -1716,9 +1601,8 @@ public class HFileBlock extends SchemaCo
      * @return the HFileBlock or null if there is a HBase checksum mismatch
      */
     private HFileBlock readBlockDataInternal(FSDataInputStream is, long offset, 
-        long onDiskSizeWithHeaderL,
-        int uncompressedSize, boolean pread, boolean verifyChecksum) 
-        throws IOException {
+        long onDiskSizeWithHeaderL, int uncompressedSize, boolean pread,
+        boolean verifyChecksum) throws IOException {
       if (offset < 0) {
         throw new IOException("Invalid offset=" + offset + " trying to read "
             + "block (onDiskSize=" + onDiskSizeWithHeaderL
@@ -1738,8 +1622,20 @@ public class HFileBlock extends SchemaCo
       }
 
       int onDiskSizeWithHeader = (int) onDiskSizeWithHeaderL;
+      // See if we can avoid reading the header. This is desirable, because
+      // we will not incur a backward seek operation if we have already
+      // read this block's header as part of the previous read's look-ahead.
+      // And we also want to skip reading the header again if it has already
+      // been read.
+      PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get();
+      ByteBuffer headerBuf = prefetchedHeader.offset == offset ?
+          prefetchedHeader.buf : null;
 
-      HFileBlock b;
+      int nextBlockOnDiskSize = 0;
+      // Allocate enough space to fit the next block's header too.
+      byte[] onDiskBlock = null;
+
+      HFileBlock b = null;
       if (onDiskSizeWithHeader > 0) {
         // We know the total on-disk size but not the uncompressed size. Read
         // the entire block into memory, then parse the header and decompress
@@ -1749,172 +1645,117 @@ public class HFileBlock extends SchemaCo
         // block's header (e.g. this block's header) when reading the previous
         // block. This is the faster and more preferable case.
 
-        int onDiskSizeWithoutHeader = onDiskSizeWithHeader - hdrSize;
-        assert onDiskSizeWithoutHeader >= 0;
-
-        // See if we can avoid reading the header. This is desirable, because
-        // we will not incur a seek operation to seek back if we have already
-        // read this block's header as part of the previous read's look-ahead.
-        PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get();
-        byte[] header = prefetchedHeader.offset == offset
-            ? prefetchedHeader.header : null;
-
         // Size that we have to skip in case we have already read the header.
-        int preReadHeaderSize = header == null ? 0 : hdrSize;
-
-        if (compressAlgo == Compression.Algorithm.NONE) {
-          // Just read the whole thing. Allocate enough space to read the
-          // next block's header too.
-
-          ByteBuffer headerAndData = ByteBuffer.allocate(onDiskSizeWithHeader
-              + hdrSize);
-          headerAndData.limit(onDiskSizeWithHeader);
-
-          if (header != null) {
-            System.arraycopy(header, 0, headerAndData.array(), 0,
-                hdrSize);
-          }
-
-          int nextBlockOnDiskSizeWithHeader = readAtOffset(is,
-              headerAndData.array(), headerAndData.arrayOffset()
-                  + preReadHeaderSize, onDiskSizeWithHeader
-                  - preReadHeaderSize, true, offset + preReadHeaderSize,
-                  pread);
-
-          b = new HFileBlock(headerAndData, getMinorVersion());
-          b.assumeUncompressed();
-          b.validateOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader);
-          b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSizeWithHeader;
-          if (verifyChecksum &&
-              !validateBlockChecksum(b, headerAndData.array(), hdrSize)) {
-            return null;             // checksum mismatch
-          }
-          if (b.nextBlockOnDiskSizeWithHeader > 0)
-            setNextBlockHeader(offset, b);
+        int preReadHeaderSize = headerBuf == null ? 0 : hdrSize;
+        onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize];
+        nextBlockOnDiskSize = readAtOffset(is, onDiskBlock,
+            preReadHeaderSize, onDiskSizeWithHeader - preReadHeaderSize,
+            true, offset + preReadHeaderSize, pread);
+        if (headerBuf != null) {
+          // the header has been read when reading the previous block, copy
+          // to this block's header
+          System.arraycopy(headerBuf.array(),
+              headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
         } else {
-          // Allocate enough space to fit the next block's header too.
-          byte[] onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize];
-
-          int nextBlockOnDiskSize = readAtOffset(is, onDiskBlock,
-              preReadHeaderSize, onDiskSizeWithHeader - preReadHeaderSize,
-              true, offset + preReadHeaderSize, pread);
-
-          if (header == null)
-            header = onDiskBlock;
-
-          try {
-            b = new HFileBlock(ByteBuffer.wrap(header, 0, hdrSize), 
-                               getMinorVersion());
-          } catch (IOException ex) {
-            // Seen in load testing. Provide comprehensive debug info.
-            throw new IOException("Failed to read compressed block at "
-                + offset + ", onDiskSizeWithoutHeader=" + onDiskSizeWithHeader
-                + ", preReadHeaderSize=" + preReadHeaderSize
-                + ", header.length=" + header.length + ", header bytes: "
-                + Bytes.toStringBinary(header, 0, hdrSize), ex);
-          }
-          b.validateOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader);
-          b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSize;
-          if (verifyChecksum && 
-              !validateBlockChecksum(b, onDiskBlock, hdrSize)) {
-            return null;             // checksum mismatch
-          }
-
-          DataInputStream dis = new DataInputStream(new ByteArrayInputStream(
-              onDiskBlock, hdrSize, onDiskSizeWithoutHeader));
-
-          // This will allocate a new buffer but keep header bytes.
-          b.allocateBuffer(b.nextBlockOnDiskSizeWithHeader > 0);
-
-          decompress(b.buf.array(), b.buf.arrayOffset() + hdrSize, dis,
-              b.uncompressedSizeWithoutHeader);
-
-          // Copy next block's header bytes into the new block if we have them.
-          if (nextBlockOnDiskSize > 0) {
-            System.arraycopy(onDiskBlock, onDiskSizeWithHeader, b.buf.array(),
-                b.buf.arrayOffset() + hdrSize
-                    + b.uncompressedSizeWithoutHeader + b.totalChecksumBytes(), 
-                hdrSize);
-
-            setNextBlockHeader(offset, b);
-          }
+          headerBuf = ByteBuffer.wrap(onDiskBlock, 0, hdrSize);
         }
-
+        // We know the total on-disk size but not the uncompressed size. Read
+        // the entire block into memory, then parse the header and decompress
+        // from memory if using compression. Here we have already read the
+        // block's header
+        try {
+          b = new HFileBlock(headerBuf, getMinorVersion());
+        } catch (IOException ex) {
+          // Seen in load testing. Provide comprehensive debug info.
+          throw new IOException("Failed to read compressed block at "
+              + offset
+              + ", onDiskSizeWithoutHeader="
+              + onDiskSizeWithHeader
+              + ", preReadHeaderSize="
+              + hdrSize
+              + ", header.length="
+              + prefetchedHeader.header.length
+              + ", header bytes: "
+              + Bytes.toStringBinary(prefetchedHeader.header, 0,
+                  hdrSize), ex);
+        }
+        // if the caller specifies a onDiskSizeWithHeader, validate it.
+        int onDiskSizeWithoutHeader = onDiskSizeWithHeader - hdrSize;
+        assert onDiskSizeWithoutHeader >= 0;
+        b.validateOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader);
       } else {
-        // We don't know the on-disk size. Read the header first, determine the
-        // on-disk size from it, and read the remaining data, thereby incurring
-        // two read operations. This might happen when we are doing the first
-        // read in a series of reads or a random read, and we don't have access
-        // to the block index. This is costly and should happen very rarely.
-
-        // Check if we have read this block's header as part of reading the
-        // previous block. If so, don't read the header again.
-        PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get();
-        ByteBuffer headerBuf = prefetchedHeader.offset == offset ?
-            prefetchedHeader.buf : null;
+        // Check headerBuf to see if we have read this block's header as part of
+        // reading the previous block. This is an optimization of peeking into
+        // the next block's header (e.g.this block's header) when reading the
+        // previous block. This is the faster and more preferable case. If the
+        // header is already there, don't read the header again.
 
+        // Unfortunately, we still have to do a separate read operation to
+        // read the header.
         if (headerBuf == null) {
-          // Unfortunately, we still have to do a separate read operation to
-          // read the header.
+          // From the header, determine the on-disk size of the given hfile
+          // block, and read the remaining data, thereby incurring two read
+          // operations. This might happen when we are doing the first read
+          // in a series of reads or a random read, and we don't have access
+          // to the block index. This is costly and should happen very rarely.
           headerBuf = ByteBuffer.allocate(hdrSize);
-          readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(), hdrSize,
-              false, offset, pread);
+          readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(),
+              hdrSize, false, offset, pread);
         }
 
         b = new HFileBlock(headerBuf, getMinorVersion());
-
-        // This will also allocate enough room for the next block's header.
-        b.allocateBuffer(true);
-
-        if (compressAlgo == Compression.Algorithm.NONE) {
-
-          // Avoid creating bounded streams and using a "codec" that does
-          // nothing.
-          b.assumeUncompressed();
-          b.nextBlockOnDiskSizeWithHeader = readAtOffset(is, b.buf.array(),
-              b.buf.arrayOffset() + hdrSize,
-              b.uncompressedSizeWithoutHeader + b.totalChecksumBytes(), 
-              true, offset + hdrSize,
-              pread);
-          if (verifyChecksum && 
-              !validateBlockChecksum(b, b.buf.array(), hdrSize)) {
-            return null;             // checksum mismatch
-          }
-
-          if (b.nextBlockOnDiskSizeWithHeader > 0) {
-            setNextBlockHeader(offset, b);
-          }
+        onDiskBlock = new byte[b.getOnDiskSizeWithHeader() + hdrSize];
+        System.arraycopy(headerBuf.array(),
+              headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
+        nextBlockOnDiskSize =
+          readAtOffset(is, onDiskBlock, hdrSize, b.getOnDiskSizeWithHeader()
+              - hdrSize, true, offset + hdrSize, pread);
+        onDiskSizeWithHeader = b.onDiskSizeWithoutHeader + hdrSize;
+      }
+
+      boolean isCompressed =
+        compressAlgo != null
+            && compressAlgo != Compression.Algorithm.NONE;
+      if (!isCompressed) {
+        b.assumeUncompressed();
+      }
+
+      if (verifyChecksum &&
+          !validateBlockChecksum(b, onDiskBlock, hdrSize)) {
+        return null;             // checksum mismatch
+      }
+
+      if (isCompressed) {
+        // This will allocate a new buffer but keep header bytes.
+        b.allocateBuffer(nextBlockOnDiskSize > 0);
+        if (b.blockType.equals(BlockType.ENCODED_DATA)) {
+          encodedBlockDecodingCtx.prepareDecoding(b, onDiskBlock, hdrSize);
         } else {
-          // Allocate enough space for the block's header and compressed data.
-          byte[] compressedBytes = new byte[b.getOnDiskSizeWithHeader()
-              + hdrSize];
-
-          b.nextBlockOnDiskSizeWithHeader = readAtOffset(is, compressedBytes,
-              hdrSize, b.onDiskSizeWithoutHeader, true, offset
-                  + hdrSize, pread);
-          if (verifyChecksum &&
-              !validateBlockChecksum(b, compressedBytes, hdrSize)) {
-            return null;             // checksum mismatch
-          }
-          DataInputStream dis = new DataInputStream(new ByteArrayInputStream(
-              compressedBytes, hdrSize, b.onDiskSizeWithoutHeader));
-
-          decompress(b.buf.array(), b.buf.arrayOffset() + hdrSize, dis,
-              b.uncompressedSizeWithoutHeader);
-
-          if (b.nextBlockOnDiskSizeWithHeader > 0) {
-            // Copy the next block's header into the new block.
-            int nextHeaderOffset = b.buf.arrayOffset() + hdrSize
-                + b.uncompressedSizeWithoutHeader + b.totalChecksumBytes();
-            System.arraycopy(compressedBytes,
-                compressedBytes.length - hdrSize,
-                b.buf.array(),
-                nextHeaderOffset,
-                hdrSize);
-
-            setNextBlockHeader(offset, b);
-          }
+          defaultDecodingCtx.prepareDecoding(b, onDiskBlock, hdrSize);
+        }
+        if (nextBlockOnDiskSize > 0) {
+          // Copy next block's header bytes into the new block if we have them.
+          System.arraycopy(onDiskBlock, onDiskSizeWithHeader, b.buf.array(),
+              b.buf.arrayOffset() + hdrSize
+              + b.uncompressedSizeWithoutHeader + b.totalChecksumBytes(),
+              hdrSize);
         }
+      } else {
+        // The onDiskBlock will become the headerAndDataBuffer for this block.
+        // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
+        // contains the header of next block, so no need to set next
+        // block's header in it.
+        b = new HFileBlock(ByteBuffer.wrap(onDiskBlock, 0,
+                onDiskSizeWithHeader), getMinorVersion());
+      }
+
+      b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSize;
+
+      // Set prefetched header
+      if (b.nextBlockOnDiskSizeWithHeader > 0) {
+        prefetchedHeader.offset = offset + b.getOnDiskSizeWithHeader();
+        System.arraycopy(onDiskBlock, onDiskSizeWithHeader,
+            prefetchedHeader.header, 0, hdrSize);
       }
 
       b.includesMemstoreTS = includesMemstoreTS;
@@ -1922,21 +1763,14 @@ public class HFileBlock extends SchemaCo
       return b;
     }
 
-    private void setNextBlockHeader(long offset, HFileBlock b) {
-      PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get();
-      prefetchedHeader.offset = offset + b.getOnDiskSizeWithHeader();
-      int nextHeaderOffset = b.buf.arrayOffset() + hdrSize
-          + b.uncompressedSizeWithoutHeader + b.totalChecksumBytes();
-      System.arraycopy(b.buf.array(), nextHeaderOffset,
-          prefetchedHeader.header, 0, hdrSize);
-    }
-
     void setIncludesMemstoreTS(boolean enabled) {
       includesMemstoreTS = enabled;
     }
 
     void setDataBlockEncoder(HFileDataBlockEncoder encoder) {
       this.dataBlockEncoder = encoder;
+      encodedBlockDecodingCtx = encoder.newOnDiskDataBlockDecodingContext(
+          this.compressAlgo);
     }
 
     /**

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java?rev=1302602&r1=1302601&r2=1302602&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java Mon Mar 19 19:12:19 2012
@@ -21,8 +21,10 @@ import java.nio.ByteBuffer;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
+import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
+import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * Controls what kind of data block encoding is used. If data block encoding is
@@ -31,6 +33,7 @@ import org.apache.hadoop.hbase.util.Pair
  */
 @InterfaceAudience.Private
 public interface HFileDataBlockEncoder {
+
   /**
    * Converts a block from the on-disk format to the in-cache format. Called in
    * the following cases:
@@ -51,12 +54,14 @@ public interface HFileDataBlockEncoder {
    * Should be called before an encoded or unencoded data block is written to
    * disk.
    * @param in KeyValues next to each other
-   * @param dummyHeader A dummy header to be written as a placeholder
-   * @return a non-null on-heap buffer containing the contents of the
-   *         HFileBlock with unfilled header and block type
+   * @param encodingResult the encoded result
+   * @param blockType block type
+   * @throws IOException
    */
-  public Pair<ByteBuffer, BlockType> beforeWriteToDisk(
-      ByteBuffer in, boolean includesMemstoreTS, byte[] dummyHeader);
+  public void beforeWriteToDisk(
+      ByteBuffer in, boolean includesMemstoreTS,
+      HFileBlockEncodingContext encodingResult,
+      BlockType blockType) throws IOException;
 
   /**
    * Decides whether we should use a scanner over encoded blocks.
@@ -85,4 +90,27 @@ public interface HFileDataBlockEncoder {
    */
   public DataBlockEncoding getEffectiveEncodingInCache(boolean isCompaction);
 
+  /**
+   * Create an encoder specific encoding context object for writing. And the
+   * encoding context should also perform compression if compressionAlgorithm is
+   * valid.
+   *
+   * @param compressionAlgorithm compression algorithm
+   * @param headerBytes header bytes
+   * @return a new {@link HFileBlockEncodingContext} object
+   */
+  public HFileBlockEncodingContext newOnDiskDataBlockEncodingContext(
+      Algorithm compressionAlgorithm, byte[] headerBytes);
+
+  /**
+   * create a encoder specific decoding context for reading. And the
+   * decoding context should also do decompression if compressionAlgorithm
+   * is valid.
+   *
+   * @param compressionAlgorithm
+   * @return a new {@link HFileBlockDecodingContext} object
+   */
+  public HFileBlockDecodingContext newOnDiskDataBlockDecodingContext(
+      Algorithm compressionAlgorithm);
+
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java?rev=1302602&r1=1302601&r2=1302602&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java Mon Mar 19 19:12:19 2012
@@ -16,18 +16,20 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
+import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
+import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
+import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
+import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
 
 import com.google.common.base.Preconditions;
 
@@ -39,6 +41,7 @@ import com.google.common.base.Preconditi
 public class HFileDataBlockEncoderImpl implements HFileDataBlockEncoder {
   private final DataBlockEncoding onDisk;
   private final DataBlockEncoding inCache;
+  private final HFileBlockEncodingContext inCacheEncodeCtx;
 
   public HFileDataBlockEncoderImpl(DataBlockEncoding encoding) {
     this(encoding, encoding);
@@ -54,10 +57,36 @@ public class HFileDataBlockEncoderImpl i
    */
   public HFileDataBlockEncoderImpl(DataBlockEncoding onDisk,
       DataBlockEncoding inCache) {
+    this(onDisk, inCache, null);
+  }
+
+  /**
+   * Do data block encoding with specified options.
+   * @param onDisk What kind of data block encoding will be used before writing
+   *          HFileBlock to disk. This must be either the same as inCache or
+   *          {@link DataBlockEncoding#NONE}.
+   * @param inCache What kind of data block encoding will be used in block
+   *          cache.
+   * @param dummyHeader dummy header bytes
+   */
+  public HFileDataBlockEncoderImpl(DataBlockEncoding onDisk,
+      DataBlockEncoding inCache, byte[] dummyHeader) {
+    dummyHeader = dummyHeader == null ? HFileBlock.DUMMY_HEADER : dummyHeader;
     this.onDisk = onDisk != null ?
         onDisk : DataBlockEncoding.NONE;
     this.inCache = inCache != null ?
         inCache : DataBlockEncoding.NONE;
+    if (inCache != DataBlockEncoding.NONE) {
+      inCacheEncodeCtx =
+          this.inCache.getEncoder().newDataBlockEncodingContext(
+              Algorithm.NONE, this.inCache, dummyHeader);
+    } else {
+      // create a default encoding context
+      inCacheEncodeCtx =
+          new HFileBlockDefaultEncodingContext(Algorithm.NONE,
+              this.inCache, dummyHeader);
+    }
+
     Preconditions.checkArgument(onDisk == DataBlockEncoding.NONE ||
         onDisk == inCache, "on-disk encoding (" + onDisk + ") must be " +
         "either the same as in-cache encoding (" + inCache + ") or " +
@@ -131,7 +160,8 @@ public class HFileDataBlockEncoderImpl i
         return block;
       }
       // Encode the unencoded block with the in-cache encoding.
-      return encodeDataBlock(block, inCache, block.doesIncludeMemstoreTS());
+      return encodeDataBlock(block, inCache, block.doesIncludeMemstoreTS(),
+          inCacheEncodeCtx);
     }
 
     if (block.getBlockType() == BlockType.ENCODED_DATA) {
@@ -149,21 +179,25 @@ public class HFileDataBlockEncoderImpl i
   }
 
   /**
-   * Precondition: a non-encoded buffer.
-   * Postcondition: on-disk encoding.
+   * Precondition: a non-encoded buffer. Postcondition: on-disk encoding.
+   *
+   * The encoded results can be stored in {@link HFileBlockEncodingContext}.
+   *
+   * @throws IOException
    */
   @Override
-  public Pair<ByteBuffer, BlockType> beforeWriteToDisk(ByteBuffer in,
-      boolean includesMemstoreTS, byte[] dummyHeader) {
+  public void beforeWriteToDisk(ByteBuffer in,
+      boolean includesMemstoreTS,
+      HFileBlockEncodingContext encodeCtx,
+      BlockType blockType) throws IOException {
     if (onDisk == DataBlockEncoding.NONE) {
       // there is no need to encode the block before writing it to disk
-      return new Pair<ByteBuffer, BlockType>(in, BlockType.DATA);
+      ((HFileBlockDefaultEncodingContext) encodeCtx).compressAfterEncoding(
+          in.array(), blockType);
+      return;
     }
-
-    ByteBuffer encodedBuffer = encodeBufferToHFileBlockBuffer(in,
-        onDisk, includesMemstoreTS, dummyHeader);
-    return new Pair<ByteBuffer, BlockType>(encodedBuffer,
-        BlockType.ENCODED_DATA);
+    encodeBufferToHFileBlockBuffer(in, onDisk,
+        includesMemstoreTS, encodeCtx);
   }
 
   @Override
@@ -174,34 +208,42 @@ public class HFileDataBlockEncoderImpl i
     return inCache != DataBlockEncoding.NONE;
   }
 
-  private ByteBuffer encodeBufferToHFileBlockBuffer(ByteBuffer in,
+  /**
+   * Encode a block of key value pairs.
+   *
+   * @param in input data to encode
+   * @param algo encoding algorithm
+   * @param includesMemstoreTS includes memstore timestamp or not
+   * @param encodeCtx where will the output data be stored
+   */
+  private void encodeBufferToHFileBlockBuffer(ByteBuffer in,
       DataBlockEncoding algo, boolean includesMemstoreTS,
-      byte[] dummyHeader) {
-    ByteArrayOutputStream encodedStream = new ByteArrayOutputStream();
-    DataOutputStream dataOut = new DataOutputStream(encodedStream);
+      HFileBlockEncodingContext encodeCtx) {
     DataBlockEncoder encoder = algo.getEncoder();
     try {
-      encodedStream.write(dummyHeader);
-      algo.writeIdInBytes(dataOut);
-      encoder.compressKeyValues(dataOut, in,
-          includesMemstoreTS);
+      encoder.compressKeyValues(in, includesMemstoreTS, encodeCtx);
     } catch (IOException e) {
-      throw new RuntimeException(String.format("Bug in data block encoder " +
-          "'%s', it probably requested too much data", algo.toString()), e);
+      throw new RuntimeException(String.format(
+          "Bug in data block encoder "
+              + "'%s', it probably requested too much data, " +
+              "exception message: %s.",
+              algo.toString(), e.getMessage()), e);
     }
-    return ByteBuffer.wrap(encodedStream.toByteArray());
   }
 
   private HFileBlock encodeDataBlock(HFileBlock block,
-      DataBlockEncoding algo, boolean includesMemstoreTS) {
-    ByteBuffer compressedBuffer = encodeBufferToHFileBlockBuffer(
-        block.getBufferWithoutHeader(), algo, includesMemstoreTS,
-        HFileBlock.DUMMY_HEADER);
-    int sizeWithoutHeader = compressedBuffer.limit() - block.headerSize();
+      DataBlockEncoding algo, boolean includesMemstoreTS,
+      HFileBlockEncodingContext encodingCtx) {
+    encodeBufferToHFileBlockBuffer(
+      block.getBufferWithoutHeader(), algo, includesMemstoreTS, encodingCtx);
+    byte[] encodedUncompressedBytes =
+      encodingCtx.getUncompressedBytesWithHeader();
+    ByteBuffer bufferWrapper = ByteBuffer.wrap(encodedUncompressedBytes);
+    int sizeWithoutHeader = bufferWrapper.limit() - encodingCtx.getHeaderSize();
     HFileBlock encodedBlock = new HFileBlock(BlockType.ENCODED_DATA,
         block.getOnDiskSizeWithoutHeader(),
         sizeWithoutHeader, block.getPrevBlockOffset(),
-        compressedBuffer, HFileBlock.FILL_HEADER, block.getOffset(),
+        bufferWrapper, HFileBlock.FILL_HEADER, block.getOffset(),
         includesMemstoreTS, block.getMinorVersion(),
         block.getBytesPerChecksum(), block.getChecksumType(),
         block.getOnDiskDataSizeWithHeader());
@@ -215,4 +257,31 @@ public class HFileDataBlockEncoderImpl i
         inCache + ")";
   }
 
+  @Override
+  public HFileBlockEncodingContext newOnDiskDataBlockEncodingContext(
+      Algorithm compressionAlgorithm,  byte[] dummyHeader) {
+    if (onDisk != null) {
+      DataBlockEncoder encoder = onDisk.getEncoder();
+      if (encoder != null) {
+        return encoder.newDataBlockEncodingContext(
+            compressionAlgorithm, onDisk, dummyHeader);
+      }
+    }
+    return new HFileBlockDefaultEncodingContext(compressionAlgorithm,
+        null, dummyHeader);
+  }
+
+  @Override
+  public HFileBlockDecodingContext newOnDiskDataBlockDecodingContext(
+      Algorithm compressionAlgorithm) {
+    if (onDisk != null) {
+      DataBlockEncoder encoder = onDisk.getEncoder();
+      if (encoder != null) {
+        return encoder.newDataBlockDecodingContext(
+            compressionAlgorithm);
+      }
+    }
+    return new HFileBlockDefaultDecodingContext(compressionAlgorithm);
+  }
+
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java?rev=1302602&r1=1302601&r2=1302602&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java Mon Mar 19 19:12:19 2012
@@ -432,7 +432,7 @@ public class HFileWriterV2 extends Abstr
 
     finishClose(trailer);
 
-    fsBlockWriter.releaseCompressor();
+    fsBlockWriter.release();
   }
 
   @Override



Mime
View raw message