Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2A906200C89 for ; Sat, 3 Jun 2017 16:59:37 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 26554160BEC; Sat, 3 Jun 2017 14:59:37 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7495F160BEE for ; Sat, 3 Jun 2017 16:59:33 +0200 (CEST) Received: (qmail 14712 invoked by uid 500); 3 Jun 2017 14:59:31 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 13413 invoked by uid 99); 3 Jun 2017 14:59:30 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 03 Jun 2017 14:59:30 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 50DA8F218F; Sat, 3 Jun 2017 14:59:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Sat, 03 Jun 2017 14:59:43 -0000 Message-Id: <7aab1475a7054cbfa48eaca3f49e3aa5@git.apache.org> In-Reply-To: <163afbcdb2f442ceb997e707415f2393@git.apache.org> References: <163afbcdb2f442ceb997e707415f2393@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [15/51] [partial] hbase-site git commit: Published site at 82d554e3783372cc6b05489452c815b57c06f6cd. archived-at: Sat, 03 Jun 2017 14:59:37 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/c9d35424/devapidocs/src-html/org/apache/hadoop/hbase/io/hfile/HFileBlock.Writer.State.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/io/hfile/HFileBlock.Writer.State.html b/devapidocs/src-html/org/apache/hadoop/hbase/io/hfile/HFileBlock.Writer.State.html index 6414009..2ad3a12 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/io/hfile/HFileBlock.Writer.State.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/io/hfile/HFileBlock.Writer.State.html @@ -25,42 +25,42 @@ 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 -020import com.google.common.annotations.VisibleForTesting; -021import com.google.common.base.Preconditions; -022 -023import java.io.DataInputStream; -024import java.io.DataOutput; -025import java.io.DataOutputStream; -026import java.io.IOException; -027import java.io.InputStream; -028import java.nio.ByteBuffer; -029import java.util.concurrent.atomic.AtomicReference; -030 -031import org.apache.commons.logging.Log; -032import org.apache.commons.logging.LogFactory; -033import org.apache.hadoop.fs.FSDataInputStream; -034import org.apache.hadoop.fs.FSDataOutputStream; -035import org.apache.hadoop.fs.Path; -036import org.apache.hadoop.hbase.Cell; -037import org.apache.hadoop.hbase.HConstants; -038import org.apache.hadoop.hbase.classification.InterfaceAudience; -039import org.apache.hadoop.hbase.fs.HFileSystem; -040import org.apache.hadoop.hbase.io.ByteArrayOutputStream; -041import org.apache.hadoop.hbase.io.ByteBuffInputStream; -042import org.apache.hadoop.hbase.io.ByteBufferWriterDataOutputStream; -043import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; -044import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; -045import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; -046import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext; -047import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; -048import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; -049import org.apache.hadoop.hbase.nio.ByteBuff; -050import org.apache.hadoop.hbase.nio.MultiByteBuff; -051import org.apache.hadoop.hbase.nio.SingleByteBuff; -052import org.apache.hadoop.hbase.util.Bytes; -053import org.apache.hadoop.hbase.util.ChecksumType; -054import org.apache.hadoop.hbase.util.ClassSize; -055import org.apache.hadoop.io.IOUtils; +020import java.io.DataInputStream; +021import java.io.DataOutput; +022import java.io.DataOutputStream; +023import java.io.IOException; +024import java.io.InputStream; +025import java.nio.ByteBuffer; +026import java.util.concurrent.atomic.AtomicReference; +027 +028import org.apache.commons.logging.Log; +029import org.apache.commons.logging.LogFactory; +030import org.apache.hadoop.fs.FSDataInputStream; +031import org.apache.hadoop.fs.FSDataOutputStream; +032import org.apache.hadoop.fs.Path; +033import org.apache.hadoop.hbase.Cell; +034import org.apache.hadoop.hbase.HConstants; +035import org.apache.hadoop.hbase.classification.InterfaceAudience; +036import org.apache.hadoop.hbase.fs.HFileSystem; +037import org.apache.hadoop.hbase.io.ByteArrayOutputStream; +038import org.apache.hadoop.hbase.io.ByteBuffInputStream; +039import org.apache.hadoop.hbase.io.ByteBufferWriterDataOutputStream; +040import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; +041import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +042import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; +043import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext; +044import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; +045import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; +046import org.apache.hadoop.hbase.nio.ByteBuff; +047import org.apache.hadoop.hbase.nio.MultiByteBuff; +048import org.apache.hadoop.hbase.nio.SingleByteBuff; +049import org.apache.hadoop.hbase.util.Bytes; +050import org.apache.hadoop.hbase.util.ChecksumType; +051import org.apache.hadoop.hbase.util.ClassSize; +052import org.apache.hadoop.io.IOUtils; +053 +054import com.google.common.annotations.VisibleForTesting; +055import com.google.common.base.Preconditions; 056 057/** 058 * Reads {@link HFile} version 2 blocks to HFiles and via {@link Cacheable} Interface to caches. @@ -443,1645 +443,1656 @@ 435 return nextBlockOnDiskSize; 436 } 437 -438 public BlockType getBlockType() { -439 return blockType; -440 } -441 -442 /** @return get data block encoding id that was used to encode this block */ -443 public short getDataBlockEncodingId() { -444 if (blockType != BlockType.ENCODED_DATA) { -445 throw new IllegalArgumentException("Querying encoder ID of a block " + -446 "of type other than " + BlockType.ENCODED_DATA + ": " + blockType); -447 } -448 return buf.getShort(headerSize()); -449 } -450 -451 /** -452 * @return the on-disk size of header + data part + checksum. -453 */ -454 public int getOnDiskSizeWithHeader() { -455 return onDiskSizeWithoutHeader + headerSize(); -456 } -457 -458 /** -459 * @return the on-disk size of the data part + checksum (header excluded). -460 */ -461 int getOnDiskSizeWithoutHeader() { -462 return onDiskSizeWithoutHeader; -463 } -464 -465 /** -466 * @return the uncompressed size of data part (header and checksum excluded). -467 */ -468 int getUncompressedSizeWithoutHeader() { -469 return uncompressedSizeWithoutHeader; -470 } -471 -472 /** -473 * @return the offset of the previous block of the same type in the file, or -474 * -1 if unknown -475 */ -476 long getPrevBlockOffset() { -477 return prevBlockOffset; -478 } -479 -480 /** -481 * Rewinds {@code buf} and writes first 4 header fields. {@code buf} position -482 * is modified as side-effect. -483 */ -484 private void overwriteHeader() { -485 buf.rewind(); -486 blockType.write(buf); -487 buf.putInt(onDiskSizeWithoutHeader); -488 buf.putInt(uncompressedSizeWithoutHeader); -489 buf.putLong(prevBlockOffset); -490 if (this.fileContext.isUseHBaseChecksum()) { -491 buf.put(fileContext.getChecksumType().getCode()); -492 buf.putInt(fileContext.getBytesPerChecksum()); -493 buf.putInt(onDiskDataSizeWithHeader); -494 } -495 } -496 -497 /** -498 * Returns a buffer that does not include the header or checksum. -499 * -500 * @return the buffer with header skipped and checksum omitted. -501 */ -502 public ByteBuff getBufferWithoutHeader() { -503 ByteBuff dup = getBufferReadOnly(); -504 // Now set it up so Buffer spans content only -- no header or no checksums. -505 return dup.position(headerSize()).limit(buf.limit() - totalChecksumBytes()).slice(); -506 } -507 -508 /** -509 * Returns a read-only duplicate of the buffer this block stores internally ready to be read. -510 * Clients must not modify the buffer object though they may set position and limit on the -511 * returned buffer since we pass back a duplicate. This method has to be public because it is used -512 * in {@link CompoundBloomFilter} to avoid object creation on every Bloom -513 * filter lookup, but has to be used with caution. Buffer holds header, block content, -514 * and any follow-on checksums if present. -515 * -516 * @return the buffer of this block for read-only operations -517 */ -518 public ByteBuff getBufferReadOnly() { -519 // TODO: ByteBuf does not support asReadOnlyBuffer(). Fix. -520 ByteBuff dup = this.buf.duplicate(); -521 assert dup.position() == 0; -522 return dup; -523 } -524 -525 private void sanityCheckAssertion(long valueFromBuf, long valueFromField, -526 String fieldName) throws IOException { -527 if (valueFromBuf != valueFromField) { -528 throw new AssertionError(fieldName + " in the buffer (" + valueFromBuf -529 + ") is different from that in the field (" + valueFromField + ")"); -530 } -531 } -532 -533 private void sanityCheckAssertion(BlockType valueFromBuf, BlockType valueFromField) -534 throws IOException { -535 if (valueFromBuf != valueFromField) { -536 throw new IOException("Block type stored in the buffer: " + -537 valueFromBuf + ", block type field: " + valueFromField); -538 } -539 } -540 -541 /** -542 * Checks if the block is internally consistent, i.e. the first -543 * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a -544 * valid header consistent with the fields. Assumes a packed block structure. -545 * This function is primary for testing and debugging, and is not -546 * thread-safe, because it alters the internal buffer pointer. -547 * Used by tests only. -548 */ -549 @VisibleForTesting -550 void sanityCheck() throws IOException { -551 // Duplicate so no side-effects -552 ByteBuff dup = this.buf.duplicate().rewind(); -553 sanityCheckAssertion(BlockType.read(dup), blockType); -554 -555 sanityCheckAssertion(dup.getInt(), onDiskSizeWithoutHeader, "onDiskSizeWithoutHeader"); -556 -557 sanityCheckAssertion(dup.getInt(), uncompressedSizeWithoutHeader, -558 "uncompressedSizeWithoutHeader"); -559 -560 sanityCheckAssertion(dup.getLong(), prevBlockOffset, "prevBlockOffset"); -561 if (this.fileContext.isUseHBaseChecksum()) { -562 sanityCheckAssertion(dup.get(), this.fileContext.getChecksumType().getCode(), "checksumType"); -563 sanityCheckAssertion(dup.getInt(), this.fileContext.getBytesPerChecksum(), -564 "bytesPerChecksum"); -565 sanityCheckAssertion(dup.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader"); -566 } -567 -568 int cksumBytes = totalChecksumBytes(); -569 int expectedBufLimit = onDiskDataSizeWithHeader + cksumBytes; -570 if (dup.limit() != expectedBufLimit) { -571 throw new AssertionError("Expected limit " + expectedBufLimit + ", got " + dup.limit()); -572 } -573 -574 // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next -575 // block's header, so there are two sensible values for buffer capacity. -576 int hdrSize = headerSize(); -577 if (dup.capacity() != expectedBufLimit && dup.capacity() != expectedBufLimit + hdrSize) { -578 throw new AssertionError("Invalid buffer capacity: " + dup.capacity() + -579 ", expected " + expectedBufLimit + " or " + (expectedBufLimit + hdrSize)); -580 } -581 } -582 -583 @Override -584 public String toString() { -585 StringBuilder sb = new StringBuilder() -586 .append("[") -587 .append("blockType=").append(blockType) -588 .append(", fileOffset=").append(offset) -589 .append(", headerSize=").append(headerSize()) -590 .append(", onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader) -591 .append(", uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader) -592 .append(", prevBlockOffset=").append(prevBlockOffset) -593 .append(", isUseHBaseChecksum=").append(fileContext.isUseHBaseChecksum()); -594 if (fileContext.isUseHBaseChecksum()) { -595 sb.append(", checksumType=").append(ChecksumType.codeToType(this.buf.get(24))) -596 .append(", bytesPerChecksum=").append(this.buf.getInt(24 + 1)) -597 .append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader); -598 } else { -599 sb.append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader) -600 .append("(").append(onDiskSizeWithoutHeader) -601 .append("+").append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")"); -602 } -603 String dataBegin = null; -604 if (buf.hasArray()) { -605 dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(), -606 Math.min(32, buf.limit() - buf.arrayOffset() - headerSize())); -607 } else { -608 ByteBuff bufWithoutHeader = getBufferWithoutHeader(); -609 byte[] dataBeginBytes = new byte[Math.min(32, -610 bufWithoutHeader.limit() - bufWithoutHeader.position())]; -611 bufWithoutHeader.get(dataBeginBytes); -612 dataBegin = Bytes.toStringBinary(dataBeginBytes); -613 } -614 sb.append(", getOnDiskSizeWithHeader=").append(getOnDiskSizeWithHeader()) -615 .append(", totalChecksumBytes=").append(totalChecksumBytes()) -616 .append(", isUnpacked=").append(isUnpacked()) -617 .append(", buf=[").append(buf).append("]") -618 .append(", dataBeginsWith=").append(dataBegin) -619 .append(", fileContext=").append(fileContext) -620 .append("]"); -621 return sb.toString(); -622 } -623 -624 /** -625 * Retrieves the decompressed/decrypted view of this block. An encoded block remains in its -626 * encoded structure. Internal structures are shared between instances where applicable. -627 */ -628 HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException { -629 if (!fileContext.isCompressedOrEncrypted()) { -630 // TODO: cannot use our own fileContext here because HFileBlock(ByteBuffer, boolean), -631 // which is used for block serialization to L2 cache, does not preserve encoding and -632 // encryption details. -633 return this; -634 } -635 -636 HFileBlock unpacked = new HFileBlock(this); -637 unpacked.allocateBuffer(); // allocates space for the decompressed block -638 -639 HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA ? -640 reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext(); -641 -642 ByteBuff dup = this.buf.duplicate(); -643 dup.position(this.headerSize()); -644 dup = dup.slice(); -645 ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(), -646 unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(), -647 dup); -648 return unpacked; -649 } -650 -651 /** -652 * Always allocates a new buffer of the correct size. Copies header bytes -653 * from the existing buffer. Does not change header fields. -654 * Reserve room to keep checksum bytes too. -655 */ -656 private void allocateBuffer() { -657 int cksumBytes = totalChecksumBytes(); -658 int headerSize = headerSize(); -659 int capacityNeeded = headerSize + uncompressedSizeWithoutHeader + cksumBytes; -660 -661 // TODO we need consider allocating offheap here? -662 ByteBuffer newBuf = ByteBuffer.allocate(capacityNeeded); -663 -664 // Copy header bytes into newBuf. -665 // newBuf is HBB so no issue in calling array() -666 buf.position(0); -667 buf.get(newBuf.array(), newBuf.arrayOffset(), headerSize); -668 -669 buf = new SingleByteBuff(newBuf); -670 // set limit to exclude next block's header -671 buf.limit(headerSize + uncompressedSizeWithoutHeader + cksumBytes); -672 } -673 -674 /** -675 * Return true when this block's buffer has been unpacked, false otherwise. Note this is a -676 * calculated heuristic, not tracked attribute of the block. -677 */ -678 public boolean isUnpacked() { -679 final int cksumBytes = totalChecksumBytes(); -680 final int headerSize = headerSize(); -681 final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader + cksumBytes; -682 final int bufCapacity = buf.capacity(); -683 return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize; -684 } -685 -686 /** An additional sanity-check in case no compression or encryption is being used. */ -687 public void sanityCheckUncompressedSize() throws IOException { -688 if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + totalChecksumBytes()) { -689 throw new IOException("Using no compression but " -690 + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", " -691 + "uncompressedSizeWithoutHeader=" + uncompressedSizeWithoutHeader -692 + ", numChecksumbytes=" + totalChecksumBytes()); -693 } -694 } -695 -696 /** -697 * Cannot be {@link #UNSET}. Must be a legitimate value. Used re-making the {@link BlockCacheKey} when -698 * block is returned to the cache. -699 * @return the offset of this block in the file it was read from -700 */ -701 long getOffset() { -702 if (offset < 0) { -703 throw new IllegalStateException("HFile block offset not initialized properly"); -704 } -705 return offset; -706 } -707 -708 /** -709 * @return a byte stream reading the data + checksum of this block -710 */ -711 DataInputStream getByteStream() { -712 ByteBuff dup = this.buf.duplicate(); -713 dup.position(this.headerSize()); -714 return new DataInputStream(new ByteBuffInputStream(dup)); -715 } -716 -717 @Override -718 public long heapSize() { -719 long size = ClassSize.align( -720 ClassSize.OBJECT + -721 // Block type, multi byte buffer, MemoryType and meta references -722 4 * ClassSize.REFERENCE + -723 // On-disk size, uncompressed size, and next block's on-disk size -724 // bytePerChecksum and onDiskDataSize -725 4 * Bytes.SIZEOF_INT + -726 // This and previous block offset -727 2 * Bytes.SIZEOF_LONG + -728 // Heap size of the meta object. meta will be always not null. -729 fileContext.heapSize() -730 ); -731 -732 if (buf != null) { -733 // Deep overhead of the byte buffer. Needs to be aligned separately. -734 size += ClassSize.align(buf.capacity() + MULTI_BYTE_BUFFER_HEAP_SIZE); -735 } -736 -737 return ClassSize.align(size); -738 } -739 -740 /** -741 * Read from an input stream at least <code>necessaryLen</code> and if possible, -742 * <code>extraLen</code> also if available. Analogous to -743 * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but specifies a -744 * number of "extra" bytes to also optionally read. -745 * -746 * @param in the input stream to read from -747 * @param buf the buffer to read into -748 * @param bufOffset the destination offset in the buffer -749 * @param necessaryLen the number of bytes that are absolutely necessary to read -750 * @param extraLen the number of extra bytes that would be nice to read -751 * @return true if succeeded reading the extra bytes -752 * @throws IOException if failed to read the necessary bytes -753 */ -754 static boolean readWithExtra(InputStream in, byte[] buf, -755 int bufOffset, int necessaryLen, int extraLen) throws IOException { -756 int bytesRemaining = necessaryLen + extraLen; -757 while (bytesRemaining > 0) { -758 int ret = in.read(buf, bufOffset, bytesRemaining); -759 if (ret == -1 && bytesRemaining <= extraLen) { -760 // We could not read the "extra data", but that is OK. -761 break; -762 } -763 if (ret < 0) { -764 throw new IOException("Premature EOF from inputStream (read " -765 + "returned " + ret + ", was trying to read " + necessaryLen -766 + " necessary bytes and " + extraLen + " extra bytes, " -767 + "successfully read " -768 + (necessaryLen + extraLen - bytesRemaining)); -769 } -770 bufOffset += ret; -771 bytesRemaining -= ret; -772 } -773 return bytesRemaining <= 0; -774 } -775 -776 /** -777 * Read from an input stream at least <code>necessaryLen</code> and if possible, -778 * <code>extraLen</code> also if available. Analogous to -779 * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses -780 * positional read and specifies a number of "extra" bytes that would be -781 * desirable but not absolutely necessary to read. -782 * -783 * @param in the input stream to read from -784 * @param position the position within the stream from which to start reading -785 * @param buf the buffer to read into -786 * @param bufOffset the destination offset in the buffer -787 * @param necessaryLen the number of bytes that are absolutely necessary to -788 * read -789 * @param extraLen the number of extra bytes that would be nice to read -790 * @return true if and only if extraLen is > 0 and reading those extra bytes -791 * was successful -792 * @throws IOException if failed to read the necessary bytes -793 */ -794 @VisibleForTesting -795 static boolean positionalReadWithExtra(FSDataInputStream in, -796 long position, byte[] buf, int bufOffset, int necessaryLen, int extraLen) -797 throws IOException { -798 int bytesRemaining = necessaryLen + extraLen; -799 int bytesRead = 0; -800 while (bytesRead < necessaryLen) { -801 int ret = in.read(position, buf, bufOffset, bytesRemaining); -802 if (ret < 0) { -803 throw new IOException("Premature EOF from inputStream (positional read " -804 + "returned " + ret + ", was trying to read " + necessaryLen -805 + " necessary bytes and " + extraLen + " extra bytes, " -806 + "successfully read " + bytesRead); -807 } -808 position += ret; -809 bufOffset += ret; -810 bytesRemaining -= ret; -811 bytesRead += ret; -812 } -813 return bytesRead != necessaryLen && bytesRemaining <= 0; -814 } -815 -816 /** -817 * Unified version 2 {@link HFile} block writer. The intended usage pattern -818 * is as follows: -819 * <ol> -820 * <li>Construct an {@link HFileBlock.Writer}, providing a compression algorithm. -821 * <li>Call {@link Writer#startWriting} and get a data stream to write to. -822 * <li>Write your data into the stream. -823 * <li>Call Writer#writeHeaderAndData(FSDataOutputStream) as many times as you need to. -824 * store the serialized block into an external stream. -825 * <li>Repeat to write more blocks. -826 * </ol> -827 * <p> -828 */ -829 static class Writer { -830 private enum State { -831 INIT, -832 WRITING, -833 BLOCK_READY -834 }; -835 -836 /** Writer state. Used to ensure the correct usage protocol. */ -837 private State state = State.INIT; -838 -839 /** Data block encoder used for data blocks */ -840 private final HFileDataBlockEncoder dataBlockEncoder; -841 -842 private HFileBlockEncodingContext dataBlockEncodingCtx; -843 -844 /** block encoding context for non-data blocks*/ -845 private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx; -846 -847 /** -848 * The stream we use to accumulate data into a block in an uncompressed format. -849 * We reset this stream at the end of each block and reuse it. The -850 * header is written as the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes into this -851 * stream. -852 */ -853 private ByteArrayOutputStream baosInMemory; -854 -855 /** -856 * Current block type. Set in {@link #startWriting(BlockType)}. Could be -857 * changed in {@link #finishBlock()} from {@link BlockType#DATA} -858 * to {@link BlockType#ENCODED_DATA}. -859 */ -860 private BlockType blockType; -861 -862 /** -863 * A stream that we write uncompressed bytes to, which compresses them and -864 * writes them to {@link #baosInMemory}. -865 */ -866 private DataOutputStream userDataStream; -867 -868 // Size of actual data being written. Not considering the block encoding/compression. This -869 // includes the header size also. -870 private int unencodedDataSizeWritten; -871 -872 // Size of actual data being written. considering the block encoding. This -873 // includes the header size also. -874 private int encodedDataSizeWritten; -875 -876 /** -877 * Bytes to be written to the file system, including the header. Compressed -878 * if compression is turned on. It also includes the checksum data that -879 * immediately follows the block data. (header + data + checksums) -880 */ -881 private ByteArrayOutputStream onDiskBlockBytesWithHeader; -882 -883 /** -884 * The size of the checksum data on disk. It is used only if data is -885 * not compressed. If data is compressed, then the checksums are already -886 * part of onDiskBytesWithHeader. If data is uncompressed, then this -887 * variable stores the checksum data for this block. -888 */ -889 private byte[] onDiskChecksum = HConstants.EMPTY_BYTE_ARRAY; -890 -891 /** -892 * Current block's start offset in the {@link HFile}. Set in -893 * {@link #writeHeaderAndData(FSDataOutputStream)}. -894 */ -895 private long startOffset; -896 -897 /** -898 * Offset of previous block by block type. Updated when the next block is -899 * started. -900 */ -901 private long[] prevOffsetByType; -902 -903 /** The offset of the previous block of the same type */ -904 private long prevOffset; -905 /** Meta data that holds information about the hfileblock**/ -906 private HFileContext fileContext; -907 -908 /** -909 * @param dataBlockEncoder data block encoding algorithm to use -910 */ -911 public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) { -912 if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) { -913 throw new RuntimeException("Unsupported value of bytesPerChecksum. " + -914 " Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " + -915 fileContext.getBytesPerChecksum()); -916 } -917 this.dataBlockEncoder = dataBlockEncoder != null? -918 dataBlockEncoder: NoOpDataBlockEncoder.INSTANCE; -919 this.dataBlockEncodingCtx = this.dataBlockEncoder. -920 newDataBlockEncodingContext(HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext); -921 // TODO: This should be lazily instantiated since we usually do NOT need this default encoder -922 this.defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null, -923 HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext); -924 // TODO: Set BAOS initial size. Use fileContext.getBlocksize() and add for header/checksum -925 baosInMemory = new ByteArrayOutputStream(); -926 prevOffsetByType = new long[BlockType.values().length]; -927 for (int i = 0; i < prevOffsetByType.length; ++i) { -928 prevOffsetByType[i] = UNSET; -929 } -930 // TODO: Why fileContext saved away when we have dataBlockEncoder and/or -931 // defaultDataBlockEncoder? -932 this.fileContext = fileContext; -933 } -934 -935 /** -936 * Starts writing into the block. The previous block's data is discarded. -937 * -938 * @return the stream the user can write their data into -939 * @throws IOException -940 */ -941 DataOutputStream startWriting(BlockType newBlockType) -942 throws IOException { -943 if (state == State.BLOCK_READY && startOffset != -1) { -944 // We had a previous block that was written to a stream at a specific -945 // offset. Save that offset as the last offset of a block of that type. -946 prevOffsetByType[blockType.getId()] = startOffset; -947 } -948 -949 startOffset = -1; -950 blockType = newBlockType; -951 -952 baosInMemory.reset(); -953 baosInMemory.write(HConstants.HFILEBLOCK_DUMMY_HEADER); -954 -955 state = State.WRITING; -956 -957 // We will compress it later in finishBlock() -958 userDataStream = new ByteBufferWriterDataOutputStream(baosInMemory); -959 if (newBlockType == BlockType.DATA) { -960 this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream); -961 } -962 this.unencodedDataSizeWritten = 0; -963 this.encodedDataSizeWritten = 0; -964 return userDataStream; -965 } -966 -967 /** -968 * Writes the Cell to this block -969 * @param cell -970 * @throws IOException -971 */ -972 void write(Cell cell) throws IOException{ -973 expectState(State.WRITING); -974 int posBeforeEncode = this.userDataStream.size(); -975 this.unencodedDataSizeWritten += -976 this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx, this.userDataStream); -977 this.encodedDataSizeWritten += this.userDataStream.size() - posBeforeEncode; -978 } -979 -980 /** -981 * Returns the stream for the user to write to. The block writer takes care -982 * of handling compression and buffering for caching on write. Can only be -983 * called in the "writing" state. -984 * -985 * @return the data output stream for the user to write to -986 */ -987 DataOutputStream getUserDataStream() { -988 expectState(State.WRITING); -989 return userDataStream; -990 } -991 -992 /** -993 * Transitions the block writer from the "writing" state to the "block -994 * ready" state. Does nothing if a block is already finished. -995 */ -996 void ensureBlockReady() throws IOException { -997 Preconditions.checkState(state != State.INIT, -998 "Unexpected state: " + state); -999 -1000 if (state == State.BLOCK_READY) { -1001 return; -1002 } -1003 -1004 // This will set state to BLOCK_READY. -1005 finishBlock(); -1006 } -1007 -1008 /** -1009 * Finish up writing of the block. -1010 * Flushes the compressing stream (if using compression), fills out the header, -1011 * does any compression/encryption of bytes to flush out to disk, and manages -1012 * the cache on write content, if applicable. Sets block write state to "block ready". -1013 */ -1014 private void finishBlock() throws IOException { -1015 if (blockType == BlockType.DATA) { -1016 this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream, -1017 baosInMemory.getBuffer(), blockType); -1018 blockType = dataBlockEncodingCtx.getBlockType(); -1019 } -1020 userDataStream.flush(); -1021 prevOffset = prevOffsetByType[blockType.getId()]; -1022 -1023 // We need to set state before we can package the block up for cache-on-write. In a way, the -1024 // block is ready, but not yet encoded or compressed. -1025 state = State.BLOCK_READY; -1026 Bytes compressAndEncryptDat; -1027 if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) { -1028 compressAndEncryptDat = dataBlockEncodingCtx. -1029 compressAndEncrypt(baosInMemory.getBuffer(), 0, baosInMemory.size()); -1030 } else { -1031 compressAndEncryptDat = defaultBlockEncodingCtx. -1032 compressAndEncrypt(baosInMemory.getBuffer(), 0, baosInMemory.size()); -1033 } -1034 if (compressAndEncryptDat == null) { -1035 compressAndEncryptDat = new Bytes(baosInMemory.getBuffer(), 0, baosInMemory.size()); -1036 } -1037 if (onDiskBlockBytesWithHeader == null) { -1038 onDiskBlockBytesWithHeader = new ByteArrayOutputStream(compressAndEncryptDat.getLength()); -1039 } -1040 onDiskBlockBytesWithHeader.reset(); -1041 onDiskBlockBytesWithHeader.write(compressAndEncryptDat.get(), -1042 compressAndEncryptDat.getOffset(), compressAndEncryptDat.getLength()); -1043 // Calculate how many bytes we need for checksum on the tail of the block. -1044 int numBytes = (int) ChecksumUtil.numBytes( -1045 onDiskBlockBytesWithHeader.size(), -1046 fileContext.getBytesPerChecksum()); -1047 -1048 // Put the header for the on disk bytes; header currently is unfilled-out -1049 putHeader(onDiskBlockBytesWithHeader, -1050 onDiskBlockBytesWithHeader.size() + numBytes, -1051 baosInMemory.size(), onDiskBlockBytesWithHeader.size()); -1052 if (onDiskChecksum.length != numBytes) { -1053 onDiskChecksum = new byte[numBytes]; -1054 } -1055 ChecksumUtil.generateChecksums( -1056 onDiskBlockBytesWithHeader.getBuffer(), 0,onDiskBlockBytesWithHeader.size(), -1057 onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum()); -1058 } -1059 -1060 /** -1061 * Put the header into the given byte array at the given offset. -1062 * @param onDiskSize size of the block on disk header + data + checksum -1063 * @param uncompressedSize size of the block after decompression (but -1064 * before optional data block decoding) including header -1065 * @param onDiskDataSize size of the block on disk with header -1066 * and data but not including the checksums -1067 */ -1068 private void putHeader(byte[] dest, int offset, int onDiskSize, -1069 int uncompressedSize, int onDiskDataSize) { -1070 offset = blockType.put(dest, offset); -1071 offset = Bytes.putInt(dest, offset, onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE); -1072 offset = Bytes.putInt(dest, offset, uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE); -1073 offset = Bytes.putLong(dest, offset, prevOffset); -1074 offset = Bytes.putByte(dest, offset, fileContext.getChecksumType().getCode()); -1075 offset = Bytes.putInt(dest, offset, fileContext.getBytesPerChecksum()); -1076 Bytes.putInt(dest, offset, onDiskDataSize); -1077 } -1078 -1079 private void putHeader(ByteArrayOutputStream dest, int onDiskSize, -1080 int uncompressedSize, int onDiskDataSize) { -1081 putHeader(dest.getBuffer(),0, onDiskSize, uncompressedSize, onDiskDataSize); -1082 } -1083 -1084 /** -1085 * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records -1086 * the offset of this block so that it can be referenced in the next block -1087 * of the same type. -1088 * -1089 * @param out -1090 * @throws IOException -1091 */ -1092 void writeHeaderAndData(FSDataOutputStream out) throws IOException { -1093 long offset = out.getPos(); -1094 if (startOffset != UNSET && offset != startOffset) { -1095 throw new IOException("A " + blockType + " block written to a " -1096 + "stream twice, first at offset " + startOffset + ", then at " -1097 + offset); -1098 } -1099 startOffset = offset; -1100 -1101 finishBlockAndWriteHeaderAndData((DataOutputStream) out); -1102 } -1103 -1104 /** -1105 * Writes the header and the compressed data of this block (or uncompressed -1106 * data when not using compression) into the given stream. Can be called in -1107 * the "writing" state or in the "block ready" state. If called in the -1108 * "writing" state, transitions the writer to the "block ready" state. -1109 * -1110 * @param out the output stream to write the -1111 * @throws IOException -1112 */ -1113 protected void finishBlockAndWriteHeaderAndData(DataOutputStream out) -1114 throws IOException { -1115 ensureBlockReady(); -1116 out.write(onDiskBlockBytesWithHeader.getBuffer(), 0, onDiskBlockBytesWithHeader.size()); -1117 out.write(onDiskChecksum); -1118 } -1119 -1120 /** -1121 * Returns the header or the compressed data (or uncompressed data when not -1122 * using compression) as a byte array. Can be called in the "writing" state -1123 * or in the "block ready" state. If called in the "writing" state, -1124 * transitions the writer to the "block ready" state. This returns -1125 * the header + data + checksums stored on disk. -1126 * -1127 * @return header and data as they would be stored on disk in a byte array -1128 * @throws IOException -1129 */ -1130 byte[] getHeaderAndDataForTest() throws IOException { -1131 ensureBlockReady(); -1132 // This is not very optimal, because we are doing an extra copy. -1133 // But this method is used only by unit tests. -1134 byte[] output = -1135 new byte[onDiskBlockBytesWithHeader.size() -1136 + onDiskChecksum.length]; -1137 System.arraycopy(onDiskBlockBytesWithHeader.getBuffer(), 0, output, 0, -1138 onDiskBlockBytesWithHeader.size()); -1139 System.arraycopy(onDiskChecksum, 0, output, -1140 onDiskBlockBytesWithHeader.size(), onDiskChecksum.length); -1141 return output; -1142 } -1143 -1144 /** -1145 * Releases resources used by this writer. -1146 */ -1147 void release() { -1148 if (dataBlockEncodingCtx != null) { -1149 dataBlockEncodingCtx.close(); -1150 dataBlockEncodingCtx = null; -1151 } -1152 if (defaultBlockEncodingCtx != null) { -1153 defaultBlockEncodingCtx.close(); -1154 defaultBlockEncodingCtx = null; -1155 } -1156 } -1157 -1158 /** -1159 * Returns the on-disk size of the data portion of the block. This is the -1160 * compressed size if compression is enabled. Can only be called in the -1161 * "block ready" state. Header is not compressed, and its size is not -1162 * included in the return value. -1163 * -1164 * @return the on-disk size of the block, not including the header. -1165 */ -1166 int getOnDiskSizeWithoutHeader() { -1167 expectState(State.BLOCK_READY); -1168 return onDiskBlockBytesWithHeader.size() + -1169 onDiskChecksum.length - HConstants.HFILEBLOCK_HEADER_SIZE; -1170 } -1171 -1172 /** -1173 * Returns the on-disk size of the block. Can only be called in the -1174 * "block ready" state. -1175 * -1176 * @return the on-disk size of the block ready to be written, including the -1177 * header size, the data and the checksum data. -1178 */ -1179 int getOnDiskSizeWithHeader() { -1180 expectState(State.BLOCK_READY); -1181 return onDiskBlockBytesWithHeader.size() + onDiskChecksum.length; -1182 } -1183 -1184 /** -1185 * The uncompressed size of the block data. Does not include header size. -1186 */ -1187 int getUncompressedSizeWithoutHeader() { -1188 expectState(State.BLOCK_READY); -1189 return baosInMemory.size() - HConstants.HFILEBLOCK_HEADER_SIZE; -1190 } -1191 -1192 /** -1193 * The uncompressed size of the block data, including header size. -1194 */ -1195 int getUncompressedSizeWithHeader() { -1196 expectState(State.BLOCK_READY); -1197 return baosInMemory.size(); -1198 } -1199 -1200 /** @return true if a block is being written */ -1201 boolean isWriting() { -1202 return state == State.WRITING; -1203 } -1204 -1205 /** -1206 * Returns the number of bytes written into the current block so far, or -1207 * zero if not writing the block at the moment. Note that this will return -1208 * zero in the "block ready" state as well. -1209 * -1210 * @return the number of bytes written -1211 */ -1212 public int encodedBlockSizeWritten() { -1213 if (state != State.WRITING) -1214 return 0; -1215 return this.encodedDataSizeWritten; -1216 } -1217 -1218 /** -1219 * Returns the number of bytes written into the current block so far, or -1220 * zero if not writing the block at the moment. Note that this will return -1221 * zero in the "block ready" state as well. -1222 * -1223 * @return the number of bytes written -1224 */ -1225 int blockSizeWritten() { -1226 if (state != State.WRITING) return 0; -1227 return this.unencodedDataSizeWritten; -1228 } -1229 -1230 /** -1231 * Clones the header followed by the uncompress