hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject svn commit: r1153645 [2/3] - in /hbase/trunk/src/main/java/org/apache/hadoop/hbase: io/ io/hfile/ util/
Date Wed, 03 Aug 2011 20:25:30 GMT
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java?rev=1153645&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java Wed Aug  3 20:25:28 2011
@@ -0,0 +1,1299 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.CompoundBloomFilterWriter;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Provides functionality to write ({@link BlockIndexWriter}) and read
+ * ({@link BlockIndexReader}) single-level and multi-level block indexes.
+ *
+ * Examples of how to use the block index writer can be found in
+ * {@link CompoundBloomFilterWriter} and {@link HFileWriterV2}. Examples of how
+ * to use the reader can be found in {@link HFileReaderV2} and
+ * {@link TestHFileBlockIndex}.
+ */
+public class HFileBlockIndex {
+
+  private static final Log LOG = LogFactory.getLog(HFileBlockIndex.class);
+
+  static final int DEFAULT_MAX_CHUNK_SIZE = 128 * 1024;
+
+  /**
+   * The maximum size guideline for index blocks (both leaf, intermediate, and
+   * root). If not specified, {@link #DEFAULT_MAX_CHUNK_SIZE} is used.
+   */
+  public static final String MAX_CHUNK_SIZE_KEY = "hfile.index.block.max.size";
+
+  /**
+   * The number of bytes stored in each "secondary index" entry in addition to
+   * key bytes in the non-root index block format. The first long is the file
+   * offset of the deeper-level block the entry points to, and the int that
+   * follows is that block's on-disk size without including header.
+   */
+  static final int SECONDARY_INDEX_ENTRY_OVERHEAD = Bytes.SIZEOF_INT
+      + Bytes.SIZEOF_LONG;
+
+  /**
+   * Error message when trying to use inline block API in single-level mode.
+   */
+  private static final String INLINE_BLOCKS_NOT_ALLOWED =
+      "Inline blocks are not allowed in the single-level-only mode";
+
+  /**
+   * Configuration key to cache leaf- and intermediate-level index blocks on
+   * write.
+   */
+  public static final String CACHE_INDEX_BLOCKS_ON_WRITE_KEY =
+      "hfile.block.index.cacheonwrite";
+
+  /**
+   * The size of a meta-data record used for finding the mid-key in a
+   * multi-level index. Consists of the middle leaf-level index block offset
+   * (long), its on-disk size without header included (int), and the mid-key
+   * entry's zero-based index in that leaf index block.
+   */
+  private static final int MID_KEY_METADATA_SIZE = Bytes.SIZEOF_LONG +
+      2 * Bytes.SIZEOF_INT;
+
+  /**
+   * The reader will always hold the root level index in the memory. Index
+   * blocks at all other levels will be cached in the LRU cache in practice,
+   * although this API does not enforce that.
+   *
+   * All non-root (leaf and intermediate) index blocks contain what we call a
+   * "secondary index": an array of offsets to the entries within the block.
+   * This allows us to do binary search for the entry corresponding to the
+   * given key without having to deserialize the block.
+   */
+  public static class BlockIndexReader implements HeapSize {
+    /** Needed doing lookup on blocks. */
+    private final RawComparator<byte[]> comparator;
+
+    // Root-level data.
+    private byte[][] blockKeys;
+    private long[] blockOffsets;
+    private int[] blockDataSizes;
+    private int rootByteSize = 0;
+    private int rootCount = 0;
+
+    // Mid-key metadata.
+    private long midLeafBlockOffset = -1;
+    private int midLeafBlockOnDiskSize = -1;
+    private int midKeyEntry = -1;
+
+    /** Pre-computed mid-key */
+    private AtomicReference<byte[]> midKey = new AtomicReference<byte[]>();
+
+    /**
+     * The number of levels in the block index tree. One if there is only root
+     * level, two for root and leaf levels, etc.
+     */
+    private int searchTreeLevel;
+
+    /** A way to read {@link HFile} blocks at a given offset */
+    private HFileBlock.BasicReader blockReader;
+
+    public BlockIndexReader(final RawComparator<byte[]> c, final int treeLevel,
+        final HFileBlock.BasicReader blockReader) {
+      this(c, treeLevel);
+      this.blockReader = blockReader;
+    }
+
+    public BlockIndexReader(final RawComparator<byte[]> c, final int treeLevel)
+    {
+      comparator = c;
+      searchTreeLevel = treeLevel;
+    }
+
+    /**
+     * @return true if the block index is empty.
+     */
+    public boolean isEmpty() {
+      return blockKeys.length == 0;
+    }
+
+    /**
+     * Verifies that the block index is non-empty and throws an
+     * {@link IllegalStateException} otherwise.
+     */
+    public void ensureNonEmpty() {
+      if (blockKeys.length == 0) {
+        throw new IllegalStateException("Block index is empty or not loaded");
+      }
+    }
+
+    /**
+     * Return the data block which contains this key. This function will only
+     * be called when the HFile version is larger than 1.
+     *
+     * @param key the key we are looking for
+     * @param keyOffset the offset of the key in its byte array
+     * @param keyLength the length of the key
+     * @param currentBlock the current block, to avoid re-reading the same
+     *          block
+     * @return reader a basic way to load blocks
+     * @throws IOException
+     */
+    public HFileBlock seekToDataBlock(final byte[] key, int keyOffset,
+        int keyLength, HFileBlock currentBlock)
+        throws IOException {
+      int rootLevelIndex = rootBlockContainingKey(key, keyOffset, keyLength);
+      if (rootLevelIndex < 0 || rootLevelIndex >= blockOffsets.length) {
+        return null;
+      }
+
+      // Read the next-level (intermediate or leaf) index block.
+      long currentOffset = blockOffsets[rootLevelIndex];
+      int currentOnDiskSize = blockDataSizes[rootLevelIndex];
+
+      int lookupLevel = 1; // How many levels deep we are in our lookup.
+      HFileBlock block = blockReader.readBlockData(currentOffset,
+          currentOnDiskSize, -1, true);
+      if (block == null) {
+        throw new IOException("Failed to read block at offset " +
+            currentOffset + ", onDiskSize=" + currentOnDiskSize);
+      }
+      while (!block.getBlockType().equals(BlockType.DATA)) {
+        // Read the block. It may be intermediate level block, leaf level block
+        // or data block. In any case, we expect non-root index block format.
+
+        // We don't allow more than searchTreeLevel iterations of this loop.
+        if (++lookupLevel > searchTreeLevel) {
+          throw new IOException("Search Tree Level overflow: lookupLevel: "+
+              lookupLevel + " searchTreeLevel: " + searchTreeLevel);
+        }
+
+        // read to the byte buffer
+        ByteBuffer buffer = block.getBufferWithoutHeader();
+        if (!locateNonRootIndexEntry(buffer, key, keyOffset, keyLength,
+            comparator)) {
+          throw new IOException("The key "
+              + Bytes.toStringBinary(key, keyOffset, keyLength)
+              + " is before the" + " first key of the non-root index block "
+              + block);
+        }
+
+        currentOffset = buffer.getLong();
+        currentOnDiskSize = buffer.getInt();
+
+        // Located a deeper-level block, now read it.
+        if (currentBlock != null && currentBlock.getOffset() == currentOffset)
+        {
+          // Avoid reading the same block.
+          block = currentBlock;
+        } else {
+          block = blockReader.readBlockData(currentOffset, currentOnDiskSize,
+              -1, true);
+        }
+      }
+
+      if (lookupLevel != searchTreeLevel) {
+        throw new IOException("Reached a data block at level " + lookupLevel +
+            " but the number of levels is " + searchTreeLevel);
+      }
+
+      return block;
+    }
+
+    /**
+     * An approximation to the {@link HFile}'s mid-key. Operates on block
+     * boundaries, and does not go inside blocks. In other words, returns the
+     * first key of the middle block of the file.
+     *
+     * @return the first key of the middle block
+     */
+    public byte[] midkey() throws IOException {
+      if (rootCount == 0)
+        throw new IOException("HFile empty");
+
+      byte[] midKey = this.midKey.get();
+      if (midKey != null)
+        return midKey;
+
+      if (midLeafBlockOffset >= 0) {
+        if (blockReader == null) {
+          throw new IOException("Have to read the middle leaf block but " +
+              "no block reader available");
+        }
+        HFileBlock midLeafBlock = blockReader.readBlockData(
+            midLeafBlockOffset, midLeafBlockOnDiskSize, -1, true);
+        ByteBuffer b = midLeafBlock.getBufferWithoutHeader();
+        int numDataBlocks = b.getInt();
+        int keyRelOffset = b.getInt(Bytes.SIZEOF_INT * (midKeyEntry + 1));
+        int keyLen = b.getInt(Bytes.SIZEOF_INT * (midKeyEntry + 2)) -
+            keyRelOffset;
+        int keyOffset = b.arrayOffset() +
+            Bytes.SIZEOF_INT * (numDataBlocks + 2) + keyRelOffset +
+            SECONDARY_INDEX_ENTRY_OVERHEAD;
+        midKey = Arrays.copyOfRange(b.array(), keyOffset, keyOffset + keyLen);
+      } else {
+        // The middle of the root-level index.
+        midKey = blockKeys[(rootCount - 1) / 2];
+      }
+
+      this.midKey.set(midKey);
+      return midKey;
+    }
+
+    /**
+     * @param i from 0 to {@link #getRootBlockCount() - 1}
+     */
+    public byte[] getRootBlockKey(int i) {
+      return blockKeys[i];
+    }
+
+    /**
+     * @param i from 0 to {@link #getRootBlockCount() - 1}
+     */
+    public long getRootBlockOffset(int i) {
+      return blockOffsets[i];
+    }
+
+    /**
+     * @param i zero-based index of a root-level block
+     * @return the on-disk size of the root-level block for version 2, or the
+     *         uncompressed size for version 1
+     */
+    public int getRootBlockDataSize(int i) {
+      return blockDataSizes[i];
+    }
+
+    /**
+     * @return the number of root-level blocks in this block index
+     */
+    public int getRootBlockCount() {
+      return rootCount;
+    }
+
+    /**
+     * Finds the root-level index block containing the given key.
+     *
+     * @param key
+     *          Key to find
+     * @return Offset of block containing <code>key</code> (between 0 and the
+     *         number of blocks - 1) or -1 if this file does not contain the
+     *         request.
+     */
+    public int rootBlockContainingKey(final byte[] key, int offset,
+        int length) {
+      int pos = Bytes.binarySearch(blockKeys, key, offset, length,
+          comparator);
+      // pos is between -(blockKeys.length + 1) to blockKeys.length - 1, see
+      // binarySearch's javadoc.
+
+      if (pos >= 0) {
+        // This means this is an exact match with an element of blockKeys.
+        assert pos < blockKeys.length;
+        return pos;
+      }
+
+      // Otherwise, pos = -(i + 1), where blockKeys[i - 1] < key < blockKeys[i],
+      // and i is in [0, blockKeys.length]. We are returning j = i - 1 such that
+      // blockKeys[j] <= key < blockKeys[j + 1]. In particular, j = -1 if
+      // key < blockKeys[0], meaning the file does not contain the given key.
+
+      int i = -pos - 1;
+      assert 0 <= i && i <= blockKeys.length;
+      return i - 1;
+    }
+
+    /**
+     * Adds a new entry in the root block index. Only used when reading.
+     *
+     * @param key Last key in the block
+     * @param offset file offset where the block is stored
+     * @param dataSize the uncompressed data size
+     */
+    private void add(final byte[] key, final long offset, final int dataSize) {
+      blockOffsets[rootCount] = offset;
+      blockKeys[rootCount] = key;
+      blockDataSizes[rootCount] = dataSize;
+
+      rootCount++;
+      rootByteSize += SECONDARY_INDEX_ENTRY_OVERHEAD + key.length;
+    }
+
+    /**
+     * Performs a binary search over a non-root level index block. Utilizes the
+     * secondary index, which records the offsets of (offset, onDiskSize,
+     * firstKey) tuples of all entries.
+     *
+     * @param key the key we are searching for offsets to individual entries in
+     *          the blockIndex buffer
+     * @param keyOffset the offset of the key in its byte array
+     * @param keyLength the length of the key
+     * @param nonRootIndex the non-root index block buffer, starting with the
+     *          secondary index. The position is ignored.
+     * @return the index i in [0, numEntries - 1] such that keys[i] <= key <
+     *         keys[i + 1], if keys is the array of all keys being searched, or
+     *         -1 otherwise
+     * @throws IOException
+     */
+    static int binarySearchNonRootIndex(byte[] key, int keyOffset,
+        int keyLength, ByteBuffer nonRootIndex,
+        RawComparator<byte[]> comparator) {
+
+      int numEntries = nonRootIndex.getInt(0);
+      int low = 0;
+      int high = numEntries - 1;
+      int mid = 0;
+
+      // Entries start after the number of entries and the secondary index.
+      // The secondary index takes numEntries + 1 ints.
+      int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
+
+      // If we imagine that keys[-1] = -Infinity and
+      // keys[numEntries] = Infinity, then we are maintaining an invariant that
+      // keys[low - 1] < key < keys[high + 1] while narrowing down the range.
+
+      while (low <= high) {
+        mid = (low + high) >>> 1;
+
+        // Midkey's offset relative to the end of secondary index
+        int midKeyRelOffset = nonRootIndex.getInt(
+            Bytes.SIZEOF_INT * (mid + 1));
+
+        // The offset of the middle key in the blockIndex buffer
+        int midKeyOffset = entriesOffset       // Skip secondary index
+            + midKeyRelOffset                  // Skip all entries until mid
+            + SECONDARY_INDEX_ENTRY_OVERHEAD;  // Skip offset and on-disk-size
+
+        // We subtract the two consecutive secondary index elements, which
+        // gives us the size of the whole (offset, onDiskSize, key) tuple. We
+        // then need to subtract the overhead of offset and onDiskSize.
+        int midLength = nonRootIndex.getInt(Bytes.SIZEOF_INT * (mid + 2)) -
+            midKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
+
+        // we have to compare in this order, because the comparator order
+        // has special logic when the 'left side' is a special key.
+        int cmp = comparator.compare(key, keyOffset, keyLength,
+            nonRootIndex.array(), nonRootIndex.arrayOffset() + midKeyOffset,
+            midLength);
+
+        // key lives above the midpoint
+        if (cmp > 0)
+          low = mid + 1; // Maintain the invariant that keys[low - 1] < key
+        // key lives below the midpoint
+        else if (cmp < 0)
+          high = mid - 1; // Maintain the invariant that key < keys[high + 1]
+        else
+          return mid; // exact match
+      }
+
+      // As per our invariant, keys[low - 1] < key < keys[high + 1], meaning
+      // that low - 1 < high + 1 and (low - high) <= 1. As per the loop break
+      // condition, low >= high + 1. Therefore, low = high + 1.
+
+      if (low != high + 1) {
+        throw new IllegalStateException("Binary search broken: low=" + low
+            + " " + "instead of " + (high + 1));
+      }
+
+      // OK, our invariant says that keys[low - 1] < key < keys[low]. We need to
+      // return i such that keys[i] <= key < keys[i + 1]. Therefore i = low - 1.
+      int i = low - 1;
+
+      // Some extra validation on the result.
+      if (i < -1 || i >= numEntries) {
+        throw new IllegalStateException("Binary search broken: result is " +
+            i + " but expected to be between -1 and (numEntries - 1) = " +
+            (numEntries - 1));
+      }
+
+      return i;
+    }
+
+    /**
+     * Search for one key using the secondary index in a non-root block. In case
+     * of success, positions the provided buffer at the entry of interest, where
+     * the file offset and the on-disk-size can be read.
+     *
+     * @param nonRootBlock a non-root block without header. Initial position
+     *          does not matter.
+     * @param key the byte array containing the key
+     * @param keyOffset the offset of the key in its byte array
+     * @param keyLength the length of the key
+     * @return true in the case the index entry containing the given key was
+     *         found, false in the case the given key is before the first key
+     *
+     */
+    static boolean locateNonRootIndexEntry(ByteBuffer nonRootBlock, byte[] key,
+        int keyOffset, int keyLength, RawComparator<byte[]> comparator) {
+      int entryIndex = binarySearchNonRootIndex(key, keyOffset, keyLength,
+          nonRootBlock, comparator);
+
+      if (entryIndex == -1) {
+        return false;
+      }
+
+      int numEntries = nonRootBlock.getInt(0);
+
+      // The end of secondary index and the beginning of entries themselves.
+      int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
+
+      // The offset of the entry we are interested in relative to the end of
+      // the secondary index.
+      int entryRelOffset = nonRootBlock.getInt(Bytes.SIZEOF_INT
+          * (1 + entryIndex));
+
+      nonRootBlock.position(entriesOffset + entryRelOffset);
+      return true;
+    }
+
+    /**
+     * Read in the root-level index from the given input stream. Must match
+     * what was written into the root level by
+     * {@link BlockIndexWriter#writeIndexBlocks(FSDataOutputStream)} at the
+     * offset that function returned.
+     *
+     * @param in the buffered input stream or wrapped byte input stream
+     * @param numEntries the number of root-level index entries
+     * @throws IOException
+     */
+    public void readRootIndex(DataInput in, final int numEntries)
+        throws IOException {
+      blockOffsets = new long[numEntries];
+      blockKeys = new byte[numEntries][];
+      blockDataSizes = new int[numEntries];
+
+      // If index size is zero, no index was written.
+      if (numEntries > 0) {
+        for (int i = 0; i < numEntries; ++i) {
+          long offset = in.readLong();
+          int dataSize = in.readInt();
+          byte[] key = Bytes.readByteArray(in);
+          add(key, offset, dataSize);
+        }
+      }
+    }
+
+    /**
+     * Read the root-level metadata of a multi-level block index. Based on
+     * {@link #readRootIndex(DataInput, int)}, but also reads metadata
+     * necessary to compute the mid-key in a multi-level index.
+     *
+     * @param in the buffered or byte input stream to read from
+     * @param numEntries the number of root-level index entries
+     * @throws IOException
+     */
+    public void readMultiLevelIndexRoot(DataInputStream in,
+        final int numEntries) throws IOException {
+      readRootIndex(in, numEntries);
+      if (in.available() < MID_KEY_METADATA_SIZE) {
+        // No mid-key metadata available.
+        return;
+      }
+
+      midLeafBlockOffset = in.readLong();
+      midLeafBlockOnDiskSize = in.readInt();
+      midKeyEntry = in.readInt();
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("size=" + rootCount).append("\n");
+      for (int i = 0; i < rootCount; i++) {
+        sb.append("key=").append(KeyValue.keyToString(blockKeys[i]))
+            .append("\n  offset=").append(blockOffsets[i])
+            .append(", dataSize=" + blockDataSizes[i]).append("\n");
+      }
+      return sb.toString();
+    }
+
+    @Override
+    public long heapSize() {
+      long heapSize = ClassSize.align(6 * ClassSize.REFERENCE +
+          3 * Bytes.SIZEOF_INT + ClassSize.OBJECT);
+
+      // Mid-key metadata.
+      heapSize += MID_KEY_METADATA_SIZE;
+
+      // Calculating the size of blockKeys
+      if (blockKeys != null) {
+        // Adding array + references overhead
+        heapSize += ClassSize.align(ClassSize.ARRAY + blockKeys.length
+            * ClassSize.REFERENCE);
+
+        // Adding bytes
+        for (byte[] key : blockKeys) {
+          heapSize += ClassSize.align(ClassSize.ARRAY + key.length);
+        }
+      }
+
+      if (blockOffsets != null) {
+        heapSize += ClassSize.align(ClassSize.ARRAY + blockOffsets.length
+            * Bytes.SIZEOF_LONG);
+      }
+
+      if (blockDataSizes != null) {
+        heapSize += ClassSize.align(ClassSize.ARRAY + blockDataSizes.length
+            * Bytes.SIZEOF_INT);
+      }
+
+      return ClassSize.align(heapSize);
+    }
+
+  }
+
+  /**
+   * Writes the block index into the output stream. Generate the tree from
+   * bottom up. The leaf level is written to disk as a sequence of inline
+   * blocks, if it is larger than a certain number of bytes. If the leaf level
+   * is not large enough, we write all entries to the root level instead.
+   *
+   * After all leaf blocks have been written, we end up with an index
+   * referencing the resulting leaf index blocks. If that index is larger than
+   * the allowed root index size, the writer will break it up into
+   * reasonable-size intermediate-level index block chunks write those chunks
+   * out, and create another index referencing those chunks. This will be
+   * repeated until the remaining index is small enough to become the root
+   * index. However, in most practical cases we will only have leaf-level
+   * blocks and the root index, or just the root index.
+   */
+  public static class BlockIndexWriter implements InlineBlockWriter {
+    /**
+     * While the index is being written, this represents the current block
+     * index referencing all leaf blocks, with one exception. If the file is
+     * being closed and there are not enough blocks to complete even a single
+     * leaf block, no leaf blocks get written and this contains the entire
+     * block index. After all levels of the index were written by
+     * {@link #writeIndexBlocks(FSDataOutputStream)}, this contains the final
+     * root-level index.
+     */
+    private BlockIndexChunk rootChunk = new BlockIndexChunk();
+
+    /**
+     * Current leaf-level chunk. New entries referencing data blocks get added
+     * to this chunk until it grows large enough to be written to disk.
+     */
+    private BlockIndexChunk curInlineChunk = new BlockIndexChunk();
+
+    /**
+     * The number of block index levels. This is one if there is only root
+     * level (even empty), two if there a leaf level and root level, and is
+     * higher if there are intermediate levels. This is only final after
+     * {@link #writeIndexBlocks(FSDataOutputStream)} has been called. The
+     * initial value accounts for the root level, and will be increased to two
+     * as soon as we find out there is a leaf-level in
+     * {@link #blockWritten(long, int)}.
+     */
+    private int numLevels = 1;
+
+    private HFileBlock.Writer blockWriter;
+    private byte[] firstKey = null;
+
+    /**
+     * The total number of leaf-level entries, i.e. entries referenced by
+     * leaf-level blocks. For the data block index this is equal to the number
+     * of data blocks.
+     */
+    private long totalNumEntries;
+
+    /** Total compressed size of all index blocks. */
+    private long totalBlockOnDiskSize;
+
+    /** Total uncompressed size of all index blocks. */
+    private long totalBlockUncompressedSize;
+
+    /** The maximum size guideline of all multi-level index blocks. */
+    private int maxChunkSize;
+
+    /** Whether we require this block index to always be single-level. */
+    private boolean singleLevelOnly;
+
+    /** Block cache, or null if cache-on-write is disabled */
+    private BlockCache blockCache;
+
+    /** Name to use for computing cache keys */
+    private String nameForCaching;
+
+    /** Creates a single-level block index writer */
+    public BlockIndexWriter() {
+      this(null, null, null);
+      singleLevelOnly = true;
+    }
+
+    /**
+     * Creates a multi-level block index writer.
+     *
+     * @param blockWriter the block writer to use to write index blocks
+     * @param blockCache if this is not null, index blocks will be cached
+     *    on write into this block cache.
+     */
+    public BlockIndexWriter(HFileBlock.Writer blockWriter,
+        BlockCache blockCache, String nameForCaching) {
+      if ((blockCache == null) != (nameForCaching == null)) {
+        throw new IllegalArgumentException("Block cache and file name for " +
+            "caching must be both specified or both null");
+      }
+
+      this.blockWriter = blockWriter;
+      this.blockCache = blockCache;
+      this.nameForCaching = nameForCaching;
+      this.maxChunkSize = HFileBlockIndex.DEFAULT_MAX_CHUNK_SIZE;
+    }
+
+    public void setMaxChunkSize(int maxChunkSize) {
+      if (maxChunkSize <= 0) {
+        throw new IllegalArgumentException("Invald maximum index block size");
+      }
+      this.maxChunkSize = maxChunkSize;
+    }
+
+    /**
+     * Writes the root level and intermediate levels of the block index into
+     * the output stream, generating the tree from bottom up. Assumes that the
+     * leaf level has been inline-written to the disk if there is enough data
+     * for more than one leaf block. We iterate by breaking the current level
+     * of the block index, starting with the index of all leaf-level blocks,
+     * into chunks small enough to be written to disk, and generate its parent
+     * level, until we end up with a level small enough to become the root
+     * level.
+     *
+     * If the leaf level is not large enough, there is no inline block index
+     * anymore, so we only write that level of block index to disk as the root
+     * level.
+     *
+     * @param out FSDataOutputStream
+     * @return position at which we entered the root-level index.
+     * @throws IOException
+     */
+    public long writeIndexBlocks(FSDataOutputStream out) throws IOException {
+      if (curInlineChunk.getNumEntries() != 0) {
+        throw new IOException("Trying to write a multi-level block index, " +
+            "but are " + curInlineChunk.getNumEntries() + " entries in the " +
+            "last inline chunk.");
+      }
+
+      // We need to get mid-key metadata before we create intermediate
+      // indexes and overwrite the root chunk.
+      byte[] midKeyMetadata = numLevels > 1 ? rootChunk.getMidKeyMetadata()
+          : null;
+
+      while (rootChunk.getRootSize() > maxChunkSize) {
+        rootChunk = writeIntermediateLevel(out, rootChunk);
+        numLevels += 1;
+      }
+
+      // write the root level
+      long rootLevelIndexPos = out.getPos();
+
+      {
+        DataOutput blockStream = blockWriter.startWriting(BlockType.ROOT_INDEX,
+            false);
+        rootChunk.writeRoot(blockStream);
+        if (midKeyMetadata != null)
+          blockStream.write(midKeyMetadata);
+        blockWriter.writeHeaderAndData(out);
+      }
+
+      // Add root index block size
+      totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
+      totalBlockUncompressedSize +=
+          blockWriter.getUncompressedSizeWithoutHeader();
+
+      LOG.info("Wrote a " + numLevels + "-level index with root level at pos "
+          + out.getPos() + ", " + rootChunk.getNumEntries()
+          + " root-level entries, " + totalNumEntries + " total entries, "
+          + totalBlockOnDiskSize + " bytes total on-disk size, "
+          + totalBlockUncompressedSize + " bytes total uncompressed size.");
+
+      return rootLevelIndexPos;
+    }
+
+    /**
+     * Writes the block index data as a single level only. Does not do any
+     * block framing.
+     *
+     * @param out the buffered output stream to write the index to. Typically a
+     *          stream writing into an {@link HFile} block.
+     * @param description a short description of the index being written. Used
+     *          in a log message.
+     * @throws IOException
+     */
+    public void writeSingleLevelIndex(DataOutput out, String description)
+        throws IOException {
+      expectNumLevels(1);
+
+      if (!singleLevelOnly)
+        throw new IOException("Single-level mode is turned off");
+
+      if (rootChunk.getNumEntries() > 0)
+        throw new IOException("Root-level entries already added in " +
+            "single-level mode");
+
+      rootChunk = curInlineChunk;
+      curInlineChunk = new BlockIndexChunk();
+
+      LOG.info("Wrote a single-level " + description + " index with "
+          + rootChunk.getNumEntries() + " entries, " + rootChunk.getRootSize()
+          + " bytes");
+      rootChunk.writeRoot(out);
+    }
+
+    /**
+     * Split the current level of the block index into intermediate index
+     * blocks of permitted size and write those blocks to disk. Return the next
+     * level of the block index referencing those intermediate-level blocks.
+     *
+     * @param out
+     * @param currentLevel the current level of the block index, such as the a
+     *          chunk referencing all leaf-level index blocks
+     * @return the parent level block index, which becomes the root index after
+     *         a few (usually zero) iterations
+     * @throws IOException
+     */
+    private BlockIndexChunk writeIntermediateLevel(FSDataOutputStream out,
+        BlockIndexChunk currentLevel) throws IOException {
+      // Entries referencing intermediate-level blocks we are about to create.
+      BlockIndexChunk parent = new BlockIndexChunk();
+
+      // The current intermediate-level block index chunk.
+      BlockIndexChunk curChunk = new BlockIndexChunk();
+
+      for (int i = 0; i < currentLevel.getNumEntries(); ++i) {
+        curChunk.add(currentLevel.getBlockKey(i),
+            currentLevel.getBlockOffset(i), currentLevel.getOnDiskDataSize(i));
+
+        if (curChunk.getRootSize() >= maxChunkSize)
+          writeIntermediateBlock(out, parent, curChunk);
+      }
+
+      if (curChunk.getNumEntries() > 0) {
+        writeIntermediateBlock(out, parent, curChunk);
+      }
+
+      return parent;
+    }
+
+    private void writeIntermediateBlock(FSDataOutputStream out,
+        BlockIndexChunk parent, BlockIndexChunk curChunk) throws IOException {
+      long beginOffset = out.getPos();
+      DataOutputStream dos = blockWriter.startWriting(
+          BlockType.INTERMEDIATE_INDEX, cacheOnWrite());
+      curChunk.writeNonRoot(dos);
+      byte[] curFirstKey = curChunk.getBlockKey(0);
+      blockWriter.writeHeaderAndData(out);
+
+      if (blockCache != null) {
+        blockCache.cacheBlock(HFile.getBlockCacheKey(nameForCaching,
+            beginOffset), blockWriter.getBlockForCaching());
+      }
+
+      // Add intermediate index block size
+      totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
+      totalBlockUncompressedSize +=
+          blockWriter.getUncompressedSizeWithoutHeader();
+
+      // OFFSET is the beginning offset the chunk of block index entries.
+      // SIZE is the total byte size of the chunk of block index entries
+      // + the secondary index size
+      // FIRST_KEY is the first key in the chunk of block index
+      // entries.
+      parent.add(curFirstKey, beginOffset,
+          blockWriter.getOnDiskSizeWithHeader());
+
+      // clear current block index chunk
+      curChunk.clear();
+      curFirstKey = null;
+    }
+
+    /**
+     * @return how many block index entries there are in the root level
+     */
+    public final int getNumRootEntries() {
+      return rootChunk.getNumEntries();
+    }
+
+    /**
+     * @return the number of levels in this block index.
+     */
+    public int getNumLevels() {
+      return numLevels;
+    }
+
+    private void expectNumLevels(int expectedNumLevels) {
+      if (numLevels != expectedNumLevels) {
+        throw new IllegalStateException("Number of block index levels is "
+            + numLevels + "but is expected to be " + expectedNumLevels);
+      }
+    }
+
+    /**
+     * Whether there is an inline block ready to be written. In general, we
+     * write an leaf-level index block as an inline block as soon as its size
+     * as serialized in the non-root format reaches a certain threshold.
+     */
+    @Override
+    public boolean shouldWriteBlock(boolean closing) {
+      if (singleLevelOnly)
+        throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
+
+      if (curInlineChunk.getNumEntries() == 0)
+        return false;
+
+      // We do have some entries in the current inline chunk.
+      if (closing) {
+        if (rootChunk.getNumEntries() == 0) {
+          // We did not add any leaf-level blocks yet. Instead of creating a
+          // leaf level with one block, move these entries to the root level.
+
+          expectNumLevels(1);
+          rootChunk = curInlineChunk;
+          curInlineChunk = new BlockIndexChunk();
+          return false;
+        }
+
+        return true;
+      } else {
+        return curInlineChunk.getNonRootSize() >= maxChunkSize;
+      }
+    }
+
+    /**
+     * Write out the current inline index block. Inline blocks are non-root
+     * blocks, so the non-root index format is used.
+     *
+     * @param out
+     * @param position The beginning offset of the inline block in the file not
+     *          include the header.
+     */
+    @Override
+    public void writeInlineBlock(DataOutput out) throws IOException {
+      if (singleLevelOnly)
+        throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
+
+      // Write the inline block index to the output stream in the non-root
+      // index block format.
+      curInlineChunk.writeNonRoot(out);
+
+      // Save the first key of the inline block so that we can add it to the
+      // parent-level index.
+      firstKey = curInlineChunk.getBlockKey(0);
+
+      // Start a new inline index block
+      curInlineChunk.clear();
+    }
+
+    /**
+     * Called after an inline block has been written so that we can add an
+     * entry referring to that block to the parent-level index.
+     */
+    @Override
+    public void blockWritten(long offset, int onDiskSize, int uncompressedSize)
+    {
+      // Add leaf index block size
+      totalBlockOnDiskSize += onDiskSize;
+      totalBlockUncompressedSize += uncompressedSize;
+
+      if (singleLevelOnly)
+        throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
+
+      if (firstKey == null) {
+        throw new IllegalStateException("Trying to add second-level index " +
+            "entry with offset=" + offset + " and onDiskSize=" + onDiskSize +
+            "but the first key was not set in writeInlineBlock");
+      }
+
+      if (rootChunk.getNumEntries() == 0) {
+        // We are writing the first leaf block, so increase index level.
+        expectNumLevels(1);
+        numLevels = 2;
+      }
+
+      // Add another entry to the second-level index. Include the number of
+      // entries in all previous leaf-level chunks for mid-key calculation.
+      rootChunk.add(firstKey, offset, onDiskSize, totalNumEntries);
+      firstKey = null;
+    }
+
+    @Override
+    public BlockType getInlineBlockType() {
+      return BlockType.LEAF_INDEX;
+    }
+
+    /**
+     * Add one index entry to the current leaf-level block. When the leaf-level
+     * block gets large enough, it will be flushed to disk as an inline block.
+     *
+     * @param firstKey the first key of the data block
+     * @param blockOffset the offset of the data block
+     * @param blockDataSize the on-disk size of the data block ({@link HFile}
+     *          format version 2), or the uncompressed size of the data block (
+     *          {@link HFile} format version 1).
+     */
+    public void addEntry(byte[] firstKey, long blockOffset, int blockDataSize)
+    {
+      curInlineChunk.add(firstKey, blockOffset, blockDataSize);
+      ++totalNumEntries;
+    }
+
+    /**
+     * @throws IOException if we happened to write a multi-level index.
+     */
+    public void ensureSingleLevel() throws IOException {
+      if (numLevels > 1) {
+        throw new IOException ("Wrote a " + numLevels + "-level index with " +
+            rootChunk.getNumEntries() + " root-level entries, but " +
+            "this is expected to be a single-level block index.");
+      }
+    }
+
+    /**
+     * @return true if we are using cache-on-write. This is configured by the
+     *         caller of the constructor by either passing a valid block cache
+     *         or null.
+     */
+    @Override
+    public boolean cacheOnWrite() {
+      return blockCache != null;
+    }
+
+    /**
+     * The total uncompressed size of the root index block, intermediate-level
+     * index blocks, and leaf-level index blocks.
+     *
+     * @return the total uncompressed size of all index blocks
+     */
+    public long getTotalUncompressedSize() {
+      return totalBlockUncompressedSize;
+    }
+
+  }
+
+  /**
+   * A single chunk of the block index in the process of writing. The data in
+   * this chunk can become a leaf-level, intermediate-level, or root index
+   * block.
+   */
+  static class BlockIndexChunk {
+
+    /** First keys of the key range corresponding to each index entry. */
+    private final List<byte[]> blockKeys = new ArrayList<byte[]>();
+
+    /** Block offset in backing stream. */
+    private final List<Long> blockOffsets = new ArrayList<Long>();
+
+    /** On-disk data sizes of lower-level data or index blocks. */
+    private final List<Integer> onDiskDataSizes = new ArrayList<Integer>();
+
+    /**
+     * The cumulative number of sub-entries, i.e. entries on deeper-level block
+     * index entries. numSubEntriesAt[i] is the number of sub-entries in the
+     * blocks corresponding to this chunk's entries #0 through #i inclusively.
+     */
+    private final List<Long> numSubEntriesAt = new ArrayList<Long>();
+
+    /**
+     * The offset of the next entry to be added, relative to the end of the
+     * "secondary index" in the "non-root" format representation of this index
+     * chunk. This is the next value to be added to the secondary index.
+     */
+    private int curTotalNonRootEntrySize = 0;
+
+    /**
+     * The accumulated size of this chunk if stored in the root index format.
+     */
+    private int curTotalRootSize = 0;
+
+    /**
+     * The "secondary index" used for binary search over variable-length
+     * records in a "non-root" format block. These offsets are relative to the
+     * end of this secondary index.
+     */
+    private final List<Integer> secondaryIndexOffsetMarks =
+        new ArrayList<Integer>();
+
+    /**
+     * Adds a new entry to this block index chunk.
+     *
+     * @param firstKey the first key in the block pointed to by this entry
+     * @param blockOffset the offset of the next-level block pointed to by this
+     *          entry
+     * @param onDiskDataSize the on-disk data of the block pointed to by this
+     *          entry, including header size
+     * @param curTotalNumSubEntries if this chunk is the root index chunk under
+     *          construction, this specifies the current total number of
+     *          sub-entries in all leaf-level chunks, including the one
+     *          corresponding to the second-level entry being added.
+     */
+    void add(byte[] firstKey, long blockOffset, int onDiskDataSize,
+        long curTotalNumSubEntries) {
+      // Record the offset for the secondary index
+      secondaryIndexOffsetMarks.add(curTotalNonRootEntrySize);
+      curTotalNonRootEntrySize += SECONDARY_INDEX_ENTRY_OVERHEAD
+          + firstKey.length;
+
+      curTotalRootSize += Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT
+          + WritableUtils.getVIntSize(firstKey.length) + firstKey.length;
+
+      blockKeys.add(firstKey);
+      blockOffsets.add(blockOffset);
+      onDiskDataSizes.add(onDiskDataSize);
+
+      if (curTotalNumSubEntries != -1) {
+        numSubEntriesAt.add(curTotalNumSubEntries);
+
+        // Make sure the parallel arrays are in sync.
+        if (numSubEntriesAt.size() != blockKeys.size()) {
+          throw new IllegalStateException("Only have key/value count " +
+              "stats for " + numSubEntriesAt.size() + " block index " +
+              "entries out of " + blockKeys.size());
+        }
+      }
+    }
+
+    /**
+     * The same as {@link #add(byte[], long, int, long)} but does not take the
+     * key/value into account. Used for single-level indexes.
+     *
+     * @see {@link #add(byte[], long, int, long)}
+     */
+    public void add(byte[] firstKey, long blockOffset, int onDiskDataSize) {
+      add(firstKey, blockOffset, onDiskDataSize, -1);
+    }
+
+    public void clear() {
+      blockKeys.clear();
+      blockOffsets.clear();
+      onDiskDataSizes.clear();
+      secondaryIndexOffsetMarks.clear();
+      numSubEntriesAt.clear();
+      curTotalNonRootEntrySize = 0;
+      curTotalRootSize = 0;
+    }
+
+    /**
+     * Finds the entry corresponding to the deeper-level index block containing
+     * the given deeper-level entry (a "sub-entry"), assuming a global 0-based
+     * ordering of sub-entries.
+     *
+     * <p>
+     * <i> Implementation note. </i> We are looking for i such that
+     * numSubEntriesAt[i - 1] <= k < numSubEntriesAt[i], because a deeper-level
+     * block #i (0-based) contains sub-entries # numSubEntriesAt[i - 1]'th
+     * through numSubEntriesAt[i] - 1, assuming a global 0-based ordering of
+     * sub-entries. i is by definition the insertion point of k in
+     * numSubEntriesAt.
+     *
+     * @param k sub-entry index, from 0 to the total number sub-entries - 1
+     * @return the 0-based index of the entry corresponding to the given
+     *         sub-entry
+     */
+    public int getEntryBySubEntry(long k) {
+      // We define mid-key as the key corresponding to k'th sub-entry
+      // (0-based).
+
+      int i = Collections.binarySearch(numSubEntriesAt, k);
+
+      // Exact match: cumulativeWeight[i] = k. This means chunks #0 through
+      // #i contain exactly k sub-entries, and the sub-entry #k (0-based)
+      // is in the (i + 1)'th chunk.
+      if (i >= 0)
+        return i + 1;
+
+      // Inexact match. Return the insertion point.
+      return -i - 1;
+    }
+
+    /**
+     * Used when writing the root block index of a multi-level block index.
+     * Serializes additional information allowing to efficiently identify the
+     * mid-key.
+     *
+     * @return a few serialized fields for finding the mid-key
+     * @throws IOException if could not create metadata for computing mid-key
+     */
+    public byte[] getMidKeyMetadata() throws IOException {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream(
+          MID_KEY_METADATA_SIZE);
+      DataOutputStream baosDos = new DataOutputStream(baos);
+      long totalNumSubEntries = numSubEntriesAt.get(blockKeys.size() - 1);
+      if (totalNumSubEntries == 0) {
+        throw new IOException("No leaf-level entries, mid-key unavailable");
+      }
+      long midKeySubEntry = (totalNumSubEntries - 1) / 2;
+      int midKeyEntry = getEntryBySubEntry(midKeySubEntry);
+
+      baosDos.writeLong(blockOffsets.get(midKeyEntry));
+      baosDos.writeInt(onDiskDataSizes.get(midKeyEntry));
+
+      long numSubEntriesBefore = midKeyEntry > 0
+          ? numSubEntriesAt.get(midKeyEntry - 1) : 0;
+      long subEntryWithinEntry = midKeySubEntry - numSubEntriesBefore;
+      if (subEntryWithinEntry < 0 || subEntryWithinEntry > Integer.MAX_VALUE)
+      {
+        throw new IOException("Could not identify mid-key index within the "
+            + "leaf-level block containing mid-key: out of range ("
+            + subEntryWithinEntry + ", numSubEntriesBefore="
+            + numSubEntriesBefore + ", midKeySubEntry=" + midKeySubEntry
+            + ")");
+      }
+
+      baosDos.writeInt((int) subEntryWithinEntry);
+
+      if (baosDos.size() != MID_KEY_METADATA_SIZE) {
+        throw new IOException("Could not write mid-key metadata: size=" +
+            baosDos.size() + ", correct size: " + MID_KEY_METADATA_SIZE);
+      }
+
+      // Close just to be good citizens, although this has no effect.
+      baos.close();
+
+      return baos.toByteArray();
+    }
+
+    /**
+     * Writes the block index chunk in the non-root index block format. This
+     * format contains the number of entries, an index of integer offsets
+     * for quick binary search on variable-length records, and tuples of
+     * block offset, on-disk block size, and the first key for each entry.
+     *
+     * @param out
+     * @throws IOException
+     */
+    void writeNonRoot(DataOutput out) throws IOException {
+      // The number of entries in the block.
+      out.writeInt(blockKeys.size());
+
+      if (secondaryIndexOffsetMarks.size() != blockKeys.size()) {
+        throw new IOException("Corrupted block index chunk writer: " +
+            blockKeys.size() + " entries but " +
+            secondaryIndexOffsetMarks.size() + " secondary index items");
+      }
+
+      // For each entry, write a "secondary index" of relative offsets to the
+      // entries from the end of the secondary index. This works, because at
+      // read time we read the number of entries and know where the secondary
+      // index ends.
+      for (int currentSecondaryIndex : secondaryIndexOffsetMarks)
+        out.writeInt(currentSecondaryIndex);
+
+      // We include one other element in the secondary index to calculate the
+      // size of each entry more easily by subtracting secondary index elements.
+      out.writeInt(curTotalNonRootEntrySize);
+
+      for (int i = 0; i < blockKeys.size(); ++i) {
+        out.writeLong(blockOffsets.get(i));
+        out.writeInt(onDiskDataSizes.get(i));
+        out.write(blockKeys.get(i));
+      }
+    }
+
+    /**
+     * @return the size of this chunk if stored in the non-root index block
+     *         format
+     */
+    int getNonRootSize() {
+      return Bytes.SIZEOF_INT                          // Number of entries
+          + Bytes.SIZEOF_INT * (blockKeys.size() + 1)  // Secondary index
+          + curTotalNonRootEntrySize;                  // All entries
+    }
+
+    /**
+     * Writes this chunk into the given output stream in the root block index
+     * format. This format is similar to the {@link HFile} version 1 block
+     * index format, except that we store on-disk size of the block instead of
+     * its uncompressed size.
+     *
+     * @param out the data output stream to write the block index to. Typically
+     *          a stream writing into an {@link HFile} block.
+     * @throws IOException
+     */
+    void writeRoot(DataOutput out) throws IOException {
+      for (int i = 0; i < blockKeys.size(); ++i) {
+        out.writeLong(blockOffsets.get(i));
+        out.writeInt(onDiskDataSizes.get(i));
+        Bytes.writeByteArray(out, blockKeys.get(i));
+      }
+    }
+
+    /**
+     * @return the size of this chunk if stored in the root index block format
+     */
+    int getRootSize() {
+      return curTotalRootSize;
+    }
+
+    /**
+     * @return the number of entries in this block index chunk
+     */
+    public int getNumEntries() {
+      return blockKeys.size();
+    }
+
+    public byte[] getBlockKey(int i) {
+      return blockKeys.get(i);
+    }
+
+    public long getBlockOffset(int i) {
+      return blockOffsets.get(i);
+    }
+
+    public int getOnDiskDataSize(int i) {
+      return onDiskDataSizes.get(i);
+    }
+
+    public long getCumulativeNumKV(int i) {
+      if (i < 0)
+        return 0;
+      return numSubEntriesAt.get(i);
+    }
+
+  }
+
+  /**
+   * @return true if the given configuration specifies that we should
+   *         cache-on-write index blocks
+   */
+  public static boolean shouldCacheOnWrite(Configuration conf) {
+    return conf.getBoolean(CACHE_INDEX_BLOCKS_ON_WRITE_KEY, false);
+  }
+
+  public static int getMaxChunkSize(Configuration conf) {
+    return conf.getInt(MAX_CHUNK_SIZE_KEY, DEFAULT_MAX_CHUNK_SIZE);
+  }
+
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java?rev=1153645&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java Wed Aug  3 20:25:28 2011
@@ -0,0 +1,308 @@
+
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
+import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
+import org.apache.hadoop.hbase.util.BloomFilter;
+import org.apache.hadoop.hbase.util.BloomFilterFactory;
+import org.apache.hadoop.hbase.util.ByteBloomFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Writables;
+
+/**
+ * Implements pretty-printing functionality for {@link HFile}s.
+ */
+public class HFilePrettyPrinter {
+
+  private static final Log LOG = LogFactory.getLog(HFilePrettyPrinter.class);
+
+  private Options options = new Options();
+
+  private boolean verbose;
+  private boolean printValue;
+  private boolean printKey;
+  private boolean shouldPrintMeta;
+  private boolean printBlocks;
+  private boolean checkRow;
+  private boolean checkFamily;
+
+  private Configuration conf;
+
+  private List<Path> files = new ArrayList<Path>();
+  private int count;
+
+  private static final String FOUR_SPACES = "    ";
+
+  public HFilePrettyPrinter() {
+    options.addOption("v", "verbose", false,
+        "Verbose output; emits file and meta data delimiters");
+    options.addOption("p", "printkv", false, "Print key/value pairs");
+    options.addOption("e", "printkey", false, "Print keys");
+    options.addOption("m", "printmeta", false, "Print meta data of file");
+    options.addOption("b", "printblocks", false, "Print block index meta data");
+    options.addOption("k", "checkrow", false,
+        "Enable row order check; looks for out-of-order keys");
+    options.addOption("a", "checkfamily", false, "Enable family check");
+    options.addOption("f", "file", true,
+        "File to scan. Pass full-path; e.g. hdfs://a:9000/hbase/.META./12/34");
+    options.addOption("r", "region", true,
+        "Region to scan. Pass region name; e.g. '.META.,,1'");
+  }
+
+  public boolean parseOptions(String args[]) throws ParseException,
+      IOException {
+    if (args.length == 0) {
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp("HFile", options, true);
+      return false;
+    }
+    CommandLineParser parser = new PosixParser();
+    CommandLine cmd = parser.parse(options, args);
+
+    verbose = cmd.hasOption("v");
+    printValue = cmd.hasOption("p");
+    printKey = cmd.hasOption("e") || printValue;
+    shouldPrintMeta = cmd.hasOption("m");
+    printBlocks = cmd.hasOption("b");
+    checkRow = cmd.hasOption("k");
+    checkFamily = cmd.hasOption("a");
+
+    if (cmd.hasOption("f")) {
+      files.add(new Path(cmd.getOptionValue("f")));
+    }
+
+    if (cmd.hasOption("r")) {
+      String regionName = cmd.getOptionValue("r");
+      byte[] rn = Bytes.toBytes(regionName);
+      byte[][] hri = HRegionInfo.parseRegionName(rn);
+      Path rootDir = FSUtils.getRootDir(conf);
+      Path tableDir = new Path(rootDir, Bytes.toString(hri[0]));
+      String enc = HRegionInfo.encodeRegionName(rn);
+      Path regionDir = new Path(tableDir, enc);
+      if (verbose)
+        System.out.println("region dir -> " + regionDir);
+      List<Path> regionFiles = HFile.getStoreFiles(FileSystem.get(conf),
+          regionDir);
+      if (verbose)
+        System.out.println("Number of region files found -> "
+            + regionFiles.size());
+      if (verbose) {
+        int i = 1;
+        for (Path p : regionFiles) {
+          if (verbose)
+            System.out.println("Found file[" + i++ + "] -> " + p);
+        }
+      }
+      files.addAll(regionFiles);
+    }
+
+    return true;
+  }
+
+  /**
+   * Runs the command-line pretty-printer, and returns the desired command
+   * exit code (zero for success, non-zero for failure).
+   */
+  public int run(String[] args) {
+    conf = HBaseConfiguration.create();
+    conf.set("fs.defaultFS",
+        conf.get(org.apache.hadoop.hbase.HConstants.HBASE_DIR));
+    conf.set("fs.default.name",
+        conf.get(org.apache.hadoop.hbase.HConstants.HBASE_DIR));
+    try {
+      if (!parseOptions(args))
+        return 1;
+    } catch (IOException ex) {
+      LOG.error("Error parsing command-line options", ex);
+      return 1;
+    } catch (ParseException ex) {
+      LOG.error("Error parsing command-line options", ex);
+      return 1;
+    }
+
+    // iterate over all files found
+    for (Path fileName : files) {
+      try {
+        processFile(fileName);
+      } catch (IOException ex) {
+        LOG.error("Error reading " + fileName, ex);
+      }
+    }
+
+    if (verbose || printKey) {
+      System.out.println("Scanned kv count -> " + count);
+    }
+
+    return 0;
+  }
+
+  private void processFile(Path file) throws IOException {
+    if (verbose)
+      System.out.println("Scanning -> " + file);
+    FileSystem fs = file.getFileSystem(conf);
+    if (!fs.exists(file)) {
+      System.err.println("ERROR, file doesnt exist: " + file);
+    }
+
+    HFile.Reader reader = HFile.createReader(fs, file, null, false, false);
+
+    Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
+
+    if (verbose || printKey || checkRow || checkFamily) {
+
+      // scan over file and read key/value's and check if requested
+      HFileScanner scanner = reader.getScanner(false, false, false);
+      scanner.seekTo();
+      scanKeysValues(file, count, scanner);
+    }
+
+    // print meta data
+    if (shouldPrintMeta) {
+      printMeta(reader, fileInfo);
+    }
+
+    if (printBlocks) {
+      System.out.println("Block Index:");
+      System.out.println(reader.getDataBlockIndexReader());
+    }
+
+    reader.close();
+  }
+
+  private void scanKeysValues(Path file, int count, HFileScanner scanner)
+      throws IOException {
+    KeyValue pkv = null;
+    do {
+      KeyValue kv = scanner.getKeyValue();
+      // dump key value
+      if (printKey) {
+        System.out.print("K: " + kv);
+        if (printValue) {
+          System.out.print(" V: " + Bytes.toStringBinary(kv.getValue()));
+        }
+        System.out.println();
+      }
+      // check if rows are in order
+      if (checkRow && pkv != null) {
+        if (Bytes.compareTo(pkv.getRow(), kv.getRow()) > 0) {
+          System.err.println("WARNING, previous row is greater then"
+              + " current row\n\tfilename -> " + file + "\n\tprevious -> "
+              + Bytes.toStringBinary(pkv.getKey()) + "\n\tcurrent  -> "
+              + Bytes.toStringBinary(kv.getKey()));
+        }
+      }
+      // check if families are consistent
+      if (checkFamily) {
+        String fam = Bytes.toString(kv.getFamily());
+        if (!file.toString().contains(fam)) {
+          System.err.println("WARNING, filename does not match kv family,"
+              + "\n\tfilename -> " + file + "\n\tkeyvalue -> "
+              + Bytes.toStringBinary(kv.getKey()));
+        }
+        if (pkv != null
+            && !Bytes.equals(pkv.getFamily(), kv.getFamily())) {
+          System.err.println("WARNING, previous kv has different family"
+              + " compared to current key\n\tfilename -> " + file
+              + "\n\tprevious -> " + Bytes.toStringBinary(pkv.getKey())
+              + "\n\tcurrent  -> " + Bytes.toStringBinary(kv.getKey()));
+        }
+      }
+      pkv = kv;
+      ++count;
+    } while (scanner.next());
+  }
+
+  /**
+   * Format a string of the form "k1=v1, k2=v2, ..." into separate lines
+   * with a four-space indentation.
+   */
+  private static String asSeparateLines(String keyValueStr) {
+    return keyValueStr.replaceAll(", ([a-zA-Z]+=)",
+                                  ",\n" + FOUR_SPACES + "$1");
+  }
+
+  private void printMeta(HFile.Reader reader, Map<byte[], byte[]> fileInfo)
+      throws IOException {
+    System.out.println("Block index size as per heapsize: "
+        + reader.indexSize());
+    System.out.println(asSeparateLines(reader.toString()));
+    System.out.println("Trailer:\n    "
+        + asSeparateLines(reader.getTrailer().toString()));
+    System.out.println("Fileinfo:");
+    for (Map.Entry<byte[], byte[]> e : fileInfo.entrySet()) {
+      System.out.print(FOUR_SPACES + Bytes.toString(e.getKey()) + " = ");
+      if (Bytes.compareTo(e.getKey(), Bytes.toBytes("MAX_SEQ_ID_KEY")) == 0) {
+        long seqid = Bytes.toLong(e.getValue());
+        System.out.println(seqid);
+      } else if (Bytes.compareTo(e.getKey(), Bytes.toBytes("TIMERANGE")) == 0) {
+        TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
+        Writables.copyWritable(e.getValue(), timeRangeTracker);
+        System.out.println(timeRangeTracker.getMinimumTimestamp() + "...."
+            + timeRangeTracker.getMaximumTimestamp());
+      } else if (Bytes.compareTo(e.getKey(), FileInfo.AVG_KEY_LEN) == 0
+          || Bytes.compareTo(e.getKey(), FileInfo.AVG_VALUE_LEN) == 0) {
+        System.out.println(Bytes.toInt(e.getValue()));
+      } else {
+        System.out.println(Bytes.toStringBinary(e.getValue()));
+      }
+    }
+
+    System.out.println("Mid-key: " + Bytes.toStringBinary(reader.midkey()));
+
+    // Printing bloom information
+    DataInput bloomMeta = reader.getBloomFilterMetadata();
+    BloomFilter bloomFilter = null;
+    if (bloomMeta != null)
+      bloomFilter = BloomFilterFactory.createFromMeta(bloomMeta, reader);
+
+    System.out.println("Bloom filter:");
+    if (bloomFilter != null) {
+      System.out.println(FOUR_SPACES + bloomFilter.toString().replaceAll(
+          ByteBloomFilter.STATS_RECORD_SEP, "\n" + FOUR_SPACES));
+    } else {
+      System.out.println(FOUR_SPACES + "Not present");
+    }
+  }
+
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/InlineBlockWriter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/InlineBlockWriter.java?rev=1153645&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/InlineBlockWriter.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/InlineBlockWriter.java Wed Aug  3 20:25:28 2011
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * A way to write "inline" blocks into an {@link HFile}. Inline blocks are
+ * interspersed with data blocks. For example, Bloom filter chunks and
+ * leaf-level blocks of a multi-level block index are stored as inline blocks.
+ */
+public interface InlineBlockWriter {
+
+  /**
+   * Determines whether there is a new block to be written out.
+   *
+   * @param closing
+   *          whether the file is being closed, in which case we need to write
+   *          out all available data and not wait to accumulate another block
+   */
+  boolean shouldWriteBlock(boolean closing);
+
+  /**
+   * Writes the block to the provided stream. Must not write any magic records.
+   * Called only if {@link #shouldWriteBlock(boolean)} returned true.
+   *
+   * @param out
+   *          a stream (usually a compressing stream) to write the block to
+   */
+  void writeInlineBlock(DataOutput out) throws IOException;
+
+  /**
+   * Called after a block has been written, and its offset, raw size, and
+   * compressed size have been determined. Can be used to add an entry to a
+   * block index. If this type of inline blocks needs a block index, the inline
+   * block writer is responsible for maintaining it.
+   *
+   * @param offset the offset of the block in the stream
+   * @param onDiskSize the on-disk size of the block
+   * @param uncompressedSize the uncompressed size of the block
+   * @param rawSize
+   */
+  void blockWritten(long offset, int onDiskSize, int uncompressedSize);
+
+  /**
+   * The type of blocks this block writer produces.
+   */
+  BlockType getInlineBlockType();
+
+  /**
+   * @return true if inline blocks produced by this writer should be cached
+   */
+  boolean cacheOnWrite();
+
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/BloomFilterBase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/BloomFilterBase.java?rev=1153645&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/BloomFilterBase.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/BloomFilterBase.java Wed Aug  3 20:25:28 2011
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import org.apache.hadoop.io.RawComparator;
+
+/**
+ * Common methods Bloom filter methods required at read and write time.
+ */
+public interface BloomFilterBase {
+
+  /**
+   * @return The number of keys added to the bloom
+   */
+  long getKeyCount();
+
+  /**
+   * @return The max number of keys that can be inserted
+   *         to maintain the desired error rate
+   */
+  long getMaxKeys();
+
+  /**
+   * @return Size of the bloom, in bytes
+   */
+  long getByteSize();
+
+  /**
+   * Create a key for a row-column Bloom filter.
+   */
+  byte[] createBloomKey(byte[] rowBuf, int rowOffset, int rowLen,
+      byte[] qualBuf, int qualOffset, int qualLen);
+
+  /**
+   * @return Bloom key comparator
+   */
+  RawComparator<byte[]> getComparator();
+
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/BloomFilterFactory.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/BloomFilterFactory.java?rev=1153645&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/BloomFilterFactory.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/BloomFilterFactory.java Wed Aug  3 20:25:28 2011
@@ -0,0 +1,208 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
+
+/**
+ * Handles Bloom filter initialization based on configuration and serialized
+ * metadata in the reader and writer of {@link StoreFile}.
+ */
+public final class BloomFilterFactory {
+
+  private static final Log LOG =
+      LogFactory.getLog(BloomFilterFactory.class.getName());
+
+  /** This class should not be instantiated. */
+  private BloomFilterFactory() {}
+
+  /**
+   * Specifies the target error rate to use when selecting the number of keys
+   * per Bloom filter.
+   */
+  public static final String IO_STOREFILE_BLOOM_ERROR_RATE =
+      "io.storefile.bloom.error.rate";
+
+  /**
+   * Maximum folding factor allowed. The Bloom filter will be shrunk by
+   * the factor of up to 2 ** this times if we oversize it initially.
+   */
+  public static final String IO_STOREFILE_BLOOM_MAX_FOLD =
+      "io.storefile.bloom.max.fold";
+
+  /**
+   * For default (single-block) Bloom filters this specifies the maximum number
+   * of keys.
+   */
+  public static final String IO_STOREFILE_BLOOM_MAX_KEYS =
+      "io.storefile.bloom.max.keys";
+
+  /** Master switch to enable Bloom filters */
+  public static final String IO_STOREFILE_BLOOM_ENABLED =
+      "io.storefile.bloom.enabled";
+
+  /**
+   * Target Bloom block size. Bloom filter blocks of approximately this size
+   * are interleaved with data blocks.
+   */
+  public static final String IO_STOREFILE_BLOOM_BLOCK_SIZE =
+      "io.storefile.bloom.block.size";
+
+  /** Whether to cache compound Bloom filter blocks on write */
+  public static final String IO_STOREFILE_BLOOM_CACHE_ON_WRITE =
+      "io.storefile.bloom.cacheonwrite";
+
+  /** Maximum number of times a Bloom filter can be "folded" if oversized */
+  private static final int MAX_ALLOWED_FOLD_FACTOR = 7;
+
+  /**
+   * Instantiates the correct Bloom filter class based on the version provided
+   * in the meta block data.
+   *
+   * @param meta the byte array holding the Bloom filter's metadata, including
+   *          version information
+   * @param reader the {@link HFile} reader to use to lazily load Bloom filter
+   *          blocks
+   * @return an instance of the correct type of Bloom filter
+   * @throws IllegalArgumentException
+   */
+  public static BloomFilter
+      createFromMeta(DataInput meta, HFile.Reader reader)
+      throws IllegalArgumentException, IOException {
+    int version = meta.readInt();
+    switch (version) {
+      case ByteBloomFilter.VERSION:
+        // This is only possible in a version 1 HFile. We are ignoring the
+        // passed comparator because raw byte comparators are always used
+        // in version 1 Bloom filters.
+        return new ByteBloomFilter(meta);
+
+      case CompoundBloomFilterBase.VERSION:
+        return new CompoundBloomFilter(meta, reader);
+
+      default:
+        throw new IllegalArgumentException(
+          "Bad bloom filter format version " + version
+        );
+    }
+  }
+
+  /**
+   * @return true if Bloom filters are enabled in the given configuration
+   */
+  public static boolean isBloomEnabled(Configuration conf) {
+    return conf.getBoolean(IO_STOREFILE_BLOOM_ENABLED, true);
+  }
+
+  public static float getErrorRate(Configuration conf) {
+    return conf.getFloat(IO_STOREFILE_BLOOM_ERROR_RATE, (float) 0.01);
+  }
+
+  /**
+   * Creates a new Bloom filter at the time of
+   * {@link org.apache.hadoop.hbase.regionserver.StoreFile} writing.
+   *
+   * @param conf
+   * @param bloomType
+   * @param maxKeys an estimate of the number of keys we expect to insert.
+   *        Irrelevant if compound Bloom filters are enabled.
+   * @param writer the HFile writer
+   * @param comparator the comparator to use for compound Bloom filters. This
+   *        has no effect if creating single-chunk version 1 Bloom filters.
+   * @return the new Bloom filter, or null in case Bloom filters are disabled
+   *         or when failed to create one.
+   */
+  public static BloomFilterWriter createBloomAtWrite(Configuration conf,
+      BloomType bloomType, int maxKeys, HFile.Writer writer) {
+    if (!isBloomEnabled(conf)) {
+      LOG.info("Bloom filters are disabled by configuration for "
+          + writer.getPath()
+          + (conf == null ? " (configuration is null)" : ""));
+      return null;
+    } else if (bloomType == BloomType.NONE) {
+      LOG.info("Bloom filter is turned off for the column family");
+      return null;
+    }
+
+    float err = getErrorRate(conf);
+
+    // In case of row/column Bloom filter lookups, each lookup is an OR if two
+    // separate lookups. Therefore, if each lookup's false positive rate is p,
+    // the resulting false positive rate is err = 1 - (1 - p)^2, and
+    // p = 1 - sqrt(1 - err).
+    if (bloomType == BloomType.ROWCOL) {
+      err = (float) (1 - Math.sqrt(1 - err));
+    }
+
+    int maxFold = conf.getInt(IO_STOREFILE_BLOOM_MAX_FOLD,
+        MAX_ALLOWED_FOLD_FACTOR);
+
+    if (HFile.getFormatVersion(conf) > HFile.MIN_FORMAT_VERSION) {
+      // In case of compound Bloom filters we ignore the maxKeys hint.
+      CompoundBloomFilterWriter bloomWriter = new CompoundBloomFilterWriter(
+          getBloomBlockSize(conf), err, Hash.getHashType(conf), maxFold,
+          cacheChunksOnWrite(conf), bloomType == BloomType.ROWCOL
+              ? KeyValue.KEY_COMPARATOR : Bytes.BYTES_RAWCOMPARATOR);
+      writer.addInlineBlockWriter(bloomWriter);
+      return bloomWriter;
+    } else {
+      // A single-block Bloom filter. Only used when testing HFile format
+      // version 1.
+      int tooBig = conf.getInt(IO_STOREFILE_BLOOM_MAX_KEYS,
+          128 * 1000 * 1000);
+
+      if (maxKeys <= 0) {
+        LOG.warn("Invalid maximum number of keys specified: " + maxKeys
+            + ", not using Bloom filter");
+        return null;
+      } else if (maxKeys < tooBig) {
+        BloomFilterWriter bloom = new ByteBloomFilter((int) maxKeys, err,
+            Hash.getHashType(conf), maxFold);
+        bloom.allocBloom();
+        return bloom;
+      } else {
+        LOG.debug("Skipping bloom filter because max keysize too large: "
+            + maxKeys);
+      }
+    }
+    return null;
+  }
+
+  /** @return the compound Bloom filter block size from the configuration */
+  public static int getBloomBlockSize(Configuration conf) {
+    return conf.getInt(IO_STOREFILE_BLOOM_BLOCK_SIZE, 128 * 1024);
+  }
+
+  /** @return whether to cache compound Bloom filter chunks on write */
+  public static boolean cacheChunksOnWrite(Configuration conf) {
+    return conf.getBoolean(IO_STOREFILE_BLOOM_CACHE_ON_WRITE, false);
+  }
+
+};

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilter.java?rev=1153645&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilter.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilter.java Wed Aug  3 20:25:28 2011
@@ -0,0 +1,175 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.io.hfile.FixedFileTrailer;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileBlock;
+import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex;
+import org.apache.hadoop.io.RawComparator;
+
+/**
+ * A Bloom filter implementation built on top of {@link ByteBloomFilter},
+ * encapsulating a set of fixed-size Bloom filters written out at the time of
+ * {@link org.apache.hadoop.hbase.io.hfile.HFile} generation into the data
+ * block stream, and loaded on demand at query time. This class only provides
+ * reading capabilities.
+ */
+public class CompoundBloomFilter extends CompoundBloomFilterBase
+    implements BloomFilter {
+
+  /** Used to load chunks on demand */
+  private HFile.Reader reader;
+
+  private HFileBlockIndex.BlockIndexReader index;
+
+  private int hashCount;
+  private Hash hash;
+
+  private long[] numQueriesPerChunk;
+  private long[] numPositivesPerChunk;
+
+  /**
+   * De-serialization for compound Bloom filter metadata. Must be consistent
+   * with what {@link CompoundBloomFilterWriter} does.
+   *
+   * @param meta serialized Bloom filter metadata without any magic blocks
+   * @throws IOException
+   */
+  public CompoundBloomFilter(DataInput meta, HFile.Reader reader)
+      throws IOException {
+    this.reader = reader;
+
+    totalByteSize = meta.readLong();
+    hashCount = meta.readInt();
+    hashType = meta.readInt();
+    totalKeyCount = meta.readLong();
+    totalMaxKeys = meta.readLong();
+    numChunks = meta.readInt();
+    comparator = FixedFileTrailer.createComparator(
+        Bytes.toString(Bytes.readByteArray(meta)));
+
+    hash = Hash.getInstance(hashType);
+    if (hash == null) {
+      throw new IllegalArgumentException("Invalid hash type: " + hashType);
+    }
+
+    index = new HFileBlockIndex.BlockIndexReader(comparator, 1);
+    index.readRootIndex(meta, numChunks);
+  }
+
+  @Override
+  public boolean contains(byte[] key, int keyOffset, int keyLength,
+      ByteBuffer bloom) {
+    // We try to store the result in this variable so we can update stats for
+    // testing, but when an error happens, we log a message and return.
+    boolean result;
+
+    int block = index.rootBlockContainingKey(key, keyOffset, keyLength);
+    if (block < 0) {
+      result = false; // This key is not in the file.
+    } else {
+      HFileBlock bloomBlock;
+      try {
+        // We cache the block and use a positional read.
+        bloomBlock = reader.readBlock(index.getRootBlockOffset(block),
+            index.getRootBlockDataSize(block), true, true, false);
+      } catch (IOException ex) {
+        // The Bloom filter is broken, turn it off.
+        throw new IllegalArgumentException(
+            "Failed to load Bloom block for key "
+                + Bytes.toStringBinary(key, keyOffset, keyLength), ex);
+      }
+
+      ByteBuffer bloomBuf = bloomBlock.getBufferReadOnly();
+      result = ByteBloomFilter.contains(key, keyOffset, keyLength,
+          bloomBuf.array(), bloomBuf.arrayOffset() + HFileBlock.HEADER_SIZE,
+          bloomBlock.getUncompressedSizeWithoutHeader(), hash, hashCount);
+    }
+
+    if (numQueriesPerChunk != null && block >= 0) {
+      // Update statistics. Only used in unit tests.
+      ++numQueriesPerChunk[block];
+      if (result)
+        ++numPositivesPerChunk[block];
+    }
+
+    return result;
+  }
+
+  public boolean supportsAutoLoading() {
+    return true;
+  }
+
+  public int getNumChunks() {
+    return numChunks;
+  }
+
+  @Override
+  public RawComparator<byte[]> getComparator() {
+    return comparator;
+  }
+
+  public void enableTestingStats() {
+    numQueriesPerChunk = new long[numChunks];
+    numPositivesPerChunk = new long[numChunks];
+  }
+
+  public String formatTestingStats() {
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < numChunks; ++i) {
+      sb.append("chunk #");
+      sb.append(i);
+      sb.append(": queries=");
+      sb.append(numQueriesPerChunk[i]);
+      sb.append(", positives=");
+      sb.append(numPositivesPerChunk[i]);
+      sb.append(", positiveRatio=");
+      sb.append(numPositivesPerChunk[i] * 1.0 / numQueriesPerChunk[i]);
+      sb.append(";\n");
+    }
+    return sb.toString();
+  }
+
+  public long getNumQueriesForTesting(int chunk) {
+    return numQueriesPerChunk[chunk];
+  }
+
+  public long getNumPositivesForTesting(int chunk) {
+    return numPositivesPerChunk[chunk];
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(ByteBloomFilter.formatStats(this));
+    sb.append(ByteBloomFilter.STATS_RECORD_SEP + 
+        "Number of chunks: " + numChunks);
+    sb.append(ByteBloomFilter.STATS_RECORD_SEP + 
+        "Comparator: " + comparator.getClass().getSimpleName());
+    return sb.toString();
+  }
+
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilterBase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilterBase.java?rev=1153645&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilterBase.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilterBase.java Wed Aug  3 20:25:28 2011
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.io.RawComparator;
+
+public class CompoundBloomFilterBase implements BloomFilterBase {
+
+  /**
+   * At read time, the total number of chunks. At write time, the number of
+   * chunks created so far. The first chunk has an ID of 0, and the current
+   * chunk has the ID of numChunks - 1.
+   */
+  protected int numChunks;
+
+  /**
+   * The Bloom filter version. There used to be a DynamicByteBloomFilter which
+   * had version 2.
+   */
+  public static final int VERSION = 3;
+
+  /** Target error rate for configuring the filter and for information */
+  protected float errorRate;
+
+  /** The total number of keys in all chunks */
+  protected long totalKeyCount;
+  protected long totalByteSize;
+  protected long totalMaxKeys;
+
+  /** Hash function type to use, as defined in {@link Hash} */
+  protected int hashType;
+  
+  /** Comparator used to compare Bloom filter keys */
+  protected RawComparator<byte[]> comparator;
+
+  @Override
+  public long getMaxKeys() {
+    return totalMaxKeys;
+  }
+
+  @Override
+  public long getKeyCount() {
+    return totalKeyCount;
+  }
+
+  @Override
+  public long getByteSize() {
+    return totalByteSize;
+  }
+
+  private static final byte[] DUMMY = new byte[0];
+
+  /**
+   * Prepare an ordered pair of row and qualifier to be compared using
+   * {@link KeyValue.KeyComparator}. This is only used for row-column Bloom
+   * filters.
+   */
+  @Override
+  public byte[] createBloomKey(byte[] row, int roffset, int rlength,
+      byte[] qualifier, int qoffset, int qlength) {
+    if (qualifier == null)
+      qualifier = DUMMY;
+
+    // Make sure this does not specify a timestamp so that the default maximum
+    // (most recent) timestamp is used.
+    KeyValue kv = KeyValue.createFirstOnRow(row, roffset, rlength, DUMMY, 0, 0,
+        qualifier, qoffset, qlength);
+    return kv.getKey();
+  }
+
+  @Override
+  public RawComparator<byte[]> getComparator() {
+    return comparator;
+  }
+
+}



Mime
View raw message