Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 827C210515 for ; Fri, 11 Mar 2016 16:59:39 +0000 (UTC) Received: (qmail 76290 invoked by uid 500); 11 Mar 2016 16:59:36 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 76146 invoked by uid 500); 11 Mar 2016 16:59:36 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 75010 invoked by uid 99); 11 Mar 2016 16:59:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Mar 2016 16:59:36 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E93FDE054A; Fri, 11 Mar 2016 16:59:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: misty@apache.org To: commits@hbase.apache.org Date: Fri, 11 Mar 2016 16:59:49 -0000 Message-Id: <6fb156e1a1854744a855f329c7b1b1a2@git.apache.org> In-Reply-To: <732a255596bb4989af8ad4b197228002@git.apache.org> References: <732a255596bb4989af8ad4b197228002@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [15/51] [partial] hbase-site git commit: Published site at eea8b38dfa0180d3e6f93d3e8055d5d4fbf673c3. http://git-wip-us.apache.org/repos/asf/hbase-site/blob/0b785cb2/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.html index 2a37f48..2c2052b 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.html @@ -29,348 +29,355 @@ 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.OutputStream; -024 -025import org.apache.hadoop.conf.Configuration; -026import org.apache.hadoop.hbase.Cell; -027import org.apache.hadoop.hbase.HBaseInterfaceAudience; -028import org.apache.hadoop.hbase.KeyValue; -029import org.apache.hadoop.hbase.KeyValueUtil; -030import org.apache.hadoop.hbase.classification.InterfaceAudience; -031import org.apache.hadoop.hbase.codec.BaseDecoder; -032import org.apache.hadoop.hbase.codec.BaseEncoder; -033import org.apache.hadoop.hbase.codec.Codec; -034import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags; -035import org.apache.hadoop.hbase.io.util.Dictionary; -036import org.apache.hadoop.hbase.io.util.StreamUtils; -037import org.apache.hadoop.hbase.util.Bytes; -038import org.apache.hadoop.hbase.util.ReflectionUtils; -039import org.apache.hadoop.io.IOUtils; -040 -041import com.google.protobuf.ByteString; +024import java.nio.ByteBuffer; +025 +026import org.apache.hadoop.conf.Configuration; +027import org.apache.hadoop.hbase.Cell; +028import org.apache.hadoop.hbase.HBaseInterfaceAudience; +029import org.apache.hadoop.hbase.KeyValue; +030import org.apache.hadoop.hbase.KeyValueUtil; +031import org.apache.hadoop.hbase.classification.InterfaceAudience; +032import org.apache.hadoop.hbase.codec.BaseDecoder; +033import org.apache.hadoop.hbase.codec.BaseEncoder; +034import org.apache.hadoop.hbase.codec.Codec; +035import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags; +036import org.apache.hadoop.hbase.io.ByteBufferInputStream; +037import org.apache.hadoop.hbase.io.util.Dictionary; +038import org.apache.hadoop.hbase.io.util.StreamUtils; +039import org.apache.hadoop.hbase.util.Bytes; +040import org.apache.hadoop.hbase.util.ReflectionUtils; +041import org.apache.hadoop.io.IOUtils; 042 -043 -044/** -045 * Compression in this class is lifted off Compressor/KeyValueCompression. -046 * This is a pure coincidence... they are independent and don't have to be compatible. -047 * -048 * This codec is used at server side for writing cells to WAL as well as for sending edits -049 * as part of the distributed splitting process. -050 */ -051@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, -052 HBaseInterfaceAudience.PHOENIX, HBaseInterfaceAudience.CONFIG}) -053public class WALCellCodec implements Codec { -054 /** Configuration key for the class to use when encoding cells in the WAL */ -055 public static final String WAL_CELL_CODEC_CLASS_KEY = "hbase.regionserver.wal.codec"; -056 -057 protected final CompressionContext compression; -058 protected final ByteStringUncompressor statelessUncompressor = new ByteStringUncompressor() { -059 @Override -060 public byte[] uncompress(ByteString data, Dictionary dict) throws IOException { -061 return WALCellCodec.uncompressByteString(data, dict); -062 } -063 }; -064 -065 /** -066 * <b>All subclasses must implement a no argument constructor</b> -067 */ -068 public WALCellCodec() { -069 this.compression = null; -070 } -071 -072 /** -073 * Default constructor - <b>all subclasses must implement a constructor with this signature </b> -074 * if they are to be dynamically loaded from the {@link Configuration}. -075 * @param conf configuration to configure <tt>this</tt> -076 * @param compression compression the codec should support, can be <tt>null</tt> to indicate no -077 * compression -078 */ -079 public WALCellCodec(Configuration conf, CompressionContext compression) { -080 this.compression = compression; -081 } -082 -083 static String getWALCellCodecClass(Configuration conf) { -084 return conf.get(WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); -085 } -086 -087 /** -088 * Create and setup a {@link WALCellCodec} from the {@code cellCodecClsName} and -089 * CompressionContext, if {@code cellCodecClsName} is specified. -090 * Otherwise Cell Codec classname is read from {@link Configuration}. -091 * Fully prepares the codec for use. -092 * @param conf {@link Configuration} to read for the user-specified codec. If none is specified, -093 * uses a {@link WALCellCodec}. -094 * @param cellCodecClsName name of codec -095 * @param compression compression the codec should use -096 * @return a {@link WALCellCodec} ready for use. -097 * @throws UnsupportedOperationException if the codec cannot be instantiated -098 */ -099 -100 public static WALCellCodec create(Configuration conf, String cellCodecClsName, -101 CompressionContext compression) throws UnsupportedOperationException { -102 if (cellCodecClsName == null) { -103 cellCodecClsName = getWALCellCodecClass(conf); -104 } -105 return ReflectionUtils.instantiateWithCustomCtor(cellCodecClsName, new Class[] -106 { Configuration.class, CompressionContext.class }, new Object[] { conf, compression }); -107 } -108 -109 /** -110 * Create and setup a {@link WALCellCodec} from the -111 * CompressionContext. -112 * Cell Codec classname is read from {@link Configuration}. -113 * Fully prepares the codec for use. -114 * @param conf {@link Configuration} to read for the user-specified codec. If none is specified, -115 * uses a {@link WALCellCodec}. -116 * @param compression compression the codec should use -117 * @return a {@link WALCellCodec} ready for use. -118 * @throws UnsupportedOperationException if the codec cannot be instantiated -119 */ -120 public static WALCellCodec create(Configuration conf, -121 CompressionContext compression) throws UnsupportedOperationException { -122 String cellCodecClsName = getWALCellCodecClass(conf); -123 return ReflectionUtils.instantiateWithCustomCtor(cellCodecClsName, new Class[] -124 { Configuration.class, CompressionContext.class }, new Object[] { conf, compression }); -125 } -126 -127 public interface ByteStringCompressor { -128 ByteString compress(byte[] data, Dictionary dict) throws IOException; -129 } -130 -131 public interface ByteStringUncompressor { -132 byte[] uncompress(ByteString data, Dictionary dict) throws IOException; -133 } -134 -135 // TODO: it sucks that compression context is in WAL.Entry. It'd be nice if it was here. -136 // Dictionary could be gotten by enum; initially, based on enum, context would create -137 // an array of dictionaries. -138 static class BaosAndCompressor extends ByteArrayOutputStream implements ByteStringCompressor { -139 public ByteString toByteString() { -140 return ByteString.copyFrom(this.buf, 0, this.count); -141 } -142 -143 @Override -144 public ByteString compress(byte[] data, Dictionary dict) throws IOException { -145 writeCompressed(data, dict); -146 ByteString result = ByteString.copyFrom(this.buf, 0, this.count); -147 reset(); // Only resets the count - we reuse the byte array. -148 return result; -149 } -150 -151 private void writeCompressed(byte[] data, Dictionary dict) throws IOException { -152 assert dict != null; -153 short dictIdx = dict.findEntry(data, 0, data.length); -154 if (dictIdx == Dictionary.NOT_IN_DICTIONARY) { -155 write(Dictionary.NOT_IN_DICTIONARY); -156 StreamUtils.writeRawVInt32(this, data.length); -157 write(data, 0, data.length); -158 } else { -159 StreamUtils.writeShort(this, dictIdx); -160 } -161 } -162 } -163 -164 private static byte[] uncompressByteString(ByteString bs, Dictionary dict) throws IOException { -165 InputStream in = bs.newInput(); -166 byte status = (byte)in.read(); -167 if (status == Dictionary.NOT_IN_DICTIONARY) { -168 byte[] arr = new byte[StreamUtils.readRawVarint32(in)]; -169 int bytesRead = in.read(arr); -170 if (bytesRead != arr.length) { -171 throw new IOException("Cannot read; wanted " + arr.length + ", but got " + bytesRead); -172 } -173 if (dict != null) dict.addEntry(arr, 0, arr.length); -174 return arr; -175 } else { -176 // Status here is the higher-order byte of index of the dictionary entry. -177 short dictIdx = StreamUtils.toShort(status, (byte)in.read()); -178 byte[] entry = dict.getEntry(dictIdx); -179 if (entry == null) { -180 throw new IOException("Missing dictionary entry for index " + dictIdx); -181 } -182 return entry; -183 } -184 } -185 -186 static class CompressedKvEncoder extends BaseEncoder { -187 private final CompressionContext compression; -188 public CompressedKvEncoder(OutputStream out, CompressionContext compression) { -189 super(out); -190 this.compression = compression; -191 } -192 -193 @Override -194 public void write(Cell cell) throws IOException { -195 // We first write the KeyValue infrastructure as VInts. -196 StreamUtils.writeRawVInt32(out, KeyValueUtil.keyLength(cell)); -197 StreamUtils.writeRawVInt32(out, cell.getValueLength()); -198 // To support tags -199 int tagsLength = cell.getTagsLength(); -200 StreamUtils.writeRawVInt32(out, tagsLength); -201 -202 // Write row, qualifier, and family; use dictionary -203 // compression as they're likely to have duplicates. -204 write(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), compression.rowDict); -205 write(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), -206 compression.familyDict); -207 write(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), -208 compression.qualifierDict); -209 -210 // Write timestamp, type and value as uncompressed. -211 StreamUtils.writeLong(out, cell.getTimestamp()); -212 out.write(cell.getTypeByte()); -213 out.write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); -214 if (tagsLength > 0) { -215 if (compression.tagCompressionContext != null) { -216 // Write tags using Dictionary compression -217 compression.tagCompressionContext.compressTags(out, cell.getTagsArray(), -218 cell.getTagsOffset(), tagsLength); -219 } else { -220 // Tag compression is disabled within the WAL compression. Just write the tags bytes as -221 // it is. -222 out.write(cell.getTagsArray(), cell.getTagsOffset(), tagsLength); -223 } -224 } -225 } -226 -227 private void write(byte[] data, int offset, int length, Dictionary dict) throws IOException { -228 short dictIdx = Dictionary.NOT_IN_DICTIONARY; -229 if (dict != null) { -230 dictIdx = dict.findEntry(data, offset, length); -231 } -232 if (dictIdx == Dictionary.NOT_IN_DICTIONARY) { -233 out.write(Dictionary.NOT_IN_DICTIONARY); -234 StreamUtils.writeRawVInt32(out, length); -235 out.write(data, offset, length); -236 } else { -237 StreamUtils.writeShort(out, dictIdx); -238 } -239 } -240 } -241 -242 static class CompressedKvDecoder extends BaseDecoder { -243 private final CompressionContext compression; -244 public CompressedKvDecoder(InputStream in, CompressionContext compression) { -245 super(in); -246 this.compression = compression; -247 } -248 -249 @Override -250 protected Cell parseCell() throws IOException { -251 int keylength = StreamUtils.readRawVarint32(in); -252 int vlength = StreamUtils.readRawVarint32(in); -253 -254 int tagsLength = StreamUtils.readRawVarint32(in); -255 int length = 0; -256 if(tagsLength == 0) { -257 length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength; -258 } else { -259 length = KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE + keylength + vlength + tagsLength; -260 } -261 -262 byte[] backingArray = new byte[length]; -263 int pos = 0; -264 pos = Bytes.putInt(backingArray, pos, keylength); -265 pos = Bytes.putInt(backingArray, pos, vlength); -266 -267 // the row -268 int elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_SHORT, compression.rowDict); -269 checkLength(elemLen, Short.MAX_VALUE); -270 pos = Bytes.putShort(backingArray, pos, (short)elemLen); -271 pos += elemLen; -272 -273 // family -274 elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_BYTE, compression.familyDict); -275 checkLength(elemLen, Byte.MAX_VALUE); -276 pos = Bytes.putByte(backingArray, pos, (byte)elemLen); -277 pos += elemLen; -278 -279 // qualifier -280 elemLen = readIntoArray(backingArray, pos, compression.qualifierDict); -281 pos += elemLen; -282 -283 // timestamp, type and value -284 int tsTypeValLen = length - pos; -285 if (tagsLength > 0) { -286 tsTypeValLen = tsTypeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE; -287 } -288 IOUtils.readFully(in, backingArray, pos, tsTypeValLen); -289 pos += tsTypeValLen; -290 -291 // tags -292 if (tagsLength > 0) { -293 pos = Bytes.putAsShort(backingArray, pos, tagsLength); -294 if (compression.tagCompressionContext != null) { -295 compression.tagCompressionContext.uncompressTags(in, backingArray, pos, tagsLength); -296 } else { -297 IOUtils.readFully(in, backingArray, pos, tagsLength); -298 } -299 } -300 return new KeyValue(backingArray, 0, length); -301 } -302 -303 private int readIntoArray(byte[] to, int offset, Dictionary dict) throws IOException { -304 byte status = (byte)in.read(); -305 if (status == Dictionary.NOT_IN_DICTIONARY) { -306 // status byte indicating that data to be read is not in dictionary. -307 // if this isn't in the dictionary, we need to add to the dictionary. -308 int length = StreamUtils.readRawVarint32(in); -309 IOUtils.readFully(in, to, offset, length); -310 dict.addEntry(to, offset, length); -311 return length; -312 } else { -313 // the status byte also acts as the higher order byte of the dictionary entry. -314 short dictIdx = StreamUtils.toShort(status, (byte)in.read()); -315 byte[] entry = dict.getEntry(dictIdx); -316 if (entry == null) { -317 throw new IOException("Missing dictionary entry for index " + dictIdx); -318 } -319 // now we write the uncompressed value. -320 Bytes.putBytes(to, offset, entry, 0, entry.length); -321 return entry.length; -322 } -323 } -324 -325 private static void checkLength(int len, int max) throws IOException { -326 if (len < 0 || len > max) { -327 throw new IOException("Invalid length for compresesed portion of keyvalue: " + len); -328 } -329 } -330 } -331 -332 public static class EnsureKvEncoder extends BaseEncoder { -333 public EnsureKvEncoder(OutputStream out) { -334 super(out); -335 } -336 @Override -337 public void write(Cell cell) throws IOException { -338 checkFlushed(); -339 // Make sure to write tags into WAL -340 KeyValueUtil.oswrite(cell, this.out, true); -341 } -342 } -343 -344 @Override -345 public Decoder getDecoder(InputStream is) { -346 return (compression == null) -347 ? new KeyValueCodecWithTags.KeyValueDecoder(is) : new CompressedKvDecoder(is, compression); -348 } -349 -350 @Override -351 public Encoder getEncoder(OutputStream os) { -352 return (compression == null) -353 ? new EnsureKvEncoder(os) : new CompressedKvEncoder(os, compression); -354 } -355 -356 public ByteStringCompressor getByteStringCompressor() { -357 // TODO: ideally this should also encapsulate compressionContext -358 return new BaosAndCompressor(); -359 } -360 -361 public ByteStringUncompressor getByteStringUncompressor() { -362 // TODO: ideally this should also encapsulate compressionContext -363 return this.statelessUncompressor; -364 } -365} +043import com.google.protobuf.ByteString; +044 +045 +046/** +047 * Compression in this class is lifted off Compressor/KeyValueCompression. +048 * This is a pure coincidence... they are independent and don't have to be compatible. +049 * +050 * This codec is used at server side for writing cells to WAL as well as for sending edits +051 * as part of the distributed splitting process. +052 */ +053@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, +054 HBaseInterfaceAudience.PHOENIX, HBaseInterfaceAudience.CONFIG}) +055public class WALCellCodec implements Codec { +056 /** Configuration key for the class to use when encoding cells in the WAL */ +057 public static final String WAL_CELL_CODEC_CLASS_KEY = "hbase.regionserver.wal.codec"; +058 +059 protected final CompressionContext compression; +060 protected final ByteStringUncompressor statelessUncompressor = new ByteStringUncompressor() { +061 @Override +062 public byte[] uncompress(ByteString data, Dictionary dict) throws IOException { +063 return WALCellCodec.uncompressByteString(data, dict); +064 } +065 }; +066 +067 /** +068 * <b>All subclasses must implement a no argument constructor</b> +069 */ +070 public WALCellCodec() { +071 this.compression = null; +072 } +073 +074 /** +075 * Default constructor - <b>all subclasses must implement a constructor with this signature </b> +076 * if they are to be dynamically loaded from the {@link Configuration}. +077 * @param conf configuration to configure <tt>this</tt> +078 * @param compression compression the codec should support, can be <tt>null</tt> to indicate no +079 * compression +080 */ +081 public WALCellCodec(Configuration conf, CompressionContext compression) { +082 this.compression = compression; +083 } +084 +085 static String getWALCellCodecClass(Configuration conf) { +086 return conf.get(WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); +087 } +088 +089 /** +090 * Create and setup a {@link WALCellCodec} from the {@code cellCodecClsName} and +091 * CompressionContext, if {@code cellCodecClsName} is specified. +092 * Otherwise Cell Codec classname is read from {@link Configuration}. +093 * Fully prepares the codec for use. +094 * @param conf {@link Configuration} to read for the user-specified codec. If none is specified, +095 * uses a {@link WALCellCodec}. +096 * @param cellCodecClsName name of codec +097 * @param compression compression the codec should use +098 * @return a {@link WALCellCodec} ready for use. +099 * @throws UnsupportedOperationException if the codec cannot be instantiated +100 */ +101 +102 public static WALCellCodec create(Configuration conf, String cellCodecClsName, +103 CompressionContext compression) throws UnsupportedOperationException { +104 if (cellCodecClsName == null) { +105 cellCodecClsName = getWALCellCodecClass(conf); +106 } +107 return ReflectionUtils.instantiateWithCustomCtor(cellCodecClsName, new Class[] +108 { Configuration.class, CompressionContext.class }, new Object[] { conf, compression }); +109 } +110 +111 /** +112 * Create and setup a {@link WALCellCodec} from the +113 * CompressionContext. +114 * Cell Codec classname is read from {@link Configuration}. +115 * Fully prepares the codec for use. +116 * @param conf {@link Configuration} to read for the user-specified codec. If none is specified, +117 * uses a {@link WALCellCodec}. +118 * @param compression compression the codec should use +119 * @return a {@link WALCellCodec} ready for use. +120 * @throws UnsupportedOperationException if the codec cannot be instantiated +121 */ +122 public static WALCellCodec create(Configuration conf, +123 CompressionContext compression) throws UnsupportedOperationException { +124 String cellCodecClsName = getWALCellCodecClass(conf); +125 return ReflectionUtils.instantiateWithCustomCtor(cellCodecClsName, new Class[] +126 { Configuration.class, CompressionContext.class }, new Object[] { conf, compression }); +127 } +128 +129 public interface ByteStringCompressor { +130 ByteString compress(byte[] data, Dictionary dict) throws IOException; +131 } +132 +133 public interface ByteStringUncompressor { +134 byte[] uncompress(ByteString data, Dictionary dict) throws IOException; +135 } +136 +137 // TODO: it sucks that compression context is in WAL.Entry. It'd be nice if it was here. +138 // Dictionary could be gotten by enum; initially, based on enum, context would create +139 // an array of dictionaries. +140 static class BaosAndCompressor extends ByteArrayOutputStream implements ByteStringCompressor { +141 public ByteString toByteString() { +142 return ByteString.copyFrom(this.buf, 0, this.count); +143 } +144 +145 @Override +146 public ByteString compress(byte[] data, Dictionary dict) throws IOException { +147 writeCompressed(data, dict); +148 ByteString result = ByteString.copyFrom(this.buf, 0, this.count); +149 reset(); // Only resets the count - we reuse the byte array. +150 return result; +151 } +152 +153 private void writeCompressed(byte[] data, Dictionary dict) throws IOException { +154 assert dict != null; +155 short dictIdx = dict.findEntry(data, 0, data.length); +156 if (dictIdx == Dictionary.NOT_IN_DICTIONARY) { +157 write(Dictionary.NOT_IN_DICTIONARY); +158 StreamUtils.writeRawVInt32(this, data.length); +159 write(data, 0, data.length); +160 } else { +161 StreamUtils.writeShort(this, dictIdx); +162 } +163 } +164 } +165 +166 private static byte[] uncompressByteString(ByteString bs, Dictionary dict) throws IOException { +167 InputStream in = bs.newInput(); +168 byte status = (byte)in.read(); +169 if (status == Dictionary.NOT_IN_DICTIONARY) { +170 byte[] arr = new byte[StreamUtils.readRawVarint32(in)]; +171 int bytesRead = in.read(arr); +172 if (bytesRead != arr.length) { +173 throw new IOException("Cannot read; wanted " + arr.length + ", but got " + bytesRead); +174 } +175 if (dict != null) dict.addEntry(arr, 0, arr.length); +176 return arr; +177 } else { +178 // Status here is the higher-order byte of index of the dictionary entry. +179 short dictIdx = StreamUtils.toShort(status, (byte)in.read()); +180 byte[] entry = dict.getEntry(dictIdx); +181 if (entry == null) { +182 throw new IOException("Missing dictionary entry for index " + dictIdx); +183 } +184 return entry; +185 } +186 } +187 +188 static class CompressedKvEncoder extends BaseEncoder { +189 private final CompressionContext compression; +190 public CompressedKvEncoder(OutputStream out, CompressionContext compression) { +191 super(out); +192 this.compression = compression; +193 } +194 +195 @Override +196 public void write(Cell cell) throws IOException { +197 // We first write the KeyValue infrastructure as VInts. +198 StreamUtils.writeRawVInt32(out, KeyValueUtil.keyLength(cell)); +199 StreamUtils.writeRawVInt32(out, cell.getValueLength()); +200 // To support tags +201 int tagsLength = cell.getTagsLength(); +202 StreamUtils.writeRawVInt32(out, tagsLength); +203 +204 // Write row, qualifier, and family; use dictionary +205 // compression as they're likely to have duplicates. +206 write(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), compression.rowDict); +207 write(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), +208 compression.familyDict); +209 write(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), +210 compression.qualifierDict); +211 +212 // Write timestamp, type and value as uncompressed. +213 StreamUtils.writeLong(out, cell.getTimestamp()); +214 out.write(cell.getTypeByte()); +215 out.write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); +216 if (tagsLength > 0) { +217 if (compression.tagCompressionContext != null) { +218 // Write tags using Dictionary compression +219 compression.tagCompressionContext.compressTags(out, cell.getTagsArray(), +220 cell.getTagsOffset(), tagsLength); +221 } else { +222 // Tag compression is disabled within the WAL compression. Just write the tags bytes as +223 // it is. +224 out.write(cell.getTagsArray(), cell.getTagsOffset(), tagsLength); +225 } +226 } +227 } +228 +229 private void write(byte[] data, int offset, int length, Dictionary dict) throws IOException { +230 short dictIdx = Dictionary.NOT_IN_DICTIONARY; +231 if (dict != null) { +232 dictIdx = dict.findEntry(data, offset, length); +233 } +234 if (dictIdx == Dictionary.NOT_IN_DICTIONARY) { +235 out.write(Dictionary.NOT_IN_DICTIONARY); +236 StreamUtils.writeRawVInt32(out, length); +237 out.write(data, offset, length); +238 } else { +239 StreamUtils.writeShort(out, dictIdx); +240 } +241 } +242 } +243 +244 static class CompressedKvDecoder extends BaseDecoder { +245 private final CompressionContext compression; +246 public CompressedKvDecoder(InputStream in, CompressionContext compression) { +247 super(in); +248 this.compression = compression; +249 } +250 +251 @Override +252 protected Cell parseCell() throws IOException { +253 int keylength = StreamUtils.readRawVarint32(in); +254 int vlength = StreamUtils.readRawVarint32(in); +255 +256 int tagsLength = StreamUtils.readRawVarint32(in); +257 int length = 0; +258 if(tagsLength == 0) { +259 length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength; +260 } else { +261 length = KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE + keylength + vlength + tagsLength; +262 } +263 +264 byte[] backingArray = new byte[length]; +265 int pos = 0; +266 pos = Bytes.putInt(backingArray, pos, keylength); +267 pos = Bytes.putInt(backingArray, pos, vlength); +268 +269 // the row +270 int elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_SHORT, compression.rowDict); +271 checkLength(elemLen, Short.MAX_VALUE); +272 pos = Bytes.putShort(backingArray, pos, (short)elemLen); +273 pos += elemLen; +274 +275 // family +276 elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_BYTE, compression.familyDict); +277 checkLength(elemLen, Byte.MAX_VALUE); +278 pos = Bytes.putByte(backingArray, pos, (byte)elemLen); +279 pos += elemLen; +280 +281 // qualifier +282 elemLen = readIntoArray(backingArray, pos, compression.qualifierDict); +283 pos += elemLen; +284 +285 // timestamp, type and value +286 int tsTypeValLen = length - pos; +287 if (tagsLength > 0) { +288 tsTypeValLen = tsTypeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE; +289 } +290 IOUtils.readFully(in, backingArray, pos, tsTypeValLen); +291 pos += tsTypeValLen; +292 +293 // tags +294 if (tagsLength > 0) { +295 pos = Bytes.putAsShort(backingArray, pos, tagsLength); +296 if (compression.tagCompressionContext != null) { +297 compression.tagCompressionContext.uncompressTags(in, backingArray, pos, tagsLength); +298 } else { +299 IOUtils.readFully(in, backingArray, pos, tagsLength); +300 } +301 } +302 return new KeyValue(backingArray, 0, length); +303 } +304 +305 private int readIntoArray(byte[] to, int offset, Dictionary dict) throws IOException { +306 byte status = (byte)in.read(); +307 if (status == Dictionary.NOT_IN_DICTIONARY) { +308 // status byte indicating that data to be read is not in dictionary. +309 // if this isn't in the dictionary, we need to add to the dictionary. +310 int length = StreamUtils.readRawVarint32(in); +311 IOUtils.readFully(in, to, offset, length); +312 dict.addEntry(to, offset, length); +313 return length; +314 } else { +315 // the status byte also acts as the higher order byte of the dictionary entry. +316 short dictIdx = StreamUtils.toShort(status, (byte)in.read()); +317 byte[] entry = dict.getEntry(dictIdx); +318 if (entry == null) { +319 throw new IOException("Missing dictionary entry for index " + dictIdx); +320 } +321 // now we write the uncompressed value. +322 Bytes.putBytes(to, offset, entry, 0, entry.length); +323 return entry.length; +324 } +325 } +326 +327 private static void checkLength(int len, int max) throws IOException { +328 if (len < 0 || len > max) { +329 throw new IOException("Invalid length for compresesed portion of keyvalue: " + len); +330 } +331 } +332 } +333 +334 public static class EnsureKvEncoder extends BaseEncoder { +335 public EnsureKvEncoder(OutputStream out) { +336 super(out); +337 } +338 @Override +339 public void write(Cell cell) throws IOException { +340 checkFlushed(); +341 // Make sure to write tags into WAL +342 KeyValueUtil.oswrite(cell, this.out, true); +343 } +344 } +345 +346 @Override +347 public Decoder getDecoder(InputStream is) { +348 return (compression == null) +349 ? new KeyValueCodecWithTags.KeyValueDecoder(is) : new CompressedKvDecoder(is, compression); +350 } +351 +352 @Override +353 public Decoder getDecoder(ByteBuffer buf) { +354 return getDecoder(new ByteBufferInputStream(buf)); +355 } +356 +357 @Override +358 public Encoder getEncoder(OutputStream os) { +359 return (compression == null) +360 ? new EnsureKvEncoder(os) : new CompressedKvEncoder(os, compression); +361 } +362 +363 public ByteStringCompressor getByteStringCompressor() { +364 // TODO: ideally this should also encapsulate compressionContext +365 return new BaosAndCompressor(); +366 } +367 +368 public ByteStringUncompressor getByteStringUncompressor() { +369 // TODO: ideally this should also encapsulate compressionContext +370 return this.statelessUncompressor; +371 } +372}