hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbau...@apache.org
Subject svn commit: r1236031 [2/7] - in /hbase/trunk/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/io/ main/java/org/apache/hadoop/hbase/io/encoding/ main/java/org/apache/hadoop/hbase/io/hfile/ main/java/org/apache/hadoop/hbase/mapr...
Date Thu, 26 Jan 2012 02:59:00 GMT
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java?rev=1236031&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java Thu Jan 26 02:58:57 2012
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.io.compress.Compressor;
+
+/**
+ * Encapsulates a data block compressed using a particular encoding algorithm.
+ * Useful for testing and benchmarking.
+ */
+public class EncodedDataBlock {
+  private static final int BUFFER_SIZE = 4 * 1024;
+  protected DataBlockEncoder dataBlockEncoder;
+  ByteArrayOutputStream uncompressedOutputStream;
+  ByteBuffer uncompressedBuffer;
+  private byte[] cacheCompressData;
+  private ByteArrayOutputStream compressedStream = new ByteArrayOutputStream();
+  private boolean includesMemstoreTS;
+
+  /**
+   * Create a buffer which will be encoded using dataBlockEncoder.
+   * @param dataBlockEncoder Algorithm used for compression.
+   */
+  public EncodedDataBlock(DataBlockEncoder dataBlockEncoder,
+      boolean includesMemstoreTS) {
+    this.dataBlockEncoder = dataBlockEncoder;
+    uncompressedOutputStream = new ByteArrayOutputStream(BUFFER_SIZE);
+  }
+
+  /**
+   * Add KeyValue and compress it.
+   * @param kv Item to be added and compressed.
+   */
+  public void addKv(KeyValue kv) {
+    cacheCompressData = null;
+    uncompressedOutputStream.write(
+        kv.getBuffer(), kv.getOffset(), kv.getLength());
+  }
+
+  /**
+   * Provides access to compressed value.
+   * @return Forwards sequential iterator.
+   */
+  public Iterator<KeyValue> getIterator() {
+    final int uncompressedSize = uncompressedOutputStream.size();
+    final ByteArrayInputStream bais = new ByteArrayInputStream(
+        getCompressedData());
+    final DataInputStream dis = new DataInputStream(bais);
+
+
+    return new Iterator<KeyValue>() {
+      private ByteBuffer decompressedData = null;
+
+      @Override
+      public boolean hasNext() {
+        if (decompressedData == null) {
+          return uncompressedSize > 0;
+        }
+        return decompressedData.hasRemaining();
+      }
+
+      @Override
+      public KeyValue next() {
+        if (decompressedData == null) {
+          try {
+            decompressedData = dataBlockEncoder.uncompressKeyValues(
+                dis, includesMemstoreTS);
+          } catch (IOException e) {
+            throw new RuntimeException("Problem with data block encoder, " +
+                "most likely it requested more bytes than are available.", e);
+          }
+          decompressedData.rewind();
+        }
+
+        int offset = decompressedData.position();
+        KeyValue kv = new KeyValue(decompressedData.array(), offset);
+        decompressedData.position(offset + kv.getLength());
+
+        return kv;
+      }
+
+      @Override
+      public void remove() {
+        throw new NotImplementedException("remove() is not supported!");
+      }
+
+      @Override
+      public String toString() {
+        return "Iterator of: " + dataBlockEncoder.getClass().getName();
+      }
+
+    };
+  }
+
+  /**
+   * Find the size of minimal buffer that could store compressed data.
+   * @return Size in bytes of compressed data.
+   */
+  public int getSize() {
+    return getCompressedData().length;
+  }
+
+  /**
+   * Find the size of compressed data assuming that buffer will be compressed
+   * using given algorithm.
+   * @param compressor Algorithm used for compression.
+   * @param buffer Array to be compressed.
+   * @param offset Offset to beginning of the data.
+   * @param length Length to be compressed.
+   * @return Size of compressed data in bytes.
+   */
+  public static int checkCompressedSize(Compressor compressor, byte[] buffer,
+      int offset, int length) {
+    byte[] compressedBuffer = new byte[buffer.length];
+    // in fact the buffer could be of any positive size
+    compressor.setInput(buffer, offset, length);
+    compressor.finish();
+    int currentPos = 0;
+    while (!compressor.finished()) {
+      try {
+        // we don't care about compressed data,
+        // we just want to callculate number of bytes
+        currentPos += compressor.compress(compressedBuffer, 0,
+            compressedBuffer.length);
+      } catch (IOException e) {
+        throw new RuntimeException(
+            "For some reason compressor couldn't read data. " +
+            "It is likely a problem with " +
+            compressor.getClass().getName(), e);
+      }
+    }
+    return currentPos;
+  }
+
+  /**
+   * Estimate size after second stage of compression (e.g. LZO).
+   * @param compressor Algorithm which will be used for compressions.
+   * @return Size after second stage of compression.
+   */
+  public int checkCompressedSize(Compressor compressor) {
+    // compress
+    byte[] compressedBytes = getCompressedData();
+    return checkCompressedSize(compressor, compressedBytes, 0,
+        compressedBytes.length);
+  }
+
+  private byte[] getCompressedData() {
+    // is cached
+    if (cacheCompressData != null) {
+      return cacheCompressData;
+    }
+    cacheCompressData = doCompressData();
+
+    return cacheCompressData;
+  }
+
+  private ByteBuffer getUncompressedBuffer() {
+    if (uncompressedBuffer == null ||
+        uncompressedBuffer.limit() < uncompressedOutputStream.size()) {
+      uncompressedBuffer = ByteBuffer.wrap(
+          uncompressedOutputStream.toByteArray());
+    }
+    return uncompressedBuffer;
+  }
+
+  /**
+   * Do the compression.
+   * @return Compressed byte buffer.
+   */
+  public byte[] doCompressData() {
+    compressedStream.reset();
+    DataOutputStream dataOut = new DataOutputStream(compressedStream);
+    try {
+      this.dataBlockEncoder.compressKeyValues(
+          dataOut, getUncompressedBuffer(), includesMemstoreTS);
+    } catch (IOException e) {
+      throw new RuntimeException(String.format(
+          "Bug in decoding part of algorithm %s. " +
+          "Probably it requested more bytes than are available.",
+          toString()), e);
+    }
+    return compressedStream.toByteArray();
+  }
+
+  @Override
+  public String toString() {
+    return dataBlockEncoder.toString();
+  }
+
+  /**
+   * Get uncompressed buffer.
+   * @return The buffer.
+   */
+  public byte[] getRawKeyValues() {
+    return uncompressedOutputStream.toByteArray();
+  }
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/EncoderBufferTooSmallException.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/EncoderBufferTooSmallException.java?rev=1236031&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/EncoderBufferTooSmallException.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/EncoderBufferTooSmallException.java Thu Jan 26 02:58:57 2012
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+/**
+ * Internal error which indicates a bug in a data block encoding algorithm.
+ */
+public class EncoderBufferTooSmallException extends RuntimeException {
+  private static final long serialVersionUID = 4767495176134878737L;
+
+  public EncoderBufferTooSmallException(String message) {
+    super(message);
+  }
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java?rev=1236031&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java Thu Jan 26 02:58:57 2012
@@ -0,0 +1,546 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.RawComparator;
+
+/**
+ * Encoder similar to {@link DiffKeyDeltaEncoder} but supposedly faster.
+ *
+ * Compress using:
+ * - store size of common prefix
+ * - save column family once in the first KeyValue
+ * - use integer compression for key, value and prefix (7-bit encoding)
+ * - use bits to avoid duplication key length, value length
+ *   and type if it same as previous
+ * - store in 3 bits length of prefix timestamp
+ *    with previous KeyValue's timestamp
+ * - one bit which allow to omit value if it is the same
+ *
+ * Format:
+ * - 1 byte:    flag
+ * - 1-5 bytes: key length (only if FLAG_SAME_KEY_LENGTH is not set in flag)
+ * - 1-5 bytes: value length (only if FLAG_SAME_VALUE_LENGTH is not set in flag)
+ * - 1-5 bytes: prefix length
+ * - ... bytes: rest of the row (if prefix length is small enough)
+ * - ... bytes: qualifier (or suffix depending on prefix length)
+ * - 1-8 bytes: timestamp suffix
+ * - 1 byte:    type (only if FLAG_SAME_TYPE is not set in the flag)
+ * - ... bytes: value (only if FLAG_SAME_VALUE is not set in the flag)
+ *
+ */
+public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
+  final int MASK_TIMESTAMP_LENGTH = (1 << 0) | (1 << 1) | (1 << 2);
+  final int SHIFT_TIMESTAMP_LENGTH = 0;
+  final int FLAG_SAME_KEY_LENGTH = 1 << 3;
+  final int FLAG_SAME_VALUE_LENGTH = 1 << 4;
+  final int FLAG_SAME_TYPE = 1 << 5;
+  final int FLAG_SAME_VALUE = 1 << 6;
+
+  private static class FastDiffCompressionState extends CompressionState {
+    byte[] timestamp = new byte[KeyValue.TIMESTAMP_SIZE];
+    int prevTimestampOffset;
+
+    @Override
+    protected void readTimestamp(ByteBuffer in) {
+      in.get(timestamp);
+    }
+
+    @Override
+    void copyFrom(CompressionState state) {
+      super.copyFrom(state);
+      FastDiffCompressionState state2 = (FastDiffCompressionState) state;
+      System.arraycopy(state2.timestamp, 0, timestamp, 0,
+          KeyValue.TIMESTAMP_SIZE);
+      prevTimestampOffset = state2.prevTimestampOffset;
+    }
+
+    /**
+     * Copies the first key/value from the given stream, and initializes
+     * decompression state based on it. Assumes that we have already read key
+     * and value lengths. Does not set {@link #qualifierLength} (not used by
+     * decompression) or {@link #prevOffset} (set by the calle afterwards).
+     */
+    private void decompressFirstKV(ByteBuffer out, DataInputStream in)
+        throws IOException {
+      int kvPos = out.position();
+      out.putInt(keyLength);
+      out.putInt(valueLength);
+      prevTimestampOffset = out.position() + keyLength -
+          KeyValue.TIMESTAMP_TYPE_SIZE;
+      ByteBufferUtils.copyFromStreamToBuffer(out, in, keyLength + valueLength);
+      rowLength = out.getShort(kvPos + KeyValue.ROW_OFFSET);
+      familyLength = out.get(kvPos + KeyValue.ROW_OFFSET +
+          KeyValue.ROW_LENGTH_SIZE + rowLength);
+      type = out.get(prevTimestampOffset + KeyValue.TIMESTAMP_SIZE);
+    }
+
+  }
+
+  private void compressSingleKeyValue(
+        FastDiffCompressionState previousState,
+        FastDiffCompressionState currentState,
+        OutputStream out, ByteBuffer in) throws IOException {
+    currentState.prevOffset = in.position();
+    int keyLength = in.getInt();
+    int valueOffset =
+        currentState.prevOffset + keyLength + KeyValue.ROW_OFFSET;
+    int valueLength = in.getInt();
+    byte flag = 0;
+
+    if (previousState.isFirst()) {
+      // copy the key, there is no common prefix with none
+      out.write(flag);
+      ByteBufferUtils.putCompressedInt(out, keyLength);
+      ByteBufferUtils.putCompressedInt(out, valueLength);
+      ByteBufferUtils.putCompressedInt(out, 0);
+
+      currentState.readKey(in, keyLength, valueLength);
+
+      ByteBufferUtils.moveBufferToStream(out, in, keyLength + valueLength);
+    } else {
+      // find a common prefix and skip it
+      int commonPrefix = ByteBufferUtils.findCommonPrefix(in, in.position(),
+          previousState.prevOffset + KeyValue.ROW_OFFSET,
+          Math.min(keyLength, previousState.keyLength) -
+          KeyValue.TIMESTAMP_TYPE_SIZE);
+
+      currentState.readKey(in, keyLength, valueLength,
+          commonPrefix, previousState);
+
+      if (keyLength == previousState.keyLength) {
+        flag |= FLAG_SAME_KEY_LENGTH;
+      }
+      if (valueLength == previousState.valueLength) {
+        flag |= FLAG_SAME_VALUE_LENGTH;
+      }
+      if (currentState.type == previousState.type) {
+        flag |= FLAG_SAME_TYPE;
+      }
+
+      int commonTimestampPrefix = findCommonTimestampPrefix(
+          currentState, previousState);
+      flag |= commonTimestampPrefix << SHIFT_TIMESTAMP_LENGTH;
+
+      // Check if current and previous values are the same. Compare value
+      // length first as an optimization.
+      if (valueLength == previousState.valueLength) {
+        int previousValueOffset = previousState.prevOffset
+            + previousState.keyLength + KeyValue.ROW_OFFSET;
+        if (ByteBufferUtils.arePartsEqual(in,
+                previousValueOffset, previousState.valueLength,
+                valueOffset, valueLength)) {
+          flag |= FLAG_SAME_VALUE;
+        }
+      }
+
+      out.write(flag);
+      if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
+        ByteBufferUtils.putCompressedInt(out, keyLength);
+      }
+      if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
+        ByteBufferUtils.putCompressedInt(out, valueLength);
+      }
+      ByteBufferUtils.putCompressedInt(out, commonPrefix);
+
+      ByteBufferUtils.skip(in, commonPrefix);
+      if (commonPrefix < currentState.rowLength + KeyValue.ROW_LENGTH_SIZE) {
+        // Previous and current rows are different. Copy the differing part of
+        // the row, skip the column family, and copy the qualifier.
+        ByteBufferUtils.moveBufferToStream(out, in,
+            currentState.rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix);
+        ByteBufferUtils.skip(in, currentState.familyLength +
+            KeyValue.FAMILY_LENGTH_SIZE);
+        ByteBufferUtils.moveBufferToStream(out, in,
+            currentState.qualifierLength);
+      } else {
+        // The common part includes the whole row. As the column family is the
+        // same across the whole file, it will automatically be included in the
+        // common prefix, so we need not special-case it here.
+        int restKeyLength = keyLength - commonPrefix -
+            KeyValue.TIMESTAMP_TYPE_SIZE;
+        ByteBufferUtils.moveBufferToStream(out, in, restKeyLength);
+      }
+      ByteBufferUtils.skip(in, commonTimestampPrefix);
+      ByteBufferUtils.moveBufferToStream(out, in,
+          KeyValue.TIMESTAMP_SIZE - commonTimestampPrefix);
+
+      // Write the type if it is not the same as before.
+      if ((flag & FLAG_SAME_TYPE) == 0) {
+        out.write(currentState.type);
+      }
+
+      // Write the value if it is not the same as before.
+      if ((flag & FLAG_SAME_VALUE) == 0) {
+        ByteBufferUtils.copyBufferToStream(out, in, valueOffset, valueLength);
+      }
+
+      // Skip key type and value in the input buffer.
+      ByteBufferUtils.skip(in, KeyValue.TYPE_SIZE + currentState.valueLength);
+    }
+  }
+
+  private int findCommonTimestampPrefix(FastDiffCompressionState left,
+      FastDiffCompressionState right) {
+    int prefixTimestamp = 0;
+    while (prefixTimestamp < (KeyValue.TIMESTAMP_SIZE - 1) &&
+        left.timestamp[prefixTimestamp]
+            == right.timestamp[prefixTimestamp]) {
+      prefixTimestamp++;
+    }
+    return prefixTimestamp; // has to be at most 7 bytes
+  }
+
+  private void uncompressSingleKeyValue(DataInputStream source,
+      ByteBuffer out, FastDiffCompressionState state)
+          throws IOException, EncoderBufferTooSmallException {
+    byte flag = source.readByte();
+    int prevKeyLength = state.keyLength;
+
+    if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
+      state.keyLength = ByteBufferUtils.readCompressedInt(source);
+    }
+    if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
+      state.valueLength = ByteBufferUtils.readCompressedInt(source);
+    }
+    int commonLength = ByteBufferUtils.readCompressedInt(source);
+
+    ByteBufferUtils.ensureSpace(out, state.keyLength + state.valueLength +
+        KeyValue.ROW_OFFSET);
+
+    int kvPos = out.position();
+
+    if (!state.isFirst()) {
+      // copy the prefix
+      int common;
+      int prevOffset;
+
+      if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
+        out.putInt(state.keyLength);
+        out.putInt(state.valueLength);
+        prevOffset = state.prevOffset + KeyValue.ROW_OFFSET;
+        common = commonLength;
+      } else {
+        if ((flag & FLAG_SAME_KEY_LENGTH) != 0) {
+          prevOffset = state.prevOffset;
+          common = commonLength + KeyValue.ROW_OFFSET;
+        } else {
+          out.putInt(state.keyLength);
+          prevOffset = state.prevOffset + KeyValue.KEY_LENGTH_SIZE;
+          common = commonLength + KeyValue.KEY_LENGTH_SIZE;
+        }
+      }
+
+      ByteBufferUtils.copyFromBufferToBuffer(out, out, prevOffset, common);
+
+      // copy the rest of the key from the buffer
+      int keyRestLength;
+      if (commonLength < state.rowLength + KeyValue.ROW_LENGTH_SIZE) {
+        // omit the family part of the key, it is always the same
+        int rowWithSizeLength;
+        int rowRestLength;
+
+        // check length of row
+        if (commonLength < KeyValue.ROW_LENGTH_SIZE) {
+          // not yet copied, do it now
+          ByteBufferUtils.copyFromStreamToBuffer(out, source,
+              KeyValue.ROW_LENGTH_SIZE - commonLength);
+
+          rowWithSizeLength = out.getShort(out.position() -
+              KeyValue.ROW_LENGTH_SIZE) + KeyValue.ROW_LENGTH_SIZE;
+          rowRestLength = rowWithSizeLength - KeyValue.ROW_LENGTH_SIZE;
+        } else {
+          // already in kvBuffer, just read it
+          rowWithSizeLength = out.getShort(kvPos + KeyValue.ROW_OFFSET) +
+              KeyValue.ROW_LENGTH_SIZE;
+          rowRestLength = rowWithSizeLength - commonLength;
+        }
+
+        // copy the rest of row
+        ByteBufferUtils.copyFromStreamToBuffer(out, source, rowRestLength);
+
+        // copy the column family
+        ByteBufferUtils.copyFromBufferToBuffer(out, out,
+            state.prevOffset + KeyValue.ROW_OFFSET + KeyValue.ROW_LENGTH_SIZE
+                + state.rowLength, state.familyLength
+                + KeyValue.FAMILY_LENGTH_SIZE);
+        state.rowLength = (short) (rowWithSizeLength -
+            KeyValue.ROW_LENGTH_SIZE);
+
+        keyRestLength = state.keyLength - rowWithSizeLength -
+            state.familyLength -
+            (KeyValue.FAMILY_LENGTH_SIZE + KeyValue.TIMESTAMP_TYPE_SIZE);
+      } else {
+        // prevRowWithSizeLength is the same as on previous row
+        keyRestLength = state.keyLength - commonLength -
+            KeyValue.TIMESTAMP_TYPE_SIZE;
+      }
+      // copy the rest of the key, after column family == column qualifier
+      ByteBufferUtils.copyFromStreamToBuffer(out, source, keyRestLength);
+
+      // copy timestamp
+      int prefixTimestamp =
+          (flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH;
+      ByteBufferUtils.copyFromBufferToBuffer(out, out,
+          state.prevTimestampOffset, prefixTimestamp);
+      state.prevTimestampOffset = out.position() - prefixTimestamp;
+      ByteBufferUtils.copyFromStreamToBuffer(out, source,
+          KeyValue.TIMESTAMP_SIZE - prefixTimestamp);
+
+      // copy the type and value
+      if ((flag & FLAG_SAME_TYPE) != 0) {
+        out.put(state.type);
+        if ((flag & FLAG_SAME_VALUE) != 0) {
+          ByteBufferUtils.copyFromBufferToBuffer(out, out, state.prevOffset +
+              KeyValue.ROW_OFFSET + prevKeyLength, state.valueLength);
+        } else {
+          ByteBufferUtils.copyFromStreamToBuffer(out, source,
+              state.valueLength);
+        }
+      } else {
+        if ((flag & FLAG_SAME_VALUE) != 0) {
+          ByteBufferUtils.copyFromStreamToBuffer(out, source,
+              KeyValue.TYPE_SIZE);
+          ByteBufferUtils.copyFromBufferToBuffer(out, out, state.prevOffset +
+              KeyValue.ROW_OFFSET + prevKeyLength, state.valueLength);
+        } else {
+          ByteBufferUtils.copyFromStreamToBuffer(out, source,
+              state.valueLength + KeyValue.TYPE_SIZE);
+        }
+        state.type = out.get(state.prevTimestampOffset +
+            KeyValue.TIMESTAMP_SIZE);
+      }
+    } else { // this is the first element
+      state.decompressFirstKV(out, source);
+    }
+
+    state.prevOffset = kvPos;
+  }
+
+  @Override
+  public void compressKeyValues(DataOutputStream out,
+      ByteBuffer in, boolean includesMemstoreTS) throws IOException {
+    in.rewind();
+    ByteBufferUtils.putInt(out, in.limit());
+    FastDiffCompressionState previousState = new FastDiffCompressionState();
+    FastDiffCompressionState currentState = new FastDiffCompressionState();
+    while (in.hasRemaining()) {
+      compressSingleKeyValue(previousState, currentState,
+          out, in);
+      afterEncodingKeyValue(in, out, includesMemstoreTS);
+
+      // swap previousState <-> currentState
+      FastDiffCompressionState tmp = previousState;
+      previousState = currentState;
+      currentState = tmp;
+    }
+  }
+
+  @Override
+  public ByteBuffer uncompressKeyValues(DataInputStream source,
+      int allocHeaderLength, int skipLastBytes, boolean includesMemstoreTS)
+          throws IOException {
+    int decompressedSize = source.readInt();
+    ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
+        allocHeaderLength);
+    buffer.position(allocHeaderLength);
+    FastDiffCompressionState state = new FastDiffCompressionState();
+    while (source.available() > skipLastBytes) {
+      uncompressSingleKeyValue(source, buffer, state);
+      afterDecodingKeyValue(source, buffer, includesMemstoreTS);
+    }
+
+    if (source.available() != skipLastBytes) {
+      throw new IllegalStateException("Read too much bytes.");
+    }
+
+    return buffer;
+  }
+
+  @Override
+  public ByteBuffer getFirstKeyInBlock(ByteBuffer block) {
+    block.mark();
+    block.position(Bytes.SIZEOF_INT + Bytes.SIZEOF_BYTE);
+    int keyLength = ByteBufferUtils.readCompressedInt(block);
+    ByteBufferUtils.readCompressedInt(block); // valueLength
+    ByteBufferUtils.readCompressedInt(block); // commonLength
+    int pos = block.position();
+    block.reset();
+    return ByteBuffer.wrap(block.array(), pos, keyLength).slice();
+  }
+
+  @Override
+  public String toString() {
+    return FastDiffDeltaEncoder.class.getSimpleName();
+  }
+
+  protected static class FastDiffSeekerState extends SeekerState {
+    private byte[] prevTimestampAndType =
+        new byte[KeyValue.TIMESTAMP_TYPE_SIZE];
+    private int rowLengthWithSize;
+    private int familyLengthWithSize;
+
+    @Override
+    protected void copyFromNext(SeekerState that) {
+      super.copyFromNext(that);
+      FastDiffSeekerState other = (FastDiffSeekerState) that;
+      System.arraycopy(other.prevTimestampAndType, 0,
+          prevTimestampAndType, 0,
+          KeyValue.TIMESTAMP_TYPE_SIZE);
+      rowLengthWithSize = other.rowLengthWithSize;
+      familyLengthWithSize = other.familyLengthWithSize;
+    }
+  }
+
+  @Override
+  public EncodedSeeker createSeeker(RawComparator<byte[]> comparator,
+      final boolean includesMemstoreTS) {
+    return new BufferedEncodedSeeker<FastDiffSeekerState>(comparator) {
+      private void decode(boolean isFirst) {
+        byte flag = currentBuffer.get();
+        if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
+          if (!isFirst) {
+            System.arraycopy(current.keyBuffer,
+                current.keyLength - current.prevTimestampAndType.length,
+                current.prevTimestampAndType, 0,
+                current.prevTimestampAndType.length);
+          }
+          current.keyLength = ByteBufferUtils.readCompressedInt(currentBuffer);
+        }
+        if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
+          current.valueLength =
+              ByteBufferUtils.readCompressedInt(currentBuffer);
+        }
+        current.lastCommonPrefix =
+            ByteBufferUtils.readCompressedInt(currentBuffer);
+
+        current.ensureSpaceForKey();
+
+        if (isFirst) {
+          // copy everything
+          currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
+              current.keyLength - current.prevTimestampAndType.length);
+          current.rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) +
+              Bytes.SIZEOF_SHORT;
+          current.familyLengthWithSize =
+              current.keyBuffer[current.rowLengthWithSize] + Bytes.SIZEOF_BYTE;
+        } else if (current.lastCommonPrefix < Bytes.SIZEOF_SHORT) {
+          // length of row is different, copy everything except family
+
+          // copy the row size
+          int oldRowLengthWithSize = current.rowLengthWithSize;
+          currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
+              Bytes.SIZEOF_SHORT - current.lastCommonPrefix);
+          current.rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) +
+              Bytes.SIZEOF_SHORT;
+
+          // move the column family
+          System.arraycopy(current.keyBuffer, oldRowLengthWithSize,
+              current.keyBuffer, current.rowLengthWithSize,
+              current.familyLengthWithSize);
+
+          // copy the rest of row
+          currentBuffer.get(current.keyBuffer, Bytes.SIZEOF_SHORT,
+              current.rowLengthWithSize - Bytes.SIZEOF_SHORT);
+
+          // copy the qualifier
+          currentBuffer.get(current.keyBuffer, current.rowLengthWithSize
+              + current.familyLengthWithSize, current.keyLength
+              - current.rowLengthWithSize - current.familyLengthWithSize
+              - current.prevTimestampAndType.length);
+        } else if (current.lastCommonPrefix < current.rowLengthWithSize) {
+          // We have to copy part of row and qualifier, but the column family
+          // is in the right place.
+
+          // before column family (rest of row)
+          currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
+              current.rowLengthWithSize - current.lastCommonPrefix);
+
+          // after column family (qualifier)
+          currentBuffer.get(current.keyBuffer, current.rowLengthWithSize
+              + current.familyLengthWithSize, current.keyLength
+              - current.rowLengthWithSize - current.familyLengthWithSize
+              - current.prevTimestampAndType.length);
+        } else {
+          // copy just the ending
+          currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
+              current.keyLength - current.prevTimestampAndType.length
+                  - current.lastCommonPrefix);
+        }
+
+        // timestamp
+        int pos = current.keyLength - current.prevTimestampAndType.length;
+        int commonTimestampPrefix = (flag & MASK_TIMESTAMP_LENGTH) >>>
+          SHIFT_TIMESTAMP_LENGTH;
+        if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
+          System.arraycopy(current.prevTimestampAndType, 0, current.keyBuffer,
+              pos, commonTimestampPrefix);
+        }
+        pos += commonTimestampPrefix;
+        currentBuffer.get(current.keyBuffer, pos,
+            Bytes.SIZEOF_LONG - commonTimestampPrefix);
+        pos += Bytes.SIZEOF_LONG - commonTimestampPrefix;
+
+        // type
+        if ((flag & FLAG_SAME_TYPE) == 0) {
+          currentBuffer.get(current.keyBuffer, pos, Bytes.SIZEOF_BYTE);
+        } else if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
+          current.keyBuffer[pos] =
+              current.prevTimestampAndType[Bytes.SIZEOF_LONG];
+        }
+
+        // handle value
+        if ((flag & FLAG_SAME_VALUE) == 0) {
+          current.valueOffset = currentBuffer.position();
+          ByteBufferUtils.skip(currentBuffer, current.valueLength);
+        }
+
+        if (includesMemstoreTS) {
+          current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
+        } else {
+          current.memstoreTS = 0;
+        }
+        current.nextKvOffset = currentBuffer.position();
+      }
+
+      @Override
+      protected void decodeFirst() {
+        ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT);
+        decode(true);
+      }
+
+      @Override
+      protected void decodeNext() {
+        decode(false);
+      }
+
+      @Override
+      protected FastDiffSeekerState createSeekerState() {
+        return new FastDiffSeekerState();
+      }
+    };
+  }
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java?rev=1236031&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java Thu Jan 26 02:58:57 2012
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.RawComparator;
+
+/**
+ * Compress key by storing size of common prefix with previous KeyValue
+ * and storing raw size of rest.
+ *
+ * Format:
+ * 1-5 bytes: compressed key length minus prefix (7-bit encoding)
+ * 1-5 bytes: compressed value length (7-bit encoding)
+ * 1-3 bytes: compressed length of common key prefix
+ * ... bytes: rest of key (including timestamp)
+ * ... bytes: value
+ *
+ * In a worst case compressed KeyValue will be three bytes longer than original.
+ *
+ */
+public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder {
+
+  private int addKV(int prevKeyOffset, DataOutputStream out,
+      ByteBuffer in, int prevKeyLength) throws IOException {
+    int keyLength = in.getInt();
+    int valueLength = in.getInt();
+
+    if (prevKeyOffset == -1) {
+      // copy the key, there is no common prefix with none
+      ByteBufferUtils.putCompressedInt(out, keyLength);
+      ByteBufferUtils.putCompressedInt(out, valueLength);
+      ByteBufferUtils.putCompressedInt(out, 0);
+      ByteBufferUtils.moveBufferToStream(out, in, keyLength + valueLength);
+    } else {
+      // find a common prefix and skip it
+      int common = ByteBufferUtils.findCommonPrefix(
+          in, prevKeyOffset + KeyValue.ROW_OFFSET,
+          in.position(),
+          Math.min(prevKeyLength, keyLength));
+
+      ByteBufferUtils.putCompressedInt(out, keyLength - common);
+      ByteBufferUtils.putCompressedInt(out, valueLength);
+      ByteBufferUtils.putCompressedInt(out, common);
+
+      ByteBufferUtils.skip(in, common);
+      ByteBufferUtils.moveBufferToStream(out, in, keyLength - common
+          + valueLength);
+    }
+
+    return keyLength;
+  }
+
+  @Override
+  public void compressKeyValues(DataOutputStream writeHere,
+      ByteBuffer in, boolean includesMemstoreTS) throws IOException {
+    in.rewind();
+    ByteBufferUtils.putInt(writeHere, in.limit());
+    int prevOffset = -1;
+    int offset = 0;
+    int keyLength = 0;
+    while (in.hasRemaining()) {
+      offset = in.position();
+      keyLength = addKV(prevOffset, writeHere, in, keyLength);
+      afterEncodingKeyValue(in, writeHere, includesMemstoreTS);
+      prevOffset = offset;
+    }
+  }
+
+  @Override
+  public ByteBuffer uncompressKeyValues(DataInputStream source,
+      int allocHeaderLength, int skipLastBytes, boolean includesMemstoreTS)
+          throws IOException {
+    int decompressedSize = source.readInt();
+    ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
+        allocHeaderLength);
+    buffer.position(allocHeaderLength);
+    int prevKeyOffset = 0;
+
+    while (source.available() > skipLastBytes) {
+      prevKeyOffset = uncompressKeyValue(source, buffer, prevKeyOffset);
+      afterDecodingKeyValue(source, buffer, includesMemstoreTS);
+    }
+
+    if (source.available() != skipLastBytes) {
+      throw new IllegalStateException("Read too many bytes.");
+    }
+
+    buffer.limit(buffer.position());
+    return buffer;
+  }
+
+  private int uncompressKeyValue(DataInputStream source, ByteBuffer buffer,
+      int prevKeyOffset)
+          throws IOException, EncoderBufferTooSmallException {
+    int keyLength = ByteBufferUtils.readCompressedInt(source);
+    int valueLength = ByteBufferUtils.readCompressedInt(source);
+    int commonLength = ByteBufferUtils.readCompressedInt(source);
+    int keyOffset;
+    keyLength += commonLength;
+
+    ByteBufferUtils.ensureSpace(buffer, keyLength + valueLength
+        + KeyValue.ROW_OFFSET);
+
+    buffer.putInt(keyLength);
+    buffer.putInt(valueLength);
+
+    // copy the prefix
+    if (commonLength > 0) {
+      keyOffset = buffer.position();
+      ByteBufferUtils.copyFromBufferToBuffer(buffer, buffer, prevKeyOffset,
+          commonLength);
+    } else {
+      keyOffset = buffer.position();
+    }
+
+    // copy rest of the key and value
+    int len = keyLength - commonLength + valueLength;
+    ByteBufferUtils.copyFromStreamToBuffer(buffer, source, len);
+    return keyOffset;
+  }
+
+  @Override
+  public ByteBuffer getFirstKeyInBlock(ByteBuffer block) {
+    block.mark();
+    block.position(Bytes.SIZEOF_INT);
+    int keyLength = ByteBufferUtils.readCompressedInt(block);
+    ByteBufferUtils.readCompressedInt(block);
+    int commonLength = ByteBufferUtils.readCompressedInt(block);
+    if (commonLength != 0) {
+      throw new AssertionError("Nonzero common length in the first key in "
+          + "block: " + commonLength);
+    }
+    int pos = block.position();
+    block.reset();
+    return ByteBuffer.wrap(block.array(), pos, keyLength).slice();
+  }
+
+  @Override
+  public String toString() {
+    return PrefixKeyDeltaEncoder.class.getSimpleName();
+  }
+
+  @Override
+  public EncodedSeeker createSeeker(RawComparator<byte[]> comparator,
+      final boolean includesMemstoreTS) {
+    return new BufferedEncodedSeeker<SeekerState>(comparator) {
+      @Override
+      protected void decodeNext() {
+        current.keyLength = ByteBufferUtils.readCompressedInt(currentBuffer);
+        current.valueLength = ByteBufferUtils.readCompressedInt(currentBuffer);
+        current.lastCommonPrefix =
+            ByteBufferUtils.readCompressedInt(currentBuffer);
+        current.keyLength += current.lastCommonPrefix;
+        current.ensureSpaceForKey();
+        currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
+            current.keyLength - current.lastCommonPrefix);
+        current.valueOffset = currentBuffer.position();
+        ByteBufferUtils.skip(currentBuffer, current.valueLength);
+        if (includesMemstoreTS) {
+          current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
+        } else {
+          current.memstoreTS = 0;
+        }
+        current.nextKvOffset = currentBuffer.position();
+      }
+
+      @Override
+      protected void decodeFirst() {
+        ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT);
+        decodeNext();
+      }
+    };
+  }
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java Thu Jan 26 02:58:57 2012
@@ -1,4 +1,4 @@
-/*
+  /*
  * Copyright 2011 The Apache Software Foundation
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -26,8 +26,8 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
-import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
 import org.apache.hadoop.io.RawComparator;
 
@@ -60,6 +60,13 @@ public abstract class AbstractHFileReade
   /** Filled when we read in the trailer. */
   protected final Compression.Algorithm compressAlgo;
 
+  /**
+   * What kind of data block encoding should be used while reading, writing,
+   * and handling cache.
+   */
+  protected HFileDataBlockEncoder dataBlockEncoder =
+      NoOpDataBlockEncoder.INSTANCE;
+
   /** Last key in the file. Filled in when we read in the file info */
   protected byte [] lastKey = null;
 
@@ -275,8 +282,11 @@ public abstract class AbstractHFileReade
 
     protected int blockFetches;
 
-    public Scanner(final boolean cacheBlocks,
+    protected final HFile.Reader reader;
+
+    public Scanner(final HFile.Reader reader, final boolean cacheBlocks,
         final boolean pread, final boolean isCompaction) {
+      this.reader = reader;
       this.cacheBlocks = cacheBlocks;
       this.pread = pread;
       this.isCompaction = isCompaction;
@@ -296,6 +306,26 @@ public abstract class AbstractHFileReade
       if (!isSeeked())
         throw new NotSeekedException();
     }
+
+    @Override
+    public int seekTo(byte[] key) throws IOException {
+      return seekTo(key, 0, key.length);
+    }
+    
+    @Override
+    public boolean seekBefore(byte[] key) throws IOException {
+      return seekBefore(key, 0, key.length);
+    }
+    
+    @Override
+    public int reseekTo(byte[] key) throws IOException {
+      return reseekTo(key, 0, key.length);
+    }
+
+    @Override
+    public HFile.Reader getReader() {
+      return reader;
+    }
   }
 
   /** For testing */
@@ -307,4 +337,9 @@ public abstract class AbstractHFileReade
     return path;
   }
 
+  @Override
+  public DataBlockEncoding getEncodingOnDisk() {
+    return dataBlockEncoder.getEncodingOnDisk();
+  }
+
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java Thu Jan 26 02:58:57 2012
@@ -83,6 +83,12 @@ public abstract class AbstractHFileWrite
 
   /** The compression algorithm used. NONE if no compression. */
   protected final Compression.Algorithm compressAlgo;
+  
+  /**
+   * The data block encoding which will be used.
+   * {@link NoOpDataBlockEncoder#INSTANCE} if there is no encoding.
+   */
+  protected final HFileDataBlockEncoder blockEncoder;
 
   /** First key in a block. */
   protected byte[] firstKeyInBlock = null;
@@ -102,7 +108,9 @@ public abstract class AbstractHFileWrite
 
   public AbstractHFileWriter(CacheConfig cacheConf,
       FSDataOutputStream outputStream, Path path, int blockSize,
-      Compression.Algorithm compressAlgo, KeyComparator comparator) {
+      Compression.Algorithm compressAlgo,
+      HFileDataBlockEncoder dataBlockEncoder,
+      KeyComparator comparator) {
     super(null, path);
     this.outputStream = outputStream;
     this.path = path;
@@ -110,6 +118,8 @@ public abstract class AbstractHFileWrite
     this.blockSize = blockSize;
     this.compressAlgo = compressAlgo == null
         ? HFile.DEFAULT_COMPRESSION_ALGORITHM : compressAlgo;
+    this.blockEncoder = dataBlockEncoder != null
+        ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
     this.comparator = comparator != null ? comparator
         : Bytes.BYTES_RAWCOMPARATOR;
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java Thu Jan 26 02:58:57 2012
@@ -18,14 +18,28 @@
 package org.apache.hadoop.hbase.io.hfile;
 
 import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
 
 /**
  * Cache Key for use with implementations of {@link BlockCache}
  */
 public class BlockCacheKey implements HeapSize {
-  private String hfileName;
-  private long offset;
+  private final String hfileName;
+  private final long offset;
+  private final DataBlockEncoding encoding;
+
+  public BlockCacheKey(String file, long offset, DataBlockEncoding encoding,
+      BlockType blockType) {
+    this.hfileName = file;
+    this.offset = offset;
+    // We add encoding to the cache key only for data blocks. If the block type
+    // is unknown (this should never be the case in production), we just use
+    // the provided encoding, because it might be a data block.
+    this.encoding = (blockType == null || blockType.isData()) ? encoding :
+        DataBlockEncoding.NONE;
+  }
 
   /**
    * Construct a new BlockCacheKey
@@ -33,13 +47,13 @@ public class BlockCacheKey implements He
    * @param offset Offset of the block into the file
    */
   public BlockCacheKey(String file, long offset) {
-    this.hfileName = file;
-    this.offset = offset;
+    this(file, offset, DataBlockEncoding.NONE, null);
   }
 
   @Override
   public int hashCode() {
-    return hfileName.hashCode() * 127 + (int) (offset ^ (offset >>> 32));
+    return hfileName.hashCode() * 127 + (int) (offset ^ (offset >>> 32)) +
+        encoding.ordinal() * 17;
   }
 
   @Override
@@ -56,14 +70,18 @@ public class BlockCacheKey implements He
 
   @Override
   public String toString() {
-    return hfileName + "_" + offset;
+    return hfileName + "_" + offset
+        + (encoding == DataBlockEncoding.NONE ? "" : "_" + encoding);
   }
 
-  // Strings have two bytes per character due to default
-  // Java unicode encoding (hence the times 2).
+  /**
+   * Strings have two bytes per character due to default Java Unicode encoding
+   * (hence length times 2).
+   */
   @Override
   public long heapSize() {
-    return 2 * hfileName.length() + Bytes.SIZEOF_LONG;
+    return ClassSize.align(ClassSize.OBJECT + 2 * hfileName.length() +
+        Bytes.SIZEOF_LONG + 2 * ClassSize.REFERENCE);
   }
 
   // can't avoid this unfortunately
@@ -73,4 +91,8 @@ public class BlockCacheKey implements He
   public String getHfileName() {
     return hfileName;
   }
+
+  public DataBlockEncoding getDataBlockEncoding() {
+    return encoding;
+  }
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java Thu Jan 26 02:58:57 2012
@@ -39,6 +39,14 @@ public enum BlockType {
   /** Data block, both versions */
   DATA("DATABLK*", BlockCategory.DATA),
 
+  /** An encoded data block (e.g. with prefix compression), version 2 */
+  ENCODED_DATA("DATABLKE", BlockCategory.DATA) {
+    @Override
+    public int getId() {
+      return DATA.ordinal();
+    }
+  },
+
   /** Version 2 leaf index block. Appears in the data block section */
   LEAF_INDEX("IDXLEAF2", BlockCategory.INDEX),
 
@@ -103,6 +111,17 @@ public enum BlockType {
     assert magic.length == MAGIC_LENGTH;
   }
 
+  /**
+   * Use this instead of {@link #ordinal()}. They work exactly the same, except
+   * DATA and ENCODED_DATA get the same id using this method (overridden for
+   * {@link #ENCODED_DATA}).
+   * @return block type id from 0 to the number of block types - 1
+   */
+  public int getId() {
+    // Default implementation, can be overridden for individual enum members.
+    return ordinal();
+  }
+
   public void writeToStream(OutputStream out) throws IOException {
     out.write(magic);
   }
@@ -189,4 +208,11 @@ public enum BlockType {
     }
   }
 
+  /**
+   * @return whether this block type is encoded or unencoded data block
+   */
+  public final boolean isData() {
+    return this == DATA || this == ENCODED_DATA;
+  }
+
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java Thu Jan 26 02:58:57 2012
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.HConstant
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.KeyComparator;
 import org.apache.hadoop.hbase.io.HbaseMapWritable;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics.SchemaAware;
 import org.apache.hadoop.hbase.util.BloomFilterWriter;
@@ -151,13 +152,14 @@ public class HFile {
   public final static int MIN_NUM_HFILE_PATH_LEVELS = 5;
 
   // For measuring latency of "sequential" reads and writes
-  static volatile AtomicInteger readOps = new AtomicInteger();
-  static volatile AtomicLong readTimeNano = new AtomicLong();
-  static volatile AtomicInteger writeOps = new AtomicInteger();
-  static volatile AtomicLong writeTimeNano = new AtomicLong();
+  static final AtomicInteger readOps = new AtomicInteger();
+  static final AtomicLong readTimeNano = new AtomicLong();
+  static final AtomicInteger writeOps = new AtomicInteger();
+  static final AtomicLong writeTimeNano = new AtomicLong();
+
   // For measuring latency of pread
-  static volatile AtomicInteger preadOps = new AtomicInteger();
-  static volatile AtomicLong preadTimeNano = new AtomicLong();
+  static final AtomicInteger preadOps = new AtomicInteger();
+  static final AtomicLong preadTimeNano = new AtomicLong();
 
   // for test purpose
   public static volatile AtomicLong dataBlockReadCnt = new AtomicLong(0);
@@ -243,6 +245,7 @@ public class HFile {
 
     public abstract Writer createWriter(FileSystem fs, Path path,
         int blockSize, Compression.Algorithm compress,
+        HFileDataBlockEncoder dataBlockEncoder,
         final KeyComparator comparator) throws IOException;
 
     public abstract Writer createWriter(FileSystem fs, Path path,
@@ -300,7 +303,8 @@ public class HFile {
   /** An abstraction used by the block index */
   public interface CachingBlockReader {
     HFileBlock readBlock(long offset, long onDiskBlockSize,
-        boolean cacheBlock, final boolean pread, final boolean isCompaction)
+        boolean cacheBlock, final boolean pread, final boolean isCompaction,
+        BlockType expectedBlockType)
         throws IOException;
   }
 
@@ -368,11 +372,14 @@ public class HFile {
 
     /** Close method with optional evictOnClose */
     void close(boolean evictOnClose) throws IOException;
+
+    DataBlockEncoding getEncodingOnDisk();
   }
 
   private static Reader pickReaderVersion(Path path, FSDataInputStream fsdis,
-      long size, boolean closeIStream, CacheConfig cacheConf)
-  throws IOException {
+      long size, boolean closeIStream, CacheConfig cacheConf,
+      DataBlockEncoding preferredEncodingInCache)
+      throws IOException {
     FixedFileTrailer trailer = FixedFileTrailer.readFromStream(fsdis, size);
     switch (trailer.getVersion()) {
     case 1:
@@ -380,23 +387,34 @@ public class HFile {
           cacheConf);
     case 2:
       return new HFileReaderV2(path, trailer, fsdis, size, closeIStream,
-          cacheConf);
+          cacheConf, preferredEncodingInCache);
     default:
       throw new IOException("Cannot instantiate reader for HFile version " +
           trailer.getVersion());
     }
   }
 
-  public static Reader createReader(FileSystem fs, Path path,
-      CacheConfig cacheConf) throws IOException {
+  public static Reader createReaderWithEncoding(
+      FileSystem fs, Path path, CacheConfig cacheConf,
+      DataBlockEncoding preferredEncodingInCache) throws IOException {
+    final boolean closeIStream = true;
     return pickReaderVersion(path, fs.open(path),
-        fs.getFileStatus(path).getLen(), true, cacheConf);
+        fs.getFileStatus(path).getLen(), closeIStream, cacheConf,
+        preferredEncodingInCache);
   }
 
-  public static Reader createReader(Path path, FSDataInputStream fsdis,
-      long size, CacheConfig cacheConf)
+  public static Reader createReader(
+      FileSystem fs, Path path, CacheConfig cacheConf) throws IOException {
+    return createReaderWithEncoding(fs, path, cacheConf,
+        DataBlockEncoding.NONE);
+  }
+
+  public static Reader createReaderFromStream(Path path,
+      FSDataInputStream fsdis, long size, CacheConfig cacheConf)
       throws IOException {
-    return pickReaderVersion(path, fsdis, size, false, cacheConf);
+    final boolean closeIStream = false;
+    return pickReaderVersion(path, fsdis, size, closeIStream, cacheConf,
+        DataBlockEncoding.NONE);
   }
 
   /*
@@ -501,10 +519,6 @@ public class HFile {
     System.exit(prettyPrinter.run(args));
   }
 
-  public static BlockCacheKey getBlockCacheKey(String hfileName, long offset) {
-    return new BlockCacheKey(hfileName, offset);
-  }
-
   /**
    * Checks the given {@link HFile} format version, and throws an exception if
    * invalid. Note that if the version number comes from an input file and has

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java Thu Jan 26 02:58:57 2012
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
+import static org.apache.hadoop.hbase.io.hfile.BlockType.MAGIC_LENGTH;
+import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.NONE;
+
 import java.io.BufferedInputStream;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -30,13 +33,14 @@ import java.nio.ByteBuffer;
 
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
-
-import org.apache.hadoop.hbase.io.DoubleOutputStream;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
+import org.apache.hadoop.hbase.regionserver.MemStore;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.CompoundBloomFilter;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Writable;
@@ -45,9 +49,6 @@ import org.apache.hadoop.io.compress.Dec
 
 import com.google.common.base.Preconditions;
 
-import static org.apache.hadoop.hbase.io.hfile.BlockType.MAGIC_LENGTH;
-import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.NONE;
-
 /**
  * Reading {@link HFile} version 1 and 2 blocks, and writing version 2 blocks.
  * <ul>
@@ -75,10 +76,20 @@ import static org.apache.hadoop.hbase.io
  */
 public class HFileBlock extends SchemaConfigured implements Cacheable {
 
+  public static final boolean FILL_HEADER = true;
+  public static final boolean DONT_FILL_HEADER = false;
+
   /** The size of a version 2 {@link HFile} block header */
   public static final int HEADER_SIZE = MAGIC_LENGTH + 2 * Bytes.SIZEOF_INT
       + Bytes.SIZEOF_LONG;
 
+  /**
+   * The size of block header when blockType is {@link BlockType#ENCODED_DATA}.
+   * This extends normal header by adding the id of encoder.
+   */
+  public static final int ENCODED_HEADER_SIZE = HEADER_SIZE
+      + DataBlockEncoding.ID_SIZE;
+
   /** Just an array of bytes of the right size. */
   public static final byte[] DUMMY_HEADER = new byte[HEADER_SIZE];
 
@@ -107,10 +118,11 @@ public class HFileBlock extends SchemaCo
       };
 
   private BlockType blockType;
-  private final int onDiskSizeWithoutHeader;
+  private int onDiskSizeWithoutHeader;
   private final int uncompressedSizeWithoutHeader;
   private final long prevBlockOffset;
   private ByteBuffer buf;
+  private boolean includesMemstoreTS;
 
   /**
    * The offset of this block in the file. Populated by the reader for
@@ -146,7 +158,7 @@ public class HFileBlock extends SchemaCo
    */
   public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
       int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer buf,
-      boolean fillHeader, long offset) {
+      boolean fillHeader, long offset, boolean includesMemstoreTS) {
     this.blockType = blockType;
     this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
     this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
@@ -155,6 +167,7 @@ public class HFileBlock extends SchemaCo
     if (fillHeader)
       overwriteHeader();
     this.offset = offset;
+    this.includesMemstoreTS = includesMemstoreTS;
   }
 
   /**
@@ -177,6 +190,15 @@ public class HFileBlock extends SchemaCo
     return blockType;
   }
 
+  /** @return get data block encoding id that was used to encode this block */
+  public short getDataBlockEncodingId() {
+    if (blockType != BlockType.ENCODED_DATA) {
+      throw new IllegalArgumentException("Querying encoder ID of a block " +
+          "of type other than " + BlockType.ENCODED_DATA + ": " + blockType);
+    }
+    return buf.getShort(HEADER_SIZE);
+  }
+
   /**
    * @return the on-disk size of the block with header size included
    */
@@ -421,7 +443,9 @@ public class HFileBlock extends SchemaCo
         // On-disk size, uncompressed size, and next block's on-disk size
         3 * Bytes.SIZEOF_INT +
         // This and previous block offset
-        2 * Bytes.SIZEOF_LONG
+        2 * Bytes.SIZEOF_LONG +
+        // "Include memstore timestamp" flag
+        Bytes.SIZEOF_BOOLEAN
     );
 
     if (buf != null) {
@@ -509,29 +533,30 @@ public class HFileBlock extends SchemaCo
     /** Compression algorithm for all blocks this instance writes. */
     private final Compression.Algorithm compressAlgo;
 
-    /**
-     * The stream we use to accumulate data in the on-disk format for each
-     * block (i.e. compressed data, or uncompressed if using no compression).
-     * We reset this stream at the end of each block and reuse it. The header
-     * is written as the first {@link #HEADER_SIZE} bytes into this stream.
-     */
-    private ByteArrayOutputStream baosOnDisk;
+    /** Data block encoder used for data blocks */
+    private final HFileDataBlockEncoder dataBlockEncoder;
 
     /**
-     * The stream we use to accumulate uncompressed block data for
-     * cache-on-write. Null when cache-on-write is turned off.
+     * The stream we use to accumulate data in uncompressed format for each
+     * block. We reset this stream at the end of each block and reuse it. The
+     * header is written as the first {@link #HEADER_SIZE} bytes into this
+     * stream.
      */
     private ByteArrayOutputStream baosInMemory;
 
     /** Compressor, which is also reused between consecutive blocks. */
     private Compressor compressor;
 
-    /** Current block type. Set in {@link #startWriting(BlockType)}. */
+    /**
+     * Current block type. Set in {@link #startWriting(BlockType)}. Could be
+     * changed in {@link #encodeDataBlockForDisk()} from {@link BlockType#DATA}
+     * to {@link BlockType#ENCODED_DATA}.
+     */
     private BlockType blockType;
 
     /**
      * A stream that we write uncompressed bytes to, which compresses them and
-     * writes them to {@link #baosOnDisk}.
+     * writes them to {@link #baosInMemory}.
      */
     private DataOutputStream userDataStream;
 
@@ -542,14 +567,8 @@ public class HFileBlock extends SchemaCo
     private byte[] onDiskBytesWithHeader;
 
     /**
-     * The total number of uncompressed bytes written into the current block,
-     * with header size not included. Valid in the READY state.
-     */
-    private int uncompressedSizeWithoutHeader;
-
-    /**
-     * Only used when we are using cache-on-write. Valid in the READY state.
-     * Contains the header and the uncompressed bytes, so the length is
+     * Valid in the READY state. Contains the header and the uncompressed (but
+     * potentially encoded, if this is a data block) bytes, so the length is
      * {@link #uncompressedSizeWithoutHeader} + {@link HFileBlock#HEADER_SIZE}.
      */
     private byte[] uncompressedBytesWithHeader;
@@ -566,30 +585,31 @@ public class HFileBlock extends SchemaCo
      */
     private long[] prevOffsetByType;
 
-    /**
-     * Whether we are accumulating uncompressed bytes for the purpose of
-     * caching on write.
-     */
-    private boolean cacheOnWrite;
-
     /** The offset of the previous block of the same type */
     private long prevOffset;
 
+    /** Whether we are including memstore timestamp after every key/value */
+    private boolean includesMemstoreTS;
+
     /**
-     * @param compressionAlgorithm
-     *          compression algorithm to use
+     * @param compressionAlgorithm compression algorithm to use
+     * @param dataBlockEncoderAlgo data block encoding algorithm to use
      */
-    public Writer(Compression.Algorithm compressionAlgorithm) {
-      compressAlgo = compressionAlgorithm == null ? NONE
-          : compressionAlgorithm;
+    public Writer(Compression.Algorithm compressionAlgorithm,
+          HFileDataBlockEncoder dataBlockEncoder, boolean includesMemstoreTS) {
+      compressAlgo = compressionAlgorithm == null ? NONE : compressionAlgorithm;
+      this.dataBlockEncoder = dataBlockEncoder != null
+          ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
 
-      baosOnDisk = new ByteArrayOutputStream();
+      baosInMemory = new ByteArrayOutputStream();
       if (compressAlgo != NONE)
         compressor = compressionAlgorithm.getCompressor();
 
       prevOffsetByType = new long[BlockType.values().length];
       for (int i = 0; i < prevOffsetByType.length; ++i)
         prevOffsetByType[i] = -1;
+
+      this.includesMemstoreTS = includesMemstoreTS;
     }
 
     /**
@@ -598,44 +618,24 @@ public class HFileBlock extends SchemaCo
      * @return the stream the user can write their data into
      * @throws IOException
      */
-    public DataOutputStream startWriting(BlockType newBlockType,
-        boolean cacheOnWrite) throws IOException {
+    public DataOutputStream startWriting(BlockType newBlockType)
+        throws IOException {
       if (state == State.BLOCK_READY && startOffset != -1) {
         // We had a previous block that was written to a stream at a specific
         // offset. Save that offset as the last offset of a block of that type.
-        prevOffsetByType[blockType.ordinal()] = startOffset;
+        prevOffsetByType[blockType.getId()] = startOffset;
       }
 
-      this.cacheOnWrite = cacheOnWrite;
-
       startOffset = -1;
       blockType = newBlockType;
 
-      baosOnDisk.reset();
-      baosOnDisk.write(DUMMY_HEADER);
+      baosInMemory.reset();
+      baosInMemory.write(DUMMY_HEADER);
 
       state = State.WRITING;
-      if (compressAlgo == NONE) {
-        // We do not need a compression stream or a second uncompressed stream
-        // for cache-on-write.
-        userDataStream = new DataOutputStream(baosOnDisk);
-      } else {
-        OutputStream compressingOutputStream =
-          compressAlgo.createCompressionStream(baosOnDisk, compressor, 0);
-
-        if (cacheOnWrite) {
-          // We save uncompressed data in a cache-on-write mode.
-          if (baosInMemory == null)
-            baosInMemory = new ByteArrayOutputStream();
-          baosInMemory.reset();
-          baosInMemory.write(DUMMY_HEADER);
-          userDataStream = new DataOutputStream(new DoubleOutputStream(
-              compressingOutputStream, baosInMemory));
-        } else {
-          userDataStream = new DataOutputStream(compressingOutputStream);
-        }
-      }
 
+      // We will compress it later in finishBlock()
+      userDataStream = new DataOutputStream(baosInMemory);
       return userDataStream;
     }
 
@@ -662,45 +662,108 @@ public class HFileBlock extends SchemaCo
       if (state == State.BLOCK_READY)
         return;
 
+      // This will set state to BLOCK_READY.
       finishBlock();
-      state = State.BLOCK_READY;
     }
 
     /**
      * An internal method that flushes the compressing stream (if using
      * compression), serializes the header, and takes care of the separate
-     * uncompressed stream for caching on write, if applicable. Block writer
-     * state transitions must be managed by the caller.
+     * uncompressed stream for caching on write, if applicable. Sets block
+     * write state to "block ready".
      */
     private void finishBlock() throws IOException {
       userDataStream.flush();
-      uncompressedSizeWithoutHeader = userDataStream.size();
 
-      onDiskBytesWithHeader = baosOnDisk.toByteArray();
-      prevOffset = prevOffsetByType[blockType.ordinal()];
-      putHeader(onDiskBytesWithHeader, 0);
+      // This does an array copy, so it is safe to cache this byte array.
+      uncompressedBytesWithHeader = baosInMemory.toByteArray();
+      prevOffset = prevOffsetByType[blockType.getId()];
+
+      // We need to set state before we can package the block up for
+      // cache-on-write. In a way, the block is ready, but not yet encoded or
+      // compressed.
+      state = State.BLOCK_READY;
+      encodeDataBlockForDisk();
 
-      if (cacheOnWrite && compressAlgo != NONE) {
-        uncompressedBytesWithHeader = baosInMemory.toByteArray();
+      doCompression();
+      putHeader(uncompressedBytesWithHeader, 0, onDiskBytesWithHeader.length,
+          uncompressedBytesWithHeader.length);
+    }
 
-        if (uncompressedSizeWithoutHeader !=
+    /**
+     * Do compression if it is enabled, or re-use the uncompressed buffer if
+     * it is not. Fills in the compressed block's header if doing compression.
+     */
+    private void doCompression() throws IOException {
+      // do the compression
+      if (compressAlgo != NONE) {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        baos.write(DUMMY_HEADER);
+
+        // compress the data
+        OutputStream compressingOutputStream =
+            compressAlgo.createCompressionStream(baos, compressor, 0);
+        compressingOutputStream.write(uncompressedBytesWithHeader, HEADER_SIZE,
+            uncompressedBytesWithHeader.length - HEADER_SIZE);
+
+        // finish compression stream
+        compressingOutputStream.flush();
+
+        onDiskBytesWithHeader = baos.toByteArray();
+        putHeader(onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length,
+            uncompressedBytesWithHeader.length);
+      } else {
+        onDiskBytesWithHeader = uncompressedBytesWithHeader;
+      }
+    }
+
+    /**
+     * Encodes this block if it is a data block and encoding is turned on in
+     * {@link #dataBlockEncoder}.
+     */
+    private void encodeDataBlockForDisk() throws IOException {
+      if (blockType != BlockType.DATA) {
+        return; // skip any non-data block
+      }
+
+      // do data block encoding, if data block encoder is set
+      ByteBuffer rawKeyValues = ByteBuffer.wrap(uncompressedBytesWithHeader,
+          HEADER_SIZE, uncompressedBytesWithHeader.length -
+          HEADER_SIZE).slice();
+      Pair<ByteBuffer, BlockType> encodingResult =
+          dataBlockEncoder.beforeWriteToDisk(rawKeyValues,
+              includesMemstoreTS);
+
+      BlockType encodedBlockType = encodingResult.getSecond();
+      if (encodedBlockType == BlockType.ENCODED_DATA) {
+        uncompressedBytesWithHeader = encodingResult.getFirst().array();
+        blockType = BlockType.ENCODED_DATA;
+      } else {
+        // There is no encoding configured. Do some extra sanity-checking.
+        if (encodedBlockType != BlockType.DATA) {
+          throw new IOException("Unexpected block type coming out of data " +
+              "block encoder: " + encodedBlockType);
+        }
+        if (userDataStream.size() !=
             uncompressedBytesWithHeader.length - HEADER_SIZE) {
           throw new IOException("Uncompressed size mismatch: "
-              + uncompressedSizeWithoutHeader + " vs. "
+              + userDataStream.size() + " vs. "
               + (uncompressedBytesWithHeader.length - HEADER_SIZE));
         }
-
-        // Write the header into the beginning of the uncompressed byte array.
-        putHeader(uncompressedBytesWithHeader, 0);
       }
     }
 
-    /** Put the header into the given byte array at the given offset. */
-    private void putHeader(byte[] dest, int offset) {
+    /**
+     * Put the header into the given byte array at the given offset.
+     * @param onDiskSize size of the block on disk
+     * @param uncompressedSize size of the block after decompression (but
+     *          before optional data block decoding)
+     */
+    private void putHeader(byte[] dest, int offset, int onDiskSize,
+        int uncompressedSize) {
       offset = blockType.put(dest, offset);
-      offset = Bytes.putInt(dest, offset, onDiskBytesWithHeader.length
-          - HEADER_SIZE);
-      offset = Bytes.putInt(dest, offset, uncompressedSizeWithoutHeader);
+      offset = Bytes.putInt(dest, offset, onDiskSize - HEADER_SIZE);
+      offset = Bytes.putInt(dest, offset, uncompressedSize - HEADER_SIZE);
       Bytes.putLong(dest, offset, prevOffset);
     }
 
@@ -793,7 +856,7 @@ public class HFileBlock extends SchemaCo
      */
     public int getUncompressedSizeWithoutHeader() {
       expectState(State.BLOCK_READY);
-      return uncompressedSizeWithoutHeader;
+      return uncompressedBytesWithHeader.length - HEADER_SIZE;
     }
 
     /**
@@ -801,7 +864,7 @@ public class HFileBlock extends SchemaCo
      */
     public int getUncompressedSizeWithHeader() {
       expectState(State.BLOCK_READY);
-      return uncompressedSizeWithoutHeader + HEADER_SIZE;
+      return uncompressedBytesWithHeader.length;
     }
 
     /** @return true if a block is being written  */
@@ -832,15 +895,6 @@ public class HFileBlock extends SchemaCo
     private byte[] getUncompressedDataWithHeader() {
       expectState(State.BLOCK_READY);
 
-      if (compressAlgo == NONE)
-        return onDiskBytesWithHeader;
-
-      if (!cacheOnWrite)
-        throw new IllegalStateException("Cache-on-write is turned off");
-
-      if (uncompressedBytesWithHeader == null)
-        throw new NullPointerException();
-
       return uncompressedBytesWithHeader;
     }
 
@@ -874,14 +928,15 @@ public class HFileBlock extends SchemaCo
      */
     public void writeBlock(BlockWritable bw, FSDataOutputStream out)
         throws IOException {
-      bw.writeToBlock(startWriting(bw.getBlockType(), false));
+      bw.writeToBlock(startWriting(bw.getBlockType()));
       writeHeaderAndData(out);
     }
 
     public HFileBlock getBlockForCaching() {
-      return new HFileBlock(blockType, onDiskBytesWithHeader.length
-          - HEADER_SIZE, uncompressedSizeWithoutHeader, prevOffset,
-          getUncompressedBufferWithHeader(), false, startOffset);
+      return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
+          getUncompressedSizeWithoutHeader(), prevOffset,
+          getUncompressedBufferWithHeader(), DONT_FILL_HEADER, startOffset,
+          includesMemstoreTS);
     }
 
   }
@@ -1214,7 +1269,8 @@ public class HFileBlock extends SchemaCo
       // to the size of the data portion of the block without the magic record,
       // since the magic record gets moved to the header.
       HFileBlock b = new HFileBlock(newBlockType, onDiskSizeWithoutHeader,
-          uncompressedSizeWithMagic - MAGIC_LENGTH, -1L, buf, true, offset);
+          uncompressedSizeWithMagic - MAGIC_LENGTH, -1L, buf, FILL_HEADER,
+          offset, MemStore.NO_PERSISTENT_TS);
       return b;
     }
   }
@@ -1232,6 +1288,13 @@ public class HFileBlock extends SchemaCo
   /** Reads version 2 blocks from the filesystem. */
   public static class FSReaderV2 extends AbstractFSReader {
 
+    /** Whether we include memstore timestamp in data blocks */
+    protected boolean includesMemstoreTS;
+
+    /** Data block encoding used to read from file */
+    protected HFileDataBlockEncoder dataBlockEncoder =
+        NoOpDataBlockEncoder.INSTANCE;
+
     private ThreadLocal<PrefetchedHeader> prefetchedHeaderForThread =
         new ThreadLocal<PrefetchedHeader>() {
           @Override
@@ -1438,6 +1501,8 @@ public class HFileBlock extends SchemaCo
           }
         }
       }
+
+      b.includesMemstoreTS = includesMemstoreTS;
       b.offset = offset;
       return b;
     }
@@ -1451,6 +1516,13 @@ public class HFileBlock extends SchemaCo
           prefetchedHeader.header, 0, HEADER_SIZE);
     }
 
+    void setIncludesMemstoreTS(boolean enabled) {
+      includesMemstoreTS = enabled;
+    }
+
+    void setDataBlockEncoder(HFileDataBlockEncoder encoder) {
+      this.dataBlockEncoder = encoder;
+    }
   }
 
   @Override
@@ -1518,5 +1590,16 @@ public class HFileBlock extends SchemaCo
     return true;
   }
 
+  public boolean doesIncludeMemstoreTS() {
+    return includesMemstoreTS;
+  }
+
+  public DataBlockEncoding getDataBlockEncoding() {
+    if (blockType == BlockType.ENCODED_DATA) {
+      return DataBlockEncoding.getEncodingById(getDataBlockEncodingId());
+    }
+    return DataBlockEncoding.NONE;
+  }
 
 }
+

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java?rev=1236031&r1=1236030&r2=1236031&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java Thu Jan 26 02:58:57 2012
@@ -38,6 +38,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.HFile.CachingBlockReader;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -200,8 +201,18 @@ public class HFileBlockIndex {
           // Call HFile's caching block reader API. We always cache index
           // blocks, otherwise we might get terrible performance.
           boolean shouldCache = cacheBlocks || (lookupLevel < searchTreeLevel);
-          block = cachingBlockReader.readBlock(currentOffset, currentOnDiskSize,
-              shouldCache, pread, isCompaction);
+          BlockType expectedBlockType;
+          if (lookupLevel < searchTreeLevel - 1) {
+            expectedBlockType = BlockType.INTERMEDIATE_INDEX;
+          } else if (lookupLevel == searchTreeLevel - 1) {
+            expectedBlockType = BlockType.LEAF_INDEX;
+          } else {
+            // this also accounts for ENCODED_DATA
+            expectedBlockType = BlockType.DATA;
+          }
+          block = cachingBlockReader.readBlock(currentOffset,
+              currentOnDiskSize, shouldCache, pread, isCompaction,
+              expectedBlockType);
         }
 
         if (block == null) {
@@ -210,7 +221,8 @@ public class HFileBlockIndex {
         }
 
         // Found a data block, break the loop and check our level in the tree.
-        if (block.getBlockType().equals(BlockType.DATA)) {
+        if (block.getBlockType().equals(BlockType.DATA) ||
+            block.getBlockType().equals(BlockType.ENCODED_DATA)) {
           break;
         }
 
@@ -267,7 +279,8 @@ public class HFileBlockIndex {
 
         // Caching, using pread, assuming this is not a compaction.
         HFileBlock midLeafBlock = cachingBlockReader.readBlock(
-            midLeafBlockOffset, midLeafBlockOnDiskSize, true, true, false);
+            midLeafBlockOffset, midLeafBlockOnDiskSize, true, true, false,
+            BlockType.LEAF_INDEX);
 
         ByteBuffer b = midLeafBlock.getBufferWithoutHeader();
         int numDataBlocks = b.getInt();
@@ -733,8 +746,8 @@ public class HFileBlockIndex {
       long rootLevelIndexPos = out.getPos();
 
       {
-        DataOutput blockStream = blockWriter.startWriting(BlockType.ROOT_INDEX,
-            false);
+        DataOutput blockStream =
+            blockWriter.startWriting(BlockType.ROOT_INDEX);
         rootChunk.writeRoot(blockStream);
         if (midKeyMetadata != null)
           blockStream.write(midKeyMetadata);
@@ -829,7 +842,7 @@ public class HFileBlockIndex {
         BlockIndexChunk parent, BlockIndexChunk curChunk) throws IOException {
       long beginOffset = out.getPos();
       DataOutputStream dos = blockWriter.startWriting(
-          BlockType.INTERMEDIATE_INDEX, cacheOnWrite());
+          BlockType.INTERMEDIATE_INDEX);
       curChunk.writeNonRoot(dos);
       byte[] curFirstKey = curChunk.getBlockKey(0);
       blockWriter.writeHeaderAndData(out);
@@ -837,8 +850,9 @@ public class HFileBlockIndex {
       if (blockCache != null) {
         HFileBlock blockForCaching = blockWriter.getBlockForCaching();
         passSchemaMetricsTo(blockForCaching);
-        blockCache.cacheBlock(HFile.getBlockCacheKey(nameForCaching,
-            beginOffset), blockForCaching);
+        blockCache.cacheBlock(new BlockCacheKey(nameForCaching,
+            beginOffset, DataBlockEncoding.NONE, 
+            blockForCaching.getBlockType()), blockForCaching);
       }
 
       // Add intermediate index block size

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java?rev=1236031&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java Thu Jan 26 02:58:57 2012
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * Controls what kind of data block encoding is used. If data block encoding is
+ * not set or the given block is not a data block (encoded or not), methods
+ * should just return the unmodified block.
+ */
+public interface HFileDataBlockEncoder {
+  /**
+   * Converts a block from the on-disk format to the in-cache format. Called in
+   * the following cases:
+   * <ul>
+   * <li>After an encoded or unencoded data block is read from disk, but before
+   * it is put into the cache.</li>
+   * <li>To convert brand-new blocks to the in-cache format when doing
+   * cache-on-write.</li>
+   * </ul>
+   * @param block a block in an on-disk format (read from HFile or freshly
+   *          generated).
+   * @return non null block which is coded according to the settings.
+   */
+  public HFileBlock diskToCacheFormat(HFileBlock block,
+      boolean isCompaction);
+
+  /**
+   * Should be called before an encoded or unencoded data block is written to
+   * disk.
+   * @param in KeyValues next to each other
+   * @return a non-null on-heap buffer containing the contents of the
+   *         HFileBlock with unfilled header and block type
+   */
+  public Pair<ByteBuffer, BlockType> beforeWriteToDisk(
+      ByteBuffer in, boolean includesMemstoreTS);
+
+  /**
+   * Decides whether we should use a scanner over encoded blocks.
+   * @param isCompaction whether we are in a compaction.
+   * @return Whether to use encoded scanner.
+   */
+  public boolean useEncodedScanner(boolean isCompaction);
+
+  /**
+   * Save metadata in StoreFile which will be written to disk
+   * @param storeFileWriter writer for a given StoreFile
+   * @exception IOException on disk problems
+   */
+  public void saveMetadata(StoreFile.Writer storeFileWriter)
+      throws IOException;
+
+  /** @return the on-disk data block encoding */
+  public DataBlockEncoding getEncodingOnDisk();
+
+  /** @return the preferred in-cache data block encoding for normal reads */
+  public DataBlockEncoding getEncodingInCache();
+
+  /**
+   * @return the effective in-cache data block encoding, taking into account
+   *         whether we are doing a compaction.
+   */
+  public DataBlockEncoding getEffectiveEncodingInCache(boolean isCompaction);
+
+}



Mime
View raw message