hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nspiegelb...@apache.org
Subject svn commit: r1181555 [3/4] - in /hbase/branches/0.89/src: main/java/org/apache/hadoop/hbase/io/hfile/ test/java/org/apache/hadoop/hbase/io/hfile/
Date Tue, 11 Oct 2011 02:19:30 GMT
Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java?rev=1181555&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java Tue Oct 11 02:19:30 2011
@@ -0,0 +1,307 @@
+
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
+import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
+import org.apache.hadoop.hbase.util.BloomFilter;
+import org.apache.hadoop.hbase.util.BloomFilterFactory;
+import org.apache.hadoop.hbase.util.ByteBloomFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Writables;
+
+/**
+ * Implements pretty-printing functionality for {@link HFile}s.
+ */
+public class HFilePrettyPrinter {
+
+  private static final Log LOG = LogFactory.getLog(HFilePrettyPrinter.class);
+
+  private Options options = new Options();
+
+  private boolean verbose;
+  private boolean printValue;
+  private boolean printKey;
+  private boolean shouldPrintMeta;
+  private boolean printBlocks;
+  private boolean checkRow;
+  private boolean checkFamily;
+
+  private Configuration conf;
+
+  private List<Path> files = new ArrayList<Path>();
+  private int count;
+
+  private static final String FOUR_SPACES = "    ";
+
+  public HFilePrettyPrinter() {
+    options.addOption("v", "verbose", false,
+        "Verbose output; emits file and meta data delimiters");
+    options.addOption("p", "printkv", false, "Print key/value pairs");
+    options.addOption("e", "printkey", false, "Print keys");
+    options.addOption("m", "printmeta", false, "Print meta data of file");
+    options.addOption("b", "printblocks", false, "Print block index meta data");
+    options.addOption("k", "checkrow", false,
+        "Enable row order check; looks for out-of-order keys");
+    options.addOption("a", "checkfamily", false, "Enable family check");
+    options.addOption("f", "file", true,
+        "File to scan. Pass full-path; e.g. hdfs://a:9000/hbase/.META./12/34");
+    options.addOption("r", "region", true,
+        "Region to scan. Pass region name; e.g. '.META.,,1'");
+  }
+
+  public boolean parseOptions(String args[]) throws ParseException,
+      IOException {
+    if (args.length == 0) {
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp("HFile", options, true);
+      return false;
+    }
+    CommandLineParser parser = new PosixParser();
+    CommandLine cmd = parser.parse(options, args);
+
+    verbose = cmd.hasOption("v");
+    printValue = cmd.hasOption("p");
+    printKey = cmd.hasOption("e") || printValue;
+    shouldPrintMeta = cmd.hasOption("m");
+    printBlocks = cmd.hasOption("b");
+    checkRow = cmd.hasOption("k");
+    checkFamily = cmd.hasOption("a");
+
+    if (cmd.hasOption("f")) {
+      files.add(new Path(cmd.getOptionValue("f")));
+    }
+
+    if (cmd.hasOption("r")) {
+      String regionName = cmd.getOptionValue("r");
+      byte[] rn = Bytes.toBytes(regionName);
+      byte[][] hri = HRegionInfo.parseRegionName(rn);
+      Path rootDir = FSUtils.getRootDir(conf);
+      Path tableDir = new Path(rootDir, Bytes.toString(hri[0]));
+      String enc = HRegionInfo.encodeRegionName(rn);
+      Path regionDir = new Path(tableDir, enc);
+      if (verbose)
+        System.out.println("region dir -> " + regionDir);
+      List<Path> regionFiles = HFile.getStoreFiles(FileSystem.get(conf),
+          regionDir);
+      if (verbose)
+        System.out.println("Number of region files found -> "
+            + regionFiles.size());
+      if (verbose) {
+        int i = 1;
+        for (Path p : regionFiles) {
+          if (verbose)
+            System.out.println("Found file[" + i++ + "] -> " + p);
+        }
+      }
+      files.addAll(regionFiles);
+    }
+
+    return true;
+  }
+
+  /**
+   * Runs the command-line pretty-printer, and returns the desired command
+   * exit code (zero for success, non-zero for failure).
+   */
+  public int run(String[] args) {
+    conf = HBaseConfiguration.create();
+    conf.set("fs.defaultFS",
+        conf.get(org.apache.hadoop.hbase.HConstants.HBASE_DIR));
+
+    try {
+      if (!parseOptions(args))
+        return 1;
+    } catch (IOException ex) {
+      LOG.error("Error parsing command-line options", ex);
+      return 1;
+    } catch (ParseException ex) {
+      LOG.error("Error parsing command-line options", ex);
+      return 1;
+    }
+
+    // iterate over all files found
+    for (Path fileName : files) {
+      try {
+        processFile(fileName);
+      } catch (IOException ex) {
+        LOG.error("Error reading " + fileName, ex);
+      }
+    }
+
+    if (verbose || printKey) {
+      System.out.println("Scanned kv count -> " + count);
+    }
+
+    return 0;
+  }
+
+  private void processFile(Path file) throws IOException {
+    if (verbose)
+      System.out.println("Scanning -> " + file);
+    FileSystem fs = file.getFileSystem(conf);
+    if (!fs.exists(file)) {
+      System.err.println("ERROR, file doesnt exist: " + file);
+    }
+
+    HFile.Reader reader = HFile.createReader(fs, file, null, false, false);
+
+    Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
+
+    if (printKey || checkRow || checkFamily) {
+
+      // scan over file and read key/value's and check if requested
+      HFileScanner scanner = reader.getScanner(false, false, false);
+      scanner.seekTo();
+      scanKeysValues(file, count, scanner);
+    }
+
+    // print meta data
+    if (shouldPrintMeta) {
+      printMeta(reader, fileInfo);
+    }
+
+    if (printBlocks) {
+      System.out.println("Block Index:");
+      System.out.println(reader.getDataBlockIndexReader());
+    }
+
+    reader.close();
+  }
+
+  private void scanKeysValues(Path file, int count, HFileScanner scanner)
+      throws IOException {
+    KeyValue pkv = null;
+    do {
+      KeyValue kv = scanner.getKeyValue();
+      // dump key value
+      if (printKey) {
+        System.out.print("K: " + kv);
+        if (printValue) {
+          System.out.print(" V: " + Bytes.toStringBinary(kv.getValue()));
+        }
+        System.out.println();
+      }
+      // check if rows are in order
+      if (checkRow && pkv != null) {
+        if (Bytes.compareTo(pkv.getRow(), kv.getRow()) > 0) {
+          System.err.println("WARNING, previous row is greater then"
+              + " current row\n\tfilename -> " + file + "\n\tprevious -> "
+              + Bytes.toStringBinary(pkv.getKey()) + "\n\tcurrent  -> "
+              + Bytes.toStringBinary(kv.getKey()));
+        }
+      }
+      // check if families are consistent
+      if (checkFamily) {
+        String fam = Bytes.toString(kv.getFamily());
+        if (!file.toString().contains(fam)) {
+          System.err.println("WARNING, filename does not match kv family,"
+              + "\n\tfilename -> " + file + "\n\tkeyvalue -> "
+              + Bytes.toStringBinary(kv.getKey()));
+        }
+        if (pkv != null
+            && Bytes.compareTo(pkv.getFamily(), kv.getFamily()) != 0) {
+          System.err.println("WARNING, previous kv has different family"
+              + " compared to current key\n\tfilename -> " + file
+              + "\n\tprevious -> " + Bytes.toStringBinary(pkv.getKey())
+              + "\n\tcurrent  -> " + Bytes.toStringBinary(kv.getKey()));
+        }
+      }
+      pkv = kv;
+      ++count;
+    } while (scanner.next());
+  }
+
+  /**
+   * Format a string of the form "k1=v1, k2=v2, ..." into separate lines
+   * with a four-space indentation.
+   */
+  private static String asSeparateLines(String keyValueStr) {
+    return keyValueStr.replaceAll(", ([a-zA-Z]+=)",
+                                  ",\n" + FOUR_SPACES + "$1");
+  }
+
+  private void printMeta(HFile.Reader reader, Map<byte[], byte[]> fileInfo)
+      throws IOException {
+    System.out.println("Block index size as per heapsize: "
+        + reader.indexSize());
+    System.out.println(asSeparateLines(reader.toString()));
+    System.out.println("Trailer:\n    "
+        + asSeparateLines(reader.getTrailer().toString()));
+    System.out.println("Fileinfo:");
+    for (Map.Entry<byte[], byte[]> e : fileInfo.entrySet()) {
+      System.out.print(FOUR_SPACES + Bytes.toString(e.getKey()) + " = ");
+      if (Bytes.compareTo(e.getKey(), Bytes.toBytes("MAX_SEQ_ID_KEY")) == 0) {
+        long seqid = Bytes.toLong(e.getValue());
+        System.out.println(seqid);
+      } else if (Bytes.compareTo(e.getKey(), Bytes.toBytes("TIMERANGE")) == 0) {
+        TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
+        Writables.copyWritable(e.getValue(), timeRangeTracker);
+        System.out.println(timeRangeTracker.getMinimumTimestamp() + "...."
+            + timeRangeTracker.getMaximumTimestamp());
+      } else if (Bytes.compareTo(e.getKey(), FileInfo.AVG_KEY_LEN) == 0
+          || Bytes.compareTo(e.getKey(), FileInfo.AVG_VALUE_LEN) == 0) {
+        System.out.println(Bytes.toInt(e.getValue()));
+      } else {
+        System.out.println(Bytes.toStringBinary(e.getValue()));
+      }
+    }
+
+    System.out.println("Mid-key: " + Bytes.toStringBinary(reader.midkey()));
+
+    // Printing bloom information
+    DataInput bloomMeta = reader.getBloomFilterMetadata();
+    BloomFilter bloomFilter = null;
+    if (bloomMeta != null)
+      bloomFilter = BloomFilterFactory.createFromMeta(bloomMeta, reader);
+
+    System.out.println("Bloom filter:");
+    if (bloomFilter != null) {
+      System.out.println(FOUR_SPACES + bloomFilter.toString().replaceAll(
+          ByteBloomFilter.STATS_RECORD_SEP, "\n" + FOUR_SPACES));
+    } else {
+      System.out.println(FOUR_SPACES + "Not present");
+    }
+  }
+
+}

Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java?rev=1181555&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java Tue Oct 11 02:19:30 2011
@@ -0,0 +1,694 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
+import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
+import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.RawComparator;
+
+/**
+ * {@link HFile} reader for version 1.
+ */
+public class HFileReaderV1 extends AbstractHFileReader {
+  private static final Log LOG = LogFactory.getLog(HFileReaderV1.class);
+
+  private volatile boolean fileInfoLoaded = false;
+
+  /**
+   * Opens a HFile.  You must load the index before you can
+   * use it by calling {@link #loadFileInfo()}.
+   *
+   * @param fsdis input stream.  Caller is responsible for closing the passed
+   * stream.
+   * @param size Length of the stream.
+   * @param blockCache block cache. Pass null if none.
+   * @param inMemory whether blocks should be marked as in-memory in cache
+   * @param evictOnClose whether blocks in cache should be evicted on close
+   * @throws IOException
+   */
+  public HFileReaderV1(Path path, FixedFileTrailer trailer,
+      final FSDataInputStream fsdis, final long size,
+      final boolean closeIStream,
+      final BlockCache blockCache, final boolean inMemory,
+      final boolean evictOnClose) {
+    super(path, trailer, fsdis, size, closeIStream, blockCache, inMemory,
+        evictOnClose);
+
+    trailer.expectVersion(1);
+    fsBlockReader = new HFileBlock.FSReaderV1(fsdis, compressAlgo, fileSize);
+  }
+
+  private byte[] readAllIndex(final FSDataInputStream in,
+      final long indexOffset, final int indexSize) throws IOException {
+    byte[] allIndex = new byte[indexSize];
+    in.seek(indexOffset);
+    IOUtils.readFully(in, allIndex, 0, allIndex.length);
+
+    return allIndex;
+  }
+
+  /**
+   * Read in the index and file info.
+   *
+   * @return A map of fileinfo data.
+   * @see {@link Writer#appendFileInfo(byte[], byte[])}.
+   * @throws IOException
+   */
+  @Override
+  public FileInfo loadFileInfo() throws IOException {
+    if (fileInfoLoaded)
+      return fileInfo;
+
+    // Read in the fileinfo and get what we need from it.
+    istream.seek(trailer.getFileInfoOffset());
+    fileInfo = new FileInfo();
+    fileInfo.readFields(istream);
+    lastKey = fileInfo.get(FileInfo.LASTKEY);
+    avgKeyLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_KEY_LEN));
+    avgValueLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_VALUE_LEN));
+
+    // Comparator is stored in the file info in version 1.
+    String clazzName = Bytes.toString(fileInfo.get(FileInfo.COMPARATOR));
+    comparator = getComparator(clazzName);
+
+    dataBlockIndexReader =
+        new HFileBlockIndex.BlockIndexReader(comparator, 1);
+    metaBlockIndexReader =
+        new HFileBlockIndex.BlockIndexReader(Bytes.BYTES_RAWCOMPARATOR, 1);
+
+    int sizeToLoadOnOpen = (int) (fileSize - trailer.getLoadOnOpenDataOffset() -
+        trailer.getTrailerSize());
+    byte[] dataAndMetaIndex = readAllIndex(istream,
+        trailer.getLoadOnOpenDataOffset(), sizeToLoadOnOpen);
+
+    ByteArrayInputStream bis = new ByteArrayInputStream(dataAndMetaIndex);
+    DataInputStream dis = new DataInputStream(bis);
+
+    // Read in the data index.
+    if (trailer.getDataIndexCount() > 0)
+      BlockType.INDEX_V1.readAndCheck(dis);
+    dataBlockIndexReader.readRootIndex(dis, trailer.getDataIndexCount());
+
+    // Read in the metadata index.
+    if (trailer.getMetaIndexCount() > 0)
+      BlockType.INDEX_V1.readAndCheck(dis);
+    metaBlockIndexReader.readRootIndex(dis, trailer.getMetaIndexCount());
+
+    fileInfoLoaded = true;
+    return fileInfo;
+  }
+
+  /**
+   * Creates comparator from the given class name.
+   *
+   * @param clazzName the comparator class name read from the trailer
+   * @return an instance of the comparator to use
+   * @throws IOException in case comparator class name is invalid
+   */
+  @SuppressWarnings("unchecked")
+  private RawComparator<byte[]> getComparator(final String clazzName)
+  throws IOException {
+    if (clazzName == null || clazzName.length() == 0) {
+      return null;
+    }
+    try {
+      return (RawComparator<byte[]>)Class.forName(clazzName).newInstance();
+    } catch (InstantiationException e) {
+      throw new IOException(e);
+    } catch (IllegalAccessException e) {
+      throw new IOException(e);
+    } catch (ClassNotFoundException e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Create a Scanner on this file. No seeks or reads are done on creation. Call
+   * {@link HFileScanner#seekTo(byte[])} to position an start the read. There is
+   * nothing to clean up in a Scanner. Letting go of your references to the
+   * scanner is sufficient.
+   *
+   * @param cacheBlocks True if we should cache blocks read in by this scanner.
+   * @param pread Use positional read rather than seek+read if true (pread is
+   *          better for random reads, seek+read is better scanning).
+   * @param isCompaction is scanner being used for a compaction?
+   * @return Scanner on this file.
+   */
+  @Override
+  public HFileScanner getScanner(boolean cacheBlocks, final boolean pread,
+                                final boolean isCompaction) {
+    return new ScannerV1(this, cacheBlocks, pread, isCompaction);
+  }
+
+  /**
+   * @param key Key to search.
+   * @return Block number of the block containing the key or -1 if not in this
+   * file.
+   */
+  protected int blockContainingKey(final byte[] key, int offset, int length) {
+    if (dataBlockIndexReader.isEmpty()) {
+      throw new RuntimeException("Block index not loaded");
+    }
+    return dataBlockIndexReader.rootBlockContainingKey(key, offset, length);
+  }
+
+  /**
+   * @param metaBlockName
+   * @param cacheBlock Add block to cache, if found
+   * @return Block wrapped in a ByteBuffer
+   * @throws IOException
+   */
+  @Override
+  public ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock)
+      throws IOException {
+    if (trailer.getMetaIndexCount() == 0) {
+      return null; // there are no meta blocks
+    }
+    if (metaBlockIndexReader == null) {
+      throw new IOException("Meta index not loaded");
+    }
+
+    byte[] nameBytes = Bytes.toBytes(metaBlockName);
+    int block = metaBlockIndexReader.rootBlockContainingKey(nameBytes, 0,
+        nameBytes.length);
+    if (block == -1)
+      return null;
+    long offset = metaBlockIndexReader.getRootBlockOffset(block);
+    long nextOffset;
+    if (block == metaBlockIndexReader.getRootBlockCount() - 1) {
+      nextOffset = trailer.getFileInfoOffset();
+    } else {
+      nextOffset = metaBlockIndexReader.getRootBlockOffset(block + 1);
+    }
+
+    long now = System.currentTimeMillis();
+
+    String cacheKey = HFile.getBlockCacheKey(name, offset);
+
+    // Per meta key from any given file, synchronize reads for said block
+    synchronized (metaBlockIndexReader.getRootBlockKey(block)) {
+      metaLoads++;
+      HRegion.incrNumericMetric(this.fsMetaBlockReadCntMetric, 1);
+      // Check cache for block.  If found return.
+      if (blockCache != null) {
+        HFileBlock cachedBlock = (HFileBlock) blockCache.getBlock(cacheKey);
+        if (cachedBlock != null) {
+          cacheHits++;
+          HRegion.incrNumericMetric(this.fsMetaBlockReadCacheHitCntMetric, 1);
+          return cachedBlock.getBufferWithoutHeader();
+        }
+        // Cache Miss, please load.
+      }
+
+      HFileBlock hfileBlock = fsBlockReader.readBlockData(offset,
+          nextOffset - offset, metaBlockIndexReader.getRootBlockDataSize(block),
+          true);
+      hfileBlock.expectType(BlockType.META);
+
+      long delta = System.currentTimeMillis() - now;
+      HRegion.incrTimeVaryingMetric(this.fsReadTimeMetric, delta);
+      HFile.readTime += delta;
+      HFile.readOps++;
+
+      // Cache the block
+      if (cacheBlock && blockCache != null) {
+        blockCache.cacheBlock(cacheKey, hfileBlock, inMemory);
+      }
+
+      return hfileBlock.getBufferWithoutHeader();
+    }
+  }
+
+  /**
+   * Read in a file block.
+   * @param block Index of block to read.
+   * @param pread Use positional read instead of seek+read (positional is
+   * better doing random reads whereas seek+read is better scanning).
+   * @param isCompaction is this block being read as part of a compaction
+   * @return Block wrapped in a ByteBuffer.
+   * @throws IOException
+   */
+  ByteBuffer readBlockBuffer(int block, boolean cacheBlock,
+      final boolean pread, final boolean isCompaction) throws IOException {
+    if (dataBlockIndexReader == null) {
+      throw new IOException("Block index not loaded");
+    }
+    if (block < 0 || block >= dataBlockIndexReader.getRootBlockCount()) {
+      throw new IOException("Requested block is out of range: " + block +
+        ", max: " + dataBlockIndexReader.getRootBlockCount());
+    }
+
+    long offset = dataBlockIndexReader.getRootBlockOffset(block);
+    String cacheKey = HFile.getBlockCacheKey(name, offset);
+
+    // For any given block from any given file, synchronize reads for said
+    // block.
+    // Without a cache, this synchronizing is needless overhead, but really
+    // the other choice is to duplicate work (which the cache would prevent you
+    // from doing).
+    synchronized (dataBlockIndexReader.getRootBlockKey(block)) {
+      blockLoads++;
+
+      if (isCompaction) {
+        HRegion.incrNumericMetric(this.compactionBlockReadCntMetric, 1);
+      } else {
+        HRegion.incrNumericMetric(this.fsBlockReadCntMetric, 1);
+      }
+
+      // Check cache for block.  If found return.
+      if (blockCache != null) {
+        HFileBlock cachedBlock = (HFileBlock) blockCache.getBlock(cacheKey);
+        if (cachedBlock != null) {
+          cacheHits++;
+
+          if (isCompaction) {
+            HRegion.incrNumericMetric(
+                this.compactionBlockReadCacheHitCntMetric, 1);
+          } else {
+            HRegion.incrNumericMetric(
+                this.fsBlockReadCacheHitCntMetric, 1);
+          }
+
+          return cachedBlock.getBufferWithoutHeader();
+        }
+        // Carry on, please load.
+      }
+
+      // Load block from filesystem.
+      long now = System.currentTimeMillis();
+      long nextOffset;
+
+      if (block == dataBlockIndexReader.getRootBlockCount() - 1) {
+        // last block!  The end of data block is first meta block if there is
+        // one or if there isn't, the fileinfo offset.
+        nextOffset = (metaBlockIndexReader.getRootBlockCount() == 0) ?
+            this.trailer.getFileInfoOffset() :
+            metaBlockIndexReader.getRootBlockOffset(0);
+      } else {
+        nextOffset = dataBlockIndexReader.getRootBlockOffset(block + 1);
+      }
+
+      HFileBlock hfileBlock = fsBlockReader.readBlockData(offset, nextOffset
+          - offset, dataBlockIndexReader.getRootBlockDataSize(block), pread);
+      hfileBlock.expectType(BlockType.DATA);
+      ByteBuffer buf = hfileBlock.getBufferWithoutHeader();
+
+      long delta = System.currentTimeMillis() - now;
+      HFile.readTime += delta;
+      HFile.readOps++;
+      if (isCompaction) {
+        HRegion.incrTimeVaryingMetric(this.compactionReadTimeMetric, delta);
+      } else {
+        HRegion.incrTimeVaryingMetric(this.fsReadTimeMetric, delta);
+      }
+
+      // Cache the block
+      if (cacheBlock && blockCache != null) {
+        blockCache.cacheBlock(cacheKey, hfileBlock, inMemory);
+      }
+
+      return buf;
+    }
+  }
+
+  /**
+   * @return Last key in the file.  May be null if file has no entries.
+   * Note that this is not the last rowkey, but rather the byte form of
+   * the last KeyValue.
+   */
+  public byte[] getLastKey() {
+    if (!fileInfoLoaded) {
+      throw new RuntimeException("Load file info first");
+    }
+    return dataBlockIndexReader.isEmpty() ? null : lastKey;
+  }
+
+  /**
+   * @return Midkey for this file. We work with block boundaries only so
+   *         returned midkey is an approximation only.
+   *
+   * @throws IOException
+   */
+  @Override
+  public byte[] midkey() throws IOException {
+    if (!isFileInfoLoaded() || dataBlockIndexReader.isEmpty()) {
+      return null;
+    }
+    return dataBlockIndexReader.midkey();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (evictOnClose && this.blockCache != null) {
+      int numEvicted = 0;
+      for (int i = 0; i < dataBlockIndexReader.getRootBlockCount(); i++) {
+        if (blockCache.evictBlock(HFile.getBlockCacheKey(name,
+            dataBlockIndexReader.getRootBlockOffset(i))))
+          numEvicted++;
+      }
+      LOG.debug("On close of file " + name + " evicted " + numEvicted
+          + " block(s) of " + dataBlockIndexReader.getRootBlockCount()
+          + " total blocks");
+    }
+    if (this.closeIStream && this.istream != null) {
+      this.istream.close();
+      this.istream = null;
+    }
+  }
+
+  /**
+   * Implementation of {@link HFileScanner} interface.
+   */
+  protected static class ScannerV1 extends AbstractHFileReader.Scanner {
+    private final HFileReaderV1 reader;
+    private int currBlock;
+
+    public ScannerV1(HFileReaderV1 reader, boolean cacheBlocks,
+        final boolean pread, final boolean isCompaction) {
+      this.reader = reader;
+      this.cacheBlocks = cacheBlocks;
+      this.pread = pread;
+      this.isCompaction = isCompaction;
+    }
+
+    @Override
+    public KeyValue getKeyValue() {
+      if (blockBuffer == null) {
+        return null;
+      }
+      return new KeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
+          + blockBuffer.position() - 8);
+    }
+
+    @Override
+    public ByteBuffer getKey() {
+      if (blockBuffer == null || currKeyLen == 0) {
+        throw new RuntimeException(
+            "you need to seekTo() before calling getKey()");
+      }
+      ByteBuffer keyBuff = blockBuffer.slice();
+      keyBuff.limit(currKeyLen);
+      keyBuff.rewind();
+      // Do keyBuff.asReadOnly()?
+      return keyBuff;
+    }
+
+    @Override
+    public ByteBuffer getValue() {
+      if (blockBuffer == null || currKeyLen == 0) {
+        throw new RuntimeException(
+            "you need to seekTo() before calling getValue()");
+      }
+
+      // TODO: Could this be done with one ByteBuffer rather than create two?
+      ByteBuffer valueBuff = blockBuffer.slice();
+      valueBuff.position(currKeyLen);
+      valueBuff = valueBuff.slice();
+      valueBuff.limit(currValueLen);
+      valueBuff.rewind();
+      return valueBuff;
+    }
+
+    @Override
+    public boolean next() throws IOException {
+      // LOG.deug("rem:" + block.remaining() + " p:" + block.position() +
+      // " kl: " + currKeyLen + " kv: " + currValueLen);
+      if (blockBuffer == null) {
+        throw new IOException("Next called on non-seeked scanner");
+      }
+
+      try {
+        blockBuffer.position(blockBuffer.position() + currKeyLen
+            + currValueLen);
+      } catch (IllegalArgumentException e) {
+        LOG.error("Current pos = " + blockBuffer.position() +
+                  "; currKeyLen = " + currKeyLen +
+                  "; currValLen = " + currValueLen +
+                  "; block limit = " + blockBuffer.limit() +
+                  "; HFile name = " + reader.getName() +
+                  "; currBlock id = " + currBlock);
+        throw e;
+      }
+      if (blockBuffer.remaining() <= 0) {
+        // LOG.debug("Fetch next block");
+        currBlock++;
+        if (currBlock >= reader.getDataBlockIndexReader().getRootBlockCount()) {
+          // damn we are at the end
+          currBlock = 0;
+          blockBuffer = null;
+          return false;
+        }
+        blockBuffer = reader.readBlockBuffer(currBlock, cacheBlocks, pread,
+            isCompaction);
+        currKeyLen = blockBuffer.getInt();
+        currValueLen = blockBuffer.getInt();
+        blockFetches++;
+        return true;
+      }
+
+      currKeyLen = blockBuffer.getInt();
+      currValueLen = blockBuffer.getInt();
+      return true;
+    }
+
+    @Override
+    public int seekTo(byte[] key) throws IOException {
+      return seekTo(key, 0, key.length);
+    }
+
+    @Override
+    public int seekTo(byte[] key, int offset, int length) throws IOException {
+      int b = reader.blockContainingKey(key, offset, length);
+      if (b < 0) return -1; // falls before the beginning of the file! :-(
+      // Avoid re-reading the same block (that'd be dumb).
+      loadBlock(b, true);
+      return blockSeek(key, offset, length, false);
+    }
+
+    @Override
+    public int reseekTo(byte[] key) throws IOException {
+      return reseekTo(key, 0, key.length);
+    }
+
+    @Override
+    public int reseekTo(byte[] key, int offset, int length)
+        throws IOException {
+      if (blockBuffer != null && currKeyLen != 0) {
+        ByteBuffer bb = getKey();
+        int compared = reader.getComparator().compare(key, offset,
+            length, bb.array(), bb.arrayOffset(), bb.limit());
+        if (compared < 1) {
+          // If the required key is less than or equal to current key, then
+          // don't do anything.
+          return compared;
+        }
+      }
+
+      int b = reader.blockContainingKey(key, offset, length);
+      if (b < 0) {
+        return -1;
+      }
+      loadBlock(b, false);
+      return blockSeek(key, offset, length, false);
+    }
+
+    /**
+     * Within a loaded block, seek looking for the first key
+     * that is smaller than (or equal to?) the key we are interested in.
+     *
+     * A note on the seekBefore - if you have seekBefore = true, AND the
+     * first key in the block = key, then you'll get thrown exceptions.
+     * @param key to find
+     * @param seekBefore find the key before the exact match.
+     * @return
+     */
+    private int blockSeek(byte[] key, int offset, int length,
+        boolean seekBefore) {
+      int klen, vlen;
+      int lastLen = 0;
+      do {
+        klen = blockBuffer.getInt();
+        vlen = blockBuffer.getInt();
+        int comp = reader.getComparator().compare(key, offset, length,
+            blockBuffer.array(),
+            blockBuffer.arrayOffset() + blockBuffer.position(), klen);
+        if (comp == 0) {
+          if (seekBefore) {
+            blockBuffer.position(blockBuffer.position() - lastLen - 16);
+            currKeyLen = blockBuffer.getInt();
+            currValueLen = blockBuffer.getInt();
+            return 1; // non exact match.
+          }
+          currKeyLen = klen;
+          currValueLen = vlen;
+          return 0; // indicate exact match
+        }
+        if (comp < 0) {
+          // go back one key:
+          blockBuffer.position(blockBuffer.position() - lastLen - 16);
+          currKeyLen = blockBuffer.getInt();
+          currValueLen = blockBuffer.getInt();
+          return 1;
+        }
+        blockBuffer.position(blockBuffer.position() + klen + vlen);
+        lastLen = klen + vlen;
+      } while (blockBuffer.remaining() > 0);
+
+      // ok we are at the end, so go back a littleeeeee....
+      // The 8 in the below is intentionally different to the 16s in the above
+      // Do the math you you'll figure it.
+      blockBuffer.position(blockBuffer.position() - lastLen - 8);
+      currKeyLen = blockBuffer.getInt();
+      currValueLen = blockBuffer.getInt();
+      return 1; // didn't exactly find it.
+    }
+
+    @Override
+    public boolean seekBefore(byte[] key) throws IOException {
+      return seekBefore(key, 0, key.length);
+    }
+
+    @Override
+    public boolean seekBefore(byte[] key, int offset, int length)
+    throws IOException {
+      int b = reader.blockContainingKey(key, offset, length);
+      if (b < 0)
+        return false; // key is before the start of the file.
+
+      // Question: does this block begin with 'key'?
+      byte[] firstkKey = reader.getDataBlockIndexReader().getRootBlockKey(b);
+      if (reader.getComparator().compare(firstkKey, 0, firstkKey.length,
+          key, offset, length) == 0) {
+        // Ok the key we're interested in is the first of the block, so go back
+        // by one.
+        if (b == 0) {
+          // we have a 'problem', the key we want is the first of the file.
+          return false;
+        }
+        b--;
+        // TODO shortcut: seek forward in this block to the last key of the
+        // block.
+      }
+      loadBlock(b, true);
+      blockSeek(key, offset, length, true);
+      return true;
+    }
+
+    @Override
+    public String getKeyString() {
+      return Bytes.toStringBinary(blockBuffer.array(),
+          blockBuffer.arrayOffset() + blockBuffer.position(), currKeyLen);
+    }
+
+    @Override
+    public String getValueString() {
+      return Bytes.toString(blockBuffer.array(), blockBuffer.arrayOffset() +
+        blockBuffer.position() + currKeyLen, currValueLen);
+    }
+
+    @Override
+    public Reader getReader() {
+      return reader;
+    }
+
+    @Override
+    public boolean seekTo() throws IOException {
+      if (reader.getDataBlockIndexReader().isEmpty()) {
+        return false;
+      }
+      if (blockBuffer != null && currBlock == 0) {
+        blockBuffer.rewind();
+        currKeyLen = blockBuffer.getInt();
+        currValueLen = blockBuffer.getInt();
+        return true;
+      }
+      currBlock = 0;
+      blockBuffer = reader.readBlockBuffer(currBlock, cacheBlocks, pread,
+          isCompaction);
+      currKeyLen = blockBuffer.getInt();
+      currValueLen = blockBuffer.getInt();
+      blockFetches++;
+      return true;
+    }
+
+    private void loadBlock(int bloc, boolean rewind) throws IOException {
+      if (blockBuffer == null) {
+        blockBuffer = reader.readBlockBuffer(bloc, cacheBlocks, pread,
+            isCompaction);
+        currBlock = bloc;
+        blockFetches++;
+      } else {
+        if (bloc != currBlock) {
+          blockBuffer = reader.readBlockBuffer(bloc, cacheBlocks, pread,
+              isCompaction);
+          currBlock = bloc;
+          blockFetches++;
+        } else {
+          // we are already in the same block, just rewind to seek again.
+          if (rewind) {
+            blockBuffer.rewind();
+          }
+          else {
+            // Go back by (size of rowlength + size of valuelength) = 8 bytes
+            blockBuffer.position(blockBuffer.position()-8);
+          }
+        }
+      }
+    }
+
+  }
+
+  @Override
+  public HFileBlock readBlock(long offset, int onDiskBlockSize,
+      boolean cacheBlock, boolean pread, boolean isCompaction) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public DataInput getBloomFilterMetadata() throws IOException {
+    ByteBuffer buf = getMetaBlock(HFileWriterV1.BLOOM_FILTER_META_KEY, false);
+    if (buf == null)
+      return null;
+    ByteArrayInputStream bais = new ByteArrayInputStream(buf.array(),
+        buf.arrayOffset(), buf.limit());
+    return new DataInputStream(bais);
+  }
+
+  @Override
+  public boolean isFileInfoLoaded() {
+    return fileInfoLoaded;
+  }
+
+}

Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java?rev=1181555&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java Tue Oct 11 02:19:30 2011
@@ -0,0 +1,721 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.IdLock;
+
+/**
+ * {@link HFile} reader for version 2.
+ */
+public class HFileReaderV2 extends AbstractHFileReader implements
+    HFileBlock.BasicReader {
+
+  private static final Log LOG = LogFactory.getLog(HFileReaderV2.class);
+
+  /**
+   * The size of a (key length, value length) tuple that prefixes each entry in
+   * a data block.
+   */
+  private static final int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT;
+
+  /**
+   * A "sparse lock" implementation allowing to lock on a particular block
+   * identified by offset. The purpose of this is to avoid two clients loading
+   * the same block, and have all but one client wait to get the block from the
+   * cache.
+   */
+  private IdLock offsetLock = new IdLock();
+
+  /**
+   * Blocks read from the load-on-open section, excluding data root index, meta
+   * index, and file info.
+   */
+  private List<HFileBlock> loadOnOpenBlocks = new ArrayList<HFileBlock>();
+
+  /**
+   * Opens a HFile. You must load the index before you can use it by calling
+   * {@link #loadFileInfo()}.
+   *
+   * @param fsdis input stream. Caller is responsible for closing the passed
+   *          stream.
+   * @param size Length of the stream.
+   * @param blockCache block cache. Pass null if none.
+   * @param inMemory whether blocks should be marked as in-memory in cache
+   * @param evictOnClose whether blocks in cache should be evicted on close
+   * @throws IOException
+   */
+  public HFileReaderV2(Path path, FixedFileTrailer trailer,
+      final FSDataInputStream fsdis, final long size,
+      final boolean closeIStream, final BlockCache blockCache,
+      final boolean inMemory, final boolean evictOnClose) throws IOException {
+    super(path, trailer, fsdis, size, closeIStream, blockCache, inMemory,
+        evictOnClose);
+
+    trailer.expectVersion(2);
+    fsBlockReader = new HFileBlock.FSReaderV2(fsdis, compressAlgo,
+        fileSize);
+
+    // Comparator class name is stored in the trailer in version 2.
+    comparator = trailer.createComparator();
+    dataBlockIndexReader = new HFileBlockIndex.BlockIndexReader(comparator,
+        trailer.getNumDataIndexLevels(), this);
+    metaBlockIndexReader = new HFileBlockIndex.BlockIndexReader(
+        Bytes.BYTES_RAWCOMPARATOR, 1);
+
+    // Parse load-on-open data.
+
+    HFileBlock.BlockIterator blockIter = fsBlockReader.blockRange(
+        trailer.getLoadOnOpenDataOffset(),
+        fileSize - trailer.getTrailerSize());
+
+    // Data index. We also read statistics about the block index written after
+    // the root level.
+    dataBlockIndexReader.readMultiLevelIndexRoot(
+        blockIter.nextBlockAsStream(BlockType.ROOT_INDEX),
+        trailer.getDataIndexCount());
+
+    // Meta index.
+    metaBlockIndexReader.readRootIndex(
+        blockIter.nextBlockAsStream(BlockType.ROOT_INDEX),
+        trailer.getMetaIndexCount());
+
+    // File info
+    fileInfo = new FileInfo();
+    fileInfo.readFields(blockIter.nextBlockAsStream(BlockType.FILE_INFO));
+    lastKey = fileInfo.get(FileInfo.LASTKEY);
+    avgKeyLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_KEY_LEN));
+    avgValueLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_VALUE_LEN));
+
+    // Store all other load-on-open blocks for further consumption.
+    HFileBlock b;
+    while ((b = blockIter.nextBlock()) != null) {
+      loadOnOpenBlocks.add(b);
+    }
+  }
+
+  /**
+   * Create a Scanner on this file. No seeks or reads are done on creation. Call
+   * {@link HFileScanner#seekTo(byte[])} to position an start the read. There is
+   * nothing to clean up in a Scanner. Letting go of your references to the
+   * scanner is sufficient.
+   *
+   * @param cacheBlocks True if we should cache blocks read in by this scanner.
+   * @param pread Use positional read rather than seek+read if true (pread is
+   *          better for random reads, seek+read is better scanning).
+   * @param isCompaction is scanner being used for a compaction?
+   * @return Scanner on this file.
+   */
+  public HFileScanner getScanner(boolean cacheBlocks, final boolean pread,
+      final boolean isCompaction) {
+    return new ScannerV2(this, cacheBlocks, pread, isCompaction);
+  }
+
+  /**
+   * @param metaBlockName
+   * @param cacheBlock Add block to cache, if found
+   * @return block wrapped in a ByteBuffer, with header skipped
+   * @throws IOException
+   */
+  public ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock)
+      throws IOException {
+    if (trailer.getMetaIndexCount() == 0) {
+      return null; // there are no meta blocks
+    }
+    if (metaBlockIndexReader == null) {
+      throw new IOException("Meta index not loaded");
+    }
+
+    byte[] mbname = Bytes.toBytes(metaBlockName);
+    int block = metaBlockIndexReader.rootBlockContainingKey(mbname, 0,
+        mbname.length);
+    if (block == -1)
+      return null;
+    long blockSize = metaBlockIndexReader.getRootBlockDataSize(block);
+    long now = System.currentTimeMillis();
+
+    // Per meta key from any given file, synchronize reads for said block. This
+    // is OK to do for meta blocks because the meta block index is always
+    // single-level.
+    synchronized (metaBlockIndexReader.getRootBlockKey(block)) {
+      metaLoads++;
+      HRegion.incrNumericMetric(fsMetaBlockReadCntMetric, 1);
+
+      // Check cache for block. If found return.
+      long metaBlockOffset = metaBlockIndexReader.getRootBlockOffset(block);
+      String cacheKey = HFile.getBlockCacheKey(name, metaBlockOffset);
+
+      if (blockCache != null) {
+        HFileBlock cachedBlock = (HFileBlock) blockCache.getBlock(cacheKey);
+        if (cachedBlock != null) {
+          // Return a distinct 'shallow copy' of the block,
+          // so pos does not get messed by the scanner
+          cacheHits++;
+          HRegion.incrNumericMetric(fsMetaBlockReadCacheHitCntMetric, 1);
+          return cachedBlock.getBufferWithoutHeader();
+        }
+        // Cache Miss, please load.
+      }
+
+      HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset,
+          blockSize, -1, true);
+
+      long delta = System.currentTimeMillis() - now;
+      HRegion.incrTimeVaryingMetric(fsReadTimeMetric, delta);
+      HFile.readTime += delta;
+      HFile.readOps++;
+
+      // Cache the block
+      if (cacheBlock && blockCache != null) {
+        blockCache.cacheBlock(cacheKey, metaBlock, inMemory);
+      }
+
+      return metaBlock.getBufferWithoutHeader();
+    }
+  }
+
+  @Override
+  public HFileBlock readBlockData(long offset, long onDiskSize,
+      int uncompressedSize, boolean pread) throws IOException {
+    if (onDiskSize >= Integer.MAX_VALUE) {
+      throw new IOException("Invalid on-disk size: " + onDiskSize);
+    }
+    return readBlock(offset, (int) onDiskSize, true, pread, false);
+  }
+
+  /**
+   * Read in a file block.
+   *
+   * @param dataBlockOffset offset to read.
+   * @param onDiskSize size of the block
+   * @param pread Use positional read instead of seek+read (positional is better
+   *          doing random reads whereas seek+read is better scanning).
+   * @param isCompaction is this block being read as part of a compaction
+   * @return Block wrapped in a ByteBuffer.
+   * @throws IOException
+   */
+  public HFileBlock readBlock(long dataBlockOffset, int onDiskBlockSize,
+      boolean cacheBlock, final boolean pread, final boolean isCompaction)
+      throws IOException {
+    if (dataBlockIndexReader == null) {
+      throw new IOException("Block index not loaded");
+    }
+    if (dataBlockOffset < 0
+        || dataBlockOffset >= trailer.getLoadOnOpenDataOffset()) {
+      throw new IOException("Requested block is out of range: "
+          + dataBlockOffset + ", lastDataBlockOffset: "
+          + trailer.getLastDataBlockOffset());
+    }
+    // For any given block from any given file, synchronize reads for said
+    // block.
+    // Without a cache, this synchronizing is needless overhead, but really
+    // the other choice is to duplicate work (which the cache would prevent you
+    // from doing).
+
+    String cacheKey = HFile.getBlockCacheKey(name, dataBlockOffset);
+    IdLock.Entry lockEntry = offsetLock.getLockEntry(dataBlockOffset);
+    try {
+      blockLoads++;
+
+      if (isCompaction) {
+        HRegion.incrNumericMetric(compactionBlockReadCntMetric, 1);
+      } else {
+        HRegion.incrNumericMetric(fsBlockReadCntMetric, 1);
+      }
+
+      // Check cache for block. If found return.
+      if (blockCache != null) {
+        HFileBlock cachedBlock = (HFileBlock) blockCache.getBlock(cacheKey);
+        if (cachedBlock != null) {
+          cacheHits++;
+
+          if (isCompaction) {
+            HRegion.incrNumericMetric(
+                compactionBlockReadCacheHitCntMetric, 1);
+          } else {
+            HRegion.incrNumericMetric(fsBlockReadCacheHitCntMetric, 1);
+          }
+          return cachedBlock;
+        }
+        // Carry on, please load.
+      }
+
+      // Load block from filesystem.
+      long now = System.currentTimeMillis();
+      HFileBlock dataBlock = fsBlockReader.readBlockData(dataBlockOffset,
+          onDiskBlockSize, -1, true);
+
+      long delta = System.currentTimeMillis() - now;
+      HFile.readTime += delta;
+      HFile.readOps++;
+      if (isCompaction) {
+        HRegion.incrTimeVaryingMetric(compactionReadTimeMetric, delta);
+      } else {
+        HRegion.incrTimeVaryingMetric(fsReadTimeMetric, delta);
+      }
+
+      // Cache the block
+      if (cacheBlock && blockCache != null) {
+        blockCache.cacheBlock(cacheKey, dataBlock, inMemory);
+      }
+
+      return dataBlock;
+    } finally {
+      offsetLock.releaseLockEntry(lockEntry);
+    }
+  }
+
+  /**
+   * @return Last key in the file. May be null if file has no entries. Note that
+   *         this is not the last row key, but rather the byte form of the last
+   *         KeyValue.
+   */
+  @Override
+  public byte[] getLastKey() {
+    return dataBlockIndexReader.isEmpty() ? null : lastKey;
+  }
+
+  /**
+   * @return Midkey for this file. We work with block boundaries only so
+   *         returned midkey is an approximation only.
+   * @throws IOException
+   */
+  @Override
+  public byte[] midkey() throws IOException {
+    return dataBlockIndexReader.midkey();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (evictOnClose && blockCache != null) {
+      int numEvicted = blockCache.evictBlocksByPrefix(name
+          + HFile.CACHE_KEY_SEPARATOR);
+      LOG.debug("On close of file " + name + " evicted " + numEvicted
+          + " block(s)");
+    }
+    if (closeIStream && istream != null) {
+      istream.close();
+      istream = null;
+    }
+  }
+
+  /**
+   * Implementation of {@link HFileScanner} interface.
+   */
+  protected static class ScannerV2 extends AbstractHFileReader.Scanner {
+    private HFileBlock block;
+
+    public ScannerV2(HFileReaderV2 r, boolean cacheBlocks,
+        final boolean pread, final boolean isCompaction) {
+      this.reader = r;
+      this.cacheBlocks = cacheBlocks;
+      this.pread = pread;
+      this.isCompaction = isCompaction;
+    }
+
+    @Override
+    public KeyValue getKeyValue() {
+      if (!isSeeked())
+        return null;
+
+      return new KeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
+          + blockBuffer.position());
+    }
+
+    @Override
+    public ByteBuffer getKey() {
+      assertSeeked();
+      return ByteBuffer.wrap(
+          blockBuffer.array(),
+          blockBuffer.arrayOffset() + blockBuffer.position()
+              + KEY_VALUE_LEN_SIZE, currKeyLen).slice();
+    }
+
+    @Override
+    public ByteBuffer getValue() {
+      assertSeeked();
+      return ByteBuffer.wrap(
+          blockBuffer.array(),
+          blockBuffer.arrayOffset() + blockBuffer.position()
+              + KEY_VALUE_LEN_SIZE + currKeyLen, currValueLen).slice();
+    }
+
+    private void setNonSeekedState() {
+      block = null;
+      blockBuffer = null;
+      currKeyLen = 0;
+      currValueLen = 0;
+    }
+
+    /**
+     * Go to the next key/value in the block section. Loads the next block if
+     * necessary. If successful, {@link #getKey()} and {@link #getValue()} can
+     * be called.
+     *
+     * @return true if successfully navigated to the next key/value
+     */
+    @Override
+    public boolean next() throws IOException {
+      assertSeeked();
+
+      try {
+        blockBuffer.position(blockBuffer.position() + KEY_VALUE_LEN_SIZE
+            + currKeyLen + currValueLen);
+      } catch (IllegalArgumentException e) {
+        LOG.error("Current pos = " + blockBuffer.position()
+            + "; currKeyLen = " + currKeyLen + "; currValLen = "
+            + currValueLen + "; block limit = " + blockBuffer.limit()
+            + "; HFile name = " + reader.getName()
+            + "; currBlock currBlockOffset = " + block.getOffset());
+        throw e;
+      }
+
+      if (blockBuffer.remaining() <= 0) {
+        long lastDataBlockOffset =
+            reader.getTrailer().getLastDataBlockOffset();
+
+        if (block.getOffset() >= lastDataBlockOffset) {
+          setNonSeekedState();
+          return false;
+        }
+
+        // read the next block
+        HFileBlock nextBlock = readNextDataBlock();
+        if (nextBlock == null) {
+          setNonSeekedState();
+          return false;
+        }
+
+        updateCurrBlock(nextBlock);
+        return true;
+      }
+
+      // We are still in the same block.
+      readKeyValueLen();
+      return true;
+    }
+
+    /**
+     * Scans blocks in the "scanned" section of the {@link HFile} until the next
+     * data block is found.
+     *
+     * @return the next block, or null if there are no more data blocks
+     * @throws IOException
+     */
+    private HFileBlock readNextDataBlock() throws IOException {
+      long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset();
+      if (block == null)
+        return null;
+
+      HFileBlock curBlock = block;
+
+      do {
+        if (curBlock.getOffset() >= lastDataBlockOffset)
+          return null;
+
+        if (curBlock.getOffset() < 0) {
+          throw new IOException("Invalid block file offset: " + block);
+        }
+        curBlock = reader.readBlock(curBlock.getOffset()
+            + curBlock.getOnDiskSizeWithHeader(),
+            curBlock.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread,
+            false);
+      } while (!curBlock.getBlockType().equals(BlockType.DATA));
+
+      return curBlock;
+    }
+
+    /**
+     * Positions this scanner at the start of the file.
+     *
+     * @return false if empty file; i.e. a call to next would return false and
+     *         the current key and value are undefined.
+     * @throws IOException
+     */
+    @Override
+    public boolean seekTo() throws IOException {
+      if (reader == null) {
+        return false;
+      }
+
+      if (reader.getTrailer().getEntryCount() == 0) {
+        // No data blocks.
+        return false;
+      }
+
+      long firstDataBlockOffset =
+          reader.getTrailer().getFirstDataBlockOffset();
+      if (block != null && block.getOffset() == firstDataBlockOffset) {
+        blockBuffer.rewind();
+        readKeyValueLen();
+        return true;
+      }
+
+      block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
+          isCompaction);
+      if (block.getOffset() < 0) {
+        throw new IOException("Invalid block offset: " + block.getOffset());
+      }
+      updateCurrBlock(block);
+      return true;
+    }
+
+    @Override
+    public int seekTo(byte[] key) throws IOException {
+      return seekTo(key, 0, key.length);
+    }
+
+    @Override
+    public int seekTo(byte[] key, int offset, int length) throws IOException {
+      HFileBlock seekToBlock =
+          ((HFileReaderV2) reader).getDataBlockIndexReader().seekToDataBlock(
+              key, offset, length, block);
+      if (seekToBlock == null) {
+        // This happens if the key e.g. falls before the beginning of the file.
+        return -1;
+      }
+      return loadBlockAndSeekToKey(seekToBlock, true, key, offset, length,
+          false);
+    }
+
+    @Override
+    public int reseekTo(byte[] key) throws IOException {
+      return reseekTo(key, 0, key.length);
+    }
+
+    @Override
+    public int reseekTo(byte[] key, int offset, int length) throws IOException {
+      if (isSeeked()) {
+        ByteBuffer bb = getKey();
+        int compared = reader.getComparator().compare(key, offset,
+            length, bb.array(), bb.arrayOffset(), bb.limit());
+        if (compared < 1) {
+          // If the required key is less than or equal to current key, then
+          // don't do anything.
+          return compared;
+        }
+      }
+      return seekTo(key, offset, length);
+    }
+
+    private int loadBlockAndSeekToKey(HFileBlock seekToBlock, boolean rewind,
+        byte[] key, int offset, int length, boolean seekBefore)
+        throws IOException {
+      if (block == null || block.getOffset() != seekToBlock.getOffset()) {
+        updateCurrBlock(seekToBlock);
+      } else if (rewind) {
+        blockBuffer.rewind();
+      }
+      return blockSeek(key, offset, length, seekBefore);
+    }
+
+    /**
+     * Updates the current block to be the given {@link HFileBlock}. Seeks to
+     * the the first key/value pair.
+     *
+     * @param newBlock the block to make current
+     */
+    private void updateCurrBlock(HFileBlock newBlock) {
+      block = newBlock;
+      blockBuffer = block.getBufferWithoutHeader();
+      readKeyValueLen();
+      blockFetches++;
+    }
+
+    private final void readKeyValueLen() {
+      blockBuffer.mark();
+      currKeyLen = blockBuffer.getInt();
+      currValueLen = blockBuffer.getInt();
+      blockBuffer.reset();
+
+      if (currKeyLen < 0 || currValueLen < 0
+          || currKeyLen > blockBuffer.limit()
+          || currValueLen > blockBuffer.limit()) {
+        throw new IllegalStateException("Invalid currKeyLen " + currKeyLen
+            + " or currValueLen " + currValueLen + ". Block offset: "
+            + block.getOffset() + ", block length: " + blockBuffer.limit()
+            + ", position: " + blockBuffer.position() + " (without header).");
+      }
+    }
+
+    /**
+     * Within a loaded block, seek looking for the first key that is smaller
+     * than (or equal to?) the key we are interested in.
+     *
+     * A note on the seekBefore: if you have seekBefore = true, AND the first
+     * key in the block = key, then you'll get thrown exceptions. The caller has
+     * to check for that case and load the previous block as appropriate.
+     *
+     * @param key the key to find
+     * @param seekBefore find the key before the given key in case of exact
+     *          match.
+     * @return 0 in case of an exact key match, 1 in case of an inexact match
+     */
+    private int blockSeek(byte[] key, int offset, int length,
+        boolean seekBefore) {
+      int klen, vlen;
+      int lastKeyValueSize = -1;
+      do {
+        blockBuffer.mark();
+        klen = blockBuffer.getInt();
+        vlen = blockBuffer.getInt();
+        blockBuffer.reset();
+
+        int keyOffset = blockBuffer.arrayOffset() + blockBuffer.position()
+            + KEY_VALUE_LEN_SIZE;
+        int comp = reader.getComparator().compare(key, offset, length,
+            blockBuffer.array(), keyOffset, klen);
+
+        if (comp == 0) {
+          if (seekBefore) {
+            if (lastKeyValueSize < 0) {
+              throw new IllegalStateException("blockSeek with seekBefore "
+                  + "at the first key of the block: key="
+                  + Bytes.toStringBinary(key) + ", blockOffset="
+                  + block.getOffset() + ", onDiskSize="
+                  + block.getOnDiskSizeWithHeader());
+            }
+            blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
+            readKeyValueLen();
+            return 1; // non exact match.
+          }
+          currKeyLen = klen;
+          currValueLen = vlen;
+          return 0; // indicate exact match
+        }
+
+        if (comp < 0) {
+          if (lastKeyValueSize > 0)
+            blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
+          readKeyValueLen();
+          return 1;
+        }
+
+        // The size of this key/value tuple, including key/value length fields.
+        lastKeyValueSize = klen + vlen + KEY_VALUE_LEN_SIZE;
+        blockBuffer.position(blockBuffer.position() + lastKeyValueSize);
+      } while (blockBuffer.remaining() > 0);
+
+      // Seek to the last key we successfully read. This will happen if this is
+      // the last key/value pair in the file, in which case the following call
+      // to next() has to return false.
+      blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
+      readKeyValueLen();
+      return 1; // didn't exactly find it.
+    }
+
+    @Override
+    public boolean seekBefore(byte[] key) throws IOException {
+      return seekBefore(key, 0, key.length);
+    }
+
+    private ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
+      ByteBuffer buffer = curBlock.getBufferWithoutHeader();
+      // It is safe to manipulate this buffer because we own the buffer object.
+      buffer.rewind();
+      int klen = buffer.getInt();
+      buffer.getInt();
+      ByteBuffer keyBuff = buffer.slice();
+      keyBuff.limit(klen);
+      keyBuff.rewind();
+      return keyBuff;
+    }
+
+    @Override
+    public boolean seekBefore(byte[] key, int offset, int length)
+        throws IOException {
+      HFileReaderV2 reader2 = (HFileReaderV2) reader;
+      HFileBlock seekToBlock =
+          reader2.getDataBlockIndexReader().seekToDataBlock(
+              key, offset, length, block);
+      if (seekToBlock == null) {
+        return false;
+      }
+      ByteBuffer firstKey = getFirstKeyInBlock(seekToBlock);
+      if (reader.getComparator().compare(firstKey.array(),
+          firstKey.arrayOffset(), firstKey.limit(), key, offset, length) == 0)
+      {
+        long previousBlockOffset = seekToBlock.getPrevBlockOffset();
+        // The key we are interested in
+        if (previousBlockOffset == -1) {
+          // we have a 'problem', the key we want is the first of the file.
+          return false;
+        }
+
+        // It is important that we compute and pass onDiskSize to the block
+        // reader so that it does not have to read the header separately to
+        // figure out the size.
+        seekToBlock = reader2.fsBlockReader.readBlockData(previousBlockOffset,
+            seekToBlock.getOffset() - previousBlockOffset, -1, pread);
+
+        // TODO shortcut: seek forward in this block to the last key of the
+        // block.
+      }
+      loadBlockAndSeekToKey(seekToBlock, true, key, offset, length, true);
+      return true;
+    }
+
+    @Override
+    public String getKeyString() {
+      return Bytes.toStringBinary(blockBuffer.array(),
+          blockBuffer.arrayOffset() + blockBuffer.position()
+              + KEY_VALUE_LEN_SIZE, currKeyLen);
+    }
+
+    @Override
+    public String getValueString() {
+      return Bytes.toString(blockBuffer.array(), blockBuffer.arrayOffset()
+          + blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen,
+          currValueLen);
+    }
+
+  }
+
+  /**
+   * Returns a buffer with the Bloom filter metadata. The caller takes
+   * ownership of the buffer.
+   */
+  @Override
+  public DataInput getBloomFilterMetadata() throws IOException {
+    for (HFileBlock b : loadOnOpenBlocks)
+      if (b.getBlockType() == BlockType.BLOOM_META)
+        return b.getByteStream();
+    return null;
+  }
+
+  @Override
+  public boolean isFileInfoLoaded() {
+    return true; // We load file info in constructor in version 2.
+  }
+
+}

Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java?rev=1181555&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java Tue Oct 11 02:19:30 2011
@@ -0,0 +1,484 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KeyComparator;
+import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
+import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
+import org.apache.hadoop.hbase.util.BloomFilterWriter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.Compressor;
+
+/**
+ * Writes version 1 HFiles. Mainly used for testing backwards-compatibilty.
+ */
+public class HFileWriterV1 extends AbstractHFileWriter {
+
+  /** Meta data block name for bloom filter parameters. */
+  static final String BLOOM_FILTER_META_KEY = "BLOOM_FILTER_META";
+
+  /** Meta data block name for bloom filter bits. */
+  public static final String BLOOM_FILTER_DATA_KEY = "BLOOM_FILTER_DATA";
+
+  private static final Log LOG = LogFactory.getLog(HFileWriterV1.class);
+
+  // A stream made per block written.
+  private DataOutputStream out;
+
+  // Offset where the current block began.
+  private long blockBegin;
+
+  // First keys of every block.
+  private ArrayList<byte[]> blockKeys = new ArrayList<byte[]>();
+
+  // Block offset in backing stream.
+  private ArrayList<Long> blockOffsets = new ArrayList<Long>();
+
+  // Raw (decompressed) data size.
+  private ArrayList<Integer> blockDataSizes = new ArrayList<Integer>();
+
+  private Compressor compressor;
+
+  // Additional byte array output stream used to fill block cache
+  private ByteArrayOutputStream baos;
+  private DataOutputStream baosDos;
+  private int blockNumber = 0;
+
+  static class WriterFactoryV1 extends HFile.WriterFactory {
+
+    WriterFactoryV1(Configuration conf) { super(conf); }
+
+    @Override
+    public Writer createWriter(FileSystem fs, Path path) throws IOException {
+      return new HFileWriterV1(conf, fs, path);
+    }
+
+    @Override
+    public Writer createWriter(FileSystem fs, Path path, int blockSize,
+        int bytesPerChecksum, Compression.Algorithm compressAlgo,
+        final KeyComparator comparator)
+        throws IOException {
+      return new HFileWriterV1(conf, fs, path, blockSize, bytesPerChecksum,
+          compressAlgo, comparator);
+    }
+
+    @Override
+    public Writer createWriter(FileSystem fs, Path path, int blockSize,
+        int bytesPerChecksum, String compressAlgoName,
+        final KeyComparator comparator) throws IOException {
+      return new HFileWriterV1(conf, fs, path, blockSize, bytesPerChecksum,
+          compressAlgoName, comparator);
+    }
+
+    @Override
+    public Writer createWriter(final FSDataOutputStream ostream,
+        final int blockSize, final String compress,
+        final KeyComparator comparator) throws IOException {
+      return new HFileWriterV1(conf, ostream, blockSize, compress, comparator);
+    }
+
+    @Override
+    public Writer createWriter(final FSDataOutputStream ostream,
+        final int blockSize, final Compression.Algorithm compress,
+        final KeyComparator c) throws IOException {
+      return new HFileWriterV1(conf, ostream, blockSize, compress, c);
+    }
+  }
+
+  /** Constructor that uses all defaults for compression and block size. */
+  public HFileWriterV1(Configuration conf, FileSystem fs, Path path)
+      throws IOException {
+    this(conf, fs, path, HFile.DEFAULT_BLOCKSIZE,
+        HFile.DEFAULT_BYTES_PER_CHECKSUM, HFile.DEFAULT_COMPRESSION_ALGORITHM,
+        null);
+  }
+
+  /**
+   * Constructor that takes a path, creates and closes the output stream. Takes
+   * compression algorithm name as string.
+   */
+  public HFileWriterV1(Configuration conf, FileSystem fs, Path path,
+      int blockSize, int bytesPerChecksum, String compressAlgoName,
+      final KeyComparator comparator) throws IOException {
+    this(conf, fs, path, blockSize, bytesPerChecksum,
+        compressionByName(compressAlgoName), comparator);
+  }
+
+  /** Constructor that takes a path, creates and closes the output stream. */
+  public HFileWriterV1(Configuration conf, FileSystem fs, Path path,
+      int blockSize, int bytesPerChecksum, Compression.Algorithm compress,
+      final KeyComparator comparator) throws IOException {
+    super(conf, createOutputStream(conf, fs, path, bytesPerChecksum), path,
+        blockSize, compress, comparator);
+  }
+
+  /** Constructor that takes a stream. */
+  public HFileWriterV1(Configuration conf,
+      final FSDataOutputStream outputStream, final int blockSize,
+      final String compressAlgoName, final KeyComparator comparator)
+      throws IOException {
+    this(conf, outputStream, blockSize,
+        Compression.getCompressionAlgorithmByName(compressAlgoName),
+        comparator);
+  }
+
+  /** Constructor that takes a stream. */
+  public HFileWriterV1(Configuration conf,
+      final FSDataOutputStream outputStream, final int blockSize,
+      final Compression.Algorithm compress, final KeyComparator comparator)
+      throws IOException {
+    super(conf, outputStream, null, blockSize, compress, comparator);
+  }
+
+  /**
+   * If at block boundary, opens new block.
+   *
+   * @throws IOException
+   */
+  private void checkBlockBoundary() throws IOException {
+    if (this.out != null && this.out.size() < blockSize)
+      return;
+    finishBlock();
+    newBlock();
+  }
+
+  /**
+   * Do the cleanup if a current block.
+   *
+   * @throws IOException
+   */
+  private void finishBlock() throws IOException {
+    if (this.out == null)
+      return;
+    long now = System.currentTimeMillis();
+
+    int size = releaseCompressingStream(this.out);
+    this.out = null;
+    blockKeys.add(firstKeyInBlock);
+    blockOffsets.add(Long.valueOf(blockBegin));
+    blockDataSizes.add(Integer.valueOf(size));
+    this.totalUncompressedBytes += size;
+
+    HFile.writeTime += System.currentTimeMillis() - now;
+    HFile.writeOps++;
+
+    if (cacheDataBlocksOnWrite) {
+      baosDos.flush();
+      byte[] bytes = baos.toByteArray();
+      blockCache.cacheBlock(HFile.getBlockCacheKey(name, blockBegin),
+          new HFileBlock(BlockType.DATA,
+              (int) (outputStream.getPos() - blockBegin), bytes.length, -1,
+              ByteBuffer.wrap(bytes, 0, bytes.length), true, blockBegin));
+      baosDos.close();
+    }
+    blockNumber++;
+  }
+
+  /**
+   * Ready a new block for writing.
+   *
+   * @throws IOException
+   */
+  private void newBlock() throws IOException {
+    // This is where the next block begins.
+    blockBegin = outputStream.getPos();
+    this.out = getCompressingStream();
+    BlockType.DATA.write(out);
+    firstKeyInBlock = null;
+    if (cacheDataBlocksOnWrite) {
+      this.baos = new ByteArrayOutputStream();
+      this.baosDos = new DataOutputStream(baos);
+      baosDos.write(HFileBlock.DUMMY_HEADER);
+    }
+  }
+
+  /**
+   * Sets up a compressor and creates a compression stream on top of
+   * this.outputStream. Get one per block written.
+   *
+   * @return A compressing stream; if 'none' compression, returned stream does
+   * not compress.
+   *
+   * @throws IOException
+   *
+   * @see {@link #releaseCompressingStream(DataOutputStream)}
+   */
+  private DataOutputStream getCompressingStream() throws IOException {
+    this.compressor = compressAlgo.getCompressor();
+    // Get new DOS compression stream. In tfile, the DOS, is not closed,
+    // just finished, and that seems to be fine over there. TODO: Check
+    // no memory retention of the DOS. Should I disable the 'flush' on the
+    // DOS as the BCFile over in tfile does? It wants to make it so flushes
+    // don't go through to the underlying compressed stream. Flush on the
+    // compressed downstream should be only when done. I was going to but
+    // looks like when we call flush in here, its legitimate flush that
+    // should go through to the compressor.
+    OutputStream os = this.compressAlgo.createCompressionStream(
+        this.outputStream, this.compressor, 0);
+    return new DataOutputStream(os);
+  }
+
+  /**
+   * Let go of block compressor and compressing stream gotten in call {@link
+   * #getCompressingStream}.
+   *
+   * @param dos
+   *
+   * @return How much was written on this stream since it was taken out.
+   *
+   * @see #getCompressingStream()
+   *
+   * @throws IOException
+   */
+  private int releaseCompressingStream(final DataOutputStream dos)
+      throws IOException {
+    dos.flush();
+    this.compressAlgo.returnCompressor(this.compressor);
+    this.compressor = null;
+    return dos.size();
+  }
+
+  /**
+   * Add a meta block to the end of the file. Call before close(). Metadata
+   * blocks are expensive. Fill one with a bunch of serialized data rather than
+   * do a metadata block per metadata instance. If metadata is small, consider
+   * adding to file info using {@link #appendFileInfo(byte[], byte[])}
+   *
+   * @param metaBlockName
+   *          name of the block
+   * @param content
+   *          will call readFields to get data later (DO NOT REUSE)
+   */
+  public void appendMetaBlock(String metaBlockName, Writable content) {
+    byte[] key = Bytes.toBytes(metaBlockName);
+    int i;
+    for (i = 0; i < metaNames.size(); ++i) {
+      // stop when the current key is greater than our own
+      byte[] cur = metaNames.get(i);
+      if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0,
+          key.length) > 0) {
+        break;
+      }
+    }
+    metaNames.add(i, key);
+    metaData.add(i, content);
+  }
+
+  /**
+   * Add key/value to file. Keys must be added in an order that agrees with the
+   * Comparator passed on construction.
+   *
+   * @param kv
+   *          KeyValue to add. Cannot be empty nor null.
+   * @throws IOException
+   */
+  public void append(final KeyValue kv) throws IOException {
+    append(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(),
+        kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
+  }
+
+  /**
+   * Add key/value to file. Keys must be added in an order that agrees with the
+   * Comparator passed on construction.
+   *
+   * @param key
+   *          Key to add. Cannot be empty nor null.
+   * @param value
+   *          Value to add. Cannot be empty nor null.
+   * @throws IOException
+   */
+  public void append(final byte[] key, final byte[] value) throws IOException {
+    append(key, 0, key.length, value, 0, value.length);
+  }
+
+  /**
+   * Add key/value to file. Keys must be added in an order that agrees with the
+   * Comparator passed on construction.
+   *
+   * @param key
+   * @param koffset
+   * @param klength
+   * @param value
+   * @param voffset
+   * @param vlength
+   * @throws IOException
+   */
+  private void append(final byte[] key, final int koffset, final int klength,
+      final byte[] value, final int voffset, final int vlength)
+      throws IOException {
+    boolean dupKey = checkKey(key, koffset, klength);
+    checkValue(value, voffset, vlength);
+    if (!dupKey) {
+      checkBlockBoundary();
+    }
+    // Write length of key and value and then actual key and value bytes.
+    this.out.writeInt(klength);
+    totalKeyLength += klength;
+    this.out.writeInt(vlength);
+    totalValueLength += vlength;
+    this.out.write(key, koffset, klength);
+    this.out.write(value, voffset, vlength);
+    // Are we the first key in this block?
+    if (this.firstKeyInBlock == null) {
+      // Copy the key.
+      this.firstKeyInBlock = new byte[klength];
+      System.arraycopy(key, koffset, this.firstKeyInBlock, 0, klength);
+    }
+    this.lastKeyBuffer = key;
+    this.lastKeyOffset = koffset;
+    this.lastKeyLength = klength;
+    this.entryCount++;
+    // If we are pre-caching blocks on write, fill byte array stream
+    if (cacheDataBlocksOnWrite) {
+      this.baosDos.writeInt(klength);
+      this.baosDos.writeInt(vlength);
+      this.baosDos.write(key, koffset, klength);
+      this.baosDos.write(value, voffset, vlength);
+    }
+  }
+
+  public void close() throws IOException {
+    if (this.outputStream == null) {
+      return;
+    }
+    // Write out the end of the data blocks, then write meta data blocks.
+    // followed by fileinfo, data block index and meta block index.
+
+    finishBlock();
+
+    FixedFileTrailer trailer = new FixedFileTrailer(1);
+
+    // Write out the metadata blocks if any.
+    ArrayList<Long> metaOffsets = null;
+    ArrayList<Integer> metaDataSizes = null;
+    if (metaNames.size() > 0) {
+      metaOffsets = new ArrayList<Long>(metaNames.size());
+      metaDataSizes = new ArrayList<Integer>(metaNames.size());
+      for (int i = 0; i < metaNames.size(); ++i) {
+        // store the beginning offset
+        long curPos = outputStream.getPos();
+        metaOffsets.add(curPos);
+        // write the metadata content
+        DataOutputStream dos = getCompressingStream();
+        BlockType.META.write(dos);
+        metaData.get(i).write(dos);
+        int size = releaseCompressingStream(dos);
+        // store the metadata size
+        metaDataSizes.add(size);
+      }
+    }
+
+    writeFileInfo(trailer, outputStream);
+
+    // Write the data block index.
+    trailer.setLoadOnOpenOffset(writeBlockIndex(this.outputStream,
+        this.blockKeys, this.blockOffsets, this.blockDataSizes));
+    LOG.info("Wrote a version 1 block index with " + this.blockKeys.size()
+        + " keys");
+
+    if (metaNames.size() > 0) {
+      // Write the meta index.
+      writeBlockIndex(this.outputStream, metaNames, metaOffsets, metaDataSizes);
+    }
+
+    // Now finish off the trailer.
+    trailer.setDataIndexCount(blockKeys.size());
+
+    finishClose(trailer);
+  }
+
+  @Override
+  protected void finishFileInfo() throws IOException {
+    super.finishFileInfo();
+
+    // In version 1, we store comparator name in the file info.
+    fileInfo.append(FileInfo.COMPARATOR,
+        Bytes.toBytes(comparator.getClass().getName()), false);
+  }
+
+  @Override
+  public void addInlineBlockWriter(InlineBlockWriter bloomWriter) {
+    // Inline blocks only exist in HFile format version 2.
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Version 1 Bloom filters are stored in two meta blocks with two different
+   * keys.
+   */
+  @Override
+  public void addBloomFilter(BloomFilterWriter bfw) {
+    appendMetaBlock(BLOOM_FILTER_META_KEY,
+        bfw.getMetaWriter());
+    Writable dataWriter = bfw.getDataWriter();
+    if (dataWriter != null) {
+      appendMetaBlock(BLOOM_FILTER_DATA_KEY, dataWriter);
+    }
+  }
+
+  /**
+   * Write out the index in the version 1 format. This conforms to the legacy
+   * version 1 format, but can still be read by
+   * {@link HFileBlockIndex.BlockIndexReader#readRootIndex(java.io.DataInputStream,
+   * int)}.
+   *
+   * @param out the stream to write to
+   * @param keys
+   * @param offsets
+   * @param uncompressedSizes in contrast with a version 2 root index format,
+   *          the sizes stored in the version 1 are uncompressed sizes
+   * @return
+   * @throws IOException
+   */
+  private static long writeBlockIndex(final FSDataOutputStream out,
+      final List<byte[]> keys, final List<Long> offsets,
+      final List<Integer> uncompressedSizes) throws IOException {
+    long pos = out.getPos();
+    // Don't write an index if nothing in the index.
+    if (keys.size() > 0) {
+      BlockType.INDEX_V1.write(out);
+      // Write the index.
+      for (int i = 0; i < keys.size(); ++i) {
+        out.writeLong(offsets.get(i).longValue());
+        out.writeInt(uncompressedSizes.get(i).intValue());
+        byte[] key = keys.get(i);
+        Bytes.writeByteArray(out, key);
+      }
+    }
+    return pos;
+  }
+
+}
\ No newline at end of file



Mime
View raw message