Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D53B3200C64 for ; Fri, 28 Apr 2017 16:59:03 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D3B2A160BC7; Fri, 28 Apr 2017 14:59:03 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 58E01160BA3 for ; Fri, 28 Apr 2017 16:59:01 +0200 (CEST) Received: (qmail 27384 invoked by uid 500); 28 Apr 2017 14:59:00 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 27370 invoked by uid 99); 28 Apr 2017 14:59:00 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 28 Apr 2017 14:59:00 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0A169E1103; Fri, 28 Apr 2017 14:59:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Fri, 28 Apr 2017 14:59:01 -0000 Message-Id: <9ecb0e9d8f894dd0afa114d6537d0773@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [02/51] [partial] hbase-site git commit: Published site at 82d554e3783372cc6b05489452c815b57c06f6cd. archived-at: Fri, 28 Apr 2017 14:59:04 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/6f2e75f2/devapidocs/src-html/org/apache/hadoop/hbase/io/hfile/HFileBlock.Writer.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/io/hfile/HFileBlock.Writer.html b/devapidocs/src-html/org/apache/hadoop/hbase/io/hfile/HFileBlock.Writer.html index a665139..3fedd0b 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/io/hfile/HFileBlock.Writer.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/io/hfile/HFileBlock.Writer.html @@ -879,1201 +879,1221 @@ 871 // includes the header size also. 872 private int unencodedDataSizeWritten; 873 -874 /** -875 * Bytes to be written to the file system, including the header. Compressed -876 * if compression is turned on. It also includes the checksum data that -877 * immediately follows the block data. (header + data + checksums) -878 */ -879 private ByteArrayOutputStream onDiskBlockBytesWithHeader; -880 -881 /** -882 * The size of the checksum data on disk. It is used only if data is -883 * not compressed. If data is compressed, then the checksums are already -884 * part of onDiskBytesWithHeader. If data is uncompressed, then this -885 * variable stores the checksum data for this block. -886 */ -887 private byte[] onDiskChecksum = HConstants.EMPTY_BYTE_ARRAY; -888 -889 /** -890 * Current block's start offset in the {@link HFile}. Set in -891 * {@link #writeHeaderAndData(FSDataOutputStream)}. -892 */ -893 private long startOffset; -894 -895 /** -896 * Offset of previous block by block type. Updated when the next block is -897 * started. -898 */ -899 private long[] prevOffsetByType; -900 -901 /** The offset of the previous block of the same type */ -902 private long prevOffset; -903 /** Meta data that holds information about the hfileblock**/ -904 private HFileContext fileContext; -905 -906 /** -907 * @param dataBlockEncoder data block encoding algorithm to use -908 */ -909 public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) { -910 if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) { -911 throw new RuntimeException("Unsupported value of bytesPerChecksum. " + -912 " Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " + -913 fileContext.getBytesPerChecksum()); -914 } -915 this.dataBlockEncoder = dataBlockEncoder != null? -916 dataBlockEncoder: NoOpDataBlockEncoder.INSTANCE; -917 this.dataBlockEncodingCtx = this.dataBlockEncoder. -918 newDataBlockEncodingContext(HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext); -919 // TODO: This should be lazily instantiated since we usually do NOT need this default encoder -920 this.defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null, -921 HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext); -922 // TODO: Set BAOS initial size. Use fileContext.getBlocksize() and add for header/checksum -923 baosInMemory = new ByteArrayOutputStream(); -924 prevOffsetByType = new long[BlockType.values().length]; -925 for (int i = 0; i < prevOffsetByType.length; ++i) { -926 prevOffsetByType[i] = UNSET; -927 } -928 // TODO: Why fileContext saved away when we have dataBlockEncoder and/or -929 // defaultDataBlockEncoder? -930 this.fileContext = fileContext; -931 } -932 -933 /** -934 * Starts writing into the block. The previous block's data is discarded. -935 * -936 * @return the stream the user can write their data into -937 * @throws IOException -938 */ -939 DataOutputStream startWriting(BlockType newBlockType) -940 throws IOException { -941 if (state == State.BLOCK_READY && startOffset != -1) { -942 // We had a previous block that was written to a stream at a specific -943 // offset. Save that offset as the last offset of a block of that type. -944 prevOffsetByType[blockType.getId()] = startOffset; -945 } -946 -947 startOffset = -1; -948 blockType = newBlockType; -949 -950 baosInMemory.reset(); -951 baosInMemory.write(HConstants.HFILEBLOCK_DUMMY_HEADER); -952 -953 state = State.WRITING; -954 -955 // We will compress it later in finishBlock() -956 userDataStream = new ByteBufferWriterDataOutputStream(baosInMemory); -957 if (newBlockType == BlockType.DATA) { -958 this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream); -959 } -960 this.unencodedDataSizeWritten = 0; -961 return userDataStream; -962 } -963 -964 /** -965 * Writes the Cell to this block -966 * @param cell -967 * @throws IOException -968 */ -969 void write(Cell cell) throws IOException{ -970 expectState(State.WRITING); -971 this.unencodedDataSizeWritten += -972 this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx, this.userDataStream); -973 } -974 -975 /** -976 * Returns the stream for the user to write to. The block writer takes care -977 * of handling compression and buffering for caching on write. Can only be -978 * called in the "writing" state. -979 * -980 * @return the data output stream for the user to write to -981 */ -982 DataOutputStream getUserDataStream() { -983 expectState(State.WRITING); -984 return userDataStream; -985 } -986 -987 /** -988 * Transitions the block writer from the "writing" state to the "block -989 * ready" state. Does nothing if a block is already finished. -990 */ -991 void ensureBlockReady() throws IOException { -992 Preconditions.checkState(state != State.INIT, -993 "Unexpected state: " + state); -994 -995 if (state == State.BLOCK_READY) { -996 return; -997 } -998 -999 // This will set state to BLOCK_READY. -1000 finishBlock(); -1001 } -1002 -1003 /** -1004 * Finish up writing of the block. -1005 * Flushes the compressing stream (if using compression), fills out the header, -1006 * does any compression/encryption of bytes to flush out to disk, and manages -1007 * the cache on write content, if applicable. Sets block write state to "block ready". -1008 */ -1009 private void finishBlock() throws IOException { -1010 if (blockType == BlockType.DATA) { -1011 this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream, -1012 baosInMemory.getBuffer(), blockType); -1013 blockType = dataBlockEncodingCtx.getBlockType(); -1014 } -1015 userDataStream.flush(); -1016 prevOffset = prevOffsetByType[blockType.getId()]; -1017 -1018 // We need to set state before we can package the block up for cache-on-write. In a way, the -1019 // block is ready, but not yet encoded or compressed. -1020 state = State.BLOCK_READY; -1021 Bytes compressAndEncryptDat; -1022 if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) { -1023 compressAndEncryptDat = dataBlockEncodingCtx. -1024 compressAndEncrypt(baosInMemory.getBuffer(), 0, baosInMemory.size()); -1025 } else { -1026 compressAndEncryptDat = defaultBlockEncodingCtx. -1027 compressAndEncrypt(baosInMemory.getBuffer(), 0, baosInMemory.size()); -1028 } -1029 if (compressAndEncryptDat == null) { -1030 compressAndEncryptDat = new Bytes(baosInMemory.getBuffer(), 0, baosInMemory.size()); -1031 } -1032 if (onDiskBlockBytesWithHeader == null) { -1033 onDiskBlockBytesWithHeader = new ByteArrayOutputStream(compressAndEncryptDat.getLength()); -1034 } -1035 onDiskBlockBytesWithHeader.reset(); -1036 onDiskBlockBytesWithHeader.write(compressAndEncryptDat.get(), -1037 compressAndEncryptDat.getOffset(), compressAndEncryptDat.getLength()); -1038 // Calculate how many bytes we need for checksum on the tail of the block. -1039 int numBytes = (int) ChecksumUtil.numBytes( -1040 onDiskBlockBytesWithHeader.size(), -1041 fileContext.getBytesPerChecksum()); -1042 -1043 // Put the header for the on disk bytes; header currently is unfilled-out -1044 putHeader(onDiskBlockBytesWithHeader, -1045 onDiskBlockBytesWithHeader.size() + numBytes, -1046 baosInMemory.size(), onDiskBlockBytesWithHeader.size()); -1047 if (onDiskChecksum.length != numBytes) { -1048 onDiskChecksum = new byte[numBytes]; -1049 } -1050 ChecksumUtil.generateChecksums( -1051 onDiskBlockBytesWithHeader.getBuffer(), 0,onDiskBlockBytesWithHeader.size(), -1052 onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum()); -1053 } -1054 -1055 /** -1056 * Put the header into the given byte array at the given offset. -1057 * @param onDiskSize size of the block on disk header + data + checksum -1058 * @param uncompressedSize size of the block after decompression (but -1059 * before optional data block decoding) including header -1060 * @param onDiskDataSize size of the block on disk with header -1061 * and data but not including the checksums -1062 */ -1063 private void putHeader(byte[] dest, int offset, int onDiskSize, -1064 int uncompressedSize, int onDiskDataSize) { -1065 offset = blockType.put(dest, offset); -1066 offset = Bytes.putInt(dest, offset, onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE); -1067 offset = Bytes.putInt(dest, offset, uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE); -1068 offset = Bytes.putLong(dest, offset, prevOffset); -1069 offset = Bytes.putByte(dest, offset, fileContext.getChecksumType().getCode()); -1070 offset = Bytes.putInt(dest, offset, fileContext.getBytesPerChecksum()); -1071 Bytes.putInt(dest, offset, onDiskDataSize); -1072 } -1073 -1074 private void putHeader(ByteArrayOutputStream dest, int onDiskSize, -1075 int uncompressedSize, int onDiskDataSize) { -1076 putHeader(dest.getBuffer(),0, onDiskSize, uncompressedSize, onDiskDataSize); -1077 } -1078 -1079 /** -1080 * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records -1081 * the offset of this block so that it can be referenced in the next block -1082 * of the same type. -1083 * -1084 * @param out -1085 * @throws IOException -1086 */ -1087 void writeHeaderAndData(FSDataOutputStream out) throws IOException { -1088 long offset = out.getPos(); -1089 if (startOffset != UNSET && offset != startOffset) { -1090 throw new IOException("A " + blockType + " block written to a " -1091 + "stream twice, first at offset " + startOffset + ", then at " -1092 + offset); -1093 } -1094 startOffset = offset; -1095 -1096 finishBlockAndWriteHeaderAndData((DataOutputStream) out); -1097 } -1098 -1099 /** -1100 * Writes the header and the compressed data of this block (or uncompressed -1101 * data when not using compression) into the given stream. Can be called in -1102 * the "writing" state or in the "block ready" state. If called in the -1103 * "writing" state, transitions the writer to the "block ready" state. -1104 * -1105 * @param out the output stream to write the -1106 * @throws IOException -1107 */ -1108 protected void finishBlockAndWriteHeaderAndData(DataOutputStream out) -1109 throws IOException { -1110 ensureBlockReady(); -1111 out.write(onDiskBlockBytesWithHeader.getBuffer(), 0, onDiskBlockBytesWithHeader.size()); -1112 out.write(onDiskChecksum); -1113 } -1114 -1115 /** -1116 * Returns the header or the compressed data (or uncompressed data when not -1117 * using compression) as a byte array. Can be called in the "writing" state -1118 * or in the "block ready" state. If called in the "writing" state, -1119 * transitions the writer to the "block ready" state. This returns -1120 * the header + data + checksums stored on disk. -1121 * -1122 * @return header and data as they would be stored on disk in a byte array -1123 * @throws IOException -1124 */ -1125 byte[] getHeaderAndDataForTest() throws IOException { -1126 ensureBlockReady(); -1127 // This is not very optimal, because we are doing an extra copy. -1128 // But this method is used only by unit tests. -1129 byte[] output = -1130 new byte[onDiskBlockBytesWithHeader.size() -1131 + onDiskChecksum.length]; -1132 System.arraycopy(onDiskBlockBytesWithHeader.getBuffer(), 0, output, 0, -1133 onDiskBlockBytesWithHeader.size()); -1134 System.arraycopy(onDiskChecksum, 0, output, -1135 onDiskBlockBytesWithHeader.size(), onDiskChecksum.length); -1136 return output; -1137 } -1138 -1139 /** -1140 * Releases resources used by this writer. -1141 */ -1142 void release() { -1143 if (dataBlockEncodingCtx != null) { -1144 dataBlockEncodingCtx.close(); -1145 dataBlockEncodingCtx = null; -1146 } -1147 if (defaultBlockEncodingCtx != null) { -1148 defaultBlockEncodingCtx.close(); -1149 defaultBlockEncodingCtx = null; -1150 } -1151 } -1152 -1153 /** -1154 * Returns the on-disk size of the data portion of the block. This is the -1155 * compressed size if compression is enabled. Can only be called in the -1156 * "block ready" state. Header is not compressed, and its size is not -1157 * included in the return value. -1158 * -1159 * @return the on-disk size of the block, not including the header. -1160 */ -1161 int getOnDiskSizeWithoutHeader() { -1162 expectState(State.BLOCK_READY); -1163 return onDiskBlockBytesWithHeader.size() + -1164 onDiskChecksum.length - HConstants.HFILEBLOCK_HEADER_SIZE; -1165 } -1166 -1167 /** -1168 * Returns the on-disk size of the block. Can only be called in the -1169 * "block ready" state. -1170 * -1171 * @return the on-disk size of the block ready to be written, including the -1172 * header size, the data and the checksum data. -1173 */ -1174 int getOnDiskSizeWithHeader() { -1175 expectState(State.BLOCK_READY); -1176 return onDiskBlockBytesWithHeader.size() + onDiskChecksum.length; -1177 } -1178 -1179 /** -1180 * The uncompressed size of the block data. Does not include header size. -1181 */ -1182 int getUncompressedSizeWithoutHeader() { -1183 expectState(State.BLOCK_READY); -1184 return baosInMemory.size() - HConstants.HFILEBLOCK_HEADER_SIZE; -1185 } -1186 -1187 /** -1188 * The uncompressed size of the block data, including header size. -1189 */ -1190 int getUncompressedSizeWithHeader() { -1191 expectState(State.BLOCK_READY); -1192 return baosInMemory.size(); -1193 } -1194 -1195 /** @return true if a block is being written */ -1196 boolean isWriting() { -1197 return state == State.WRITING; -1198 } -1199 -1200 /** -1201 * Returns the number of bytes written into the current block so far, or -1202 * zero if not writing the block at the moment. Note that this will return -1203 * zero in the "block ready" state as well. -1204 * -1205 * @return the number of bytes written -1206 */ -1207 int blockSizeWritten() { -1208 if (state != State.WRITING) return 0; -1209 return this.unencodedDataSizeWritten; -1210 } -1211 -1212 /** -1213 * Clones the header followed by the uncompressed data, even if using -1214 * compression. This is needed for storing uncompressed blocks in the block -1215 * cache. Can be called in the "writing" state or the "block ready" state. -1216 * Returns only the header and data, does not include checksum data. -1217 * -1218 * @return Returns a copy of uncompressed block bytes for caching on write -1219 */ -1220 @VisibleForTesting -1221 ByteBuffer cloneUncompressedBufferWithHeader() { -1222 expectState(State.BLOCK_READY); -1223 byte[] uncompressedBlockBytesWithHeader = baosInMemory.toByteArray(); -1224 int numBytes = (int) ChecksumUtil.numBytes( -1225 onDiskBlockBytesWithHeader.size(), -1226 fileContext.getBytesPerChecksum()); -1227 putHeader(uncompressedBlockBytesWithHeader, 0, -1228 onDiskBlockBytesWithHeader.size() + numBytes, -1229 baosInMemory.size(), onDiskBlockBytesWithHeader.size()); -1230 return ByteBuffer.wrap(uncompressedBlockBytesWithHeader); -1231 } -1232 -1233 /** -1234 * Clones the header followed by the on-disk (compressed/encoded/encrypted) data. This is -1235 * needed for storing packed blocks in the block cache. Expects calling semantics identical to -1236 * {@link #getUncompressedBufferWithHeader()}. Returns only the header and data, -1237 * Does not include checksum data. -1238 * -1239 * @return Returns a copy of block bytes for caching on write -1240 */ -1241 private ByteBuffer cloneOnDiskBufferWithHeader() { +874 // Size of actual data being written. considering the block encoding. This +875 // includes the header size also. +876 private int encodedDataSizeWritten; +877 +878 /** +879 * Bytes to be written to the file system, including the header. Compressed +880 * if compression is turned on. It also includes the checksum data that +881 * immediately follows the block data. (header + data + checksums) +882 */ +883 private ByteArrayOutputStream onDiskBlockBytesWithHeader; +884 +885 /** +886 * The size of the checksum data on disk. It is used only if data is +887 * not compressed. If data is compressed, then the checksums are already +888 * part of onDiskBytesWithHeader. If data is uncompressed, then this +889 * variable stores the checksum data for this block. +890 */ +891 private byte[] onDiskChecksum = HConstants.EMPTY_BYTE_ARRAY; +892 +893 /** +894 * Current block's start offset in the {@link HFile}. Set in +895 * {@link #writeHeaderAndData(FSDataOutputStream)}. +896 */ +897 private long startOffset; +898 +899 /** +900 * Offset of previous block by block type. Updated when the next block is +901 * started. +902 */ +903 private long[] prevOffsetByType; +904 +905 /** The offset of the previous block of the same type */ +906 private long prevOffset; +907 /** Meta data that holds information about the hfileblock**/ +908 private HFileContext fileContext; +909 +910 /** +911 * @param dataBlockEncoder data block encoding algorithm to use +912 */ +913 public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) { +914 if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) { +915 throw new RuntimeException("Unsupported value of bytesPerChecksum. " + +916 " Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " + +917 fileContext.getBytesPerChecksum()); +918 } +919 this.dataBlockEncoder = dataBlockEncoder != null? +920 dataBlockEncoder: NoOpDataBlockEncoder.INSTANCE; +921 this.dataBlockEncodingCtx = this.dataBlockEncoder. +922 newDataBlockEncodingContext(HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext); +923 // TODO: This should be lazily instantiated since we usually do NOT need this default encoder +924 this.defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null, +925 HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext); +926 // TODO: Set BAOS initial size. Use fileContext.getBlocksize() and add for header/checksum +927 baosInMemory = new ByteArrayOutputStream(); +928 prevOffsetByType = new long[BlockType.values().length]; +929 for (int i = 0; i < prevOffsetByType.length; ++i) { +930 prevOffsetByType[i] = UNSET; +931 } +932 // TODO: Why fileContext saved away when we have dataBlockEncoder and/or +933 // defaultDataBlockEncoder? +934 this.fileContext = fileContext; +935 } +936 +937 /** +938 * Starts writing into the block. The previous block's data is discarded. +939 * +940 * @return the stream the user can write their data into +941 * @throws IOException +942 */ +943 DataOutputStream startWriting(BlockType newBlockType) +944 throws IOException { +945 if (state == State.BLOCK_READY && startOffset != -1) { +946 // We had a previous block that was written to a stream at a specific +947 // offset. Save that offset as the last offset of a block of that type. +948 prevOffsetByType[blockType.getId()] = startOffset; +949 } +950 +951 startOffset = -1; +952 blockType = newBlockType; +953 +954 baosInMemory.reset(); +955 baosInMemory.write(HConstants.HFILEBLOCK_DUMMY_HEADER); +956 +957 state = State.WRITING; +958 +959 // We will compress it later in finishBlock() +960 userDataStream = new ByteBufferWriterDataOutputStream(baosInMemory); +961 if (newBlockType == BlockType.DATA) { +962 this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream); +963 } +964 this.unencodedDataSizeWritten = 0; +965 this.encodedDataSizeWritten = 0; +966 return userDataStream; +967 } +968 +969 /** +970 * Writes the Cell to this block +971 * @param cell +972 * @throws IOException +973 */ +974 void write(Cell cell) throws IOException{ +975 expectState(State.WRITING); +976 int posBeforeEncode = this.userDataStream.size(); +977 this.unencodedDataSizeWritten += +978 this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx, this.userDataStream); +979 this.encodedDataSizeWritten += this.userDataStream.size() - posBeforeEncode; +980 } +981 +982 /** +983 * Returns the stream for the user to write to. The block writer takes care +984 * of handling compression and buffering for caching on write. Can only be +985 * called in the "writing" state. +986 * +987 * @return the data output stream for the user to write to +988 */ +989 DataOutputStream getUserDataStream() { +990 expectState(State.WRITING); +991 return userDataStream; +992 } +993 +994 /** +995 * Transitions the block writer from the "writing" state to the "block +996 * ready" state. Does nothing if a block is already finished. +997 */ +998 void ensureBlockReady() throws IOException { +999 Preconditions.checkState(state != State.INIT, +1000 "Unexpected state: " + state); +1001 +1002 if (state == State.BLOCK_READY) { +1003 return; +1004 } +1005 +1006 // This will set state to BLOCK_READY. +1007 finishBlock(); +1008 } +1009 +1010 /** +1011 * Finish up writing of the block. +1012 * Flushes the compressing stream (if using compression), fills out the header, +1013 * does any compression/encryption of bytes to flush out to disk, and manages +1014 * the cache on write content, if applicable. Sets block write state to "block ready". +1015 */ +1016 private void finishBlock() throws IOException { +1017 if (blockType == BlockType.DATA) { +1018 this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream, +1019 baosInMemory.getBuffer(), blockType); +1020 blockType = dataBlockEncodingCtx.getBlockType(); +1021 } +1022 userDataStream.flush(); +1023 prevOffset = prevOffsetByType[blockType.getId()]; +1024 +1025 // We need to set state before we can package the block up for cache-on-write. In a way, the +1026 // block is ready, but not yet encoded or compressed. +1027 state = State.BLOCK_READY; +1028 Bytes compressAndEncryptDat; +1029 if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) { +1030 compressAndEncryptDat = dataBlockEncodingCtx. +1031 compressAndEncrypt(baosInMemory.getBuffer(), 0, baosInMemory.size()); +1032 } else { +1033 compressAndEncryptDat = defaultBlockEncodingCtx. +1034 compressAndEncrypt(baosInMemory.getBuffer(), 0, baosInMemory.size()); +1035 } +1036 if (compressAndEncryptDat == null) { +1037 compressAndEncryptDat = new Bytes(baosInMemory.getBuffer(), 0, baosInMemory.size()); +1038 } +1039 if (onDiskBlockBytesWithHeader == null) { +1040 onDiskBlockBytesWithHeader = new ByteArrayOutputStream(compressAndEncryptDat.getLength()); +1041 } +1042 onDiskBlockBytesWithHeader.reset(); +1043 onDiskBlockBytesWithHeader.write(compressAndEncryptDat.get(), +1044 compressAndEncryptDat.getOffset(), compressAndEncryptDat.getLength()); +1045 // Calculate how many bytes we need for checksum on the tail of the block. +1046 int numBytes = (int) ChecksumUtil.numBytes( +1047 onDiskBlockBytesWithHeader.size(), +1048 fileContext.getBytesPerChecksum()); +1049 +1050 // Put the header for the on disk bytes; header currently is unfilled-out +1051 putHeader(onDiskBlockBytesWithHeader, +1052 onDiskBlockBytesWithHeader.size() + numBytes, +1053 baosInMemory.size(), onDiskBlockBytesWithHeader.size()); +1054 if (onDiskChecksum.length != numBytes) { +1055 onDiskChecksum = new byte[numBytes]; +1056 } +1057 ChecksumUtil.generateChecksums( +1058 onDiskBlockBytesWithHeader.getBuffer(), 0,onDiskBlockBytesWithHeader.size(), +1059 onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum()); +1060 } +1061 +1062 /** +1063 * Put the header into the given byte array at the given offset. +1064 * @param onDiskSize size of the block on disk header + data + checksum +1065 * @param uncompressedSize size of the block after decompression (but +1066 * before optional data block decoding) including header +1067 * @param onDiskDataSize size of the block on disk with header +1068 * and data but not including the checksums +1069 */ +1070 private void putHeader(byte[] dest, int offset, int onDiskSize, +1071 int uncompressedSize, int onDiskDataSize) { +1072 offset = blockType.put(dest, offset); +1073 offset = Bytes.putInt(dest, offset, onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE); +1074 offset = Bytes.putInt(dest, offset, uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE); +1075 offset = Bytes.putLong(dest, offset, prevOffset); +1076 offset = Bytes.putByte(dest, offset, fileContext.getChecksumType().getCode()); +1077 offset = Bytes.putInt(dest, offset, fileContext.getBytesPerChecksum()); +1078 Bytes.putInt(dest, offset, onDiskDataSize); +1079 } +1080 +1081 private void putHeader(ByteArrayOutputStream dest, int onDiskSize, +1082 int uncompressedSize, int onDiskDataSize) { +1083 putHeader(dest.getBuffer(),0, onDiskSize, uncompressedSize, onDiskDataSize); +1084 } +1085 +1086 /** +1087 * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records +1088 * the offset of this block so that it can be referenced in the next block +1089 * of the same type. +1090 * +1091 * @param out +1092 * @throws IOException +1093 */ +1094 void writeHeaderAndData(FSDataOutputStream out) throws IOException { +1095 long offset = out.getPos(); +1096 if (startOffset != UNSET && offset != startOffset) { +1097 throw new IOException("A " + blockType + " block written to a " +1098 + "stream twice, first at offset " + startOffset + ", then at " +1099 + offset); +1100 } +1101 startOffset = offset; +1102 +1103 finishBlockAndWriteHeaderAndData((DataOutputStream) out); +1104 } +1105 +1106 /** +1107 * Writes the header and the compressed data of this block (or uncompressed +1108 * data when not using compression) into the given stream. Can be called in +1109 * the "writing" state or in the "block ready" state. If called in the +1110 * "writing" state, transitions the writer to the "block ready" state. +1111 * +1112 * @param out the output stream to write the +1113 * @throws IOException +1114 */ +1115 protected void finishBlockAndWriteHeaderAndData(DataOutputStream out) +1116 throws IOException { +1117 ensureBlockReady(); +1118 out.write(onDiskBlockBytesWithHeader.getBuffer(), 0, onDiskBlockBytesWithHeader.size()); +1119 out.write(onDiskChecksum); +1120 } +1121 +1122 /** +1123 * Returns the header or the compressed data (or uncompressed data when not +1124 * using compression) as a byte array. Can be called in the "writing" state +1125 * or in the "block ready" state. If called in the "writing" state, +1126 * transitions the writer to the "block ready" state. This returns +1127 * the header + data + checksums stored on disk. +1128 * +1129 * @return header and data as they would be stored on disk in a byte array +1130 * @throws IOException +1131 */ +1132 byte[] getHeaderAndDataForTest() throws IOException { +1133 ensureBlockReady(); +1134 // This is not very optimal, because we are doing an extra copy. +1135 // But this method is used only by unit tests. +1136 byte[] output = +1137 new byte[onDiskBlockBytesWithHeader.size() +1138 + onDiskChecksum.length]; +1139 System.arraycopy(onDiskBlockBytesWithHeader.getBuffer(), 0, output, 0, +1140 onDiskBlockBytesWithHeader.size()); +1141 System.arraycopy(onDiskChecksum, 0, output, +1142 onDiskBlockBytesWithHeader.size(), onDiskChecksum.length); +1143 return output; +1144 } +1145 +1146 /** +1147 * Releases resources used by this writer. +1148 */ +1149 void release() { +1150 if (dataBlockEncodingCtx != null) { +1151 dataBlockEncodingCtx.close(); +1152 dataBlockEncodingCtx = null; +1153 } +1154 if (defaultBlockEncodingCtx != null) { +1155 defaultBlockEncodingCtx.close(); +1156 defaultBlockEncodingCtx = null; +1157 } +1158 } +1159 +1160 /** +1161 * Returns the on-disk size of the data portion of the block. This is the +1162 * compressed size if compression is enabled. Can only be called in the +1163 * "block ready" state. Header is not compressed, and its size is not +1164 * included in the return value. +1165 * +1166 * @return the on-disk size of the block, not including the header. +1167 */ +1168 int getOnDiskSizeWithoutHeader() { +1169 expectState(State.BLOCK_READY); +1170 return onDiskBlockBytesWithHeader.size() + +1171 onDiskChecksum.length - HConstants.HFILEBLOCK_HEADER_SIZE; +1172 } +1173 +1174 /** +1175 * Returns the on-disk size of the block. Can only be called in the +1176 * "block ready" state. +1177 * +1178 * @return the on-disk size of the block ready to be written, including the +1179 * header size, the data and the checksum data. +1180 */ +1181 int getOnDiskSizeWithHeader() { +1182 expectState(State.BLOCK_READY); +1183 return onDiskBlockBytesWithHeader.size() + onDiskChecksum.length; +1184 } +1185 +1186 /** +1187 * The uncompressed size of the block data. Does not include header size. +1188 */ +1189 int getUncompressedSizeWithoutHeader() { +1190 expectState(State.BLOCK_READY); +1191 return baosInMemory.size() - HConstants.HFILEBLOCK_HEADER_SIZE; +1192 } +1193 +1194 /** +1195 * The uncompressed size of the block data, including header size. +1196 */ +1197 int getUncompressedSizeWithHeader() { +1198 expectState(State.BLOCK_READY); +1199 return baosInMemory.size(); +1200 } +1201 +1202 /** @return true if a block is being written */ +1203 boolean isWriting() { +1204 return state == State.WRITING; +1205 } +1206 +1207 /** +1208 * Returns the number of bytes written into the current block so far, or +1209 * zero if not writing the block at the moment. Note that this will return +1210 * zero in the "block ready" state as well. +1211 * +1212 * @return the number of bytes written +1213 */ +1214 public int encodedBlockSizeWritten() { +1215 if (state != State.WRITING) +1216 return 0; +1217 return this.encodedDataSizeWritten; +1218 } +1219 +1220 /** +1221 * Returns the number of bytes written into the current block so far, or +1222 * zero if not writing the block at the moment. Note that this will return +1223 * zero in the "block ready" state as well. +1224 * +1225 * @return the number of bytes written +1226 */ +1227 int blockSizeWritten() { +1228 if (state != State.WRITING) return 0; +1229 return this.unencodedDataSizeWritten; +1230 } +1231 +1232 /** +1233 * Clones the header followed by the uncompressed data, even if using +1234 * compression. This is needed for storing uncompressed blocks in the block +1235 * cache. Can be called in the "writing" state or the "block ready" state. +1236 * Returns only the header and data, does not include checksum data. +1237 * +1238 * @return Returns a copy of uncompressed block bytes for caching on write +1239 */ +1240 @VisibleForTesting +1241 ByteBuffer cloneUncompressedBufferWithHeader() { 1242 expectState(State.BLOCK_READY); -1243 return ByteBuffer.wrap(onDiskBlockBytesWithHeader.toByteArray()); -1244 } -1245 -1246 private void expectState(State expectedState) { -1247 if (state != expectedState) { -1248 throw new IllegalStateException("Expected state: " + expectedState + -1249 ", actual state: " + state); -1250 } +1243 byte[] uncompressedBlockBytesWithHeader = baosInMemory.toByteArray(); +1244 int numBytes = (int) ChecksumUtil.numBytes( +1245 onDiskBlockBytesWithHeader.size(), +1246 fileContext.getBytesPerChecksum()); +1247 putHeader(uncompressedBlockBytesWithHeader, 0, +1248 onDiskBlockBytesWithHeader.size() + numBytes, +1249 baosInMemory.size(), onDiskBlockBytesWithHeader.size()); +1250 return ByteBuffer.wrap(uncompressedBlockBytesWithHeader); 1251 } 1252 1253 /** -1254 * Takes the given {@link BlockWritable} instance, creates a new block of -1255 * its appropriate type, writes the writable into this block, and flushes -1256 * the block into the output stream. The writer is instructed not to buffer -1257 * uncompressed bytes for cache-on-write. +1254 * Clones the header followed by the on-disk (compressed/encoded/encrypted) data. This is +1255 * needed for storing packed blocks in the block cache. Expects calling semantics identical to +1256 * {@link #getUncompressedBufferWithHeader()}. Returns only the header and data, +1257 * Does not include checksum data. 1258 * -1259 * @param bw the block-writable object to write as a block -1260 * @param out the file system output stream -1261 * @throws IOException -1262 */ -1263 void writeBlock(BlockWritable bw, FSDataOutputStream out) -1264 throws IOException { -1265 bw.writeToBlock(startWriting(bw.getBlockType())); -1266 writeHeaderAndData(out); -1267 } -1268 -1269 /** -1270 * Creates a new HFileBlock. Checksums have already been validated, so -1271 * the byte buffer passed into the constructor of this newly created -1272 * block does not have checksum data even though the header minor -1273 * version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a -1274 * 0 value in bytesPerChecksum. This method copies the on-disk or -1275 * uncompressed data to build the HFileBlock which is used only -1276 * while writing blocks and caching. -1277 * -1278 * <p>TODO: Should there be an option where a cache can ask that hbase preserve block -1279 * checksums for checking after a block comes out of the cache? Otehrwise, cache is responsible -1280 * for blocks being wholesome (ECC memory or if file-backed, it does checksumming). -1281 */ -1282 HFileBlock getBlockForCaching(CacheConfig cacheConf) { -1283 HFileContext newContext = new HFileContextBuilder() -1284 .withBlockSize(fileContext.getBlocksize()) -1285 .withBytesPerCheckSum(0) -1286 .withChecksumType(ChecksumType.NULL) // no checksums in cached data -1287 .withCompression(fileContext.getCompression()) -1288 .withDataBlockEncoding(fileContext.getDataBlockEncoding()) -1289 .withHBaseCheckSum(fileContext.isUseHBaseChecksum()) -1290 .withCompressTags(fileContext.isCompressTags()) -1291 .withIncludesMvcc(fileContext.isIncludesMvcc()) -1292 .withIncludesTags(fileContext.isIncludesTags()) -1293 .build(); -1294 return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(), -1295 getUncompressedSizeWithoutHeader(), prevOffset, -1296 cacheConf.shouldCacheCompressed(blockType.getCategory())? -1297 cloneOnDiskBufferWithHeader() : -1298 cloneUncompressedBufferWithHeader(), -1299 FILL_HEADER, startOffset, UNSET, -1300 onDiskBlockBytesWithHeader.size() + onDiskChecksum.length, newContext); -1301 } -1302 } -1303 -1304 /** Something that can be written into a block. */ -1305 interface BlockWritable { -1306 -1307 /** The type of block this data should use. */ -1308 BlockType getBlockType(); -1309 -1310 /** -1311 * Writes the block to the provided stream. Must not write any magic -1312 * records. -1313 * -1314 * @param out a stream to write uncompressed data into -1315 */ -1316 void writeToBlock(DataOutput out) throws IOException; -1317 } -1318 -1319 // Block readers and writers -1320 -1321 /** An interface allowing to iterate {@link HFileBlock}s. */ -1322 interface BlockIterator { +1259 * @return Returns a copy of block bytes for caching on write +1260 */ +1261 private ByteBuffer cloneOnDiskBufferWithHeader() { +1262 expectState(State.BLOCK_READY); +1263 return ByteBuffer.wrap(onDiskBlockBytesWithHeader.toByteArray()); +1264 } +1265 +1266 private void expectState(State expectedState) { +1267 if (state != expectedState) { +1268 throw new IllegalStateException("Expected state: " + expectedState + +1269 ", actual state: " + state); +1270 } +1271 } +1272 +1273 /** +1274 * Takes the given {@link BlockWritable} instance, creates a new block of +1275 * its appropriate type, writes the writable into this block, and flushes +1276 * the block into the output stream. The writer is instructed not to buffer +1277 * uncompressed bytes for cache-on-write. +1278 * +1279 * @param bw the block-writable object to write as a block +1280 * @param out the file system output stream +1281 * @throws IOException +1282 */ +1283 void writeBlock(BlockWritable bw, FSDataOutputStream out) +1284 throws IOException { +1285 bw.writeToBlock(startWriting(bw.getBlockType())); +1286 writeHeaderAndData(out); +