Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D99CB9F08 for ; Sat, 24 Dec 2011 21:21:09 +0000 (UTC) Received: (qmail 93000 invoked by uid 500); 24 Dec 2011 21:21:09 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 92943 invoked by uid 500); 24 Dec 2011 21:21:08 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 92935 invoked by uid 99); 24 Dec 2011 21:21:08 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 24 Dec 2011 21:21:08 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 24 Dec 2011 21:21:05 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 85A512388900 for ; Sat, 24 Dec 2011 21:20:44 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1223020 [2/5] - in /hbase/trunk/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/io/ main/java/org/apache/hadoop/hbase/io/encoding/ main/java/org/apache/hadoop/hbase/io/hfile/ main/java/org/apache/hadoop/hbase/mapr... Date: Sat, 24 Dec 2011 21:20:41 -0000 To: commits@hbase.apache.org From: tedyu@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111224212044.85A512388900@eris.apache.org> Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java?rev=1223020&view=auto ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java (added) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java Sat Dec 24 21:20:39 2011 @@ -0,0 +1,533 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.encoding; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.RawComparator; + +/** + * Compress using: + * - store size of common prefix + * - save column family once, it is same within HFile + * - use integer compression for key, value and prefix (7-bit encoding) + * - use bits to avoid duplication key length, value length + * and type if it same as previous + * - store in 3 bits length of timestamp field + * - allow diff in timestamp instead of actual value + * + * Format: + * - 1 byte: flag + * - 1-5 bytes: key length (only if FLAG_SAME_KEY_LENGTH is not set in flag) + * - 1-5 bytes: value length (only if FLAG_SAME_VALUE_LENGTH is not set in flag) + * - 1-5 bytes: prefix length + * - ... bytes: rest of the row (if prefix length is small enough) + * - ... bytes: qualifier (or suffix depending on prefix length) + * - 1-8 bytes: timestamp or diff + * - 1 byte: type (only if FLAG_SAME_TYPE is not set in the flag) + * - ... bytes: value + */ +public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder { + static final int FLAG_SAME_KEY_LENGTH = 1; + static final int FLAG_SAME_VALUE_LENGTH = 1 << 1; + static final int FLAG_SAME_TYPE = 1 << 2; + static final int FLAG_TIMESTAMP_IS_DIFF = 1 << 3; + static final int MASK_TIMESTAMP_LENGTH = (1 << 4) | (1 << 5) | (1 << 6); + static final int SHIFT_TIMESTAMP_LENGTH = 4; + static final int FLAG_TIMESTAMP_SIGN = 1 << 7; + + protected static class DiffCompressionState extends CompressionState { + long timestamp; + byte[] familyNameWithSize; + + @Override + protected void readTimestamp(ByteBuffer in) { + timestamp = in.getLong(); + } + + @Override + void copyFrom(CompressionState state) { + super.copyFrom(state); + DiffCompressionState state2 = (DiffCompressionState) state; + timestamp = state2.timestamp; + } + } + + private void compressSingleKeyValue(DiffCompressionState previousState, + DiffCompressionState currentState, DataOutputStream out, + ByteBuffer in) throws IOException { + byte flag = 0; + int kvPos = in.position(); + int keyLength = in.getInt(); + int valueLength = in.getInt(); + + long timestamp; + long diffTimestamp = 0; + int diffTimestampFitsInBytes = 0; + + int commonPrefix; + + int timestampFitsInBytes; + + if (previousState.isFirst()) { + currentState.readKey(in, keyLength, valueLength); + currentState.prevOffset = kvPos; + timestamp = currentState.timestamp; + if (timestamp < 0) { + flag |= FLAG_TIMESTAMP_SIGN; + timestamp = -timestamp; + } + timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp); + + flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH; + commonPrefix = 0; + + // put column family + in.mark(); + ByteBufferUtils.skip(in, currentState.rowLength + + KeyValue.ROW_LENGTH_SIZE); + ByteBufferUtils.copyToStream(out, in, currentState.familyLength + + KeyValue.FAMILY_LENGTH_SIZE); + in.reset(); + } else { + // find a common prefix and skip it + commonPrefix = + ByteBufferUtils.findCommonPrefix(in, in.position(), + previousState.prevOffset + KeyValue.ROW_OFFSET, keyLength + - KeyValue.TIMESTAMP_TYPE_SIZE); + // don't compress timestamp and type using prefix + + currentState.readKey(in, keyLength, valueLength, + commonPrefix, previousState); + currentState.prevOffset = kvPos; + timestamp = currentState.timestamp; + boolean minusTimestamp = timestamp < 0; + if (minusTimestamp) { + timestamp = -timestamp; + } + timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp); + + if (keyLength == previousState.keyLength) { + flag |= FLAG_SAME_KEY_LENGTH; + } + if (valueLength == previousState.valueLength) { + flag |= FLAG_SAME_VALUE_LENGTH; + } + if (currentState.type == previousState.type) { + flag |= FLAG_SAME_TYPE; + } + + // encode timestamp + diffTimestamp = previousState.timestamp - currentState.timestamp; + boolean minusDiffTimestamp = diffTimestamp < 0; + if (minusDiffTimestamp) { + diffTimestamp = -diffTimestamp; + } + diffTimestampFitsInBytes = ByteBufferUtils.longFitsIn(diffTimestamp); + if (diffTimestampFitsInBytes < timestampFitsInBytes) { + flag |= (diffTimestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH; + flag |= FLAG_TIMESTAMP_IS_DIFF; + if (minusDiffTimestamp) { + flag |= FLAG_TIMESTAMP_SIGN; + } + } else { + flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH; + if (minusTimestamp) { + flag |= FLAG_TIMESTAMP_SIGN; + } + } + } + + ByteBufferUtils.copyToStream(out, flag); + if ((flag & FLAG_SAME_KEY_LENGTH) == 0) { + ByteBufferUtils.putCompressedInt(out, keyLength); + } + if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) { + ByteBufferUtils.putCompressedInt(out, valueLength); + } + + ByteBufferUtils.putCompressedInt(out, commonPrefix); + ByteBufferUtils.skip(in, commonPrefix); + + if (previousState.isFirst() || + commonPrefix < currentState.rowLength + KeyValue.ROW_LENGTH_SIZE) { + int restRowLength = + currentState.rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix; + ByteBufferUtils.copyToStream(out, in, restRowLength); + ByteBufferUtils.skip(in, currentState.familyLength + + KeyValue.FAMILY_LENGTH_SIZE); + ByteBufferUtils.copyToStream(out, in, currentState.qualifierLength); + } else { + ByteBufferUtils.copyToStream(out, in, + keyLength - commonPrefix - KeyValue.TIMESTAMP_TYPE_SIZE); + } + + if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) { + ByteBufferUtils.putLong(out, timestamp, timestampFitsInBytes); + } else { + ByteBufferUtils.putLong(out, diffTimestamp, diffTimestampFitsInBytes); + } + + if ((flag & FLAG_SAME_TYPE) == 0) { + ByteBufferUtils.copyToStream(out, currentState.type); + } + ByteBufferUtils.skip(in, KeyValue.TIMESTAMP_TYPE_SIZE); + + ByteBufferUtils.copyToStream(out, in, valueLength); + } + + private void uncompressSingleKeyValue(DataInputStream source, + ByteBuffer buffer, + DiffCompressionState state) + throws IOException, EncoderBufferTooSmallException { + // read the column family at the beginning + if (state.isFirst()) { + state.familyLength = source.readByte(); + state.familyNameWithSize = + new byte[(state.familyLength & 0xff) + KeyValue.FAMILY_LENGTH_SIZE]; + state.familyNameWithSize[0] = state.familyLength; + source.read(state.familyNameWithSize, KeyValue.FAMILY_LENGTH_SIZE, + state.familyLength); + } + + // read flag + byte flag = source.readByte(); + + // read key/value/common lengths + int keyLength; + int valueLength; + if ((flag & FLAG_SAME_KEY_LENGTH) != 0) { + keyLength = state.keyLength; + } else { + keyLength = ByteBufferUtils.readCompressedInt(source); + } + if ((flag & FLAG_SAME_VALUE_LENGTH) != 0) { + valueLength = state.valueLength; + } else { + valueLength = ByteBufferUtils.readCompressedInt(source); + } + int commonPrefix = ByteBufferUtils.readCompressedInt(source); + + // create KeyValue buffer and fill it prefix + int keyOffset = buffer.position(); + ByteBufferUtils.ensureSpace(buffer, keyLength + valueLength + + KeyValue.ROW_OFFSET); + buffer.putInt(keyLength); + buffer.putInt(valueLength); + + // copy common from previous key + if (commonPrefix > 0) { + ByteBufferUtils.copyFromBuffer(buffer, buffer, state.prevOffset + + KeyValue.ROW_OFFSET, commonPrefix); + } + + // copy the rest of the key from the buffer + int keyRestLength; + if (state.isFirst() || commonPrefix < + state.rowLength + KeyValue.ROW_LENGTH_SIZE) { + // omit the family part of the key, it is always the same + short rowLength; + int rowRestLength; + + // check length of row + if (commonPrefix < KeyValue.ROW_LENGTH_SIZE) { + // not yet copied, do it now + ByteBufferUtils.copyFromStream(source, buffer, + KeyValue.ROW_LENGTH_SIZE - commonPrefix); + ByteBufferUtils.skip(buffer, -KeyValue.ROW_LENGTH_SIZE); + rowLength = buffer.getShort(); + rowRestLength = rowLength; + } else { + // already in buffer, just read it + rowLength = buffer.getShort(keyOffset + KeyValue.ROW_OFFSET); + rowRestLength = rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix; + } + + // copy the rest of row + ByteBufferUtils.copyFromStream(source, buffer, rowRestLength); + state.rowLength = rowLength; + + // copy the column family + buffer.put(state.familyNameWithSize); + + keyRestLength = keyLength - rowLength - + state.familyNameWithSize.length - + (KeyValue.ROW_LENGTH_SIZE + KeyValue.TIMESTAMP_TYPE_SIZE); + } else { + // prevRowWithSizeLength is the same as on previous row + keyRestLength = keyLength - commonPrefix - KeyValue.TIMESTAMP_TYPE_SIZE; + } + // copy the rest of the key, after column family -> column qualifier + ByteBufferUtils.copyFromStream(source, buffer, keyRestLength); + + // handle timestamp + int timestampFitsInBytes = + ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1; + long timestamp = ByteBufferUtils.readLong(source, timestampFitsInBytes); + if ((flag & FLAG_TIMESTAMP_SIGN) != 0) { + timestamp = -timestamp; + } + if ((flag & FLAG_TIMESTAMP_IS_DIFF) != 0) { + timestamp = state.timestamp - timestamp; + } + buffer.putLong(timestamp); + + // copy the type field + byte type; + if ((flag & FLAG_SAME_TYPE) != 0) { + type = state.type; + } else { + type = source.readByte(); + } + buffer.put(type); + + // copy value part + ByteBufferUtils.copyFromStream(source, buffer, valueLength); + + state.keyLength = keyLength; + state.valueLength = valueLength; + state.prevOffset = keyOffset; + state.timestamp = timestamp; + state.type = type; + // state.qualifier is unused + } + + @Override + public void compressKeyValues(DataOutputStream out, + ByteBuffer in, boolean includesMemstoreTS) throws IOException { + in.rewind(); + ByteBufferUtils.putInt(out, in.limit()); + DiffCompressionState previousState = new DiffCompressionState(); + DiffCompressionState currentState = new DiffCompressionState(); + while (in.hasRemaining()) { + compressSingleKeyValue(previousState, currentState, + out, in); + afterEncodingKeyValue(in, out, includesMemstoreTS); + + // swap previousState <-> currentState + DiffCompressionState tmp = previousState; + previousState = currentState; + currentState = tmp; + } + } + + @Override + public ByteBuffer uncompressKeyValues(DataInputStream source, + int allocHeaderLength, int skipLastBytes, boolean includesMemstoreTS) + throws IOException { + int decompressedSize = source.readInt(); + ByteBuffer buffer = ByteBuffer.allocate(decompressedSize + + allocHeaderLength); + buffer.position(allocHeaderLength); + DiffCompressionState state = new DiffCompressionState(); + while (source.available() > skipLastBytes) { + uncompressSingleKeyValue(source, buffer, state); + afterDecodingKeyValue(source, buffer, includesMemstoreTS); + } + + if (source.available() != skipLastBytes) { + throw new IllegalStateException("Read too much bytes."); + } + + return buffer; + } + + @Override + public ByteBuffer getFirstKeyInBlock(ByteBuffer block) { + block.mark(); + block.position(Bytes.SIZEOF_INT); + byte familyLength = block.get(); + ByteBufferUtils.skip(block, familyLength); + byte flag = block.get(); + int keyLength = ByteBufferUtils.readCompressedInt(block); + ByteBufferUtils.readCompressedInt(block); // valueLength + ByteBufferUtils.readCompressedInt(block); // commonLength + ByteBuffer result = ByteBuffer.allocate(keyLength); + + // copy row + int pos = result.arrayOffset(); + block.get(result.array(), pos, Bytes.SIZEOF_SHORT); + pos += Bytes.SIZEOF_SHORT; + short rowLength = result.getShort(); + block.get(result.array(), pos, rowLength); + pos += rowLength; + + // copy family + int savePosition = block.position(); + block.position(Bytes.SIZEOF_INT); + block.get(result.array(), pos, familyLength + Bytes.SIZEOF_BYTE); + pos += familyLength + Bytes.SIZEOF_BYTE; + + // copy qualifier + block.position(savePosition); + int qualifierLength = + keyLength - pos + result.arrayOffset() - KeyValue.TIMESTAMP_TYPE_SIZE; + block.get(result.array(), pos, qualifierLength); + pos += qualifierLength; + + // copy the timestamp and type + int timestampFitInBytes = + ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1; + long timestamp = ByteBufferUtils.readLong(block, timestampFitInBytes); + if ((flag & FLAG_TIMESTAMP_SIGN) != 0) { + timestamp = -timestamp; + } + result.putLong(pos, timestamp); + pos += Bytes.SIZEOF_LONG; + block.get(result.array(), pos, Bytes.SIZEOF_BYTE); + + block.reset(); + return result; + } + + @Override + public String toString() { + return DiffKeyDeltaEncoder.class.getSimpleName(); + } + + @Override + public EncodedSeeker createSeeker(RawComparator comparator, + final boolean includesMemstoreTS) { + return new BufferedEncodedSeeker(comparator) { + private static final int TIMESTAMP_WITH_TYPE_LENGTH = + Bytes.SIZEOF_LONG + Bytes.SIZEOF_BYTE; + private byte[] familyNameWithSize; + private int rowLengthWithSize; + private long timestamp; + + private void decode(boolean isFirst) { + byte flag = currentBuffer.get(); + byte type = 0; + if ((flag & FLAG_SAME_KEY_LENGTH) == 0) { + if (!isFirst) { + type = current.keyBuffer[current.keyLength - Bytes.SIZEOF_BYTE]; + } + current.keyLength = ByteBufferUtils.readCompressedInt(currentBuffer); + } + if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) { + current.valueLength = + ByteBufferUtils.readCompressedInt(currentBuffer); + } + current.lastCommonPrefix = + ByteBufferUtils.readCompressedInt(currentBuffer); + + current.ensureSpaceForKey(); + + if (current.lastCommonPrefix < Bytes.SIZEOF_SHORT) { + // length of row is different, copy everything except family + + // copy the row size + currentBuffer.get(current.keyBuffer, current.lastCommonPrefix, + Bytes.SIZEOF_SHORT - current.lastCommonPrefix); + rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) + + Bytes.SIZEOF_SHORT; + + // copy the rest of row + currentBuffer.get(current.keyBuffer, Bytes.SIZEOF_SHORT, + rowLengthWithSize - Bytes.SIZEOF_SHORT); + + // copy the column family + System.arraycopy(familyNameWithSize, 0, + current.keyBuffer, rowLengthWithSize, familyNameWithSize.length); + + // copy the qualifier + currentBuffer.get(current.keyBuffer, + rowLengthWithSize + familyNameWithSize.length, + current.keyLength - rowLengthWithSize - + familyNameWithSize.length - TIMESTAMP_WITH_TYPE_LENGTH); + } else if (current.lastCommonPrefix < rowLengthWithSize) { + // we have to copy part of row and qualifier, + // but column family is in right place + + // before column family (rest of row) + currentBuffer.get(current.keyBuffer, current.lastCommonPrefix, + rowLengthWithSize - current.lastCommonPrefix); + + // after column family (qualifier) + currentBuffer.get(current.keyBuffer, + rowLengthWithSize + familyNameWithSize.length, + current.keyLength - rowLengthWithSize - + familyNameWithSize.length - TIMESTAMP_WITH_TYPE_LENGTH); + } else { + // copy just the ending + currentBuffer.get(current.keyBuffer, current.lastCommonPrefix, + current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH - + current.lastCommonPrefix); + } + + + // timestamp + int pos = current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH; + int timestampFitInBytes = 1 + + ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH); + long timestampOrDiff = + ByteBufferUtils.readLong(currentBuffer, timestampFitInBytes); + if ((flag & FLAG_TIMESTAMP_SIGN) != 0) { + timestampOrDiff = -timestampOrDiff; + } + if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) { // it is timestamp + timestamp = timestampOrDiff; + } else { // it is diff + timestamp = timestamp - timestampOrDiff; + } + Bytes.putLong(current.keyBuffer, pos, timestamp); + pos += Bytes.SIZEOF_LONG; + + // type + if ((flag & FLAG_SAME_TYPE) == 0) { + currentBuffer.get(current.keyBuffer, pos, Bytes.SIZEOF_BYTE); + } else if ((flag & FLAG_SAME_KEY_LENGTH) == 0) { + current.keyBuffer[pos] = type; + } + + current.valueOffset = currentBuffer.position(); + ByteBufferUtils.skip(currentBuffer, current.valueLength); + + if (includesMemstoreTS) { + current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer); + } else { + current.memstoreTS = 0; + } + current.nextKvOffset = currentBuffer.position(); + } + + @Override + protected void decodeFirst() { + ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT); + + // read column family + byte familyNameLength = currentBuffer.get(); + familyNameWithSize = new byte[familyNameLength + Bytes.SIZEOF_BYTE]; + familyNameWithSize[0] = familyNameLength; + currentBuffer.get(familyNameWithSize, Bytes.SIZEOF_BYTE, + familyNameLength); + decode(true); + } + + @Override + protected void decodeNext() { + decode(false); + } + }; + } +} Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java?rev=1223020&view=auto ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java (added) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java Sat Dec 24 21:20:39 2011 @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.encoding; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Iterator; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.io.compress.Compressor; + +/** + * Encapsulates a data block compressed using a particular encoding algorithm. + * Useful for testing and benchmarking. + */ +public class EncodedDataBlock { + private static final int BUFFER_SIZE = 4 * 1024; + protected DataBlockEncoder dataBlockEncoder; + ByteArrayOutputStream uncompressedOutputStream; + ByteBuffer uncompressedBuffer; + private byte[] cacheCompressData; + private ByteArrayOutputStream compressedStream = new ByteArrayOutputStream(); + private boolean includesMemstoreTS; + + /** + * Create a buffer which will be encoded using dataBlockEncoder. + * @param dataBlockEncoder Algorithm used for compression. + */ + public EncodedDataBlock(DataBlockEncoder dataBlockEncoder, + boolean includesMemstoreTS) { + this.dataBlockEncoder = dataBlockEncoder; + uncompressedOutputStream = new ByteArrayOutputStream(BUFFER_SIZE); + } + + /** + * Add KeyValue and compress it. + * @param kv Item to be added and compressed. + */ + public void addKv(KeyValue kv) { + cacheCompressData = null; + uncompressedOutputStream.write( + kv.getBuffer(), kv.getOffset(), kv.getLength()); + } + + /** + * Provides access to compressed value. + * @return Forwards sequential iterator. + */ + public Iterator getIterator() { + final int uncompressedSize = uncompressedOutputStream.size(); + final ByteArrayInputStream bais = new ByteArrayInputStream( + getCompressedData()); + final DataInputStream dis = new DataInputStream(bais); + + + return new Iterator() { + private ByteBuffer decompressedData = null; + + @Override + public boolean hasNext() { + if (decompressedData == null) { + return uncompressedSize > 0; + } + return decompressedData.hasRemaining(); + } + + @Override + public KeyValue next() { + if (decompressedData == null) { + try { + decompressedData = dataBlockEncoder.uncompressKeyValues( + dis, includesMemstoreTS); + } catch (IOException e) { + throw new RuntimeException("Problem with data block encoder, " + + "most likely it requested more bytes than are available.", e); + } + decompressedData.rewind(); + } + + int offset = decompressedData.position(); + KeyValue kv = new KeyValue(decompressedData.array(), offset); + decompressedData.position(offset + kv.getLength()); + + return kv; + } + + @Override + public void remove() { + throw new NotImplementedException("remove() is not supported!"); + } + + @Override + public String toString() { + return "Iterator of: " + dataBlockEncoder.getClass().getName(); + } + + }; + } + + /** + * Find the size of minimal buffer that could store compressed data. + * @return Size in bytes of compressed data. + */ + public int getSize() { + return getCompressedData().length; + } + + /** + * Find the size of compressed data assuming that buffer will be compressed + * using given algorithm. + * @param compressor Algorithm used for compression. + * @param buffer Array to be compressed. + * @param offset Offset to beginning of the data. + * @param length Length to be compressed. + * @return Size of compressed data in bytes. + */ + public static int checkCompressedSize(Compressor compressor, byte[] buffer, + int offset, int length) { + byte[] compressedBuffer = new byte[buffer.length]; + // in fact the buffer could be of any positive size + compressor.setInput(buffer, offset, length); + compressor.finish(); + int currentPos = 0; + while (!compressor.finished()) { + try { + // we don't care about compressed data, + // we just want to callculate number of bytes + currentPos += compressor.compress(compressedBuffer, 0, + compressedBuffer.length); + } catch (IOException e) { + throw new RuntimeException( + "For some reason compressor couldn't read data. " + + "It is likely a problem with " + + compressor.getClass().getName(), e); + } + } + return currentPos; + } + + /** + * Estimate size after second stage of compression (e.g. LZO). + * @param compressor Algorithm which will be used for compressions. + * @return Size after second stage of compression. + */ + public int checkCompressedSize(Compressor compressor) { + // compress + byte[] compressedBytes = getCompressedData(); + return checkCompressedSize(compressor, compressedBytes, 0, + compressedBytes.length); + } + + private byte[] getCompressedData() { + // is cached + if (cacheCompressData != null) { + return cacheCompressData; + } + cacheCompressData = doCompressData(); + + return cacheCompressData; + } + + private ByteBuffer getUncompressedBuffer() { + if (uncompressedBuffer == null || + uncompressedBuffer.limit() < uncompressedOutputStream.size()) { + uncompressedBuffer = ByteBuffer.wrap( + uncompressedOutputStream.toByteArray()); + } + return uncompressedBuffer; + } + + /** + * Do the compression. + * @return Compressed byte buffer. + */ + public byte[] doCompressData() { + compressedStream.reset(); + DataOutputStream dataOut = new DataOutputStream(compressedStream); + try { + this.dataBlockEncoder.compressKeyValues( + dataOut, getUncompressedBuffer(), includesMemstoreTS); + } catch (IOException e) { + throw new RuntimeException(String.format( + "Bug in decoding part of algorithm %s. " + + "Probably it requested more bytes than are available.", + toString()), e); + } + return compressedStream.toByteArray(); + } + + @Override + public String toString() { + return dataBlockEncoder.toString(); + } + + /** + * Get uncompressed buffer. + * @return The buffer. + */ + public byte[] getRawKeyValues() { + return uncompressedOutputStream.toByteArray(); + } +} Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/EncoderBufferTooSmallException.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/EncoderBufferTooSmallException.java?rev=1223020&view=auto ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/EncoderBufferTooSmallException.java (added) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/EncoderBufferTooSmallException.java Sat Dec 24 21:20:39 2011 @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.encoding; + +/** + * Internal error which indicates a bug in a data block encoding algorithm. + */ +public class EncoderBufferTooSmallException extends RuntimeException { + private static final long serialVersionUID = 4767495176134878737L; + + public EncoderBufferTooSmallException(String message) { + super(message); + } +} Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java?rev=1223020&view=auto ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java (added) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java Sat Dec 24 21:20:39 2011 @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.encoding; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.RawComparator; + +/** + * Encoder similar to {@link DiffKeyDeltaEncoder} but supposedly faster. + * + * Compress using: + * - store size of common prefix + * - save column family once in the first KeyValue + * - use integer compression for key, value and prefix (128-bit encoding) + * - use bits to avoid duplication key length, value length + * and type if it same as previous + * - store in 3 bits length of prefix timestamp + * with previous KeyValue's timestamp + * - one bit which allow to omit value if it is the same + * + * Format: + * - 1 byte: flag + * - 1-5 bytes: key length (only if FLAG_SAME_KEY_LENGTH is not set in flag) + * - 1-5 bytes: value length (only if FLAG_SAME_VALUE_LENGTH is not set in flag) + * - 1-5 bytes: prefix length + * - ... bytes: rest of the row (if prefix length is small enough) + * - ... bytes: qualifier (or suffix depending on prefix length) + * - 1-8 bytes: timestamp suffix + * - 1 byte: type (only if FLAG_SAME_TYPE is not set in the flag) + * - ... bytes: value (only if FLAG_SAME_VALUE is not set in the flag) + * + */ +public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder { + final int MASK_TIMESTAMP_LENGTH = (1 << 0) | (1 << 1) | (1 << 2); + final int SHIFT_TIMESTAMP_LENGTH = 0; + final int FLAG_SAME_KEY_LENGTH = 1 << 3; + final int FLAG_SAME_VALUE_LENGTH = 1 << 4; + final int FLAG_SAME_TYPE = 1 << 5; + final int FLAG_SAME_VALUE = 1 << 6; + + private static class FastDiffCompressionState extends CompressionState { + byte[] timestamp = new byte[KeyValue.TIMESTAMP_SIZE]; + int prevTimestampOffset; + + @Override + protected void readTimestamp(ByteBuffer in) { + in.get(timestamp); + } + + @Override + void copyFrom(CompressionState state) { + super.copyFrom(state); + FastDiffCompressionState state2 = (FastDiffCompressionState) state; + System.arraycopy(state2.timestamp, 0, timestamp, 0, + KeyValue.TIMESTAMP_SIZE); + prevTimestampOffset = state2.prevTimestampOffset; + } + } + + private void compressSingleKeyValue( + FastDiffCompressionState previousState, + FastDiffCompressionState currentState, + OutputStream out, ByteBuffer in) throws IOException { + currentState.prevOffset = in.position(); + int keyLength = in.getInt(); + int valueOffset = currentState.prevOffset + keyLength + KeyValue.ROW_OFFSET; + int valueLength = in.getInt(); + byte flag = 0; + + if (previousState.isFirst()) { + // copy the key, there is no common prefix with none + ByteBufferUtils.copyToStream(out, flag); + ByteBufferUtils.putCompressedInt(out, keyLength); + ByteBufferUtils.putCompressedInt(out, valueLength); + ByteBufferUtils.putCompressedInt(out, 0); + + currentState.readKey(in, keyLength, valueLength); + + ByteBufferUtils.copyToStream(out, in, keyLength + valueLength); + } else { + // find a common prefix and skip it + int commonPrefix = ByteBufferUtils.findCommonPrefix(in, in.position(), + previousState.prevOffset + KeyValue.ROW_OFFSET, + keyLength - KeyValue.TIMESTAMP_TYPE_SIZE); + + currentState.readKey(in, keyLength, valueLength, + commonPrefix, previousState); + + if (keyLength == previousState.keyLength) { + flag |= FLAG_SAME_KEY_LENGTH; + } + if (valueLength == previousState.valueLength) { + flag |= FLAG_SAME_VALUE_LENGTH; + } + if (currentState.type == previousState.type) { + flag |= FLAG_SAME_TYPE; + } + + int prefixTimestamp = findCommonTimestampPrefix( + currentState, previousState); + flag |= (prefixTimestamp) << SHIFT_TIMESTAMP_LENGTH; + + if (ByteBufferUtils.arePartsEqual(in, + previousState.prevOffset + previousState.keyLength + KeyValue.ROW_OFFSET, + previousState.valueLength, valueOffset, valueLength)) { + flag |= FLAG_SAME_VALUE; + } + + ByteBufferUtils.copyToStream(out, flag); + if ((flag & FLAG_SAME_KEY_LENGTH) == 0) { + ByteBufferUtils.putCompressedInt(out, keyLength); + } + if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) { + ByteBufferUtils.putCompressedInt(out, valueLength); + } + ByteBufferUtils.putCompressedInt(out, commonPrefix); + + ByteBufferUtils.skip(in, commonPrefix); + if (commonPrefix < currentState.rowLength + KeyValue.ROW_LENGTH_SIZE) { + ByteBufferUtils.copyToStream(out, in, + currentState.rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix); + ByteBufferUtils.skip(in, currentState.familyLength + + KeyValue.FAMILY_LENGTH_SIZE); + ByteBufferUtils.copyToStream(out, in, currentState.qualifierLength); + } else { + int restKeyLength = keyLength - commonPrefix - + KeyValue.TIMESTAMP_TYPE_SIZE; + ByteBufferUtils.copyToStream(out, in, restKeyLength); + } + ByteBufferUtils.skip(in, prefixTimestamp); + ByteBufferUtils.copyToStream(out, in, + KeyValue.TIMESTAMP_SIZE - prefixTimestamp); + + if ((flag & FLAG_SAME_TYPE) == 0) { + valueOffset -= KeyValue.TYPE_SIZE; + valueLength += KeyValue.TYPE_SIZE; + } + + ByteBufferUtils.skip(in, KeyValue.TYPE_SIZE + currentState.valueLength); + + if ((flag & FLAG_SAME_VALUE) == 0 ) { + ByteBufferUtils.copyToStream(out, in, valueOffset, valueLength); + } else { + if ((flag & FLAG_SAME_TYPE) == 0) { + ByteBufferUtils.copyToStream(out, currentState.type); + } + } + } + } + + private int findCommonTimestampPrefix(FastDiffCompressionState left, + FastDiffCompressionState right) { + int prefixTimestamp = 0; + while (prefixTimestamp < (KeyValue.TIMESTAMP_SIZE - 1) && + left.timestamp[prefixTimestamp] + == right.timestamp[prefixTimestamp]) { + prefixTimestamp++; + } + return prefixTimestamp; // has to be at most 7 bytes + } + + private void uncompressSingleKeyValue(DataInputStream source, + ByteBuffer buffer, FastDiffCompressionState state) + throws IOException, EncoderBufferTooSmallException { + byte flag = source.readByte(); + int prevKeyLength = state.keyLength; + + if ((flag & FLAG_SAME_KEY_LENGTH) == 0) { + state.keyLength = ByteBufferUtils.readCompressedInt(source); + } + if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) { + state.valueLength = ByteBufferUtils.readCompressedInt(source); + } + int commonLength = ByteBufferUtils.readCompressedInt(source); + + ByteBufferUtils.ensureSpace(buffer, state.keyLength + state.valueLength + + KeyValue.ROW_OFFSET); + + int kvPos = buffer.position(); + + if (!state.isFirst()) { + // copy the prefix + int common; + int prevOffset; + + if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) { + buffer.putInt(state.keyLength); + buffer.putInt(state.valueLength); + prevOffset = state.prevOffset + KeyValue.ROW_OFFSET; + common = commonLength; + } else { + if ((flag & FLAG_SAME_KEY_LENGTH) != 0) { + prevOffset = state.prevOffset; + common = commonLength + KeyValue.ROW_OFFSET; + } else { + buffer.putInt(state.keyLength); + prevOffset = state.prevOffset + KeyValue.KEY_LENGTH_SIZE; + common = commonLength + KeyValue.KEY_LENGTH_SIZE; + } + } + + ByteBufferUtils.copyFromBuffer(buffer, buffer, prevOffset, common); + + // copy the rest of the key from the buffer + int keyRestLength; + if (commonLength < state.rowLength + KeyValue.ROW_LENGTH_SIZE) { + // omit the family part of the key, it is always the same + int rowWithSizeLength; + int rowRestLength; + + // check length of row + if (commonLength < KeyValue.ROW_LENGTH_SIZE) { + // not yet copied, do it now + ByteBufferUtils.copyFromStream(source, buffer, + KeyValue.ROW_LENGTH_SIZE - commonLength); + + rowWithSizeLength = buffer.getShort(buffer.position() - + KeyValue.ROW_LENGTH_SIZE) + KeyValue.ROW_LENGTH_SIZE; + rowRestLength = rowWithSizeLength - KeyValue.ROW_LENGTH_SIZE; + } else { + // already in kvBuffer, just read it + rowWithSizeLength = buffer.getShort(kvPos + KeyValue.ROW_OFFSET) + + KeyValue.ROW_LENGTH_SIZE; + rowRestLength = rowWithSizeLength - commonLength; + } + + // copy the rest of row + ByteBufferUtils.copyFromStream(source, buffer, rowRestLength); + + // copy the column family + ByteBufferUtils.copyFromBuffer(buffer, buffer, + state.prevOffset + KeyValue.ROW_OFFSET + KeyValue.ROW_LENGTH_SIZE + + state.rowLength, state.familyLength + + KeyValue.FAMILY_LENGTH_SIZE); + state.rowLength = (short) (rowWithSizeLength - + KeyValue.ROW_LENGTH_SIZE); + + keyRestLength = state.keyLength - rowWithSizeLength - + state.familyLength - + (KeyValue.FAMILY_LENGTH_SIZE + KeyValue.TIMESTAMP_TYPE_SIZE); + } else { + // prevRowWithSizeLength is the same as on previous row + keyRestLength = state.keyLength - commonLength - + KeyValue.TIMESTAMP_TYPE_SIZE; + } + // copy the rest of the key, after column family == column qualifier + ByteBufferUtils.copyFromStream(source, buffer, keyRestLength); + + // copy timestamp + int prefixTimestamp = + (flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH; + ByteBufferUtils.copyFromBuffer(buffer, buffer, state.prevTimestampOffset, + prefixTimestamp); + state.prevTimestampOffset = buffer.position() - prefixTimestamp; + ByteBufferUtils.copyFromStream(source, buffer, KeyValue.TIMESTAMP_SIZE + - prefixTimestamp); + + // copy the type and value + if ((flag & FLAG_SAME_TYPE) != 0) { + buffer.put(state.type); + if ((flag & FLAG_SAME_VALUE) != 0) { + ByteBufferUtils.copyFromBuffer(buffer, buffer, state.prevOffset + + KeyValue.ROW_OFFSET + prevKeyLength, state.valueLength); + } else { + ByteBufferUtils.copyFromStream(source, buffer, state.valueLength); + } + } else { + if ((flag & FLAG_SAME_VALUE) != 0) { + ByteBufferUtils.copyFromStream(source, buffer, KeyValue.TYPE_SIZE); + ByteBufferUtils.copyFromBuffer(buffer, buffer, state.prevOffset + + KeyValue.ROW_OFFSET + prevKeyLength, state.valueLength); + } else { + ByteBufferUtils.copyFromStream(source, buffer, + state.valueLength + KeyValue.TYPE_SIZE); + } + state.type = buffer.get(state.prevTimestampOffset + + KeyValue.TIMESTAMP_SIZE); + } + } else { // is first element + buffer.putInt(state.keyLength); + buffer.putInt(state.valueLength); + + state.prevTimestampOffset = buffer.position() + state.keyLength - + KeyValue.TIMESTAMP_TYPE_SIZE; + ByteBufferUtils.copyFromStream(source, buffer, state.keyLength + + state.valueLength); + state.rowLength = buffer.getShort(kvPos + KeyValue.ROW_OFFSET); + state.familyLength = buffer.get(kvPos + KeyValue.ROW_OFFSET + + KeyValue.ROW_LENGTH_SIZE + state.rowLength); + state.type = buffer.get(state.prevTimestampOffset + + KeyValue.TIMESTAMP_SIZE); + } + + state.prevOffset = kvPos; + } + + @Override + public void compressKeyValues(DataOutputStream out, + ByteBuffer in, boolean includesMemstoreTS) throws IOException { + in.rewind(); + ByteBufferUtils.putInt(out, in.limit()); + FastDiffCompressionState previousState = new FastDiffCompressionState(); + FastDiffCompressionState currentState = new FastDiffCompressionState(); + while (in.hasRemaining()) { + compressSingleKeyValue(previousState, currentState, + out, in); + afterEncodingKeyValue(in, out, includesMemstoreTS); + + // swap previousState <-> currentState + FastDiffCompressionState tmp = previousState; + previousState = currentState; + currentState = tmp; + } + } + + @Override + public ByteBuffer uncompressKeyValues(DataInputStream source, + int allocHeaderLength, int skipLastBytes, boolean includesMemstoreTS) + throws IOException { + int decompressedSize = source.readInt(); + ByteBuffer buffer = ByteBuffer.allocate(decompressedSize + + allocHeaderLength); + buffer.position(allocHeaderLength); + FastDiffCompressionState state = new FastDiffCompressionState(); + while (source.available() > skipLastBytes) { + uncompressSingleKeyValue(source, buffer, state); + afterDecodingKeyValue(source, buffer, includesMemstoreTS); + } + + if (source.available() != skipLastBytes) { + throw new IllegalStateException("Read too much bytes."); + } + + return buffer; + } + + @Override + public ByteBuffer getFirstKeyInBlock(ByteBuffer block) { + block.mark(); + block.position(Bytes.SIZEOF_INT + Bytes.SIZEOF_BYTE); + int keyLength = ByteBufferUtils.readCompressedInt(block); + ByteBufferUtils.readCompressedInt(block); // valueLength + ByteBufferUtils.readCompressedInt(block); // commonLength + int pos = block.position(); + block.reset(); + return ByteBuffer.wrap(block.array(), pos, keyLength).slice(); + } + + @Override + public String toString() { + return FastDiffDeltaEncoder.class.getSimpleName(); + } + + @Override + public EncodedSeeker createSeeker(RawComparator comparator, + final boolean includesMemstoreTS) { + return new BufferedEncodedSeeker(comparator) { + private byte[] prevTimestampAndType = new byte[ + Bytes.SIZEOF_LONG + Bytes.SIZEOF_BYTE]; + private int rowLengthWithSize; + private int columnFamilyLengthWithSize; + + private void decode(boolean isFirst) { + byte flag = currentBuffer.get(); + if ((flag & FLAG_SAME_KEY_LENGTH) == 0) { + if (!isFirst) { + System.arraycopy(current.keyBuffer, + current.keyLength - prevTimestampAndType.length, + prevTimestampAndType, 0, + prevTimestampAndType.length); + } + current.keyLength = ByteBufferUtils.readCompressedInt(currentBuffer); + } + if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) { + current.valueLength = + ByteBufferUtils.readCompressedInt(currentBuffer); + } + current.lastCommonPrefix = + ByteBufferUtils.readCompressedInt(currentBuffer); + + current.ensureSpaceForKey(); + + if (isFirst) { + // copy everything + currentBuffer.get(current.keyBuffer, current.lastCommonPrefix, + current.keyLength - prevTimestampAndType.length); + rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) + + Bytes.SIZEOF_SHORT; + columnFamilyLengthWithSize = current.keyBuffer[rowLengthWithSize] + + Bytes.SIZEOF_BYTE; + } else if (current.lastCommonPrefix < Bytes.SIZEOF_SHORT) { + // length of row is different, copy everything except family + + // copy the row size + int oldRowLengthWithSize = rowLengthWithSize; + currentBuffer.get(current.keyBuffer, current.lastCommonPrefix, + Bytes.SIZEOF_SHORT - current.lastCommonPrefix); + rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) + + Bytes.SIZEOF_SHORT; + + // move the column family + System.arraycopy(current.keyBuffer, oldRowLengthWithSize, + current.keyBuffer, rowLengthWithSize, + columnFamilyLengthWithSize); + + // copy the rest of row + currentBuffer.get(current.keyBuffer, Bytes.SIZEOF_SHORT, + rowLengthWithSize - Bytes.SIZEOF_SHORT); + + // copy the qualifier + currentBuffer.get(current.keyBuffer, + rowLengthWithSize + columnFamilyLengthWithSize, + current.keyLength - rowLengthWithSize - + columnFamilyLengthWithSize - prevTimestampAndType.length); + } else if (current.lastCommonPrefix < rowLengthWithSize) { + // we have to copy part of row and qualifier, + // but column family is in right place + + // before column family (rest of row) + currentBuffer.get(current.keyBuffer, current.lastCommonPrefix, + rowLengthWithSize - current.lastCommonPrefix); + + // after column family (qualifier) + currentBuffer.get(current.keyBuffer, + rowLengthWithSize + columnFamilyLengthWithSize, + current.keyLength - rowLengthWithSize - + columnFamilyLengthWithSize - prevTimestampAndType.length); + } else { + // copy just the ending + currentBuffer.get(current.keyBuffer, current.lastCommonPrefix, + current.keyLength - prevTimestampAndType.length - + current.lastCommonPrefix); + } + + // timestamp + int pos = current.keyLength - prevTimestampAndType.length; + int commonTimestampPrefix = (flag & MASK_TIMESTAMP_LENGTH) >>> + SHIFT_TIMESTAMP_LENGTH; + if ((flag & FLAG_SAME_KEY_LENGTH) == 0) { + System.arraycopy(prevTimestampAndType, 0, current.keyBuffer, + pos, commonTimestampPrefix); + } + pos += commonTimestampPrefix; + currentBuffer.get(current.keyBuffer, pos, + Bytes.SIZEOF_LONG - commonTimestampPrefix); + pos += Bytes.SIZEOF_LONG - commonTimestampPrefix; + + // type + if ((flag & FLAG_SAME_TYPE) == 0) { + currentBuffer.get(current.keyBuffer, pos, Bytes.SIZEOF_BYTE); + } else if ((flag & FLAG_SAME_KEY_LENGTH) == 0) { + current.keyBuffer[pos] = prevTimestampAndType[Bytes.SIZEOF_LONG]; + } + + // handle value + if ((flag & FLAG_SAME_VALUE) == 0) { + current.valueOffset = currentBuffer.position(); + ByteBufferUtils.skip(currentBuffer, current.valueLength); + } + + if (includesMemstoreTS) { + current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer); + } else { + current.memstoreTS = 0; + } + current.nextKvOffset = currentBuffer.position(); + } + + @Override + protected void decodeFirst() { + ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT); + decode(true); + } + + @Override + protected void decodeNext() { + decode(false); + } + }; + } +} Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java?rev=1223020&view=auto ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java (added) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java Sat Dec 24 21:20:39 2011 @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.encoding; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.RawComparator; + +/** + * Compress key by storing size of common prefix with previous KeyValue + * and storing raw size of rest. + * + * Format: + * 1-5 bytes: compressed key length minus prefix (7-bit encoding) + * 1-5 bytes: compressed value length (7-bit encoding) + * 1-3 bytes: compressed length of common key prefix + * ... bytes: rest of key (including timestamp) + * ... bytes: value + * + * In a worst case compressed KeyValue will be three bytes longer than original. + * + */ +public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder { + + private int addKv(int offset, DataOutputStream out, + ByteBuffer in, int prevKeyLength) throws IOException { + int keyLength = in.getInt(); + int valueLength = in.getInt(); + + if (offset == -1) { + // copy the key, there is no common prefix with none + ByteBufferUtils.putCompressedInt(out, keyLength); + ByteBufferUtils.putCompressedInt(out, valueLength); + ByteBufferUtils.putCompressedInt(out, 0); + ByteBufferUtils.copyToStream(out, in, keyLength + valueLength); + } else { + // find a common prefix and skip it + int common = ByteBufferUtils.findCommonPrefix( + in, offset + KeyValue.ROW_OFFSET, + in.position(), + Math.min(prevKeyLength, keyLength)); + + ByteBufferUtils.putCompressedInt(out, keyLength - common); + ByteBufferUtils.putCompressedInt(out, valueLength); + ByteBufferUtils.putCompressedInt(out, common); + + ByteBufferUtils.skip(in, common); + ByteBufferUtils.copyToStream(out, in, keyLength - common + valueLength); + } + + return keyLength; + } + + @Override + public void compressKeyValues(DataOutputStream writeHere, + ByteBuffer in, boolean includesMemstoreTS) throws IOException { + in.rewind(); + ByteBufferUtils.putInt(writeHere, in.limit()); + int prevOffset = -1; + int offset = 0; + int keyLength = 0; + while (in.hasRemaining()) { + offset = in.position(); + keyLength = addKv(prevOffset, writeHere, in, keyLength); + afterEncodingKeyValue(in, writeHere, includesMemstoreTS); + prevOffset = offset; + } + } + + @Override + public ByteBuffer uncompressKeyValues(DataInputStream source, + int allocHeaderLength, int skipLastBytes, boolean includesMemstoreTS) + throws IOException { + int decompressedSize = source.readInt(); + ByteBuffer buffer = ByteBuffer.allocate(decompressedSize + + allocHeaderLength); + buffer.position(allocHeaderLength); + int prevKeyOffset = 0; + + while (source.available() > skipLastBytes) { + prevKeyOffset = uncompressKeyValue(source, buffer, prevKeyOffset); + afterDecodingKeyValue(source, buffer, includesMemstoreTS); + } + + if (source.available() != skipLastBytes) { + throw new IllegalStateException("Read too many bytes."); + } + + buffer.limit(buffer.position()); + return buffer; + } + + private int uncompressKeyValue(DataInputStream source, ByteBuffer buffer, + int prevKeyOffset) + throws IOException, EncoderBufferTooSmallException { + int keyLength = ByteBufferUtils.readCompressedInt(source); + int valueLength = ByteBufferUtils.readCompressedInt(source); + int commonLength = ByteBufferUtils.readCompressedInt(source); + int keyOffset; + keyLength += commonLength; + + ByteBufferUtils.ensureSpace(buffer, keyLength + valueLength + + KeyValue.ROW_OFFSET); + + buffer.putInt(keyLength); + buffer.putInt(valueLength); + + // copy the prefix + if (commonLength > 0) { + keyOffset = buffer.position(); + ByteBufferUtils.copyFromBuffer(buffer, buffer, prevKeyOffset, + commonLength); + } else { + keyOffset = buffer.position(); + } + + // copy rest of the key and value + int len = keyLength - commonLength + valueLength; + ByteBufferUtils.copyFromStream(source, buffer, len); + return keyOffset; + } + + @Override + public ByteBuffer getFirstKeyInBlock(ByteBuffer block) { + block.mark(); + block.position(Bytes.SIZEOF_INT); + int keyLength = ByteBufferUtils.readCompressedInt(block); + ByteBufferUtils.readCompressedInt(block); + int commonLength = ByteBufferUtils.readCompressedInt(block); + if (commonLength != 0) { + throw new AssertionError("Nonzero common length in the first key in " + + "block: " + commonLength); + } + int pos = block.position(); + block.reset(); + return ByteBuffer.wrap(block.array(), pos, keyLength).slice(); + } + + @Override + public String toString() { + return PrefixKeyDeltaEncoder.class.getSimpleName(); + } + + @Override + public EncodedSeeker createSeeker(RawComparator comparator, + final boolean includesMemstoreTS) { + return new BufferedEncodedSeeker(comparator) { + @Override + protected void decodeNext() { + current.keyLength = ByteBufferUtils.readCompressedInt(currentBuffer); + current.valueLength = ByteBufferUtils.readCompressedInt(currentBuffer); + current.lastCommonPrefix = + ByteBufferUtils.readCompressedInt(currentBuffer); + current.keyLength += current.lastCommonPrefix; + current.ensureSpaceForKey(); + currentBuffer.get(current.keyBuffer, current.lastCommonPrefix, + current.keyLength - current.lastCommonPrefix); + current.valueOffset = currentBuffer.position(); + ByteBufferUtils.skip(currentBuffer, current.valueLength); + if (includesMemstoreTS) { + current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer); + } else { + current.memstoreTS = 0; + } + current.nextKvOffset = currentBuffer.position(); + } + + @Override + protected void decodeFirst() { + ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT); + decodeNext(); + } + }; + } +} Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java?rev=1223020&r1=1223019&r2=1223020&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java Sat Dec 24 21:20:39 2011 @@ -1,4 +1,4 @@ -/* + /* * Copyright 2011 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one @@ -27,7 +27,6 @@ import org.apache.hadoop.fs.FSDataInputS import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; -import org.apache.hadoop.hbase.io.hfile.HFile.Reader; import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured; import org.apache.hadoop.io.RawComparator; @@ -60,6 +59,12 @@ public abstract class AbstractHFileReade /** Filled when we read in the trailer. */ protected final Compression.Algorithm compressAlgo; + /** + * What kind of data block encoding should be used while reading, writing and + * handling cache. + */ + protected final HFileDataBlockEncoder blockEncoder; + /** Last key in the file. Filled in when we read in the file info */ protected byte [] lastKey = null; @@ -93,7 +98,8 @@ public abstract class AbstractHFileReade protected AbstractHFileReader(Path path, FixedFileTrailer trailer, final FSDataInputStream fsdis, final long fileSize, final boolean closeIStream, - final CacheConfig cacheConf) { + final CacheConfig cacheConf, + final HFileDataBlockEncoder dataBlockEncoder) { super(null, path); this.trailer = trailer; this.compressAlgo = trailer.getCompressionCodec(); @@ -101,6 +107,8 @@ public abstract class AbstractHFileReade this.fileSize = fileSize; this.istream = fsdis; this.closeIStream = closeIStream; + this.blockEncoder = dataBlockEncoder != null + ? dataBlockEncoder : new NoOpDataBlockEncoder(); this.path = path; this.name = path.getName(); } @@ -275,8 +283,11 @@ public abstract class AbstractHFileReade protected int blockFetches; - public Scanner(final boolean cacheBlocks, + protected final HFile.Reader reader; + + public Scanner(final HFile.Reader reader, final boolean cacheBlocks, final boolean pread, final boolean isCompaction) { + this.reader = reader; this.cacheBlocks = cacheBlocks; this.pread = pread; this.isCompaction = isCompaction; @@ -296,6 +307,26 @@ public abstract class AbstractHFileReade if (!isSeeked()) throw new NotSeekedException(); } + + @Override + public int seekTo(byte[] key) throws IOException { + return seekTo(key, 0, key.length); + } + + @Override + public boolean seekBefore(byte[] key) throws IOException { + return seekBefore(key, 0, key.length); + } + + @Override + public int reseekTo(byte[] key) throws IOException { + return reseekTo(key, 0, key.length); + } + + @Override + public HFile.Reader getReader() { + return reader; + } } /** For testing */ @@ -306,5 +337,4 @@ public abstract class AbstractHFileReade public Path getPath() { return path; } - } Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java?rev=1223020&r1=1223019&r2=1223020&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java Sat Dec 24 21:20:39 2011 @@ -83,6 +83,9 @@ public abstract class AbstractHFileWrite /** The compression algorithm used. NONE if no compression. */ protected final Compression.Algorithm compressAlgo; + + /** The data block encoding which will be used. NONE if there is no encoding */ + protected final HFileDataBlockEncoder blockEncoder; /** First key in a block. */ protected byte[] firstKeyInBlock = null; @@ -102,7 +105,9 @@ public abstract class AbstractHFileWrite public AbstractHFileWriter(CacheConfig cacheConf, FSDataOutputStream outputStream, Path path, int blockSize, - Compression.Algorithm compressAlgo, KeyComparator comparator) { + Compression.Algorithm compressAlgo, + HFileDataBlockEncoder dataBlockEncoder, + KeyComparator comparator) { super(null, path); this.outputStream = outputStream; this.path = path; @@ -110,6 +115,8 @@ public abstract class AbstractHFileWrite this.blockSize = blockSize; this.compressAlgo = compressAlgo == null ? HFile.DEFAULT_COMPRESSION_ALGORITHM : compressAlgo; + this.blockEncoder = dataBlockEncoder != null + ? dataBlockEncoder : new NoOpDataBlockEncoder(); this.comparator = comparator != null ? comparator : Bytes.BYTES_RAWCOMPARATOR; Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java?rev=1223020&r1=1223019&r2=1223020&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java Sat Dec 24 21:20:39 2011 @@ -39,6 +39,14 @@ public enum BlockType { /** Data block, both versions */ DATA("DATABLK*", BlockCategory.DATA), + /** An encoded data block (e.g. with prefix compression), version 2 */ + ENCODED_DATA("DATABLKE", BlockCategory.DATA) { + @Override + public int getId() { + return DATA.ordinal(); + } + }, + /** Version 2 leaf index block. Appears in the data block section */ LEAF_INDEX("IDXLEAF2", BlockCategory.INDEX), @@ -102,6 +110,15 @@ public enum BlockType { this.metricCat = metricCat; assert magic.length == MAGIC_LENGTH; } + + /** + * Use it instead of oridinal(). It works exactly the same, + * except DATA and ENCODED_DATA got same id. + * @return id between 0 and N + */ + public int getId() { + return ordinal(); + } public void writeToStream(OutputStream out) throws IOException { out.write(magic); Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java?rev=1223020&r1=1223019&r2=1223020&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java Sat Dec 24 21:20:39 2011 @@ -243,6 +243,7 @@ public class HFile { public abstract Writer createWriter(FileSystem fs, Path path, int blockSize, Compression.Algorithm compress, + HFileDataBlockEncoder dataBlockEncoder, final KeyComparator comparator) throws IOException; public abstract Writer createWriter(FileSystem fs, Path path, @@ -371,32 +372,47 @@ public class HFile { } private static Reader pickReaderVersion(Path path, FSDataInputStream fsdis, - long size, boolean closeIStream, CacheConfig cacheConf) + long size, boolean closeIStream, CacheConfig cacheConf, + HFileDataBlockEncoder dataBlockEncoder) throws IOException { FixedFileTrailer trailer = FixedFileTrailer.readFromStream(fsdis, size); switch (trailer.getVersion()) { case 1: return new HFileReaderV1(path, trailer, fsdis, size, closeIStream, - cacheConf); + cacheConf, dataBlockEncoder); case 2: return new HFileReaderV2(path, trailer, fsdis, size, closeIStream, - cacheConf); + cacheConf, dataBlockEncoder); default: throw new IOException("Cannot instantiate reader for HFile version " + trailer.getVersion()); } } + public static Reader createReader( + FileSystem fs, Path path, CacheConfig cacheConf) throws IOException { + return createReader(fs, path, cacheConf, + new NoOpDataBlockEncoder()); + } + + public static Reader createReader(Path path, FSDataInputStream fsdis, + long size, CacheConfig cacheConf) throws IOException { + return createReader(path, fsdis, size, cacheConf, + new NoOpDataBlockEncoder()); + } + public static Reader createReader(FileSystem fs, Path path, - CacheConfig cacheConf) throws IOException { + CacheConfig cacheConf, HFileDataBlockEncoder dataBlockEncoder) + throws IOException { return pickReaderVersion(path, fs.open(path), - fs.getFileStatus(path).getLen(), true, cacheConf); + fs.getFileStatus(path).getLen(), true, cacheConf, dataBlockEncoder); } public static Reader createReader(Path path, FSDataInputStream fsdis, - long size, CacheConfig cacheConf) - throws IOException { - return pickReaderVersion(path, fsdis, size, false, cacheConf); + long size, CacheConfig cacheConf, + HFileDataBlockEncoder dataBlockEncoder) throws IOException { + return pickReaderVersion(path, fsdis, size, false, cacheConf, + dataBlockEncoder); } /*