Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 54F271037A for ; Mon, 4 Nov 2013 17:03:22 +0000 (UTC) Received: (qmail 98522 invoked by uid 500); 4 Nov 2013 17:03:19 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 98480 invoked by uid 500); 4 Nov 2013 17:03:18 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 98377 invoked by uid 99); 4 Nov 2013 17:03:16 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 04 Nov 2013 17:03:16 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id A9C6F3A992; Mon, 4 Nov 2013 17:03:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vines@apache.org To: commits@accumulo.apache.org Date: Mon, 04 Nov 2013 17:03:15 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] ACCUMULO-1679 - removing TFile and Chunk Updated Branches: refs/heads/1.6.0-SNAPSHOT 61d00d47d -> ef6ba24c7 http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef6ba24c/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/TFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/TFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/TFile.java deleted file mode 100644 index f2cb326..0000000 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/TFile.java +++ /dev/null @@ -1,2030 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.accumulo.core.file.rfile.bcfile; - -import java.io.ByteArrayInputStream; -import java.io.Closeable; -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.EOFException; -import java.io.IOException; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Comparator; - -import org.apache.accumulo.core.file.rfile.bcfile.BCFile.Reader.BlockReader; -import org.apache.accumulo.core.file.rfile.bcfile.BCFile.Writer.BlockAppender; -import org.apache.accumulo.core.file.rfile.bcfile.Chunk.ChunkDecoder; -import org.apache.accumulo.core.file.rfile.bcfile.Chunk.ChunkEncoder; -import org.apache.accumulo.core.file.rfile.bcfile.CompareUtils.BytesComparator; -import org.apache.accumulo.core.file.rfile.bcfile.CompareUtils.MemcmpRawComparator; -import org.apache.accumulo.core.file.rfile.bcfile.Utils.Version; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.DataInputBuffer; -import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.RawComparator; -import org.apache.hadoop.io.WritableComparator; -import org.apache.hadoop.io.serializer.JavaSerializationComparator; - -/** - * A TFile is a container of key-value pairs. Both keys and values are type-less bytes. Keys are restricted to 64KB, value length is not restricted (practically - * limited to the available disk storage). TFile further provides the following features: - *
    - *
  • Block Compression. - *
  • Named meta data blocks. - *
  • Sorted or unsorted keys. - *
  • Seek by key or by file offset. - *
- * The memory footprint of a TFile includes the following: - *
    - *
  • Some constant overhead of reading or writing a compressed block. - *
      - *
    • Each compressed block requires one compression/decompression codec for I/O. - *
    • Temporary space to buffer the key. - *
    • Temporary space to buffer the value (for TFile.Writer only). Values are chunk encoded, so that we buffer at most one chunk of user data. By default, the - * chunk buffer is 1MB. Reading chunked value does not require additional memory. - *
    - *
  • TFile index, which is proportional to the total number of Data Blocks. The total amount of memory needed to hold the index can be estimated as - * (56+AvgKeySize)*NumBlocks. - *
  • MetaBlock index, which is proportional to the total number of Meta Blocks.The total amount of memory needed to hold the index for Meta Blocks can be - * estimated as (40+AvgMetaBlockName)*NumMetaBlock. - *
- *

- * The behavior of TFile can be customized by the following variables through Configuration: - *

    - *
  • tfile.io.chunk.size: Value chunk size. Integer (in bytes). Default to 1MB. Values of the length less than the chunk size is guaranteed to have - * known value length in read time (See {@link TFile.Reader.Scanner.Entry#isValueLengthKnown()}). - *
  • tfile.fs.output.buffer.size: Buffer size used for FSDataOutputStream. Integer (in bytes). Default to 256KB. - *
  • tfile.fs.input.buffer.size: Buffer size used for FSDataInputStream. Integer (in bytes). Default to 256KB. - *
- *

- * Suggestions on performance optimization. - *

    - *
  • Minimum block size. We recommend a setting of minimum block size between 256KB to 1MB for general usage. Larger block size is preferred if files are - * primarily for sequential access. However, it would lead to inefficient random access (because there are more data to decompress). Smaller blocks are good for - * random access, but require more memory to hold the block index, and may be slower to create (because we must flush the compressor stream at the conclusion of - * each data block, which leads to an FS I/O flush). Further, due to the internal caching in Compression codec, the smallest possible block size would be around - * 20KB-30KB. - *
  • The current implementation does not offer true multi-threading for reading. The implementation uses FSDataInputStream seek()+read(), which is shown to be - * much faster than positioned-read call in single thread mode. However, it also means that if multiple threads attempt to access the same TFile (using multiple - * scanners) simultaneously, the actual I/O is carried out sequentially even if they access different DFS blocks. - *
  • Compression codec. Use "none" if the data is not very compressable (by compressable, I mean a compression ratio at least 2:1). Generally, use "lzo" as - * the starting point for experimenting. "gz" overs slightly better compression ratio over "lzo" but requires 4x CPU to compress and 2x CPU to decompress, - * comparing to "lzo". - *
  • File system buffering, if the underlying FSDataInputStream and FSDataOutputStream is already adequately buffered; or if applications reads/writes keys - * and values in large buffers, we can reduce the sizes of input/output buffering in TFile layer by setting the configuration parameters - * "tfile.fs.input.buffer.size" and "tfile.fs.output.buffer.size". - *
- * - * Some design rationale behind TFile can be found at Hadoop-3315. - */ -public class TFile { - static final Log LOG = LogFactory.getLog(TFile.class); - - private static final String CHUNK_BUF_SIZE_ATTR = "tfile.io.chunk.size"; - private static final String FS_INPUT_BUF_SIZE_ATTR = "tfile.fs.input.buffer.size"; - private static final String FS_OUTPUT_BUF_SIZE_ATTR = "tfile.fs.output.buffer.size"; - - static int getChunkBufferSize(Configuration conf) { - int ret = conf.getInt(CHUNK_BUF_SIZE_ATTR, 1024 * 1024); - return (ret > 0) ? ret : 1024 * 1024; - } - - static int getFSInputBufferSize(Configuration conf) { - return conf.getInt(FS_INPUT_BUF_SIZE_ATTR, 256 * 1024); - } - - static int getFSOutputBufferSize(Configuration conf) { - return conf.getInt(FS_OUTPUT_BUF_SIZE_ATTR, 256 * 1024); - } - - private static final int MAX_KEY_SIZE = 64 * 1024; // 64KB - static final Version API_VERSION = new Version((short) 1, (short) 0); - - /** snappy codec **/ - public static final String COMPRESSION_SNAPPY = "snappy"; - - /** compression: gzip */ - public static final String COMPRESSION_GZ = "gz"; - /** compression: lzo */ - public static final String COMPRESSION_LZO = "lzo"; - /** compression: none */ - public static final String COMPRESSION_NONE = "none"; - /** comparator: memcmp */ - public static final String COMPARATOR_MEMCMP = "memcmp"; - /** comparator prefix: java class */ - public static final String COMPARATOR_JCLASS = "jclass:"; - - /** - * Make a raw comparator from a string name. - * - * @param name - * Comparator name - * @return A RawComparable comparator. - */ - static public Comparator makeComparator(String name) { - return TFileMeta.makeComparator(name); - } - - // Prevent the instantiation of TFiles - private TFile() { - // nothing - } - - /** - * Get names of supported compression algorithms. The names are acceptable by TFile.Writer. - * - * @return Array of strings, each represents a supported compression algorithm. Currently, the following compression algorithms are supported. - *
    - *
  • "none" - No compression. - *
  • "lzo" - LZO compression. - *
  • "gz" - GZIP compression. - *
  • "snappy" - Snappy compression - *
- */ - public static String[] getSupportedCompressionAlgorithms() { - return Compression.getSupportedAlgorithms(); - } - - /** - * TFile Writer. - */ - public static class Writer implements Closeable { - // minimum compressed size for a block. - private final int sizeMinBlock; - - // Meta blocks. - final TFileIndex tfileIndex; - final TFileMeta tfileMeta; - - // reference to the underlying BCFile. - private BCFile.Writer writerBCF; - - // current data block appender. - BlockAppender blkAppender; - long blkRecordCount; - - // buffers for caching the key. - BoundedByteArrayOutputStream currentKeyBufferOS; - BoundedByteArrayOutputStream lastKeyBufferOS; - - // buffer used by chunk codec - private byte[] valueBuffer; - - /** - * Writer states. The state always transits in circles: READY -> IN_KEY -> END_KEY -> IN_VALUE -> READY. - */ - private enum State { - READY, // Ready to start a new key-value pair insertion. - IN_KEY, // In the middle of key insertion. - END_KEY, // Key insertion complete, ready to insert value. - IN_VALUE, // In value insertion. - // ERROR, // Error encountered, cannot continue. - CLOSED, // TFile already closed. - } - - // current state of Writer. - State state = State.READY; - Configuration conf; - long errorCount = 0; - - /** - * Constructor - * - * @param fsdos - * output stream for writing. Must be at position 0. - * @param minBlockSize - * Minimum compressed block size in bytes. A compression block will not be closed until it reaches this size except for the last block. - * @param compressName - * Name of the compression algorithm. Must be one of the strings returned by {@link TFile#getSupportedCompressionAlgorithms()}. - * @param comparator - * Leave comparator as null or empty string if TFile is not sorted. Otherwise, provide the string name for the comparison algorithm for keys. Two - * kinds of comparators are supported. - *
    - *
  • Algorithmic comparator: binary comparators that is language independent. Currently, only "memcmp" is supported. - *
  • Language-specific comparator: binary comparators that can only be constructed in specific language. For Java, the syntax is "jclass:", - * followed by the class name of the RawComparator. Currently, we only support RawComparators that can be constructed through the default - * constructor (with no parameters). Parameterized RawComparators such as {@link WritableComparator} or {@link JavaSerializationComparator} may not - * be directly used. One should write a wrapper class that inherits from such classes and use its default constructor to perform proper - * initialization. - *
- * @param conf - * The configuration object. - * @throws IOException - */ - public Writer(FSDataOutputStream fsdos, int minBlockSize, String compressName, String comparator, Configuration conf) throws IOException { - sizeMinBlock = minBlockSize; - tfileMeta = new TFileMeta(comparator); - tfileIndex = new TFileIndex(tfileMeta.getComparator()); - - writerBCF = new BCFile.Writer(fsdos, compressName, conf, true); - currentKeyBufferOS = new BoundedByteArrayOutputStream(MAX_KEY_SIZE); - lastKeyBufferOS = new BoundedByteArrayOutputStream(MAX_KEY_SIZE); - this.conf = conf; - } - - /** - * Close the Writer. Resources will be released regardless of the exceptions being thrown. Future close calls will have no effect. - * - * The underlying FSDataOutputStream is not closed. - */ - public void close() throws IOException { - if ((state == State.CLOSED)) { - return; - } - try { - // First try the normal finish. - // Terminate upon the first Exception. - if (errorCount == 0) { - if (state != State.READY) { - throw new IllegalStateException("Cannot close TFile in the middle of key-value insertion."); - } - - finishDataBlock(true); - - // first, write out data:TFile.meta - BlockAppender outMeta = writerBCF.prepareMetaBlock(TFileMeta.BLOCK_NAME, COMPRESSION_NONE); - try { - tfileMeta.write(outMeta); - } finally { - outMeta.close(); - } - - // second, write out data:TFile.index - BlockAppender outIndex = writerBCF.prepareMetaBlock(TFileIndex.BLOCK_NAME); - try { - tfileIndex.write(outIndex); - } finally { - outIndex.close(); - } - - writerBCF.close(); - } - } finally { - IOUtils.cleanup(LOG, blkAppender, writerBCF); - blkAppender = null; - writerBCF = null; - state = State.CLOSED; - } - } - - /** - * Adding a new key-value pair to the TFile. This is synonymous to append(key, 0, key.length, value, 0, value.length) - * - * @param key - * Buffer for key. - * @param value - * Buffer for value. - * @throws IOException - */ - public void append(byte[] key, byte[] value) throws IOException { - append(key, 0, key.length, value, 0, value.length); - } - - /** - * Adding a new key-value pair to TFile. - * - * @param key - * buffer for key. - * @param koff - * offset in key buffer. - * @param klen - * length of key. - * @param value - * buffer for value. - * @param voff - * offset in value buffer. - * @param vlen - * length of value. - * @throws IOException - * Upon IO errors. - *

- * If an exception is thrown, the TFile will be in an inconsistent state. The only legitimate call after that would be close - */ - public void append(byte[] key, int koff, int klen, byte[] value, int voff, int vlen) throws IOException { - if ((koff | klen | (koff + klen) | (key.length - (koff + klen))) < 0) { - throw new IndexOutOfBoundsException("Bad key buffer offset-length combination."); - } - - if ((voff | vlen | (voff + vlen) | (value.length - (voff + vlen))) < 0) { - throw new IndexOutOfBoundsException("Bad value buffer offset-length combination."); - } - - try { - DataOutputStream dosKey = prepareAppendKey(klen); - try { - ++errorCount; - dosKey.write(key, koff, klen); - --errorCount; - } finally { - dosKey.close(); - } - - DataOutputStream dosValue = prepareAppendValue(vlen); - try { - ++errorCount; - dosValue.write(value, voff, vlen); - --errorCount; - } finally { - dosValue.close(); - } - } finally { - state = State.READY; - } - } - - /** - * Helper class to register key after close call on key append stream. - */ - private class KeyRegister extends DataOutputStream { - private final int expectedLength; - private boolean closed = false; - - public KeyRegister(int len) { - super(currentKeyBufferOS); - if (len >= 0) { - currentKeyBufferOS.reset(len); - } else { - currentKeyBufferOS.reset(); - } - expectedLength = len; - } - - @Override - public void close() throws IOException { - if (closed == true) { - return; - } - - try { - ++errorCount; - byte[] key = currentKeyBufferOS.getBuffer(); - int len = currentKeyBufferOS.size(); - /** - * verify length. - */ - if (expectedLength >= 0 && expectedLength != len) { - throw new IOException("Incorrect key length: expected=" + expectedLength + " actual=" + len); - } - - Utils.writeVInt(blkAppender, len); - blkAppender.write(key, 0, len); - if (tfileIndex.getFirstKey() == null) { - tfileIndex.setFirstKey(key, 0, len); - } - - if (tfileMeta.isSorted()) { - byte[] lastKey = lastKeyBufferOS.getBuffer(); - int lastLen = lastKeyBufferOS.size(); - if (tfileMeta.getComparator().compare(key, 0, len, lastKey, 0, lastLen) < 0) { - throw new IOException("Keys are not added in sorted order"); - } - } - - BoundedByteArrayOutputStream tmp = currentKeyBufferOS; - currentKeyBufferOS = lastKeyBufferOS; - lastKeyBufferOS = tmp; - --errorCount; - } finally { - closed = true; - state = State.END_KEY; - } - } - } - - /** - * Helper class to register value after close call on value append stream. - */ - private class ValueRegister extends DataOutputStream { - private boolean closed = false; - - public ValueRegister(OutputStream os) { - super(os); - } - - // Avoiding flushing call to down stream. - @Override - public void flush() { - // do nothing - } - - @Override - public void close() throws IOException { - if (closed == true) { - return; - } - - try { - ++errorCount; - super.close(); - blkRecordCount++; - // bump up the total record count in the whole file - tfileMeta.incRecordCount(); - finishDataBlock(false); - --errorCount; - } finally { - closed = true; - state = State.READY; - } - } - } - - /** - * Obtain an output stream for writing a key into TFile. This may only be called when there is no active Key appending stream or value appending stream. - * - * @param length - * The expected length of the key. If length of the key is not known, set length = -1. Otherwise, the application must write exactly as many bytes - * as specified here before calling close on the returned output stream. - * @return The key appending output stream. - * @throws IOException - * - */ - public DataOutputStream prepareAppendKey(int length) throws IOException { - if (state != State.READY) { - throw new IllegalStateException("Incorrect state to start a new key: " + state.name()); - } - - initDataBlock(); - DataOutputStream ret = new KeyRegister(length); - state = State.IN_KEY; - return ret; - } - - /** - * Obtain an output stream for writing a value into TFile. This may only be called right after a key appending operation (the key append stream must be - * closed). - * - * @param length - * The expected length of the value. If length of the value is not known, set length = -1. Otherwise, the application must write exactly as many - * bytes as specified here before calling close on the returned output stream. Advertising the value size up-front guarantees that the value is - * encoded in one chunk, and avoids intermediate chunk buffering. - * @throws IOException - * - */ - public DataOutputStream prepareAppendValue(int length) throws IOException { - if (state != State.END_KEY) { - throw new IllegalStateException("Incorrect state to start a new value: " + state.name()); - } - - DataOutputStream ret; - - // unknown length - if (length < 0) { - if (valueBuffer == null) { - valueBuffer = new byte[getChunkBufferSize(conf)]; - } - ret = new ValueRegister(new ChunkEncoder(blkAppender, valueBuffer)); - } else { - ret = new ValueRegister(new Chunk.SingleChunkEncoder(blkAppender, length)); - } - - state = State.IN_VALUE; - return ret; - } - - /** - * Obtain an output stream for creating a meta block. This function may not be called when there is a key append stream or value append stream active. No - * more key-value insertion is allowed after a meta data block has been added to TFile. - * - * @param name - * Name of the meta block. - * @param compressName - * Name of the compression algorithm to be used. Must be one of the strings returned by {@link TFile#getSupportedCompressionAlgorithms()}. - * @return A DataOutputStream that can be used to write Meta Block data. Closing the stream would signal the ending of the block. - * @throws IOException - * @throws MetaBlockAlreadyExists - * the Meta Block with the same name already exists. - */ - public DataOutputStream prepareMetaBlock(String name, String compressName) throws IOException, MetaBlockAlreadyExists { - if (state != State.READY) { - throw new IllegalStateException("Incorrect state to start a Meta Block: " + state.name()); - } - - finishDataBlock(true); - DataOutputStream outputStream = writerBCF.prepareMetaBlock(name, compressName); - return outputStream; - } - - /** - * Obtain an output stream for creating a meta block. This function may not be called when there is a key append stream or value append stream active. No - * more key-value insertion is allowed after a meta data block has been added to TFile. Data will be compressed using the default compressor as defined in - * Writer's constructor. - * - * @param name - * Name of the meta block. - * @return A DataOutputStream that can be used to write Meta Block data. Closing the stream would signal the ending of the block. - * @throws IOException - * @throws MetaBlockAlreadyExists - * the Meta Block with the same name already exists. - */ - public DataOutputStream prepareMetaBlock(String name) throws IOException, MetaBlockAlreadyExists { - if (state != State.READY) { - throw new IllegalStateException("Incorrect state to start a Meta Block: " + state.name()); - } - - finishDataBlock(true); - return writerBCF.prepareMetaBlock(name); - } - - /** - * Check if we need to start a new data block. - * - * @throws IOException - */ - private void initDataBlock() throws IOException { - // for each new block, get a new appender - if (blkAppender == null) { - blkAppender = writerBCF.prepareDataBlock(); - } - } - - /** - * Close the current data block if necessary. - * - * @param bForceFinish - * Force the closure regardless of the block size. - * @throws IOException - */ - void finishDataBlock(boolean bForceFinish) throws IOException { - if (blkAppender == null) { - return; - } - - // exceeded the size limit, do the compression and finish the block - if (bForceFinish || blkAppender.getCompressedSize() >= sizeMinBlock) { - // keep tracks of the last key of each data block, no padding - // for now - TFileIndexEntry keyLast = new TFileIndexEntry(lastKeyBufferOS.getBuffer(), 0, lastKeyBufferOS.size(), blkRecordCount); - tfileIndex.addEntry(keyLast); - // close the appender - blkAppender.close(); - blkAppender = null; - blkRecordCount = 0; - } - } - } - - /** - * TFile Reader. Users may only read TFiles by creating TFile.Reader.Scanner. objects. A scanner may scan the whole TFile ({@link Reader#createScanner()} ) , - * a portion of TFile based on byte offsets ( {@link Reader#createScanner(long, long)}), or a portion of TFile with keys fall in a certain key range (for - * sorted TFile only, {@link Reader#createScanner(byte[], byte[])} or {@link Reader#createScanner(RawComparable, RawComparable)}). - */ - public static class Reader implements Closeable { - // The underlying BCFile reader. - final BCFile.Reader readerBCF; - - // TFile index, it is loaded lazily. - TFileIndex tfileIndex = null; - final TFileMeta tfileMeta; - final BytesComparator comparator; - - // global begin and end locations. - private final Location begin; - private final Location end; - - /** - * Location representing a virtual position in the TFile. - */ - static final class Location implements Comparable, Cloneable { - private int blockIndex; - // distance/offset from the beginning of the block - private long recordIndex; - - Location(int blockIndex, long recordIndex) { - set(blockIndex, recordIndex); - } - - void incRecordIndex() { - ++recordIndex; - } - - Location(Location other) { - set(other); - } - - int getBlockIndex() { - return blockIndex; - } - - long getRecordIndex() { - return recordIndex; - } - - void set(int blockIndex, long recordIndex) { - if ((blockIndex | recordIndex) < 0) { - throw new IllegalArgumentException("Illegal parameter for BlockLocation."); - } - this.blockIndex = blockIndex; - this.recordIndex = recordIndex; - } - - void set(Location other) { - set(other.blockIndex, other.recordIndex); - } - - /** - * @see java.lang.Comparable#compareTo(java.lang.Object) - */ - @Override - public int compareTo(Location other) { - return compareTo(other.blockIndex, other.recordIndex); - } - - int compareTo(int bid, long rid) { - if (this.blockIndex == bid) { - long ret = this.recordIndex - rid; - if (ret > 0) - return 1; - if (ret < 0) - return -1; - return 0; - } - return this.blockIndex - bid; - } - - /** - * @see java.lang.Object#clone() - */ - @Override - protected Location clone() { - return new Location(blockIndex, recordIndex); - } - - /** - * @see java.lang.Object#hashCode() - */ - @Override - public int hashCode() { - final int prime = 31; - int result = prime + blockIndex; - result = (int) (prime * result + recordIndex); - return result; - } - - /** - * @see java.lang.Object#equals(java.lang.Object) - */ - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - Location other = (Location) obj; - if (blockIndex != other.blockIndex) - return false; - if (recordIndex != other.recordIndex) - return false; - return true; - } - } - - /** - * Constructor - * - * @param fsdis - * FS input stream of the TFile. - * @param fileLength - * The length of TFile. This is required because we have no easy way of knowing the actual size of the input file through the File input stream. - * @param conf - * @throws IOException - */ - public Reader(FSDataInputStream fsdis, long fileLength, Configuration conf) throws IOException { - readerBCF = new BCFile.Reader(fsdis, fileLength, conf); - - // first, read TFile meta - BlockReader brMeta = readerBCF.getMetaBlock(TFileMeta.BLOCK_NAME); - try { - tfileMeta = new TFileMeta(brMeta); - } finally { - brMeta.close(); - } - - comparator = tfileMeta.getComparator(); - // Set begin and end locations. - begin = new Location(0, 0); - end = new Location(readerBCF.getBlockCount(), 0); - } - - /** - * Close the reader. The state of the Reader object is undefined after close. Calling close() for multiple times has no effect. - */ - public void close() throws IOException { - readerBCF.close(); - } - - /** - * Get the begin location of the TFile. - * - * @return If TFile is not empty, the location of the first key-value pair. Otherwise, it returns end(). - */ - Location begin() { - return begin; - } - - /** - * Get the end location of the TFile. - * - * @return The location right after the last key-value pair in TFile. - */ - Location end() { - return end; - } - - /** - * Get the string representation of the comparator. - * - * @return If the TFile is not sorted by keys, an empty string will be returned. Otherwise, the actual comparator string that is provided during the TFile - * creation time will be returned. - */ - public String getComparatorName() { - return tfileMeta.getComparatorString(); - } - - /** - * Is the TFile sorted? - * - * @return true if TFile is sorted. - */ - public boolean isSorted() { - return tfileMeta.isSorted(); - } - - /** - * Get the number of key-value pair entries in TFile. - * - * @return the number of key-value pairs in TFile - */ - public long getEntryCount() { - return tfileMeta.getRecordCount(); - } - - /** - * Lazily loading the TFile index. - * - * @throws IOException - */ - synchronized void checkTFileDataIndex() throws IOException { - if (tfileIndex == null) { - BlockReader brIndex = readerBCF.getMetaBlock(TFileIndex.BLOCK_NAME); - try { - tfileIndex = new TFileIndex(readerBCF.getBlockCount(), brIndex, tfileMeta.getComparator()); - } finally { - brIndex.close(); - } - } - } - - /** - * Get the first key in the TFile. - * - * @return The first key in the TFile. - * @throws IOException - */ - public RawComparable getFirstKey() throws IOException { - checkTFileDataIndex(); - return tfileIndex.getFirstKey(); - } - - /** - * Get the last key in the TFile. - * - * @return The last key in the TFile. - * @throws IOException - */ - public RawComparable getLastKey() throws IOException { - checkTFileDataIndex(); - return tfileIndex.getLastKey(); - } - - /** - * Get a Comparator object to compare Entries. It is useful when you want stores the entries in a collection (such as PriorityQueue) and perform sorting or - * comparison among entries based on the keys without copying out the key. - * - * @return An Entry Comparator.. - */ - public Comparator getEntryComparator() { - if (!isSorted()) { - throw new RuntimeException("Entries are not comparable for unsorted TFiles"); - } - - return new Comparator() { - /** - * Provide a customized comparator for Entries. This is useful if we have a collection of Entry objects. However, if the Entry objects come from - * different TFiles, users must ensure that those TFiles share the same RawComparator. - */ - @Override - public int compare(Scanner.Entry o1, Scanner.Entry o2) { - return comparator.compare(o1.getKeyBuffer(), 0, o1.getKeyLength(), o2.getKeyBuffer(), 0, o2.getKeyLength()); - } - }; - } - - /** - * Get an instance of the RawComparator that is constructed based on the string comparator representation. - * - * @return a Comparator that can compare RawComparable's. - */ - public Comparator getComparator() { - return comparator; - } - - /** - * Stream access to a meta block.`` - * - * @param name - * The name of the meta block. - * @return The input stream. - * @throws IOException - * on I/O error. - * @throws MetaBlockDoesNotExist - * If the meta block with the name does not exist. - */ - public DataInputStream getMetaBlock(String name) throws IOException, MetaBlockDoesNotExist { - return readerBCF.getMetaBlock(name); - } - - /** - * if greater is true then returns the beginning location of the block containing the key strictly greater than input key. if greater is false then returns - * the beginning location of the block greater than equal to the input key - * - * @param key - * the input key - * @param greater - * boolean flag - * @throws IOException - */ - Location getBlockContainsKey(RawComparable key, boolean greater) throws IOException { - if (!isSorted()) { - throw new RuntimeException("Seeking in unsorted TFile"); - } - checkTFileDataIndex(); - int blkIndex = (greater) ? tfileIndex.upperBound(key) : tfileIndex.lowerBound(key); - if (blkIndex < 0) - return end; - return new Location(blkIndex, 0); - } - - int compareKeys(byte[] a, int o1, int l1, byte[] b, int o2, int l2) { - if (!isSorted()) { - throw new RuntimeException("Cannot compare keys for unsorted TFiles."); - } - return comparator.compare(a, o1, l1, b, o2, l2); - } - - int compareKeys(RawComparable a, RawComparable b) { - if (!isSorted()) { - throw new RuntimeException("Cannot compare keys for unsorted TFiles."); - } - return comparator.compare(a, b); - } - - /** - * Get the location pointing to the beginning of the first key-value pair in a compressed block whose byte offset in the TFile is greater than or equal to - * the specified offset. - * - * @param offset - * the user supplied offset. - * @return the location to the corresponding entry; or end() if no such entry exists. - */ - Location getLocationNear(long offset) { - int blockIndex = readerBCF.getBlockIndexNear(offset); - if (blockIndex == -1) - return end; - return new Location(blockIndex, 0); - } - - /** - * Get a sample key that is within a block whose starting offset is greater than or equal to the specified offset. - * - * @param offset - * The file offset. - * @return the key that fits the requirement; or null if no such key exists (which could happen if the offset is close to the end of the TFile). - * @throws IOException - */ - public RawComparable getKeyNear(long offset) throws IOException { - int blockIndex = readerBCF.getBlockIndexNear(offset); - if (blockIndex == -1) - return null; - checkTFileDataIndex(); - return new ByteArray(tfileIndex.getEntry(blockIndex).key); - } - - /** - * Get a scanner than can scan the whole TFile. - * - * @return The scanner object. A valid Scanner is always returned even if the TFile is empty. - * @throws IOException - */ - public Scanner createScanner() throws IOException { - return new Scanner(this, begin, end); - } - - /** - * Get a scanner that covers a portion of TFile based on byte offsets. - * - * @param offset - * The beginning byte offset in the TFile. - * @param length - * The length of the region. - * @return The actual coverage of the returned scanner tries to match the specified byte-region but always round up to the compression block boundaries. It - * is possible that the returned scanner contains zero key-value pairs even if length is positive. - * @throws IOException - */ - public Scanner createScanner(long offset, long length) throws IOException { - return new Scanner(this, offset, offset + length); - } - - /** - * Get a scanner that covers a portion of TFile based on keys. - * - * @param beginKey - * Begin key of the scan (inclusive). If null, scan from the first key-value entry of the TFile. - * @param endKey - * End key of the scan (exclusive). If null, scan up to the last key-value entry of the TFile. - * @return The actual coverage of the returned scanner will cover all keys greater than or equal to the beginKey and less than the endKey. - * @throws IOException - */ - public Scanner createScanner(byte[] beginKey, byte[] endKey) throws IOException { - return createScanner((beginKey == null) ? null : new ByteArray(beginKey, 0, beginKey.length), (endKey == null) ? null : new ByteArray(endKey, 0, - endKey.length)); - } - - /** - * Get a scanner that covers a specific key range. - * - * @param beginKey - * Begin key of the scan (inclusive). If null, scan from the first key-value entry of the TFile. - * @param endKey - * End key of the scan (exclusive). If null, scan up to the last key-value entry of the TFile. - * @return The actual coverage of the returned scanner will cover all keys greater than or equal to the beginKey and less than the endKey. - * @throws IOException - */ - public Scanner createScanner(RawComparable beginKey, RawComparable endKey) throws IOException { - if ((beginKey != null) && (endKey != null) && (compareKeys(beginKey, endKey) >= 0)) { - return new Scanner(this, beginKey, beginKey); - } - return new Scanner(this, beginKey, endKey); - } - - /** - * The TFile Scanner. The Scanner has an implicit cursor, which, upon creation, points to the first key-value pair in the scan range. If the scan range is - * empty, the cursor will point to the end of the scan range. - *

- * Use {@link Scanner#atEnd()} to test whether the cursor is at the end location of the scanner. - *

- * Use {@link Scanner#advance()} to move the cursor to the next key-value pair (or end if none exists). Use seekTo methods ( {@link Scanner#seekTo(byte[])} - * or {@link Scanner#seekTo(byte[], int, int)}) to seek to any arbitrary location in the covered range (including backward seeking). Use - * {@link Scanner#rewind()} to seek back to the beginning of the scanner. Use {@link Scanner#seekToEnd()} to seek to the end of the scanner. - *

- * Actual keys and values may be obtained through {@link Scanner.Entry} object, which is obtained through {@link Scanner#entry()}. - */ - public static class Scanner implements Closeable { - // The underlying TFile reader. - final Reader reader; - // current block (null if reaching end) - private BlockReader blkReader; - - Location beginLocation; - Location endLocation; - Location currentLocation; - - // flag to ensure value is only examined once. - boolean valueChecked = false; - // reusable buffer for keys. - final byte[] keyBuffer; - // length of key, -1 means key is invalid. - int klen = -1; - - static final int MAX_VAL_TRANSFER_BUF_SIZE = 128 * 1024; - BytesWritable valTransferBuffer; - - DataInputBuffer keyDataInputStream; - ChunkDecoder valueBufferInputStream; - DataInputStream valueDataInputStream; - // vlen == -1 if unknown. - int vlen; - - /** - * Constructor - * - * @param reader - * The TFile reader object. - * @param offBegin - * Begin byte-offset of the scan. - * @param offEnd - * End byte-offset of the scan. - * @throws IOException - * - * The offsets will be rounded to the beginning of a compressed block whose offset is greater than or equal to the specified offset. - */ - protected Scanner(Reader reader, long offBegin, long offEnd) throws IOException { - this(reader, reader.getLocationNear(offBegin), reader.getLocationNear(offEnd)); - } - - /** - * Constructor - * - * @param reader - * The TFile reader object. - * @param begin - * Begin location of the scan. - * @param end - * End location of the scan. - * @throws IOException - */ - Scanner(Reader reader, Location begin, Location end) throws IOException { - this.reader = reader; - // ensure the TFile index is loaded throughout the life of scanner. - reader.checkTFileDataIndex(); - beginLocation = begin; - endLocation = end; - - valTransferBuffer = new BytesWritable(); - keyBuffer = new byte[MAX_KEY_SIZE]; - keyDataInputStream = new DataInputBuffer(); - valueBufferInputStream = new ChunkDecoder(); - valueDataInputStream = new DataInputStream(valueBufferInputStream); - - if (beginLocation.compareTo(endLocation) >= 0) { - currentLocation = new Location(endLocation); - } else { - currentLocation = new Location(0, 0); - initBlock(beginLocation.getBlockIndex()); - inBlockAdvance(beginLocation.getRecordIndex()); - } - } - - /** - * Constructor - * - * @param reader - * The TFile reader object. - * @param beginKey - * Begin key of the scan. If null, scan from the first entry of the TFile. - * @param endKey - * End key of the scan. If null, scan up to the last entry of the TFile. - * @throws IOException - */ - protected Scanner(Reader reader, RawComparable beginKey, RawComparable endKey) throws IOException { - this(reader, (beginKey == null) ? reader.begin() : reader.getBlockContainsKey(beginKey, false), reader.end()); - if (beginKey != null) { - inBlockAdvance(beginKey, false); - beginLocation.set(currentLocation); - } - if (endKey != null) { - seekTo(endKey, false); - endLocation.set(currentLocation); - seekTo(beginLocation); - } - } - - /** - * Move the cursor to the first entry whose key is greater than or equal to the input key. Synonymous to seekTo(key, 0, key.length). The entry returned by - * the previous entry() call will be invalid. - * - * @param key - * The input key - * @return true if we find an equal key. - * @throws IOException - */ - public boolean seekTo(byte[] key) throws IOException { - return seekTo(key, 0, key.length); - } - - /** - * Move the cursor to the first entry whose key is greater than or equal to the input key. The entry returned by the previous entry() call will be - * invalid. - * - * @param key - * The input key - * @param keyOffset - * offset in the key buffer. - * @param keyLen - * key buffer length. - * @return true if we find an equal key; false otherwise. - * @throws IOException - */ - public boolean seekTo(byte[] key, int keyOffset, int keyLen) throws IOException { - return seekTo(new ByteArray(key, keyOffset, keyLen), false); - } - - private boolean seekTo(RawComparable key, boolean beyond) throws IOException { - Location l = reader.getBlockContainsKey(key, beyond); - if (l.compareTo(beginLocation) < 0) { - l = beginLocation; - } else if (l.compareTo(endLocation) >= 0) { - seekTo(endLocation); - return false; - } - - // check if what we are seeking is in the later part of the current - // block. - if (atEnd() || (l.getBlockIndex() != currentLocation.getBlockIndex()) || (compareCursorKeyTo(key) >= 0)) { - // sorry, we must seek to a different location first. - seekTo(l); - } - - return inBlockAdvance(key, beyond); - } - - /** - * Move the cursor to the new location. The entry returned by the previous entry() call will be invalid. - * - * @param l - * new cursor location. It must fall between the begin and end location of the scanner. - * @throws IOException - */ - private void seekTo(Location l) throws IOException { - if (l.compareTo(beginLocation) < 0) { - throw new IllegalArgumentException("Attempt to seek before the begin location."); - } - - if (l.compareTo(endLocation) > 0) { - throw new IllegalArgumentException("Attempt to seek after the end location."); - } - - if (l.compareTo(endLocation) == 0) { - parkCursorAtEnd(); - return; - } - - if (l.getBlockIndex() != currentLocation.getBlockIndex()) { - // going to a totally different block - initBlock(l.getBlockIndex()); - } else { - if (valueChecked) { - // may temporarily go beyond the last record in the block (in which - // case the next if loop will always be true). - inBlockAdvance(1); - } - if (l.getRecordIndex() < currentLocation.getRecordIndex()) { - initBlock(l.getBlockIndex()); - } - } - - inBlockAdvance(l.getRecordIndex() - currentLocation.getRecordIndex()); - - return; - } - - /** - * Rewind to the first entry in the scanner. The entry returned by the previous entry() call will be invalid. - * - * @throws IOException - */ - public void rewind() throws IOException { - seekTo(beginLocation); - } - - /** - * Seek to the end of the scanner. The entry returned by the previous entry() call will be invalid. - * - * @throws IOException - */ - public void seekToEnd() throws IOException { - parkCursorAtEnd(); - } - - /** - * Move the cursor to the first entry whose key is greater than or equal to the input key. Synonymous to lowerBound(key, 0, key.length). The entry - * returned by the previous entry() call will be invalid. - * - * @param key - * The input key - * @throws IOException - */ - public void lowerBound(byte[] key) throws IOException { - lowerBound(key, 0, key.length); - } - - /** - * Move the cursor to the first entry whose key is greater than or equal to the input key. The entry returned by the previous entry() call will be - * invalid. - * - * @param key - * The input key - * @param keyOffset - * offset in the key buffer. - * @param keyLen - * key buffer length. - * @throws IOException - */ - public void lowerBound(byte[] key, int keyOffset, int keyLen) throws IOException { - seekTo(new ByteArray(key, keyOffset, keyLen), false); - } - - /** - * Move the cursor to the first entry whose key is strictly greater than the input key. Synonymous to upperBound(key, 0, key.length). The entry returned - * by the previous entry() call will be invalid. - * - * @param key - * The input key - * @throws IOException - */ - public void upperBound(byte[] key) throws IOException { - upperBound(key, 0, key.length); - } - - /** - * Move the cursor to the first entry whose key is strictly greater than the input key. The entry returned by the previous entry() call will be invalid. - * - * @param key - * The input key - * @param keyOffset - * offset in the key buffer. - * @param keyLen - * key buffer length. - * @throws IOException - */ - public void upperBound(byte[] key, int keyOffset, int keyLen) throws IOException { - seekTo(new ByteArray(key, keyOffset, keyLen), true); - } - - /** - * Move the cursor to the next key-value pair. The entry returned by the previous entry() call will be invalid. - * - * @return true if the cursor successfully moves. False when cursor is already at the end location and cannot be advanced. - * @throws IOException - */ - public boolean advance() throws IOException { - if (atEnd()) { - return false; - } - - int curBid = currentLocation.getBlockIndex(); - long curRid = currentLocation.getRecordIndex(); - long entriesInBlock = reader.getBlockEntryCount(curBid); - if (curRid + 1 >= entriesInBlock) { - if (endLocation.compareTo(curBid + 1, 0) <= 0) { - // last entry in TFile. - parkCursorAtEnd(); - } else { - // last entry in Block. - initBlock(curBid + 1); - } - } else { - inBlockAdvance(1); - } - return true; - } - - /** - * Load a compressed block for reading. Expecting blockIndex is valid. - * - * @throws IOException - */ - private void initBlock(int blockIndex) throws IOException { - klen = -1; - if (blkReader != null) { - try { - blkReader.close(); - } finally { - blkReader = null; - } - } - blkReader = reader.getBlockReader(blockIndex); - currentLocation.set(blockIndex, 0); - } - - private void parkCursorAtEnd() throws IOException { - klen = -1; - currentLocation.set(endLocation); - if (blkReader != null) { - try { - blkReader.close(); - } finally { - blkReader = null; - } - } - } - - /** - * Close the scanner. Release all resources. The behavior of using the scanner after calling close is not defined. The entry returned by the previous - * entry() call will be invalid. - */ - public void close() throws IOException { - parkCursorAtEnd(); - } - - /** - * Is cursor at the end location? - * - * @return true if the cursor is at the end location. - */ - public boolean atEnd() { - return (currentLocation.compareTo(endLocation) >= 0); - } - - /** - * check whether we have already successfully obtained the key. It also initializes the valueInputStream. - */ - void checkKey() throws IOException { - if (klen >= 0) - return; - if (atEnd()) { - throw new EOFException("No key-value to read"); - } - klen = -1; - vlen = -1; - valueChecked = false; - - klen = Utils.readVInt(blkReader); - blkReader.readFully(keyBuffer, 0, klen); - valueBufferInputStream.reset(blkReader); - if (valueBufferInputStream.isLastChunk()) { - vlen = valueBufferInputStream.getRemain(); - } - } - - /** - * Get an entry to access the key and value. - * - * @return The Entry object to access the key and value. - * @throws IOException - */ - public Entry entry() throws IOException { - checkKey(); - return new Entry(); - } - - /** - * Internal API. Comparing the key at cursor to user-specified key. - * - * @param other - * user-specified key. - * @return negative if key at cursor is smaller than user key; 0 if equal; and positive if key at cursor greater than user key. - * @throws IOException - */ - int compareCursorKeyTo(RawComparable other) throws IOException { - checkKey(); - return reader.compareKeys(keyBuffer, 0, klen, other.buffer(), other.offset(), other.size()); - } - - /** - * Entry to a <Key, Value> pair. - */ - public class Entry implements Comparable { - /** - * Get the length of the key. - * - * @return the length of the key. - */ - public int getKeyLength() { - return klen; - } - - byte[] getKeyBuffer() { - return keyBuffer; - } - - /** - * Copy the key and value in one shot into BytesWritables. This is equivalent to getKey(key); getValue(value); - * - * @param key - * BytesWritable to hold key. - * @param value - * BytesWritable to hold value - * @throws IOException - */ - public void get(BytesWritable key, BytesWritable value) throws IOException { - getKey(key); - getValue(value); - } - - /** - * Copy the key into BytesWritable. The input BytesWritable will be automatically resized to the actual key size. - * - * @param key - * BytesWritable to hold the key. - * @throws IOException - */ - public int getKey(BytesWritable key) throws IOException { - key.setSize(getKeyLength()); - getKey(key.getBytes()); - return key.getLength(); - } - - /** - * Copy the value into BytesWritable. The input BytesWritable will be automatically resized to the actual value size. The implementation directly uses - * the buffer inside BytesWritable for storing the value. The call does not require the value length to be known. - * - * @param value - * @throws IOException - */ - public long getValue(BytesWritable value) throws IOException { - DataInputStream dis = getValueStream(); - int size = 0; - try { - int remain; - while ((remain = valueBufferInputStream.getRemain()) > 0) { - value.setSize(size + remain); - dis.readFully(value.getBytes(), size, remain); - size += remain; - } - return value.getLength(); - } finally { - dis.close(); - } - } - - /** - * Writing the key to the output stream. This method avoids copying key buffer from Scanner into user buffer, then writing to the output stream. - * - * @param out - * The output stream - * @return the length of the key. - * @throws IOException - */ - public int writeKey(OutputStream out) throws IOException { - out.write(keyBuffer, 0, klen); - return klen; - } - - /** - * Writing the value to the output stream. This method avoids copying value data from Scanner into user buffer, then writing to the output stream. It - * does not require the value length to be known. - * - * @param out - * The output stream - * @return the length of the value - * @throws IOException - */ - public long writeValue(OutputStream out) throws IOException { - DataInputStream dis = getValueStream(); - long size = 0; - try { - int chunkSize; - while ((chunkSize = valueBufferInputStream.getRemain()) > 0) { - chunkSize = Math.min(chunkSize, MAX_VAL_TRANSFER_BUF_SIZE); - valTransferBuffer.setSize(chunkSize); - dis.readFully(valTransferBuffer.getBytes(), 0, chunkSize); - out.write(valTransferBuffer.getBytes(), 0, chunkSize); - size += chunkSize; - } - return size; - } finally { - dis.close(); - } - } - - /** - * Copy the key into user supplied buffer. - * - * @param buf - * The buffer supplied by user. The length of the buffer must not be shorter than the key length. - * @return The length of the key. - * - * @throws IOException - */ - public int getKey(byte[] buf) throws IOException { - return getKey(buf, 0); - } - - /** - * Copy the key into user supplied buffer. - * - * @param buf - * The buffer supplied by user. - * @param offset - * The starting offset of the user buffer where we should copy the key into. Requiring the key-length + offset no greater than the buffer - * length. - * @return The length of the key. - * @throws IOException - */ - public int getKey(byte[] buf, int offset) throws IOException { - if ((offset | (buf.length - offset - klen)) < 0) { - throw new IndexOutOfBoundsException("Bufer not enough to store the key"); - } - System.arraycopy(keyBuffer, 0, buf, offset, klen); - return klen; - } - - /** - * Streaming access to the key. Useful for desrializing the key into user objects. - * - * @return The input stream. - */ - public DataInputStream getKeyStream() { - keyDataInputStream.reset(keyBuffer, klen); - return keyDataInputStream; - } - - /** - * Get the length of the value. isValueLengthKnown() must be tested true. - * - * @return the length of the value. - */ - public int getValueLength() { - if (vlen >= 0) { - return vlen; - } - - throw new RuntimeException("Value length unknown."); - } - - /** - * Copy value into user-supplied buffer. User supplied buffer must be large enough to hold the whole value. The value part of the key-value pair pointed - * by the current cursor is not cached and can only be examined once. Calling any of the following functions more than once without moving the cursor - * will result in exception: {@link #getValue(byte[])}, {@link #getValue(byte[], int)}, {@link #getValueStream}. - * - * @return the length of the value. Does not require isValueLengthKnown() to be true. - * @throws IOException - * - */ - public int getValue(byte[] buf) throws IOException { - return getValue(buf, 0); - } - - /** - * Copy value into user-supplied buffer. User supplied buffer must be large enough to hold the whole value (starting from the offset). The value part of - * the key-value pair pointed by the current cursor is not cached and can only be examined once. Calling any of the following functions more than once - * without moving the cursor will result in exception: {@link #getValue(byte[])}, {@link #getValue(byte[], int)}, {@link #getValueStream}. - * - * @return the length of the value. Does not require isValueLengthKnown() to be true. - * @throws IOException - */ - public int getValue(byte[] buf, int offset) throws IOException { - DataInputStream dis = getValueStream(); - try { - if (isValueLengthKnown()) { - if ((offset | (buf.length - offset - vlen)) < 0) { - throw new IndexOutOfBoundsException("Buffer too small to hold value"); - } - dis.readFully(buf, offset, vlen); - return vlen; - } - - int nextOffset = offset; - while (nextOffset < buf.length) { - int n = dis.read(buf, nextOffset, buf.length - nextOffset); - if (n < 0) { - break; - } - nextOffset += n; - } - if (dis.read() >= 0) { - // attempt to read one more byte to determine whether we reached - // the - // end or not. - throw new IndexOutOfBoundsException("Buffer too small to hold value"); - } - return nextOffset - offset; - } finally { - dis.close(); - } - } - - /** - * Stream access to value. The value part of the key-value pair pointed by the current cursor is not cached and can only be examined once. Calling any - * of the following functions more than once without moving the cursor will result in exception: {@link #getValue(byte[])}, - * {@link #getValue(byte[], int)}, {@link #getValueStream}. - * - * @return The input stream for reading the value. - * @throws IOException - */ - public DataInputStream getValueStream() throws IOException { - if (valueChecked == true) { - throw new IllegalStateException("Attempt to examine value multiple times."); - } - valueChecked = true; - return valueDataInputStream; - } - - /** - * Check whether it is safe to call getValueLength(). - * - * @return true if value length is known before hand. Values less than the chunk size will always have their lengths known before hand. Values that are - * written out as a whole (with advertised length up-front) will always have their lengths known in read. - */ - public boolean isValueLengthKnown() { - return (vlen >= 0); - } - - /** - * Compare the entry key to another key. Synonymous to compareTo(key, 0, key.length). - * - * @param buf - * The key buffer. - * @return comparison result between the entry key with the input key. - */ - public int compareTo(byte[] buf) { - return compareTo(buf, 0, buf.length); - } - - /** - * Compare the entry key to another key. Synonymous to compareTo(new ByteArray(buf, offset, length) - * - * @param buf - * The key buffer - * @param offset - * offset into the key buffer. - * @param length - * the length of the key. - * @return comparison result between the entry key with the input key. - */ - public int compareTo(byte[] buf, int offset, int length) { - return compareTo(new ByteArray(buf, offset, length)); - } - - /** - * Compare an entry with a RawComparable object. This is useful when Entries are stored in a collection, and we want to compare a user supplied key. - */ - @Override - public int compareTo(RawComparable key) { - return reader.compareKeys(keyBuffer, 0, getKeyLength(), key.buffer(), key.offset(), key.size()); - } - - /** - * Compare whether this and other points to the same key value. - */ - @Override - public boolean equals(Object other) { - if (this == other) - return true; - if (!(other instanceof Entry)) - return false; - return ((Entry) other).compareTo(keyBuffer, 0, getKeyLength()) == 0; - } - - @Override - public int hashCode() { - return WritableComparator.hashBytes(keyBuffer, getKeyLength()); - } - } - - /** - * Advance cursor by n positions within the block. - * - * @param n - * Number of key-value pairs to skip in block. - * @throws IOException - */ - private void inBlockAdvance(long n) throws IOException { - for (long i = 0; i < n; ++i) { - checkKey(); - if (!valueBufferInputStream.isClosed()) { - valueBufferInputStream.close(); - } - klen = -1; - currentLocation.incRecordIndex(); - } - } - - /** - * Advance cursor in block until we find a key that is greater than or equal to the input key. - * - * @param key - * Key to compare. - * @param greater - * advance until we find a key greater than the input key. - * @return true if we find a equal key. - * @throws IOException - */ - private boolean inBlockAdvance(RawComparable key, boolean greater) throws IOException { - int curBid = currentLocation.getBlockIndex(); - long entryInBlock = reader.getBlockEntryCount(curBid); - if (curBid == endLocation.getBlockIndex()) { - entryInBlock = endLocation.getRecordIndex(); - } - - while (currentLocation.getRecordIndex() < entryInBlock) { - int cmp = compareCursorKeyTo(key); - if (cmp > 0) - return false; - if (cmp == 0 && !greater) - return true; - if (!valueBufferInputStream.isClosed()) { - valueBufferInputStream.close(); - } - klen = -1; - currentLocation.incRecordIndex(); - } - - throw new RuntimeException("Cannot find matching key in block."); - } - } - - long getBlockEntryCount(int curBid) { - return tfileIndex.getEntry(curBid).entries(); - } - - BlockReader getBlockReader(int blockIndex) throws IOException { - return readerBCF.getDataBlock(blockIndex); - } - } - - /** - * Data structure representing "TFile.meta" meta block. - */ - static final class TFileMeta { - final static String BLOCK_NAME = "TFile.meta"; - final Version version; - private long recordCount; - private final String strComparator; - private final BytesComparator comparator; - - // ctor for writes - public TFileMeta(String comparator) { - // set fileVersion to API version when we create it. - version = TFile.API_VERSION; - recordCount = 0; - strComparator = (comparator == null) ? "" : comparator; - this.comparator = makeComparator(strComparator); - } - - // ctor for reads - public TFileMeta(DataInput in) throws IOException { - version = new Version(in); - if (!version.compatibleWith(TFile.API_VERSION)) { - throw new RuntimeException("Incompatible TFile fileVersion."); - } - recordCount = Utils.readVLong(in); - strComparator = Utils.readString(in); - comparator = makeComparator(strComparator); - } - - @SuppressWarnings({"rawtypes", "unchecked"}) - static BytesComparator makeComparator(String comparator) { - if (comparator.length() == 0) { - // unsorted keys - return null; - } - if (comparator.equals(COMPARATOR_MEMCMP)) { - // default comparator - return new BytesComparator(new MemcmpRawComparator()); - } else if (comparator.startsWith(COMPARATOR_JCLASS)) { - String compClassName = comparator.substring(COMPARATOR_JCLASS.length()).trim(); - try { - Class compClass = Class.forName(compClassName); - // use its default ctor to create an instance - return new BytesComparator((RawComparator) compClass.newInstance()); - } catch (Exception e) { - throw new IllegalArgumentException("Failed to instantiate comparator: " + comparator + "(" + e.toString() + ")"); - } - } else { - throw new IllegalArgumentException("Unsupported comparator: " + comparator); - } - } - - public void write(DataOutput out) throws IOException { - TFile.API_VERSION.write(out); - Utils.writeVLong(out, recordCount); - Utils.writeString(out, strComparator); - } - - public long getRecordCount() { - return recordCount; - } - - public void incRecordCount() { - ++recordCount; - } - - public boolean isSorted() { - return !strComparator.equals(""); - } - - public String getComparatorString() { - return strComparator; - } - - public BytesComparator getComparator() { - return comparator; - } - - public Version getVersion() { - return version; - } - } // END: class MetaTFileMeta - - /** - * Data structure representing "TFile.index" meta block. - */ - static class TFileIndex { - final static String BLOCK_NAME = "TFile.index"; - private ByteArray firstKey; - private final ArrayList index; - private final BytesComparator comparator; - - /** - * For reading from file. - * - * @throws IOException - */ - public TFileIndex(int entryCount, DataInput in, BytesComparator comparator) throws IOException { - index = new ArrayList(entryCount); - int size = Utils.readVInt(in); // size for the first key entry. - if (size > 0) { - byte[] buffer = new byte[size]; - in.readFully(buffer); - DataInputStream firstKeyInputStream = new DataInputStream(new ByteArrayInputStream(buffer, 0, size)); - - int firstKeyLength = Utils.readVInt(firstKeyInputStream); - firstKey = new ByteArray(new byte[firstKeyLength]); - firstKeyInputStream.readFully(firstKey.buffer()); - - for (int i = 0; i < entryCount; i++) { - size = Utils.readVInt(in); - if (buffer.length < size) { - buffer = new byte[size]; - } - in.readFully(buffer, 0, size); - TFileIndexEntry idx = new TFileIndexEntry(new DataInputStream(new ByteArrayInputStream(buffer, 0, size))); - index.add(idx); - } - } else { - if (entryCount != 0) { - throw new RuntimeException("Internal error"); - } - } - this.comparator = comparator; - } - - /** - * @param key - * input key. - * @return the ID of the first block that contains key >= input key. Or -1 if no such block exists. - */ - public int lowerBound(RawComparable key) { - if (comparator == null) { - throw new RuntimeException("Cannot search in unsorted TFile"); - } - - if (firstKey == null) { - return -1; // not found - } - - int ret = Utils.lowerBound(index, key, comparator); - if (ret == index.size()) { - return -1; - } - return ret; - } - - public int upperBound(RawComparable key) { - if (comparator == null) { - throw new RuntimeException("Cannot search in unsorted TFile"); - } - - if (firstKey == null) { - return -1; // not found - } - - int ret = Utils.upperBound(index, key, comparator); - if (ret == index.size()) { - return -1; - } - return ret; - } - - /** - * For writing to file. - */ - public TFileIndex(BytesComparator comparator) { - index = new ArrayList(); - this.comparator = comparator; - } - - public RawComparable getFirstKey() { - return firstKey; - } - - public void setFirstKey(byte[] key, int offset, int length) { - firstKey = new ByteArray(new byte[length]); - System.arraycopy(key, offset, firstKey.buffer(), 0, length); - } - - public RawComparable getLastKey() { - if (index.size() == 0) { - return null; - } - return new ByteArray(index.get(index.size() - 1).buffer()); - } - - public void addEntry(TFileIndexEntry keyEntry) { - index.add(keyEntry); - } - - public TFileIndexEntry getEntry(int bid) { - return index.get(bid); - } - - public void write(DataOutput out) throws IOException { - if (firstKey == null) { - Utils.writeVInt(out, 0); - return; - } - - DataOutputBuffer dob = new DataOutputBuffer(); - Utils.writeVInt(dob, firstKey.size()); - dob.write(firstKey.buffer()); - Utils.writeVInt(out, dob.size()); - out.write(dob.getData(), 0, dob.getLength()); - - for (TFileIndexEntry entry : index) { - dob.reset(); - entry.write(dob); - Utils.writeVInt(out, dob.getLength()); - out.write(dob.getData(), 0, dob.getLength()); - } - } - } - - /** - * TFile Data Index entry. We should try to make the memory footprint of each index entry as small as possible. - */ - static final class TFileIndexEntry implements RawComparable { - final byte[] key; - // count of entries in the block. - final long kvEntries; - - public TFileIndexEntry(DataInput in) throws IOException { - int len = Utils.readVInt(in); - key = new byte[len]; - in.readFully(key, 0, len); - kvEntries = Utils.readVLong(in); - } - - // default entry, without any padding - public TFileIndexEntry(byte[] newkey, int offset, int len, long entries) { - key = new byte[len]; - System.arraycopy(newkey, offset, key, 0, len); - this.kvEntries = entries; - } - - @Override - public byte[] buffer() { - return key; - } - - @Override - public int offset() { - return 0; - } - - @Override - public int size() { - return key.length; - } - - long entries() { - return kvEntries; - } - - public void write(DataOutput out) throws IOException { - Utils.writeVInt(out, key.length); - out.write(key, 0, key.length); - Utils.writeVLong(out, kvEntries); - } - } - - /** - * Dumping the TFile information. - * - * @param args - * A list of TFile paths. - */ - public static void main(String[] args) { - System.out.printf("TFile Dumper (TFile %s, BCFile %s)%n", TFile.API_VERSION.toString(), BCFile.API_VERSION.toString()); - if (args.length == 0) { - System.out.println("Usage: java ... org.apache.hadoop.io.file.tfile.TFile tfile-path [tfile-path ...]"); - System.exit(0); - } - Configuration conf = new Configuration(); - - for (String file : args) { - System.out.println("===" + file + "==="); - try { - TFileDumper.dumpInfo(file, System.out, conf); - } catch (IOException e) { - e.printStackTrace(System.err); - } - } - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef6ba24c/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/TFileDumper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/TFileDumper.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/TFileDumper.java deleted file mode 100644 index d5b0a1b..0000000 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/TFileDumper.java +++ /dev/null @@ -1,258 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.accumulo.core.file.rfile.bcfile; - -import java.io.IOException; -import java.io.PrintStream; -import java.util.Collection; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Set; - -import org.apache.accumulo.core.file.rfile.bcfile.BCFile.BlockRegion; -import org.apache.accumulo.core.file.rfile.bcfile.BCFile.MetaIndexEntry; -import org.apache.accumulo.core.file.rfile.bcfile.TFile.TFileIndexEntry; -import org.apache.accumulo.core.file.rfile.bcfile.Utils.Version; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IOUtils; - -/** - * Dumping the information of a TFile. - */ -class TFileDumper { - static final Log LOG = LogFactory.getLog(TFileDumper.class); - - private TFileDumper() { - // namespace object not constructable. - } - - private enum Align { - LEFT, CENTER, RIGHT, ZERO_PADDED; - static String format(String s, int width, Align align) { - if (s.length() >= width) - return s; - int room = width - s.length(); - Align alignAdjusted = align; - if (room == 1) { - alignAdjusted = LEFT; - } - if (alignAdjusted == LEFT) { - return s + String.format("%" + room + "s", ""); - } - if (alignAdjusted == RIGHT) { - return String.format("%" + room + "s", "") + s; - } - if (alignAdjusted == CENTER) { - int half = room / 2; - return String.format("%" + half + "s", "") + s + String.format("%" + (room - half) + "s", ""); - } - throw new IllegalArgumentException("Unsupported alignment"); - } - - static String format(long l, int width, Align align) { - if (align == ZERO_PADDED) { - return String.format("%0" + width + "d", l); - } - return format(Long.toString(l), width, align); - } - - static int calculateWidth(String caption, long max) { - return Math.max(caption.length(), Long.toString(max).length()); - } - } - - /** - * Dump information about TFile. - * - * @param file - * Path string of the TFile - * @param out - * PrintStream to output the information. - * @param conf - * The configuration object. - * @throws IOException - */ - static public void dumpInfo(String file, PrintStream out, Configuration conf) throws IOException { - final int maxKeySampleLen = 16; - Path path = new Path(file); - FileSystem fs = path.getFileSystem(conf); - long length = fs.getFileStatus(path).getLen(); - FSDataInputStream fsdis = fs.open(path); - TFile.Reader reader = new TFile.Reader(fsdis, length, conf); - try { - LinkedHashMap properties = new LinkedHashMap(); - int blockCnt = reader.readerBCF.getBlockCount(); - int metaBlkCnt = reader.readerBCF.metaIndex.index.size(); - properties.put("BCFile Version", reader.readerBCF.version.toString()); - properties.put("TFile Version", reader.tfileMeta.version.toString()); - properties.put("File Length", Long.toString(length)); - properties.put("Data Compression", reader.readerBCF.getDefaultCompressionName()); - properties.put("Record Count", Long.toString(reader.getEntryCount())); - properties.put("Sorted", Boolean.toString(reader.isSorted())); - if (reader.isSorted()) { - properties.put("Comparator", reader.getComparatorName()); - } - properties.put("Data Block Count", Integer.toString(blockCnt)); - long dataSize = 0, dataSizeUncompressed = 0; - if (blockCnt > 0) { - for (int i = 0; i < blockCnt; ++i) { - BlockRegion region = reader.readerBCF.dataIndex.getBlockRegionList().get(i); - dataSize += region.getCompressedSize(); - dataSizeUncompressed += region.getRawSize(); - } - properties.put("Data Block Bytes", Long.toString(dataSize)); - if (reader.readerBCF.getDefaultCompressionName() != "none") { - properties.put("Data Block Uncompressed Bytes", Long.toString(dataSizeUncompressed)); - properties.put("Data Block Compression Ratio", String.format("1:%.1f", (double) dataSizeUncompressed / dataSize)); - } - } - - properties.put("Meta Block Count", Integer.toString(metaBlkCnt)); - long metaSize = 0, metaSizeUncompressed = 0; - if (metaBlkCnt > 0) { - Collection metaBlks = reader.readerBCF.metaIndex.index.values(); - boolean calculateCompression = false; - for (Iterator it = metaBlks.iterator(); it.hasNext();) { - MetaIndexEntry e = it.next(); - metaSize += e.getRegion().getCompressedSize(); - metaSizeUncompressed += e.getRegion().getRawSize(); - if (e.getCompressionAlgorithm() != Compression.Algorithm.NONE) { - calculateCompression = true; - } - } - properties.put("Meta Block Bytes", Long.toString(metaSize)); - if (calculateCompression) { - properties.put("Meta Block Uncompressed Bytes", Long.toString(metaSizeUncompressed)); - properties.put("Meta Block Compression Ratio", String.format("1:%.1f", (double) metaSizeUncompressed / metaSize)); - } - } - properties.put("Meta-Data Size Ratio", String.format("1:%.1f", (double) dataSize / metaSize)); - long leftOverBytes = length - dataSize - metaSize; - long miscSize = BCFile.Magic.size() * 2 + Long.SIZE / Byte.SIZE + Version.size(); - long metaIndexSize = leftOverBytes - miscSize; - properties.put("Meta Block Index Bytes", Long.toString(metaIndexSize)); - properties.put("Headers Etc Bytes", Long.toString(miscSize)); - // Now output the properties table. - int maxKeyLength = 0; - Set> entrySet = properties.entrySet(); - for (Iterator> it = entrySet.iterator(); it.hasNext();) { - Map.Entry e = it.next(); - if (e.getKey().length() > maxKeyLength) { - maxKeyLength = e.getKey().length(); - } - } - for (Iterator> it = entrySet.iterator(); it.hasNext();) { - Map.Entry e = it.next(); - out.printf("%s : %s%n", Align.format(e.getKey(), maxKeyLength, Align.LEFT), e.getValue()); - } - out.println(); - reader.checkTFileDataIndex(); - if (blockCnt > 0) { - String blkID = "Data-Block"; - int blkIDWidth = Align.calculateWidth(blkID, blockCnt); - int blkIDWidth2 = Align.calculateWidth("", blockCnt); - String offset = "Offset"; - int offsetWidth = Align.calculateWidth(offset, length); - String blkLen = "Length"; - int blkLenWidth = Align.calculateWidth(blkLen, dataSize / blockCnt * 10); - String rawSize = "Raw-Size"; - int rawSizeWidth = Align.calculateWidth(rawSize, dataSizeUncompressed / blockCnt * 10); - String records = "Records"; - int recordsWidth = Align.calculateWidth(records, reader.getEntryCount() / blockCnt * 10); - String endKey = "End-Key"; - int endKeyWidth = Math.max(endKey.length(), maxKeySampleLen * 2 + 5); - - out.printf("%s %s %s %s %s %s%n", Align.format(blkID, blkIDWidth, Align.CENTER), Align.format(offset, offsetWidth, Align.CENTER), - Align.format(blkLen, blkLenWidth, Align.CENTER), Align.format(rawSize, rawSizeWidth, Align.CENTER), - Align.format(records, recordsWidth, Align.CENTER), Align.format(endKey, endKeyWidth, Align.LEFT)); - - for (int i = 0; i < blockCnt; ++i) { - BlockRegion region = reader.readerBCF.dataIndex.getBlockRegionList().get(i); - TFileIndexEntry indexEntry = reader.tfileIndex.getEntry(i); - out.printf("%s %s %s %s %s ", Align.format(Align.format(i, blkIDWidth2, Align.ZERO_PADDED), blkIDWidth, Align.LEFT), - Align.format(region.getOffset(), offsetWidth, Align.LEFT), Align.format(region.getCompressedSize(), blkLenWidth, Align.LEFT), - Align.format(region.getRawSize(), rawSizeWidth, Align.LEFT), Align.format(indexEntry.kvEntries, recordsWidth, Align.LEFT)); - byte[] key = indexEntry.key; - boolean asAscii = true; - int sampleLen = Math.min(maxKeySampleLen, key.length); - for (int j = 0; j < sampleLen; ++j) { - byte b = key[j]; - if ((b < 32 && b != 9) || (b == 127)) { - asAscii = false; - } - } - if (!asAscii) { - out.print("0X"); - for (int j = 0; j < sampleLen; ++j) { - byte b = key[i]; - out.printf("%X", b); - } - } else { - out.print(new String(key, 0, sampleLen)); - } - if (sampleLen < key.length) { - out.print("..."); - } - out.println(); - } - } - - out.println(); - if (metaBlkCnt > 0) { - String name = "Meta-Block"; - int maxNameLen = 0; - Set> metaBlkEntrySet = reader.readerBCF.metaIndex.index.entrySet(); - for (Iterator> it = metaBlkEntrySet.iterator(); it.hasNext();) { - Map.Entry e = it.next(); - if (e.getKey().length() > maxNameLen) { - maxNameLen = e.getKey().length(); - } - } - int nameWidth = Math.max(name.length(), maxNameLen); - String offset = "Offset"; - int offsetWidth = Align.calculateWidth(offset, length); - String blkLen = "Length"; - int blkLenWidth = Align.calculateWidth(blkLen, metaSize / metaBlkCnt * 10); - String rawSize = "Raw-Size"; - int rawSizeWidth = Align.calculateWidth(rawSize, metaSizeUncompressed / metaBlkCnt * 10); - String compression = "Compression"; - int compressionWidth = compression.length(); - out.printf("%s %s %s %s %s%n", Align.format(name, nameWidth, Align.CENTER), Align.format(offset, offsetWidth, Align.CENTER), - Align.format(blkLen, blkLenWidth, Align.CENTER), Align.format(rawSize, rawSizeWidth, Align.CENTER), - Align.format(compression, compressionWidth, Align.LEFT)); - - for (Iterator> it = metaBlkEntrySet.iterator(); it.hasNext();) { - Map.Entry e = it.next(); - String blkName = e.getValue().getMetaName(); - BlockRegion region = e.getValue().getRegion(); - String blkCompression = e.getValue().getCompressionAlgorithm().getName(); - out.printf("%s %s %s %s %s%n", Align.format(blkName, nameWidth, Align.LEFT), Align.format(region.getOffset(), offsetWidth, Align.LEFT), - Align.format(region.getCompressedSize(), blkLenWidth, Align.LEFT), Align.format(region.getRawSize(), rawSizeWidth, Align.LEFT), - Align.format(blkCompression, compressionWidth, Align.LEFT)); - } - } - } finally { - IOUtils.cleanup(LOG, reader, fsdis); - } - } -}