Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8639A752A for ; Wed, 3 Aug 2011 20:00:21 +0000 (UTC) Received: (qmail 81025 invoked by uid 500); 3 Aug 2011 20:00:21 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 80870 invoked by uid 500); 3 Aug 2011 20:00:20 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 80859 invoked by uid 99); 3 Aug 2011 20:00:20 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Aug 2011 20:00:20 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Aug 2011 20:00:16 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id EB05023889DA for ; Wed, 3 Aug 2011 19:59:55 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1153634 [2/4] - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbase/io/hfile/ src/main/java/org/apache/hadoop/hbase/mapreduce/ src/main/java/org/apache/hadoop/hbase/regionserver/ src/main/java/o... Date: Wed, 03 Aug 2011 19:59:54 -0000 To: commits@hbase.apache.org From: tedyu@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110803195955.EB05023889DA@eris.apache.org> Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java?rev=1153634&r1=1153633&r2=1153634&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java Wed Aug 3 19:59:48 2011 @@ -19,26 +19,15 @@ */ package org.apache.hadoop.hbase.io.hfile; -import java.io.BufferedInputStream; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.Closeable; -import java.io.DataInputStream; -import java.io.DataOutputStream; +import java.io.DataInput; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.PosixParser; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -48,26 +37,14 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.hbase.KeyValue.KeyComparator; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.KeyComparator; import org.apache.hadoop.hbase.io.HbaseMapWritable; -import org.apache.hadoop.hbase.io.HeapSize; -import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; -import org.apache.hadoop.hbase.util.BloomFilter; -import org.apache.hadoop.hbase.util.ByteBloomFilter; -import org.apache.hadoop.hbase.util.ByteBufferOutputStream; +import org.apache.hadoop.hbase.util.BloomFilterWriter; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.ClassSize; -import org.apache.hadoop.hbase.util.CompressionTest; import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.util.Writables; -import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.compress.Compressor; -import org.apache.hadoop.io.compress.Decompressor; /** * File format for hbase. @@ -135,25 +112,13 @@ import org.apache.hadoop.io.compress.Dec public class HFile { static final Log LOG = LogFactory.getLog(HFile.class); - /* These values are more or less arbitrary, and they are used as a - * form of check to make sure the file isn't completely corrupt. - */ - final static byte [] DATABLOCKMAGIC = - {'D', 'A', 'T', 'A', 'B', 'L', 'K', 42 }; - final static byte [] INDEXBLOCKMAGIC = - { 'I', 'D', 'X', 'B', 'L', 'K', 41, 43 }; - final static byte [] METABLOCKMAGIC = - { 'M', 'E', 'T', 'A', 'B', 'L', 'K', 99 }; - final static byte [] TRAILERBLOCKMAGIC = - { 'T', 'R', 'A', 'B', 'L', 'K', 34, 36 }; - /** * Maximum length of key in HFile. */ public final static int MAXIMUM_KEY_LENGTH = Integer.MAX_VALUE; /** - * Default blocksize for hfile. + * Default block size for an HFile. */ public final static int DEFAULT_BLOCKSIZE = 64 * 1024; @@ -162,1653 +127,229 @@ public class HFile { */ public final static Compression.Algorithm DEFAULT_COMPRESSION_ALGORITHM = Compression.Algorithm.NONE; + + /** Minimum supported HFile format version */ + public static final int MIN_FORMAT_VERSION = 1; + + /** Maximum supported HFile format version */ + public static final int MAX_FORMAT_VERSION = 2; + /** Default compression name: none. */ public final static String DEFAULT_COMPRESSION = DEFAULT_COMPRESSION_ALGORITHM.getName(); + /** Separator between HFile name and offset in block cache key */ + static final char CACHE_KEY_SEPARATOR = '_'; + // For measuring latency of "typical" reads and writes - private static volatile long readOps; - private static volatile long readTime; - private static volatile long writeOps; - private static volatile long writeTime; + static volatile AtomicLong readOps = new AtomicLong(); + static volatile AtomicLong readTimeNano = new AtomicLong(); + static volatile AtomicLong writeOps = new AtomicLong(); + static volatile AtomicLong writeTimeNano = new AtomicLong(); public static final long getReadOps() { - long ret = readOps; - readOps = 0; - return ret; + return readOps.getAndSet(0); } - public static final long getReadTime() { - long ret = readTime; - readTime = 0; - return ret; + public static final long getReadTimeMs() { + return readTimeNano.getAndSet(0) / 1000000; } public static final long getWriteOps() { - long ret = writeOps; - writeOps = 0; - return ret; + return writeOps.getAndSet(0); } - public static final long getWriteTime() { - long ret = writeTime; - writeTime = 0; - return ret; + public static final long getWriteTimeMs() { + return writeTimeNano.getAndSet(0) / 1000000; } - /** - * HFile Writer. - */ - public static class Writer implements Closeable { - // FileSystem stream to write on. - private FSDataOutputStream outputStream; - // True if we opened the outputStream (and so will close it). - private boolean closeOutputStream; - - // Name for this object used when logging or in toString. Is either - // the result of a toString on stream or else toString of passed file Path. - protected String name; - - // Total uncompressed bytes, maybe calculate a compression ratio later. - private long totalBytes = 0; - - // Total # of key/value entries, ie: how many times add() was called. - private int entryCount = 0; - - // Used calculating average key and value lengths. - private long keylength = 0; - private long valuelength = 0; - - // Used to ensure we write in order. - private final RawComparator comparator; - - // Number of uncompressed bytes per block. Reinitialized when we start - // new block. - private int blocksize; - - // Offset where the current block began. - private long blockBegin; - - // First key in a block (Not first key in file). - private byte [] firstKey = null; - - // Key previously appended. Becomes the last key in the file. - private byte [] lastKeyBuffer = null; - private int lastKeyOffset = -1; - private int lastKeyLength = -1; - - // See {@link BlockIndex}. Below four fields are used to write the block - // index. - ArrayList blockKeys = new ArrayList(); - // Block offset in backing stream. - ArrayList blockOffsets = new ArrayList(); - // Raw (decompressed) data size. - ArrayList blockDataSizes = new ArrayList(); - - // Meta block system. - private ArrayList metaNames = new ArrayList(); - private ArrayList metaData = new ArrayList(); - - // Used compression. Used even if no compression -- 'none'. - private final Compression.Algorithm compressAlgo; - private Compressor compressor; - - // Special datastructure to hold fileinfo. - private FileInfo fileinfo = new FileInfo(); - - // May be null if we were passed a stream. - private Path path = null; - - // Block cache to optionally fill on write - private BlockCache blockCache; - - // Byte buffer output stream made per block written. - private ByteBufferOutputStream bbos = null; - private DataOutputStream bbosDos = null; - private int blockNumber = 0; - - /** - * Constructor that uses all defaults for compression and block size. - * @param fs - * @param path - * @throws IOException - */ - public Writer(FileSystem fs, Path path) - throws IOException { - this(fs, path, DEFAULT_BLOCKSIZE, (Compression.Algorithm) null, null, - null); - } - - /** - * Constructor that takes a Path. - * @param fs - * @param path - * @param blocksize - * @param compress - * @param comparator - * @throws IOException - * @throws IOException - */ - public Writer(FileSystem fs, Path path, int blocksize, - String compress, final KeyComparator comparator) - throws IOException { - this(fs, path, blocksize, - compress == null? DEFAULT_COMPRESSION_ALGORITHM: - Compression.getCompressionAlgorithmByName(compress), - comparator, null); - } - - /** - * Constructor that takes a Path. - * @param fs - * @param path - * @param blocksize - * @param compress - * @param comparator - * @throws IOException - */ - public Writer(FileSystem fs, Path path, int blocksize, - Compression.Algorithm compress, - final KeyComparator comparator, BlockCache blockCache) - throws IOException { - this(fs.create(path), blocksize, compress, comparator); - this.closeOutputStream = true; - this.name = path.toString(); - this.path = path; - this.blockCache = blockCache; - } - - /** - * Constructor that takes a stream. - * @param ostream Stream to use. - * @param blocksize - * @param compress - * @param c RawComparator to use. - * @throws IOException - */ - public Writer(final FSDataOutputStream ostream, final int blocksize, - final String compress, final KeyComparator c) - throws IOException { - this(ostream, blocksize, - Compression.getCompressionAlgorithmByName(compress), c); - } - - /** - * Constructor that takes a stream. - * @param ostream Stream to use. - * @param blocksize - * @param compress - * @param c - * @throws IOException - */ - public Writer(final FSDataOutputStream ostream, final int blocksize, - final Compression.Algorithm compress, final KeyComparator c) - throws IOException { - this.outputStream = ostream; - this.closeOutputStream = false; - this.blocksize = blocksize; - this.comparator = c == null? Bytes.BYTES_RAWCOMPARATOR: c; - this.name = this.outputStream.toString(); - this.compressAlgo = compress == null? - DEFAULT_COMPRESSION_ALGORITHM: compress; - } - - /* - * If at block boundary, opens new block. - * @throws IOException - */ - private void checkBlockBoundary() throws IOException { - if (bbosDos != null && bbosDos.size() < blocksize) return; - finishBlock(); - newBlock(); - } - - /* - * Do the cleanup if a current block. - * @throws IOException - */ - private void finishBlock() throws IOException { - if (bbosDos == null) return; - - // Flush Data Output Stream - bbosDos.flush(); - - // Compress Data and write to output stream - DataOutputStream compressStream = getCompressingStream(); - bbos.writeTo(compressStream); - int size = releaseCompressingStream(compressStream); - - long now = System.currentTimeMillis(); - - blockKeys.add(firstKey); - blockOffsets.add(Long.valueOf(blockBegin)); - blockDataSizes.add(Integer.valueOf(size)); - this.totalBytes += size; - - writeTime += System.currentTimeMillis() - now; - writeOps++; - - if (blockCache != null) { - byte[] bytes = bbos.toByteArray(DATABLOCKMAGIC.length, bbos.size() - DATABLOCKMAGIC.length); - ByteBuffer blockToCache = ByteBuffer.wrap(bytes); - String blockName = path.toString() + blockNumber; - blockCache.cacheBlock(blockName, blockToCache); - } - - bbosDos.close(); - bbosDos = null; - bbos = null; - - blockNumber++; - } - - /* - * Ready a new block for writing. - * @throws IOException - */ - private void newBlock() throws IOException { - // This is where the next block begins. - blockBegin = outputStream.getPos(); - - firstKey = null; - - // to avoid too many calls to realloc(), - // pre-allocates the byte stream to the block size + 25% - // only if blocksize is under 1Gb - int bbosBlocksize = Math.max(blocksize, blocksize + (blocksize / 4)); - bbos = new ByteBufferOutputStream(bbosBlocksize); - bbosDos = new DataOutputStream(bbos); - bbosDos.write(DATABLOCKMAGIC); - } - - /* - * Sets up a compressor and creates a compression stream on top of - * this.outputStream. Get one per block written. - * @return A compressing stream; if 'none' compression, returned stream - * does not compress. - * @throws IOException - * @see {@link #releaseCompressingStream(DataOutputStream)} - */ - private DataOutputStream getCompressingStream() throws IOException { - this.compressor = compressAlgo.getCompressor(); - // Get new DOS compression stream. In tfile, the DOS, is not closed, - // just finished, and that seems to be fine over there. TODO: Check - // no memory retention of the DOS. Should I disable the 'flush' on the - // DOS as the BCFile over in tfile does? It wants to make it so flushes - // don't go through to the underlying compressed stream. Flush on the - // compressed downstream should be only when done. I was going to but - // looks like when we call flush in here, its legitimate flush that - // should go through to the compressor. - OutputStream os = - this.compressAlgo.createCompressionStream(this.outputStream, - this.compressor, 0); - return new DataOutputStream(os); - } - - /* - * Let go of block compressor and compressing stream gotten in call - * {@link #getCompressingStream}. - * @param dos - * @return How much was written on this stream since it was taken out. - * @see #getCompressingStream() - * @throws IOException - */ - private int releaseCompressingStream(final DataOutputStream dos) - throws IOException { - dos.flush(); - this.compressAlgo.returnCompressor(this.compressor); - this.compressor = null; - return dos.size(); - } + /** API required to write an {@link HFile} */ + public interface Writer extends Closeable { - /** - * Add a meta block to the end of the file. Call before close(). - * Metadata blocks are expensive. Fill one with a bunch of serialized data - * rather than do a metadata block per metadata instance. If metadata is - * small, consider adding to file info using - * {@link #appendFileInfo(byte[], byte[])} - * @param metaBlockName name of the block - * @param content will call readFields to get data later (DO NOT REUSE) - */ - public void appendMetaBlock(String metaBlockName, Writable content) { - byte[] key = Bytes.toBytes(metaBlockName); - int i; - for (i = 0; i < metaNames.size(); ++i) { - // stop when the current key is greater than our own - byte[] cur = metaNames.get(i); - if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, - key, 0, key.length) > 0) { - break; - } - } - metaNames.add(i, key); - metaData.add(i, content); - } + /** Add an element to the file info map. */ + void appendFileInfo(byte[] key, byte[] value) throws IOException; - /** - * Add to the file info. Added key value can be gotten out of the return - * from {@link Reader#loadFileInfo()}. - * @param k Key - * @param v Value - * @throws IOException - */ - public void appendFileInfo(final byte [] k, final byte [] v) - throws IOException { - appendFileInfo(this.fileinfo, k, v, true); - } + void append(KeyValue kv) throws IOException; - static FileInfo appendFileInfo(FileInfo fi, final byte [] k, final byte [] v, - final boolean checkPrefix) - throws IOException { - if (k == null || v == null) { - throw new NullPointerException("Key nor value may be null"); - } - if (checkPrefix && - Bytes.startsWith(k, FileInfo.RESERVED_PREFIX_BYTES)) { - throw new IOException("Keys with a " + FileInfo.RESERVED_PREFIX + - " are reserved"); - } - fi.put(k, v); - return fi; - } + void append(byte[] key, byte[] value) throws IOException; - /** - * @return Path or null if we were passed a stream rather than a Path. - */ - public Path getPath() { - return this.path; - } + /** @return the path to this {@link HFile} */ + Path getPath(); - @Override - public String toString() { - return "writer=" + this.name + ", compression=" + - this.compressAlgo.getName(); - } + void appendMetaBlock(String bloomFilterMetaKey, Writable metaWriter); /** - * Add key/value to file. - * Keys must be added in an order that agrees with the Comparator passed - * on construction. - * @param kv KeyValue to add. Cannot be empty nor null. - * @throws IOException + * Adds an inline block writer such as a multi-level block index writer or + * a compound Bloom filter writer. */ - public void append(final KeyValue kv) - throws IOException { - append(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(), - kv.getBuffer(), kv.getValueOffset(), kv.getValueLength()); - } + void addInlineBlockWriter(InlineBlockWriter bloomWriter); /** - * Add key/value to file. - * Keys must be added in an order that agrees with the Comparator passed - * on construction. - * @param key Key to add. Cannot be empty nor null. - * @param value Value to add. Cannot be empty nor null. - * @throws IOException + * Store Bloom filter in the file. This does not deal with Bloom filter + * internals but is necessary, since Bloom filters are stored differently + * in HFile version 1 and version 2. */ - public void append(final byte [] key, final byte [] value) - throws IOException { - append(key, 0, key.length, value, 0, value.length); - } - - /** - * Add key/value to file. - * Keys must be added in an order that agrees with the Comparator passed - * on construction. - * @param key - * @param koffset - * @param klength - * @param value - * @param voffset - * @param vlength - * @throws IOException - */ - private void append(final byte [] key, final int koffset, final int klength, - final byte [] value, final int voffset, final int vlength) - throws IOException { - boolean dupKey = checkKey(key, koffset, klength); - checkValue(value, voffset, vlength); - if (!dupKey) { - checkBlockBoundary(); - } - // Write length of key and value and then actual key and value bytes. - this.bbosDos.writeInt(klength); - this.keylength += klength; - this.bbosDos.writeInt(vlength); - this.valuelength += vlength; - this.bbosDos.write(key, koffset, klength); - this.bbosDos.write(value, voffset, vlength); - // Are we the first key in this block? - if (this.firstKey == null) { - // Copy the key. - this.firstKey = new byte [klength]; - System.arraycopy(key, koffset, this.firstKey, 0, klength); - } - this.lastKeyBuffer = key; - this.lastKeyOffset = koffset; - this.lastKeyLength = klength; - this.entryCount ++; - } - - /* - * @param key Key to check. - * @return the flag of duplicate Key or not - * @throws IOException - */ - private boolean checkKey(final byte [] key, final int offset, final int length) - throws IOException { - boolean dupKey = false; - - if (key == null || length <= 0) { - throw new IOException("Key cannot be null or empty"); - } - if (length > MAXIMUM_KEY_LENGTH) { - throw new IOException("Key length " + length + " > " + - MAXIMUM_KEY_LENGTH); - } - if (this.lastKeyBuffer != null) { - int keyComp = this.comparator.compare(this.lastKeyBuffer, this.lastKeyOffset, - this.lastKeyLength, key, offset, length); - if (keyComp > 0) { - throw new IOException("Added a key not lexically larger than" + - " previous key=" + Bytes.toStringBinary(key, offset, length) + - ", lastkey=" + Bytes.toStringBinary(this.lastKeyBuffer, this.lastKeyOffset, - this.lastKeyLength)); - } else if (keyComp == 0) { - dupKey = true; - } - } - return dupKey; - } - - private void checkValue(final byte [] value, final int offset, - final int length) throws IOException { - if (value == null) { - throw new IOException("Value cannot be null"); - } - } - - public long getTotalBytes() { - return this.totalBytes; - } - - public void close() throws IOException { - if (this.outputStream == null) { - return; - } - // Write out the end of the data blocks, then write meta data blocks. - // followed by fileinfo, data block index and meta block index. - - finishBlock(); - - FixedFileTrailer trailer = new FixedFileTrailer(); - - // Write out the metadata blocks if any. - ArrayList metaOffsets = null; - ArrayList metaDataSizes = null; - if (metaNames.size() > 0) { - metaOffsets = new ArrayList(metaNames.size()); - metaDataSizes = new ArrayList(metaNames.size()); - for (int i = 0 ; i < metaNames.size() ; ++ i ) { - // store the beginning offset - long curPos = outputStream.getPos(); - metaOffsets.add(curPos); - // write the metadata content - DataOutputStream dos = getCompressingStream(); - dos.write(METABLOCKMAGIC); - metaData.get(i).write(dos); - int size = releaseCompressingStream(dos); - // store the metadata size - metaDataSizes.add(size); - } - } - - // Write fileinfo. - trailer.fileinfoOffset = writeFileInfo(this.outputStream); - - // Write the data block index. - trailer.dataIndexOffset = BlockIndex.writeIndex(this.outputStream, - this.blockKeys, this.blockOffsets, this.blockDataSizes); - - // Meta block index. - if (metaNames.size() > 0) { - trailer.metaIndexOffset = BlockIndex.writeIndex(this.outputStream, - this.metaNames, metaOffsets, metaDataSizes); - } - - // Now finish off the trailer. - trailer.dataIndexCount = blockKeys.size(); - trailer.metaIndexCount = metaNames.size(); - - trailer.totalUncompressedBytes = totalBytes; - trailer.entryCount = entryCount; - - trailer.compressionCodec = this.compressAlgo.ordinal(); - - trailer.serialize(outputStream); - - if (this.closeOutputStream) { - this.outputStream.close(); - this.outputStream = null; - } - } - - /* - * Add last bits of metadata to fileinfo and then write it out. - * Reader will be expecting to find all below. - * @param o Stream to write on. - * @return Position at which we started writing. - * @throws IOException - */ - private long writeFileInfo(FSDataOutputStream o) throws IOException { - if (this.lastKeyBuffer != null) { - // Make a copy. The copy is stuffed into HMapWritable. Needs a clean - // byte buffer. Won't take a tuple. - byte [] b = new byte[this.lastKeyLength]; - System.arraycopy(this.lastKeyBuffer, this.lastKeyOffset, b, 0, - this.lastKeyLength); - appendFileInfo(this.fileinfo, FileInfo.LASTKEY, b, false); - } - int avgKeyLen = this.entryCount == 0? 0: - (int)(this.keylength/this.entryCount); - appendFileInfo(this.fileinfo, FileInfo.AVG_KEY_LEN, - Bytes.toBytes(avgKeyLen), false); - int avgValueLen = this.entryCount == 0? 0: - (int)(this.valuelength/this.entryCount); - appendFileInfo(this.fileinfo, FileInfo.AVG_VALUE_LEN, - Bytes.toBytes(avgValueLen), false); - appendFileInfo(this.fileinfo, FileInfo.COMPARATOR, - Bytes.toBytes(this.comparator.getClass().getName()), false); - long pos = o.getPos(); - this.fileinfo.write(o); - return pos; - } + void addBloomFilter(BloomFilterWriter bfw); } /** - * HFile Reader. + * This variety of ways to construct writers is used throughout the code, and + * we want to be able to swap writer implementations. */ - public static class Reader implements Closeable { - // Stream to read from. - private FSDataInputStream istream; - // True if we should close istream when done. We don't close it if we - // didn't open it. - private boolean closeIStream; - - // These are read in when the file info is loaded. - HFile.BlockIndex blockIndex; - private BlockIndex metaIndex; - FixedFileTrailer trailer; - private volatile boolean fileInfoLoaded = false; - - // Filled when we read in the trailer. - private Compression.Algorithm compressAlgo; - - // Last key in the file. Filled in when we read in the file info - private byte [] lastkey = null; - // Stats read in when we load file info. - private int avgKeyLen = -1; - private int avgValueLen = -1; - - // Used to ensure we seek correctly. - RawComparator comparator; - - // Size of this file. - private final long fileSize; - - // Block cache to use. - private final BlockCache cache; - public int cacheHits = 0; - public int blockLoads = 0; - public int metaLoads = 0; - - // Whether file is from in-memory store - private boolean inMemory = false; - - // Whether blocks of file should be evicted on close of file - private final boolean evictOnClose; - - // Name for this object used when logging or in toString. Is either - // the result of a toString on the stream or else is toString of passed - // file Path plus metadata key/value pairs. - protected String name; + public static abstract class WriterFactory { + protected Configuration conf; - /** - * Opens a HFile. You must load the file info before you can - * use it by calling {@link #loadFileInfo()}. - * - * @param fs filesystem to load from - * @param path path within said filesystem - * @param cache block cache. Pass null if none. - * @throws IOException - */ - public Reader(FileSystem fs, Path path, BlockCache cache, boolean inMemory, - boolean evictOnClose) - throws IOException { - this(path, fs.open(path), fs.getFileStatus(path).getLen(), cache, - inMemory, evictOnClose); - this.closeIStream = true; - this.name = path.toString(); - } + WriterFactory(Configuration conf) { this.conf = conf; } - /** - * Opens a HFile. You must load the index before you can - * use it by calling {@link #loadFileInfo()}. - * - * @param fsdis input stream. Caller is responsible for closing the passed - * stream. - * @param size Length of the stream. - * @param cache block cache. Pass null if none. - * @param inMemory whether blocks should be marked as in-memory in cache - * @param evictOnClose whether blocks in cache should be evicted on close - * @throws IOException - */ - public Reader(Path path, final FSDataInputStream fsdis, final long size, - final BlockCache cache, final boolean inMemory, - final boolean evictOnClose) { - this.cache = cache; - this.fileSize = size; - this.istream = fsdis; - this.closeIStream = false; - this.name = path.toString(); - this.inMemory = inMemory; - this.evictOnClose = evictOnClose; - } + public abstract Writer createWriter(FileSystem fs, Path path) + throws IOException; - @Override - public String toString() { - return "reader=" + this.name + - (!isFileInfoLoaded()? "": - ", compression=" + this.compressAlgo.getName() + - ", inMemory=" + this.inMemory + - ", firstKey=" + toStringFirstKey() + - ", lastKey=" + toStringLastKey()) + - ", avgKeyLen=" + this.avgKeyLen + - ", avgValueLen=" + this.avgValueLen + - ", entries=" + this.trailer.entryCount + - ", length=" + this.fileSize; - } + public abstract Writer createWriter(FileSystem fs, Path path, + int blockSize, Compression.Algorithm compress, + final KeyComparator comparator) throws IOException; - protected String toStringFirstKey() { - return KeyValue.keyToString(getFirstKey()); - } - - protected String toStringLastKey() { - return KeyValue.keyToString(getLastKey()); - } - - public long length() { - return this.fileSize; - } - - public long getTotalUncompressedBytes() { - return this.trailer.totalUncompressedBytes; - } - - public boolean inMemory() { - return this.inMemory; - } + public abstract Writer createWriter(FileSystem fs, Path path, + int blockSize, String compress, + final KeyComparator comparator) throws IOException; - private byte[] readAllIndex(final FSDataInputStream in, final long indexOffset, - final int indexSize) throws IOException { - byte[] allIndex = new byte[indexSize]; - in.seek(indexOffset); - IOUtils.readFully(in, allIndex, 0, allIndex.length); - return allIndex; - } + public abstract Writer createWriter(final FSDataOutputStream ostream, + final int blockSize, final String compress, + final KeyComparator comparator) throws IOException; - /** - * Read in the index and file info. - * @return A map of fileinfo data. - * See {@link Writer#appendFileInfo(byte[], byte[])}. - * @throws IOException - */ - public Map loadFileInfo() - throws IOException { - this.trailer = readTrailer(); - - // Read in the fileinfo and get what we need from it. - this.istream.seek(this.trailer.fileinfoOffset); - FileInfo fi = new FileInfo(); - fi.readFields(this.istream); - this.lastkey = fi.get(FileInfo.LASTKEY); - this.avgKeyLen = Bytes.toInt(fi.get(FileInfo.AVG_KEY_LEN)); - this.avgValueLen = Bytes.toInt(fi.get(FileInfo.AVG_VALUE_LEN)); - String clazzName = Bytes.toString(fi.get(FileInfo.COMPARATOR)); - this.comparator = getComparator(clazzName); - - int allIndexSize = (int)(this.fileSize - this.trailer.dataIndexOffset - FixedFileTrailer.trailerSize()); - byte[] dataAndMetaIndex = readAllIndex(this.istream, this.trailer.dataIndexOffset, allIndexSize); - - ByteArrayInputStream bis = new ByteArrayInputStream(dataAndMetaIndex); - DataInputStream dis = new DataInputStream(bis); - - // Read in the data index. - this.blockIndex = - BlockIndex.readIndex(this.comparator, dis, this.trailer.dataIndexCount); - - // Read in the metadata index. - if (trailer.metaIndexCount > 0) { - this.metaIndex = BlockIndex.readIndex(Bytes.BYTES_RAWCOMPARATOR, dis, - this.trailer.metaIndexCount); - } - this.fileInfoLoaded = true; - - if (null != dis) { - dis.close(); - } + public abstract Writer createWriter(final FSDataOutputStream ostream, + final int blockSize, final Compression.Algorithm compress, + final KeyComparator c) throws IOException; + } - return fi; - } + /** The configuration key for HFile version to use for new files */ + public static final String FORMAT_VERSION_KEY = "hfile.format.version"; - boolean isFileInfoLoaded() { - return this.fileInfoLoaded; - } + public static int getFormatVersion(Configuration conf) { + int version = conf.getInt(FORMAT_VERSION_KEY, MAX_FORMAT_VERSION); + checkFormatVersion(version); + return version; + } - @SuppressWarnings("unchecked") - private RawComparator getComparator(final String clazzName) - throws IOException { - if (clazzName == null || clazzName.length() == 0) { - return null; - } - try { - return (RawComparator)Class.forName(clazzName).newInstance(); - } catch (InstantiationException e) { - throw new IOException(e); - } catch (IllegalAccessException e) { - throw new IOException(e); - } catch (ClassNotFoundException e) { - throw new IOException(e); - } + /** + * Returns the factory to be used to create {@link HFile} writers. Should + * always be {@link HFileWriterV2#WRITER_FACTORY_V2} in production, but + * can also be {@link HFileWriterV1#WRITER_FACTORY_V1} in testing. + */ + public static final WriterFactory getWriterFactory(Configuration conf) { + int version = getFormatVersion(conf); + LOG.debug("Using HFile format version " + version); + switch (version) { + case 1: + return new HFileWriterV1.WriterFactoryV1(conf); + case 2: + return new HFileWriterV2.WriterFactoryV2(conf); + default: + throw new IllegalArgumentException("Cannot create writer for HFile " + + "format version " + version); } + } - /* Read the trailer off the input stream. As side effect, sets the - * compression algorithm. - * @return Populated FixedFileTrailer. - * @throws IOException - */ - private FixedFileTrailer readTrailer() throws IOException { - FixedFileTrailer fft = new FixedFileTrailer(); - long seekPoint = this.fileSize - FixedFileTrailer.trailerSize(); - this.istream.seek(seekPoint); - fft.deserialize(this.istream); - // Set up the codec. - this.compressAlgo = - Compression.Algorithm.values()[fft.compressionCodec]; + /** + * Configuration key to evict all blocks of a given file from the block cache + * when the file is closed. + */ + public static final String EVICT_BLOCKS_ON_CLOSE_KEY = + "hbase.rs.evictblocksonclose"; - CompressionTest.testCompression(this.compressAlgo); + /** + * Configuration key to cache data blocks on write. There are separate + * switches for Bloom blocks and non-root index blocks. + */ + public static final String CACHE_BLOCKS_ON_WRITE_KEY = + "hbase.rs.cacheblocksonwrite"; - return fft; - } + /** An interface used by clients to open and iterate an {@link HFile}. */ + public interface Reader extends Closeable { /** - * Create a Scanner on this file. No seeks or reads are done on creation. - * Call {@link HFileScanner#seekTo(byte[])} to position an start the read. - * There is nothing to clean up in a Scanner. Letting go of your references - * to the scanner is sufficient. - * @param pread Use positional read rather than seek+read if true (pread is - * better for random reads, seek+read is better scanning). - * @param cacheBlocks True if we should cache blocks read in by this scanner. - * @return Scanner on this file. + * Returns this reader's "name". Usually the last component of the path. + * Needs to be constant as the file is being moved to support caching on + * write. */ - public HFileScanner getScanner(boolean cacheBlocks, final boolean pread) { - return new Scanner(this, cacheBlocks, pread); - } + String getName(); - /** - * @param key Key to search. - * @return Block number of the block containing the key or -1 if not in this - * file. - */ - protected int blockContainingKey(final byte [] key, int offset, int length) { - if (blockIndex == null) { - throw new RuntimeException("Block index not loaded"); - } - return blockIndex.blockContainingKey(key, offset, length); - } - /** - * @param metaBlockName - * @param cacheBlock Add block to cache, if found - * @return Block wrapped in a ByteBuffer - * @throws IOException - */ - public ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock) - throws IOException { - if (trailer.metaIndexCount == 0) { - return null; // there are no meta blocks - } - if (metaIndex == null) { - throw new IOException("Meta index not loaded"); - } + String getColumnFamilyName(); - byte [] mbname = Bytes.toBytes(metaBlockName); - int block = metaIndex.blockContainingKey(mbname, 0, mbname.length); - if (block == -1) - return null; - long blockSize; - if (block == metaIndex.count - 1) { - blockSize = trailer.fileinfoOffset - metaIndex.blockOffsets[block]; - } else { - blockSize = metaIndex.blockOffsets[block+1] - metaIndex.blockOffsets[block]; - } + RawComparator getComparator(); - long now = System.currentTimeMillis(); + HFileScanner getScanner(boolean cacheBlocks, + final boolean pread, final boolean isCompaction); - // Per meta key from any given file, synchronize reads for said block - synchronized (metaIndex.blockKeys[block]) { - metaLoads++; - // Check cache for block. If found return. - if (cache != null) { - ByteBuffer cachedBuf = cache.getBlock(name + "meta" + block, - cacheBlock); - if (cachedBuf != null) { - // Return a distinct 'shallow copy' of the block, - // so pos doesnt get messed by the scanner - cacheHits++; - return cachedBuf.duplicate(); - } - // Cache Miss, please load. - } + ByteBuffer getMetaBlock(String metaBlockName, + boolean cacheBlock) throws IOException; - ByteBuffer buf = decompress(metaIndex.blockOffsets[block], - longToInt(blockSize), metaIndex.blockDataSizes[block], true); - byte [] magic = new byte[METABLOCKMAGIC.length]; - buf.get(magic, 0, magic.length); + HFileBlock readBlock(long offset, int onDiskBlockSize, + boolean cacheBlock, final boolean pread, final boolean isCompaction) + throws IOException; - if (! Arrays.equals(magic, METABLOCKMAGIC)) { - throw new IOException("Meta magic is bad in block " + block); - } + Map loadFileInfo() throws IOException; - // Create a new ByteBuffer 'shallow copy' to hide the magic header - buf = buf.slice(); + byte[] getLastKey(); - readTime += System.currentTimeMillis() - now; - readOps++; + byte[] midkey() throws IOException; - // Cache the block - if(cacheBlock && cache != null) { - cache.cacheBlock(name + "meta" + block, buf.duplicate(), inMemory); - } + long length(); - return buf; - } - } + long getEntries(); - /** - * Read in a file block. - * @param block Index of block to read. - * @param pread Use positional read instead of seek+read (positional is - * better doing random reads whereas seek+read is better scanning). - * @return Block wrapped in a ByteBuffer. - * @throws IOException - */ - ByteBuffer readBlock(int block, boolean cacheBlock, final boolean pread) - throws IOException { - if (blockIndex == null) { - throw new IOException("Block index not loaded"); - } - if (block < 0 || block >= blockIndex.count) { - throw new IOException("Requested block is out of range: " + block + - ", max: " + blockIndex.count); - } - // For any given block from any given file, synchronize reads for said - // block. - // Without a cache, this synchronizing is needless overhead, but really - // the other choice is to duplicate work (which the cache would prevent you from doing). - synchronized (blockIndex.blockKeys[block]) { - blockLoads++; - // Check cache for block. If found return. - if (cache != null) { - ByteBuffer cachedBuf = cache.getBlock(name + block, cacheBlock); - if (cachedBuf != null) { - // Return a distinct 'shallow copy' of the block, - // so pos doesnt get messed by the scanner - cacheHits++; - return cachedBuf.duplicate(); - } - // Carry on, please load. - } + byte[] getFirstKey(); - // Load block from filesystem. - long now = System.currentTimeMillis(); - long onDiskBlockSize; - if (block == blockIndex.count - 1) { - // last block! The end of data block is first meta block if there is - // one or if there isn't, the fileinfo offset. - long offset = this.metaIndex != null? - this.metaIndex.blockOffsets[0]: this.trailer.fileinfoOffset; - onDiskBlockSize = offset - blockIndex.blockOffsets[block]; - } else { - onDiskBlockSize = blockIndex.blockOffsets[block+1] - - blockIndex.blockOffsets[block]; - } - ByteBuffer buf = decompress(blockIndex.blockOffsets[block], - longToInt(onDiskBlockSize), this.blockIndex.blockDataSizes[block], - pread); - - byte [] magic = new byte[DATABLOCKMAGIC.length]; - buf.get(magic, 0, magic.length); - if (!Arrays.equals(magic, DATABLOCKMAGIC)) { - throw new IOException("Data magic is bad in block " + block); - } + long indexSize(); - // 'shallow copy' to hide the header - // NOTE: you WILL GET BIT if you call buf.array() but don't start - // reading at buf.arrayOffset() - buf = buf.slice(); - - readTime += System.currentTimeMillis() - now; - readOps++; - - // Cache the block - if(cacheBlock && cache != null) { - cache.cacheBlock(name + block, buf.duplicate(), inMemory); - } + byte[] getFirstRowKey(); - return buf; - } - } + byte[] getLastRowKey(); - /* - * Decompress compressedSize bytes off the backing - * FSDataInputStream. - * @param offset - * @param compressedSize - * @param decompressedSize - * - * @return - * @throws IOException - */ - private ByteBuffer decompress(final long offset, final int compressedSize, - final int decompressedSize, final boolean pread) - throws IOException { - Decompressor decompressor = null; - ByteBuffer buf = null; - try { - decompressor = this.compressAlgo.getDecompressor(); - // My guess is that the bounded range fis is needed to stop the - // decompressor reading into next block -- IIRC, it just grabs a - // bunch of data w/o regard to whether decompressor is coming to end of a - // decompression. - - // We use a buffer of DEFAULT_BLOCKSIZE size. This might be extreme. - // Could maybe do with less. Study and figure it: TODO - InputStream is = this.compressAlgo.createDecompressionStream( - new BufferedInputStream( - new BoundedRangeFileInputStream(this.istream, offset, compressedSize, - pread), - Math.min(DEFAULT_BLOCKSIZE, compressedSize)), - decompressor, 0); - buf = ByteBuffer.allocate(decompressedSize); - IOUtils.readFully(is, buf.array(), 0, buf.capacity()); - is.close(); - } finally { - if (null != decompressor) { - this.compressAlgo.returnDecompressor(decompressor); - } - } - return buf; - } + FixedFileTrailer getTrailer(); - /** - * @return First key in the file. May be null if file has no entries. - * Note that this is not the first rowkey, but rather the byte form of - * the first KeyValue. - */ - public byte [] getFirstKey() { - if (blockIndex == null) { - throw new RuntimeException("Block index not loaded"); - } - return this.blockIndex.isEmpty()? null: this.blockIndex.blockKeys[0]; - } + HFileBlockIndex.BlockIndexReader getDataBlockIndexReader(); - /** - * @return the first row key, or null if the file is empty. - * TODO move this to StoreFile after Ryan's patch goes in - * to eliminate KeyValue here - */ - public byte[] getFirstRowKey() { - byte[] firstKey = getFirstKey(); - if (firstKey == null) return null; - return KeyValue.createKeyValueFromKey(firstKey).getRow(); - } + HFileScanner getScanner(boolean cacheBlocks, boolean pread); - /** - * @return number of KV entries in this HFile - */ - public int getEntries() { - if (!this.isFileInfoLoaded()) { - throw new RuntimeException("File info not loaded"); - } - return this.trailer.entryCount; - } + Compression.Algorithm getCompressionAlgorithm(); /** - * @return Last key in the file. May be null if file has no entries. - * Note that this is not the last rowkey, but rather the byte form of - * the last KeyValue. + * Retrieves Bloom filter metadata as appropriate for each {@link HFile} + * version. Knows nothing about how that metadata is structured. */ - public byte [] getLastKey() { - if (!isFileInfoLoaded()) { - throw new RuntimeException("Load file info first"); - } - return this.blockIndex.isEmpty()? null: this.lastkey; - } - - /** - * @return the last row key, or null if the file is empty. - * TODO move this to StoreFile after Ryan's patch goes in - * to eliminate KeyValue here - */ - public byte[] getLastRowKey() { - byte[] lastKey = getLastKey(); - if (lastKey == null) return null; - return KeyValue.createKeyValueFromKey(lastKey).getRow(); - } - - /** - * @return number of K entries in this HFile's filter. Returns KV count if no filter. - */ - public int getFilterEntries() { - return getEntries(); - } - - /** - * @return Comparator. - */ - public RawComparator getComparator() { - return this.comparator; - } - - public Compression.Algorithm getCompressionAlgorithm() { - return this.compressAlgo; - } - - /** - * @return index size - */ - public long indexSize() { - return (this.blockIndex != null? this.blockIndex.heapSize(): 0) + - ((this.metaIndex != null)? this.metaIndex.heapSize(): 0); - } - - /** - * @return Midkey for this file. We work with block boundaries only so - * returned midkey is an approximation only. - * @throws IOException - */ - public byte [] midkey() throws IOException { - if (!isFileInfoLoaded() || this.blockIndex.isEmpty()) { - return null; - } - return this.blockIndex.midkey(); - } - - public void close() throws IOException { - if (evictOnClose && this.cache != null) { - int numEvicted = 0; - for (int i=0; i= reader.blockIndex.count) { - // damn we are at the end - currBlock = 0; - block = null; - return false; - } - block = reader.readBlock(this.currBlock, this.cacheBlocks, this.pread); - currKeyLen = Bytes.toInt(block.array(), block.arrayOffset()+block.position(), 4); - currValueLen = Bytes.toInt(block.array(), block.arrayOffset()+block.position()+4, 4); - block.position(block.position()+8); - blockFetches++; - return true; - } - // LOG.debug("rem:" + block.remaining() + " p:" + block.position() + - // " kl: " + currKeyLen + " kv: " + currValueLen); - currKeyLen = Bytes.toInt(block.array(), block.arrayOffset()+block.position(), 4); - currValueLen = Bytes.toInt(block.array(), block.arrayOffset()+block.position()+4, 4); - block.position(block.position()+8); - return true; - } - - public int seekTo(byte [] key) throws IOException { - return seekTo(key, 0, key.length); - } - - public int seekTo(byte[] key, int offset, int length) throws IOException { - int b = reader.blockContainingKey(key, offset, length); - if (b < 0) return -1; // falls before the beginning of the file! :-( - // Avoid re-reading the same block (that'd be dumb). - loadBlock(b, true); - return blockSeek(key, offset, length, false); - } - - public int reseekTo(byte [] key) throws IOException { - return reseekTo(key, 0, key.length); - } - - public int reseekTo(byte[] key, int offset, int length) - throws IOException { - - if (this.block != null && this.currKeyLen != 0) { - ByteBuffer bb = getKey(); - int compared = this.reader.comparator.compare(key, offset, length, - bb.array(), bb.arrayOffset(), bb.limit()); - if (compared < 1) { - //If the required key is less than or equal to current key, then - //don't do anything. - return compared; - } - } - - int b = reader.blockContainingKey(key, offset, length); - if (b < 0) { - return -1; - } - loadBlock(b, false); - return blockSeek(key, offset, length, false); - } - - /** - * Within a loaded block, seek looking for the first key - * that is smaller than (or equal to?) the key we are interested in. - * - * A note on the seekBefore - if you have seekBefore = true, AND the - * first key in the block = key, then you'll get thrown exceptions. - * @param key to find - * @param seekBefore find the key before the exact match. - * @return - */ - private int blockSeek(byte[] key, int offset, int length, boolean seekBefore) { - int klen, vlen; - int lastLen = 0; - do { - klen = block.getInt(); - vlen = block.getInt(); - int comp = this.reader.comparator.compare(key, offset, length, - block.array(), block.arrayOffset() + block.position(), klen); - if (comp == 0) { - if (seekBefore) { - block.position(block.position() - lastLen - 16); - currKeyLen = block.getInt(); - currValueLen = block.getInt(); - return 1; // non exact match. - } - currKeyLen = klen; - currValueLen = vlen; - return 0; // indicate exact match - } - if (comp < 0) { - // go back one key: - block.position(block.position() - lastLen - 16); - currKeyLen = block.getInt(); - currValueLen = block.getInt(); - return 1; - } - block.position(block.position() + klen + vlen); - lastLen = klen + vlen ; - } while(block.remaining() > 0); - // ok we are at the end, so go back a littleeeeee.... - // The 8 in the below is intentionally different to the 16s in the above - // Do the math you you'll figure it. - block.position(block.position() - lastLen - 8); - currKeyLen = block.getInt(); - currValueLen = block.getInt(); - return 1; // didn't exactly find it. - } - - public boolean seekBefore(byte [] key) throws IOException { - return seekBefore(key, 0, key.length); - } - - public boolean seekBefore(byte[] key, int offset, int length) - throws IOException { - int b = reader.blockContainingKey(key, offset, length); - if (b < 0) - return false; // key is before the start of the file. - - // Question: does this block begin with 'key'? - if (this.reader.comparator.compare(reader.blockIndex.blockKeys[b], - 0, reader.blockIndex.blockKeys[b].length, - key, offset, length) == 0) { - // Ok the key we're interested in is the first of the block, so go back one. - if (b == 0) { - // we have a 'problem', the key we want is the first of the file. - return false; - } - b--; - // TODO shortcut: seek forward in this block to the last key of the block. - } - loadBlock(b, true); - blockSeek(key, offset, length, true); - return true; - } - - public String getKeyString() { - return Bytes.toStringBinary(block.array(), block.arrayOffset() + - block.position(), currKeyLen); - } - - public String getValueString() { - return Bytes.toString(block.array(), block.arrayOffset() + - block.position() + currKeyLen, currValueLen); - } - - public Reader getReader() { - return this.reader; - } - - public boolean isSeeked(){ - return this.block != null; - } - - public boolean seekTo() throws IOException { - if (this.reader.blockIndex.isEmpty()) { - return false; - } - if (block != null && currBlock == 0) { - block.rewind(); - currKeyLen = block.getInt(); - currValueLen = block.getInt(); - return true; - } - currBlock = 0; - block = reader.readBlock(this.currBlock, this.cacheBlocks, this.pread); - currKeyLen = block.getInt(); - currValueLen = block.getInt(); - blockFetches++; - return true; - } - - private void loadBlock(int bloc, boolean rewind) throws IOException { - if (block == null) { - block = reader.readBlock(bloc, this.cacheBlocks, this.pread); - currBlock = bloc; - blockFetches++; - } else { - if (bloc != currBlock) { - block = reader.readBlock(bloc, this.cacheBlocks, this.pread); - currBlock = bloc; - blockFetches++; - } else { - // we are already in the same block, just rewind to seek again. - if (rewind) { - block.rewind(); - } - else { - //Go back by (size of rowlength + size of valuelength) = 8 bytes - block.position(block.position()-8); - } - } - } - } - - @Override - public String toString() { - return "HFileScanner for reader " + String.valueOf(reader); - } - } - - public String getTrailerInfo() { - return trailer.toString(); - } + DataInput getBloomFilterMetadata() throws IOException; } - /* - * The RFile has a fixed trailer which contains offsets to other variable - * parts of the file. Also includes basic metadata on this file. - */ - private static class FixedFileTrailer { - // Offset to the fileinfo data, a small block of vitals.. - long fileinfoOffset; - // Offset to the data block index. - long dataIndexOffset; - // How many index counts are there (aka: block count) - int dataIndexCount; - // Offset to the meta block index. - long metaIndexOffset; - // How many meta block index entries (aka: meta block count) - int metaIndexCount; - long totalUncompressedBytes; - int entryCount; - int compressionCodec; - int version = 1; - - FixedFileTrailer() { - super(); - } - - static int trailerSize() { - // Keep this up to date... - return - ( Bytes.SIZEOF_INT * 5 ) + - ( Bytes.SIZEOF_LONG * 4 ) + - TRAILERBLOCKMAGIC.length; - } - - void serialize(DataOutputStream outputStream) throws IOException { - outputStream.write(TRAILERBLOCKMAGIC); - outputStream.writeLong(fileinfoOffset); - outputStream.writeLong(dataIndexOffset); - outputStream.writeInt(dataIndexCount); - outputStream.writeLong(metaIndexOffset); - outputStream.writeInt(metaIndexCount); - outputStream.writeLong(totalUncompressedBytes); - outputStream.writeInt(entryCount); - outputStream.writeInt(compressionCodec); - outputStream.writeInt(version); - } - - void deserialize(DataInputStream inputStream) throws IOException { - byte [] header = new byte[TRAILERBLOCKMAGIC.length]; - inputStream.readFully(header); - if ( !Arrays.equals(header, TRAILERBLOCKMAGIC)) { - throw new IOException("Trailer 'header' is wrong; does the trailer " + - "size match content?"); - } - fileinfoOffset = inputStream.readLong(); - dataIndexOffset = inputStream.readLong(); - dataIndexCount = inputStream.readInt(); - - metaIndexOffset = inputStream.readLong(); - metaIndexCount = inputStream.readInt(); - - totalUncompressedBytes = inputStream.readLong(); - entryCount = inputStream.readInt(); - compressionCodec = inputStream.readInt(); - version = inputStream.readInt(); - - if (version != 1) { - throw new IOException("Wrong version: " + version); - } - } - - @Override - public String toString() { - return "fileinfoOffset=" + fileinfoOffset + - ", dataIndexOffset=" + dataIndexOffset + - ", dataIndexCount=" + dataIndexCount + - ", metaIndexOffset=" + metaIndexOffset + - ", metaIndexCount=" + metaIndexCount + - ", totalBytes=" + totalUncompressedBytes + - ", entryCount=" + entryCount + - ", version=" + version; + private static Reader pickReaderVersion(Path path, FSDataInputStream fsdis, + long size, boolean closeIStream, BlockCache blockCache, + boolean inMemory, boolean evictOnClose) throws IOException { + FixedFileTrailer trailer = FixedFileTrailer.readFromStream(fsdis, size); + switch (trailer.getVersion()) { + case 1: + return new HFileReaderV1(path, trailer, fsdis, size, closeIStream, + blockCache, inMemory, evictOnClose); + case 2: + return new HFileReaderV2(path, trailer, fsdis, size, closeIStream, + blockCache, inMemory, evictOnClose); + default: + throw new IOException("Cannot instantiate reader for HFile version " + + trailer.getVersion()); } } - /* - * The block index for a RFile. - * Used reading. - */ - static class BlockIndex implements HeapSize { - // How many actual items are there? The next insert location too. - int count = 0; - byte [][] blockKeys; - long [] blockOffsets; - int [] blockDataSizes; - int size = 0; - - /* Needed doing lookup on blocks. - */ - final RawComparator comparator; - - /* - * Shutdown default constructor - */ - @SuppressWarnings("unused") - private BlockIndex() { - this(null); - } - - - /** - * @param c comparator used to compare keys. - */ - BlockIndex(final RawComparatorc) { - this.comparator = c; - // Guess that cost of three arrays + this object is 4 * 8 bytes. - this.size += (4 * 8); - } - - /** - * @return True if block index is empty. - */ - boolean isEmpty() { - return this.blockKeys.length <= 0; - } - - /** - * Adds a new entry in the block index. - * - * @param key Last key in the block - * @param offset file offset where the block is stored - * @param dataSize the uncompressed data size - */ - void add(final byte[] key, final long offset, final int dataSize) { - blockOffsets[count] = offset; - blockKeys[count] = key; - blockDataSizes[count] = dataSize; - count++; - this.size += (Bytes.SIZEOF_INT * 2 + key.length); - } - - /** - * @param key Key to find - * @return Offset of block containing key or -1 if this file - * does not contain the request. - */ - int blockContainingKey(final byte[] key, int offset, int length) { - int pos = Bytes.binarySearch(blockKeys, key, offset, length, this.comparator); - if (pos < 0) { - pos ++; - pos *= -1; - if (pos == 0) { - // falls before the beginning of the file. - return -1; - } - // When switched to "first key in block" index, binarySearch now returns - // the block with a firstKey < key. This means the value we want is potentially - // in the next block. - pos --; // in previous block. - - return pos; - } - // wow, a perfect hit, how unlikely? - return pos; - } - - /* - * @return File midkey. Inexact. Operates on block boundaries. Does - * not go into blocks. - */ - byte [] midkey() throws IOException { - int pos = ((this.count - 1)/2); // middle of the index - if (pos < 0) { - throw new IOException("HFile empty"); - } - return this.blockKeys[pos]; - } - - /* - * Write out index. Whatever we write here must jibe with what - * BlockIndex#readIndex is expecting. Make sure the two ends of the - * index serialization match. - * @param o - * @param keys - * @param offsets - * @param sizes - * @param c - * @return Position at which we entered the index. - * @throws IOException - */ - static long writeIndex(final FSDataOutputStream o, - final List keys, final List offsets, - final List sizes) - throws IOException { - long pos = o.getPos(); - // Don't write an index if nothing in the index. - if (keys.size() > 0) { - o.write(INDEXBLOCKMAGIC); - // Write the index. - for (int i = 0; i < keys.size(); ++i) { - o.writeLong(offsets.get(i).longValue()); - o.writeInt(sizes.get(i).intValue()); - byte [] key = keys.get(i); - Bytes.writeByteArray(o, key); - } - } - return pos; - } - - /* - * Read in the index that is at indexOffset - * Must match what was written by writeIndex in the Writer.close. - * @param c Comparator to use. - * @param in - * @param indexSize - * @throws IOException - */ - static BlockIndex readIndex(final RawComparator c, - DataInputStream in, final int indexSize) - throws IOException { - BlockIndex bi = new BlockIndex(c); - bi.blockOffsets = new long[indexSize]; - bi.blockKeys = new byte[indexSize][]; - bi.blockDataSizes = new int[indexSize]; - // If index size is zero, no index was written. - if (indexSize > 0) { - byte [] magic = new byte[INDEXBLOCKMAGIC.length]; - in.readFully(magic); - if (!Arrays.equals(magic, INDEXBLOCKMAGIC)) { - throw new IOException("Index block magic is wrong: " + - Arrays.toString(magic)); - } - for (int i = 0; i < indexSize; ++i ) { - long offset = in.readLong(); - int dataSize = in.readInt(); - byte [] key = Bytes.readByteArray(in); - bi.add(key, offset, dataSize); - } - } - return bi; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("size=" + count).append("\n"); - for (int i = 0; i < count ; i++) { - sb.append("key=").append(KeyValue.keyToString(blockKeys[i])). - append("\n offset=").append(blockOffsets[i]). - append(", dataSize=" + blockDataSizes[i]). - append("\n"); - } - return sb.toString(); - } - - public long heapSize() { - long heapsize = ClassSize.align(ClassSize.OBJECT + - 2 * Bytes.SIZEOF_INT + (3 + 1) * ClassSize.REFERENCE); - //Calculating the size of blockKeys - if(blockKeys != null) { - //Adding array + references overhead - heapsize += ClassSize.align(ClassSize.ARRAY + - blockKeys.length * ClassSize.REFERENCE); - //Adding bytes - for(byte [] bs : blockKeys) { - heapsize += ClassSize.align(ClassSize.ARRAY + bs.length); - } - } - if(blockOffsets != null) { - heapsize += ClassSize.align(ClassSize.ARRAY + - blockOffsets.length * Bytes.SIZEOF_LONG); - } - if(blockDataSizes != null) { - heapsize += ClassSize.align(ClassSize.ARRAY + - blockDataSizes.length * Bytes.SIZEOF_INT); - } - - return ClassSize.align(heapsize); - } + public static Reader createReader( + FileSystem fs, Path path, BlockCache blockCache, boolean inMemory, + boolean evictOnClose) throws IOException { + return pickReaderVersion(path, fs.open(path), + fs.getFileStatus(path).getLen(), true, blockCache, inMemory, + evictOnClose); + } + public static Reader createReader(Path path, FSDataInputStream fsdis, + long size, BlockCache blockache, boolean inMemory, boolean evictOnClose) + throws IOException { + return pickReaderVersion(path, fsdis, size, false, blockache, inMemory, + evictOnClose); } /* @@ -1825,23 +366,37 @@ public class HFile { static final byte [] COMPARATOR = Bytes.toBytes(RESERVED_PREFIX + "COMPARATOR"); - /* - * Constructor. + /** + * Append the given key/value pair to the file info, optionally checking the + * key prefix. + * + * @param k key to add + * @param v value to add + * @param checkPrefix whether to check that the provided key does not start + * with the reserved prefix + * @return this file info object + * @throws IOException if the key or value is invalid */ - FileInfo() { - super(); + public FileInfo append(final byte[] k, final byte[] v, + final boolean checkPrefix) throws IOException { + if (k == null || v == null) { + throw new NullPointerException("Key nor value may be null"); + } + if (checkPrefix && isReservedFileInfoKey(k)) { + throw new IOException("Keys with a " + FileInfo.RESERVED_PREFIX + + " are reserved"); + } + put(k, v); + return this; } + } - /** - * Return true if the given file info key is reserved for internal - * use by HFile. - */ + /** Return true if the given file info key is reserved for internal use. */ public static boolean isReservedFileInfoKey(byte[] key) { return Bytes.startsWith(key, FileInfo.RESERVED_PREFIX_BYTES); } - /** * Get names of supported compression algorithms. The names are acceptable by * HFile.Writer. @@ -1879,7 +434,7 @@ public class HFile { * @throws IOException When scanning the files fails. */ static List getStoreFiles(FileSystem fs, Path regionDir) - throws IOException { + throws IOException { List res = new ArrayList(); PathFilter dirFilter = new FSUtils.DirFilter(fs); FileStatus[] familyDirs = fs.listStatus(regionDir, dirFilter); @@ -1894,171 +449,31 @@ public class HFile { return res; } - public static void main(String []args) throws IOException { - try { - // create options - Options options = new Options(); - options.addOption("v", "verbose", false, "Verbose output; emits file and meta data delimiters"); - options.addOption("p", "printkv", false, "Print key/value pairs"); - options.addOption("e", "printkey", false, "Print keys"); - options.addOption("m", "printmeta", false, "Print meta data of file"); - options.addOption("b", "printblocks", false, "Print block index meta data"); - options.addOption("k", "checkrow", false, - "Enable row order check; looks for out-of-order keys"); - options.addOption("a", "checkfamily", false, "Enable family check"); - options.addOption("f", "file", true, - "File to scan. Pass full-path; e.g. hdfs://a:9000/hbase/.META./12/34"); - options.addOption("r", "region", true, - "Region to scan. Pass region name; e.g. '.META.,,1'"); - if (args.length == 0) { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp("HFile ", options, true); - System.exit(-1); - } - CommandLineParser parser = new PosixParser(); - CommandLine cmd = parser.parse(options, args); - boolean verbose = cmd.hasOption("v"); - boolean printValue = cmd.hasOption("p"); - boolean printKey = cmd.hasOption("e") || printValue; - boolean printMeta = cmd.hasOption("m"); - boolean printBlocks = cmd.hasOption("b"); - boolean checkRow = cmd.hasOption("k"); - boolean checkFamily = cmd.hasOption("a"); - // get configuration, file system and get list of files - Configuration conf = HBaseConfiguration.create(); - conf.set("fs.defaultFS", - conf.get(org.apache.hadoop.hbase.HConstants.HBASE_DIR)); - conf.set("fs.default.name", - conf.get(org.apache.hadoop.hbase.HConstants.HBASE_DIR)); - ArrayList files = new ArrayList(); - if (cmd.hasOption("f")) { - files.add(new Path(cmd.getOptionValue("f"))); - } - if (cmd.hasOption("r")) { - String regionName = cmd.getOptionValue("r"); - byte[] rn = Bytes.toBytes(regionName); - byte[][] hri = HRegionInfo.parseRegionName(rn); - Path rootDir = FSUtils.getRootDir(conf); - Path tableDir = new Path(rootDir, Bytes.toString(hri[0])); - String enc = HRegionInfo.encodeRegionName(rn); - Path regionDir = new Path(tableDir, enc); - if (verbose) System.out.println("region dir -> " + regionDir); - List regionFiles = - getStoreFiles(FileSystem.get(conf), regionDir); - if (verbose) System.out.println("Number of region files found -> " + - regionFiles.size()); - if (verbose) { - int i = 1; - for (Path p : regionFiles) { - if (verbose) System.out.println("Found file[" + i++ + "] -> " + p); - } - } - files.addAll(regionFiles); - } - // iterate over all files found - for (Path file : files) { - if (verbose) System.out.println("Scanning -> " + file); - FileSystem fs = file.getFileSystem(conf); - if (!fs.exists(file)) { - System.err.println("ERROR, file doesnt exist: " + file); - continue; - } - // create reader and load file info - HFile.Reader reader = new HFile.Reader(fs, file, null, false, false); - Map fileInfo = reader.loadFileInfo(); - int count = 0; - if (verbose || printKey || checkRow || checkFamily) { - // scan over file and read key/value's and check if requested - HFileScanner scanner = reader.getScanner(false, false); - scanner.seekTo(); - KeyValue pkv = null; - do { - KeyValue kv = scanner.getKeyValue(); - // dump key value - if (printKey) { - System.out.print("K: " + kv); - if (printValue) { - System.out.print(" V: " + Bytes.toStringBinary(kv.getValue())); - } - System.out.println(); - } - // check if rows are in order - if (checkRow && pkv != null) { - if (Bytes.compareTo(pkv.getRow(), kv.getRow()) > 0) { - System.err.println("WARNING, previous row is greater then" + - " current row\n\tfilename -> " + file + - "\n\tprevious -> " + Bytes.toStringBinary(pkv.getKey()) + - "\n\tcurrent -> " + Bytes.toStringBinary(kv.getKey())); - } - } - // check if families are consistent - if (checkFamily) { - String fam = Bytes.toString(kv.getFamily()); - if (!file.toString().contains(fam)) { - System.err.println("WARNING, filename does not match kv family," + - "\n\tfilename -> " + file + - "\n\tkeyvalue -> " + Bytes.toStringBinary(kv.getKey())); - } - if (pkv != null && !Bytes.equals(pkv.getFamily(), kv.getFamily())) { - System.err.println("WARNING, previous kv has different family" + - " compared to current key\n\tfilename -> " + file + - "\n\tprevious -> " + Bytes.toStringBinary(pkv.getKey()) + - "\n\tcurrent -> " + Bytes.toStringBinary(kv.getKey())); - } - } - pkv = kv; - count++; - } while (scanner.next()); - } - if (verbose || printKey) { - System.out.println("Scanned kv count -> " + count); - } - // print meta data - if (printMeta) { - System.out.println("Block index size as per heapsize: " + reader.indexSize()); - System.out.println(reader.toString()); - System.out.println(reader.getTrailerInfo()); - System.out.println("Fileinfo:"); - for (Map.Entry e : fileInfo.entrySet()) { - System.out.print(Bytes.toString(e.getKey()) + " = " ); - if (Bytes.equals(e.getKey(), Bytes.toBytes("MAX_SEQ_ID_KEY"))) { - long seqid = Bytes.toLong(e.getValue()); - System.out.println(seqid); - } else if (Bytes.equals(e.getKey(), - Bytes.toBytes("TIMERANGE"))) { - TimeRangeTracker timeRangeTracker = new TimeRangeTracker(); - Writables.copyWritable(e.getValue(), timeRangeTracker); - System.out.println(timeRangeTracker.getMinimumTimestamp() + - "...." + timeRangeTracker.getMaximumTimestamp()); - } else if (Bytes.equals(e.getKey(), FileInfo.AVG_KEY_LEN) || - Bytes.equals(e.getKey(), FileInfo.AVG_VALUE_LEN)) { - System.out.println(Bytes.toInt(e.getValue())); - } else { - System.out.println(Bytes.toStringBinary(e.getValue())); - } - } - - //Printing bloom information - ByteBuffer b = reader.getMetaBlock("BLOOM_FILTER_META", false); - if (b!= null) { - BloomFilter bloomFilter = new ByteBloomFilter(b); - System.out.println("BloomSize: " + bloomFilter.getByteSize()); - System.out.println("No of Keys in bloom: " + - bloomFilter.getKeyCount()); - System.out.println("Max Keys for bloom: " + - bloomFilter.getMaxKeys()); - } else { - System.out.println("Could not get bloom data from meta block"); - } - } - if (printBlocks) { - System.out.println("Block Index:"); - System.out.println(reader.blockIndex); - } - reader.close(); - } - } catch (Exception e) { - e.printStackTrace(); + public static void main(String[] args) throws IOException { + HFilePrettyPrinter prettyPrinter = new HFilePrettyPrinter(); + System.exit(prettyPrinter.run(args)); + } + + public static String getBlockCacheKey(String hfileName, long offset) { + return hfileName + CACHE_KEY_SEPARATOR + offset; + } + + /** + * Checks the given {@link HFile} format version, and throws an exception if + * invalid. Note that if the version number comes from an input file and has + * not been verified, the caller needs to re-throw an {@link IOException} to + * indicate that this is not a software error, but corrupted input. + * + * @param version an HFile version + * @throws IllegalArgumentException if the version is invalid + */ + public static void checkFormatVersion(int version) + throws IllegalArgumentException { + if (version < MIN_FORMAT_VERSION || version > MAX_FORMAT_VERSION) { + throw new IllegalArgumentException("Invalid HFile version: " + version + + " (expected to be " + "between " + MIN_FORMAT_VERSION + " and " + + MAX_FORMAT_VERSION + ")"); } } + } Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java?rev=1153634&r1=1153633&r2=1153634&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java Wed Aug 3 19:59:48 2011 @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.io.hfile; import java.lang.ref.WeakReference; -import java.nio.ByteBuffer; import java.util.LinkedList; import java.util.PriorityQueue; import java.util.concurrent.atomic.AtomicLong; @@ -251,7 +250,7 @@ public class LruBlockCache implements Bl * @param buf block buffer * @param inMemory if block is in-memory */ - public void cacheBlock(String blockName, ByteBuffer buf, boolean inMemory) { + public void cacheBlock(String blockName, HeapSize buf, boolean inMemory) { CachedBlock cb = map.get(blockName); if(cb != null) { throw new RuntimeException("Cached an already cached block"); @@ -275,7 +274,7 @@ public class LruBlockCache implements Bl * @param blockName block name * @param buf block buffer */ - public void cacheBlock(String blockName, ByteBuffer buf) { + public void cacheBlock(String blockName, HeapSize buf) { cacheBlock(blockName, buf, false); } @@ -284,7 +283,7 @@ public class LruBlockCache implements Bl * @param blockName block name * @return buffer of specified block name, or null if not in cache */ - public ByteBuffer getBlock(String blockName, boolean caching) { + public HeapSize getBlock(String blockName, boolean caching) { CachedBlock cb = map.get(blockName); if(cb == null) { stats.miss(caching); @@ -304,6 +303,31 @@ public class LruBlockCache implements Bl return true; } + /** + * Evicts all blocks whose name starts with the given prefix. This is an + * expensive operation implemented as a linear-time search through all blocks + * in the cache. Ideally this should be a search in a log-access-time map. + * + *

+ * This is used for evict-on-close to remove all blocks of a specific HFile. + * The prefix would be the HFile/StoreFile name (a UUID) followed by an + * underscore, because HFile v2 block names in cache are of the form + * "<storeFileUUID>_<blockOffset>". + * + * @return the number of blocks evicted + */ + @Override + public int evictBlocksByPrefix(String prefix) { + int numEvicted = 0; + for (String key : map.keySet()) { + if (key.startsWith(prefix)) { + if (evictBlock(key)) + ++numEvicted; + } + } + return numEvicted; + } + protected long evictBlock(CachedBlock block) { map.remove(block.getName()); size.addAndGet(-1 * block.heapSize()); Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/SimpleBlockCache.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/SimpleBlockCache.java?rev=1153634&r1=1153633&r2=1153634&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/SimpleBlockCache.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/SimpleBlockCache.java Wed Aug 3 19:59:48 2011 @@ -25,6 +25,7 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; +import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.hfile.LruBlockCache.CacheStats; @@ -32,10 +33,10 @@ import org.apache.hadoop.hbase.io.hfile. * Simple one RFile soft reference cache. */ public class SimpleBlockCache implements BlockCache { - private static class Ref extends SoftReference { + private static class Ref extends SoftReference { public String blockId; - public Ref(String blockId, ByteBuffer buf, ReferenceQueue q) { - super(buf, q); + public Ref(String blockId, HeapSize block, ReferenceQueue q) { + super(block, q); this.blockId = blockId; } } @@ -68,7 +69,7 @@ public class SimpleBlockCache implements return cache.size(); } - public synchronized ByteBuffer getBlock(String blockName, boolean caching) { + public synchronized HeapSize getBlock(String blockName, boolean caching) { processQueue(); // clear out some crap. Ref ref = cache.get(blockName); if (ref == null) @@ -76,13 +77,13 @@ public class SimpleBlockCache implements return ref.get(); } - public synchronized void cacheBlock(String blockName, ByteBuffer buf) { - cache.put(blockName, new Ref(blockName, buf, q)); + public synchronized void cacheBlock(String blockName, HeapSize block) { + cache.put(blockName, new Ref(blockName, block, q)); } - public synchronized void cacheBlock(String blockName, ByteBuffer buf, + public synchronized void cacheBlock(String blockName, HeapSize block, boolean inMemory) { - cache.put(blockName, new Ref(blockName, buf, q)); + cache.put(blockName, new Ref(blockName, block, q)); } @Override @@ -117,4 +118,10 @@ public class SimpleBlockCache implements // TODO: implement this if we ever actually use this block cache return 0; } + + @Override + public int evictBlocksByPrefix(String string) { + throw new UnsupportedOperationException(); + } } + Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java?rev=1153634&r1=1153633&r2=1153634&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java Wed Aug 3 19:59:48 2011 @@ -79,7 +79,7 @@ public class HFileOutputFormat extends F // Get the path of the temporary output file final Path outputPath = FileOutputFormat.getOutputPath(context); final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath(); - Configuration conf = context.getConfiguration(); + final Configuration conf = context.getConfiguration(); final FileSystem fs = outputdir.getFileSystem(conf); // These configs. are from hbase-*.xml final long maxsize = conf.getLong("hbase.hregion.max.filesize", @@ -132,7 +132,7 @@ public class HFileOutputFormat extends F // create a new HLog writer, if necessary if (wl == null || wl.writer == null) { - wl = getNewWriter(family); + wl = getNewWriter(family, conf); } // we now have the proper HLog writer. full steam ahead @@ -163,12 +163,13 @@ public class HFileOutputFormat extends F * @return A WriterLength, containing a new HFile.Writer. * @throws IOException */ - private WriterLength getNewWriter(byte[] family) throws IOException { + private WriterLength getNewWriter(byte[] family, Configuration conf) + throws IOException { WriterLength wl = new WriterLength(); Path familydir = new Path(outputdir, Bytes.toString(family)); String compression = compressionMap.get(family); compression = compression == null ? defaultCompression : compression; - wl.writer = new HFile.Writer(fs, + wl.writer = HFile.getWriterFactory(conf).createWriter(fs, StoreFile.getUniqueFile(fs, familydir), blocksize, compression, KeyValue.KEY_COMPARATOR); this.writers.put(family, wl); Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1153634&r1=1153633&r2=1153634&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java Wed Aug 3 19:59:48 2011 @@ -288,7 +288,7 @@ public class LoadIncrementalHFiles exten throws IOException { final Path hfilePath = item.hfilePath; final FileSystem fs = hfilePath.getFileSystem(getConf()); - HFile.Reader hfr = new HFile.Reader(fs, hfilePath, null, false, false); + HFile.Reader hfr = HFile.createReader(fs, hfilePath, null, false, false); final byte[] first, last; try { hfr.loadFileInfo(); @@ -390,7 +390,7 @@ public class LoadIncrementalHFiles exten halfWriter = new StoreFile.Writer( fs, outFile, blocksize, compression, conf, KeyValue.COMPARATOR, - bloomFilterType, 0, false); + bloomFilterType, 0); HFileScanner scanner = halfReader.getScanner(false, false); scanner.seekTo(); do { @@ -490,7 +490,7 @@ public class LoadIncrementalHFiles exten for (Path hfile : hfiles) { if (hfile.getName().startsWith("_")) continue; - HFile.Reader reader = new HFile.Reader(fs, hfile, null, false, false); + HFile.Reader reader = HFile.createReader(fs, hfile, null, false, false); final byte[] first, last; try { reader.loadFileInfo();