hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From anoopsamj...@apache.org
Subject svn commit: r1537377 - in /hbase/trunk: hbase-client/src/main/java/org/apache/hadoop/hbase/ hbase-common/src/main/java/org/apache/hadoop/hbase/io/ hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/ hbase-common/src/main/java/org/apache/had...
Date Thu, 31 Oct 2013 04:59:01 GMT
Author: anoopsamjohn
Date: Thu Oct 31 04:59:00 2013
New Revision: 1537377

URL: http://svn.apache.org/r1537377
Log:
HBASE-9045 Support Dictionary based Tag compression in HFiles

Added:
    hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java
Modified:
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV3.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV3.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java?rev=1537377&r1=1537376&r2=1537377&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
(original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
Thu Oct 31 04:59:00 2013
@@ -94,6 +94,7 @@ public class HColumnDescriptor implement
   public static final String REPLICATION_SCOPE = "REPLICATION_SCOPE";
   public static final String MIN_VERSIONS = "MIN_VERSIONS";
   public static final String KEEP_DELETED_CELLS = "KEEP_DELETED_CELLS";
+  public static final String COMPRESS_TAGS = "COMPRESS_TAGS";
 
   /**
    * Default compression type.
@@ -187,6 +188,11 @@ public class HColumnDescriptor implement
    */
   public static final boolean DEFAULT_EVICT_BLOCKS_ON_CLOSE = false;
 
+  /**
+   * Default compress tags along with any type of DataBlockEncoding
+   */
+  public static final boolean DEFAULT_COMPRESS_TAGS = true;
+
   private final static Map<String, String> DEFAULT_VALUES
     = new HashMap<String, String>();
   private final static Set<ImmutableBytesWritable> RESERVED_KEYWORDS
@@ -675,6 +681,30 @@ public class HColumnDescriptor implement
   }
 
   /**
+   * Set whether the tags should be compressed along with DataBlockEncoding. When no
+   * DataBlockEncoding is been used, this is having no effect.
+   * 
+   * @param compressTags
+   * @return this (for chained invocation)
+   */
+  public HColumnDescriptor setCompressTags(boolean compressTags) {
+    return setValue(COMPRESS_TAGS, String.valueOf(compressTags));
+  }
+
+  /**
+   * @return Whether KV tags should be compressed along with DataBlockEncoding. When no
+   *         DataBlockEncoding is been used, this is having no effect.
+   */
+  public boolean shouldCompressTags() {
+    String compressTagsStr = getValue(COMPRESS_TAGS);
+    boolean compressTags = DEFAULT_COMPRESS_TAGS;
+    if (compressTagsStr != null) {
+      compressTags = Boolean.valueOf(compressTagsStr);
+    }
+    return compressTags;
+  }
+
+  /**
    * @return Compression type setting.
    */
   public Compression.Algorithm getCompactionCompressionType() {

Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java?rev=1537377&r1=1537376&r2=1537377&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
(original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
Thu Oct 31 04:59:00 2013
@@ -23,17 +23,19 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
+import java.nio.ByteBuffer;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.io.util.Dictionary;
 import org.apache.hadoop.hbase.io.util.StreamUtils;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.IOUtils;
 
 /**
  * Context that holds the dictionary for Tag compression and doing the compress/uncompress.
This
- * will be used for compressing tags while writing into WALs.
+ * will be used for compressing tags while writing into HFiles and WALs.
  */
 @InterfaceAudience.Private
 public class TagCompressionContext {
@@ -52,7 +54,7 @@ public class TagCompressionContext {
   }
 
   /**
-   * Compress tags one by one and writes the OutputStream. 
+   * Compress tags one by one and writes to the OutputStream.
    * @param out Stream to which the compressed tags to be written
    * @param in Source where tags are available
    * @param offset Offset for the tags bytes
@@ -73,6 +75,24 @@ public class TagCompressionContext {
   }
 
   /**
+   * Compress tags one by one and writes to the OutputStream.
+   * @param out Stream to which the compressed tags to be written
+   * @param in Source buffer where tags are available
+   * @param length Length of all tag bytes
+   * @throws IOException
+   */
+  public void compressTags(OutputStream out, ByteBuffer in, short length) throws IOException
{
+    if (in.hasArray()) {
+      compressTags(out, in.array(), in.arrayOffset() + in.position(), length);
+      ByteBufferUtils.skip(in, length);
+    } else {
+      byte[] tagBuf = new byte[length];
+      in.get(tagBuf);
+      compressTags(out, tagBuf, 0, length);
+    }
+  }
+
+  /**
    * Uncompress tags from the InputStream and writes to the destination array.
    * @param src Stream where the compressed tags are available
    * @param dest Destination array where to write the uncompressed tags
@@ -105,6 +125,58 @@ public class TagCompressionContext {
     }
   }
 
+  /**
+   * Uncompress tags from the input ByteBuffer and writes to the destination array.
+   * @param src Buffer where the compressed tags are available
+   * @param dest Destination array where to write the uncompressed tags
+   * @param offset Offset in destination where tags to be written
+   * @param length Length of all tag bytes
+   * @throws IOException
+   */
+  public void uncompressTags(ByteBuffer src, byte[] dest, int offset, int length)
+      throws IOException {
+    int endOffset = offset + length;
+    while (offset < endOffset) {
+      byte status = src.get();
+      short tagLen;
+      if (status == Dictionary.NOT_IN_DICTIONARY) {
+        // We are writing short as tagLen. So can downcast this without any risk.
+        tagLen = (short) StreamUtils.readRawVarint32(src);
+        offset = Bytes.putShort(dest, offset, tagLen);
+        src.get(dest, offset, tagLen);
+        tagDict.addEntry(dest, offset, tagLen);
+        offset += tagLen;
+      } else {
+        short dictIdx = StreamUtils.toShort(status, src.get());
+        byte[] entry = tagDict.getEntry(dictIdx);
+        if (entry == null) {
+          throw new IOException("Missing dictionary entry for index " + dictIdx);
+        }
+        tagLen = (short) entry.length;
+        offset = Bytes.putShort(dest, offset, tagLen);
+        System.arraycopy(entry, 0, dest, offset, tagLen);
+        offset += tagLen;
+      }
+    }
+  }
+
+  /**
+   * Uncompress tags from the InputStream and writes to the destination buffer.
+   * @param src Stream where the compressed tags are available
+   * @param dest Destination buffer where to write the uncompressed tags
+   * @param length Length of all tag bytes
+   * @throws IOException
+   */
+  public void uncompressTags(InputStream src, ByteBuffer dest, short length) throws IOException
{
+    if (dest.hasArray()) {
+      uncompressTags(src, dest.array(), dest.arrayOffset() + dest.position(), length);
+    } else {
+      byte[] tagBuf = new byte[length];
+      uncompressTags(src, tagBuf, 0, length);
+      dest.put(tagBuf);
+    }
+  }
+
   private void write(byte[] data, int offset, short length, OutputStream out) throws IOException
{
     short dictIdx = Dictionary.NOT_IN_DICTIONARY;
     if (tagDict != null) {

Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java?rev=1537377&r1=1537376&r2=1537377&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
(original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
Thu Oct 31 04:59:00 2013
@@ -26,8 +26,10 @@ import org.apache.hadoop.hbase.HConstant
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.KVComparator;
 import org.apache.hadoop.hbase.KeyValue.SamePrefixComparator;
+import org.apache.hadoop.hbase.io.TagCompressionContext;
 import org.apache.hadoop.hbase.io.hfile.BlockType;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.util.LRUDictionary;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.WritableUtils;
@@ -50,6 +52,14 @@ abstract class BufferedDataBlockEncoder 
 
     HFileBlockDefaultDecodingContext decodingCtx =
         (HFileBlockDefaultDecodingContext) blkDecodingCtx;
+    if (decodingCtx.getHFileContext().shouldCompressTags()) {
+      try {
+        TagCompressionContext tagCompressionContext = new TagCompressionContext(LRUDictionary.class);
+        decodingCtx.setTagCompressionContext(tagCompressionContext);
+      } catch (Exception e) {
+        throw new IOException("Failed to initialize TagCompressionContext", e);
+      }
+    }
     return internalDecodeKeyValues(source, 0, 0, decodingCtx);
   }
 
@@ -58,11 +68,12 @@ abstract class BufferedDataBlockEncoder 
     protected int keyLength;
     protected int valueLength;
     protected int lastCommonPrefix;
-    protected int tagLength = 0;
-    protected int tagOffset = -1;
+    protected int tagsLength = 0;
+    protected int tagsOffset = -1;
 
     /** We need to store a copy of the key. */
     protected byte[] keyBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
+    protected byte[] tagsBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
 
     protected long memstoreTS;
     protected int nextKvOffset;
@@ -88,6 +99,19 @@ abstract class BufferedDataBlockEncoder 
       }
     }
 
+    protected void ensureSpaceForTags() {
+      if (tagsLength > tagsBuffer.length) {
+        // rare case, but we need to handle arbitrary length of tags
+        int newTagsBufferLength = Math.max(tagsBuffer.length, 1) * 2;
+        while (tagsLength > newTagsBufferLength) {
+          newTagsBufferLength *= 2;
+        }
+        byte[] newTagsBuffer = new byte[newTagsBufferLength];
+        System.arraycopy(tagsBuffer, 0, newTagsBuffer, 0, tagsBuffer.length);
+        tagsBuffer = newTagsBuffer;
+      }
+    }
+
     /**
      * Copy the state from the next one into this instance (the previous state
      * placeholder). Used to save the previous state when we are advancing the
@@ -127,6 +151,7 @@ abstract class BufferedDataBlockEncoder 
     protected ByteBuffer currentBuffer;
     protected STATE current = createSeekerState(); // always valid
     protected STATE previous = createSeekerState(); // may not be valid
+    protected TagCompressionContext tagCompressionContext = null;
 
     public BufferedEncodedSeeker(KVComparator comparator,
         HFileBlockDecodingContext decodingCtx) {
@@ -137,6 +162,13 @@ abstract class BufferedDataBlockEncoder 
         this.samePrefixComparator = null;
       }
       this.decodingCtx = decodingCtx;
+      if (decodingCtx.getHFileContext().shouldCompressTags()) {
+        try {
+          tagCompressionContext = new TagCompressionContext(LRUDictionary.class);
+        } catch (Exception e) {
+          throw new RuntimeException("Failed to initialize TagCompressionContext", e);
+        }
+      }
     }
     
     protected boolean includesMvcc() {
@@ -183,17 +215,25 @@ abstract class BufferedDataBlockEncoder 
       kvBuffer.put(currentBuffer.array(),
           currentBuffer.arrayOffset() + current.valueOffset,
           current.valueLength);
-      if (current.tagLength > 0) {
-        kvBuffer.putShort((short) current.tagLength);
-        kvBuffer.put(currentBuffer.array(), currentBuffer.arrayOffset() + current.tagOffset,
-            current.tagLength);
+      if (current.tagsLength > 0) {
+        kvBuffer.putShort((short) current.tagsLength);
+        if (current.tagsOffset != -1) {
+          // the offset of the tags bytes in the underlying buffer is marked. So the temp
+          // buffer,tagsBuffer was not been used.
+          kvBuffer.put(currentBuffer.array(), currentBuffer.arrayOffset() + current.tagsOffset,
+              current.tagsLength);
+        } else {
+          // When tagsOffset is marked as -1, tag compression was present and so the tags
were
+          // uncompressed into temp buffer, tagsBuffer. Let us copy it from there
+          kvBuffer.put(current.tagsBuffer, 0, current.tagsLength);
+        }
       }
       return kvBuffer;
     }
 
     protected ByteBuffer createKVBuffer() {
       int kvBufSize = (int) KeyValue.getKeyValueDataStructureSize(current.keyLength,
-          current.valueLength, current.tagLength);
+          current.valueLength, current.tagsLength);
       ByteBuffer kvBuffer = ByteBuffer.allocate(kvBufSize);
       return kvBuffer;
     }
@@ -225,9 +265,23 @@ abstract class BufferedDataBlockEncoder 
     }
 
     public void decodeTags() {
-      current.tagLength = ByteBufferUtils.readCompressedInt(currentBuffer);
-      current.tagOffset = currentBuffer.position();
-      ByteBufferUtils.skip(currentBuffer, current.tagLength);
+      current.tagsLength = ByteBufferUtils.readCompressedInt(currentBuffer);
+      if (tagCompressionContext != null) {
+        // Tag compression is been used. uncompress it into tagsBuffer
+        current.ensureSpaceForTags();
+        try {
+          tagCompressionContext.uncompressTags(currentBuffer, current.tagsBuffer, 0,
+              current.tagsLength);
+        } catch (IOException e) {
+          throw new RuntimeException("Exception while uncompressing tags", e);
+        }
+        current.tagsOffset = -1;
+      } else {
+        // When tag compress is not used, let us not do temp copying of tags bytes into tagsBuffer.
+        // Just mark the tags Offset so as to create the KV buffer later in getKeyValueBuffer()
+        current.tagsOffset = currentBuffer.position();
+        ByteBufferUtils.skip(currentBuffer, current.tagsLength);
+      }
     }
 
     @Override
@@ -320,9 +374,19 @@ abstract class BufferedDataBlockEncoder 
   protected final void afterEncodingKeyValue(ByteBuffer in,
       DataOutputStream out, HFileBlockDefaultEncodingContext encodingCtx) throws IOException
{
     if (encodingCtx.getHFileContext().shouldIncludeTags()) {
-      int tagsLength = in.getShort();
+      short tagsLength = in.getShort();
       ByteBufferUtils.putCompressedInt(out, tagsLength);
-      ByteBufferUtils.moveBufferToStream(out, in, tagsLength);
+      // There are some tags to be written
+      if (tagsLength > 0) {
+        TagCompressionContext tagCompressionContext = encodingCtx.getTagCompressionContext();
+        // When tag compression is enabled, tagCompressionContext will have a not null value.
Write
+        // the tags using Dictionary compression in such a case
+        if (tagCompressionContext != null) {
+          tagCompressionContext.compressTags(out, in, tagsLength);
+        } else {
+          ByteBufferUtils.moveBufferToStream(out, in, tagsLength);
+        }
+      }
     }
     if (encodingCtx.getHFileContext().shouldIncludeMvcc()) {
       // Copy memstore timestamp from the byte buffer to the output stream.
@@ -340,9 +404,18 @@ abstract class BufferedDataBlockEncoder 
   protected final void afterDecodingKeyValue(DataInputStream source,
       ByteBuffer dest, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
     if (decodingCtx.getHFileContext().shouldIncludeTags()) {
-      int tagsLength = ByteBufferUtils.readCompressedInt(source);
-      dest.putShort((short)tagsLength);
-      ByteBufferUtils.copyFromStreamToBuffer(dest, source, tagsLength);
+      short tagsLength = (short) ByteBufferUtils.readCompressedInt(source);
+      dest.putShort(tagsLength);
+      if (tagsLength > 0) {
+        TagCompressionContext tagCompressionContext = decodingCtx.getTagCompressionContext();
+        // When tag compression is been used in this file, tagCompressionContext will have
a not
+        // null value passed.
+        if (tagCompressionContext != null) {
+          tagCompressionContext.uncompressTags(source, dest, tagsLength);
+        } else {
+          ByteBufferUtils.copyFromStreamToBuffer(dest, source, tagsLength);
+        }
+      }
     }
     if (decodingCtx.getHFileContext().shouldIncludeMvcc()) {
       long memstoreTS = -1;
@@ -398,6 +471,14 @@ abstract class BufferedDataBlockEncoder 
     DataOutputStream dataOut =
         ((HFileBlockDefaultEncodingContext) encodingCtx)
         .getOutputStreamForEncoder();
+    if (encodingCtx.getHFileContext().shouldCompressTags()) {
+      try {
+        TagCompressionContext tagCompressionContext = new TagCompressionContext(LRUDictionary.class);
+        encodingCtx.setTagCompressionContext(tagCompressionContext);
+      } catch (Exception e) {
+        throw new IOException("Failed to initialize TagCompressionContext", e);
+      }
+    }
     internalEncodeKeyValues(dataOut, in, encodingCtx);
     if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) {
       encodingCtx.postEncoding(BlockType.ENCODED_DATA);

Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java?rev=1537377&r1=1537376&r2=1537377&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
(original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
Thu Oct 31 04:59:00 2013
@@ -67,8 +67,8 @@ public class CopyKeyDataBlockEncoder ext
         current.valueOffset = currentBuffer.position();
         ByteBufferUtils.skip(currentBuffer, current.valueLength);
         if (includesTags()) {
-          current.tagLength = currentBuffer.getShort();
-          ByteBufferUtils.skip(currentBuffer, current.tagLength);
+          current.tagsLength = currentBuffer.getShort();
+          ByteBufferUtils.skip(currentBuffer, current.tagsLength);
         }
         if (includesMvcc()) {
           current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);

Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java?rev=1537377&r1=1537376&r2=1537377&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java
(original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java
Thu Oct 31 04:59:00 2013
@@ -23,6 +23,7 @@ import java.io.InputStream;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.io.TagCompressionContext;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 
@@ -38,6 +39,7 @@ public class HFileBlockDefaultDecodingCo
     HFileBlockDecodingContext {
 
   private final HFileContext fileContext;
+  private TagCompressionContext tagCompressionContext;
   
   public HFileBlockDefaultDecodingContext(HFileContext fileContext) {
     this.fileContext = fileContext;
@@ -58,4 +60,12 @@ public class HFileBlockDefaultDecodingCo
   public HFileContext getHFileContext() {
     return this.fileContext;
   }
+
+  public TagCompressionContext getTagCompressionContext() {
+    return tagCompressionContext;
+  }
+
+  public void setTagCompressionContext(TagCompressionContext tagCompressionContext) {
+    this.tagCompressionContext = tagCompressionContext;
+  }
 }

Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java?rev=1537377&r1=1537376&r2=1537377&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java
(original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java
Thu Oct 31 04:59:00 2013
@@ -23,6 +23,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.io.TagCompressionContext;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.hfile.BlockType;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
@@ -62,6 +63,7 @@ public class HFileBlockDefaultEncodingCo
   private byte[] dummyHeader;
 
   private HFileContext fileContext;
+  private TagCompressionContext tagCompressionContext;
 
   /**
    * @param encoding encoding used
@@ -193,4 +195,12 @@ public class HFileBlockDefaultEncodingCo
   public HFileContext getHFileContext() {
     return this.fileContext;
   }
+
+  public TagCompressionContext getTagCompressionContext() {
+    return tagCompressionContext;
+  }
+
+  public void setTagCompressionContext(TagCompressionContext tagCompressionContext) {
+    this.tagCompressionContext = tagCompressionContext;
+  }
 }

Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java?rev=1537377&r1=1537376&r2=1537377&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java
(original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java
Thu Oct 31 04:59:00 2013
@@ -122,6 +122,10 @@ public class HFileContext implements Hea
     return compressTags;
   }
 
+  public void setCompressTags(boolean compressTags) {
+    this.compressTags = compressTags;
+  }
+
   public ChecksumType getChecksumType() {
     return checksumType;
   }

Added: hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java?rev=1537377&view=auto
==============================================================================
--- hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java
(added)
+++ hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java
Thu Oct 31 04:59:00 2013
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.io.util.LRUDictionary;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestTagCompressionContext {
+
+  private static final byte[] ROW = Bytes.toBytes("r1");
+  private static final byte[] CF = Bytes.toBytes("f");
+  private static final byte[] Q = Bytes.toBytes("q");
+  private static final byte[] V = Bytes.toBytes("v");
+
+  @Test
+  public void testCompressUncompressTags1() throws Exception {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    TagCompressionContext context = new TagCompressionContext(LRUDictionary.class);
+    KeyValue kv1 = createKVWithTags(2);
+    short tagsLength1 = kv1.getTagsLength();
+    ByteBuffer ib = ByteBuffer.wrap(kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1);
+    context.compressTags(baos, ib, tagsLength1);
+    KeyValue kv2 = createKVWithTags(3);
+    short tagsLength2 = kv2.getTagsLength();
+    ib = ByteBuffer.wrap(kv2.getTagsArray(), kv2.getTagsOffset(), tagsLength2);
+    context.compressTags(baos, ib, tagsLength2);
+
+    context.clear();
+
+    byte[] dest = new byte[tagsLength1];
+    ByteBuffer ob = ByteBuffer.wrap(baos.toByteArray());
+    context.uncompressTags(ob, dest, 0, tagsLength1);
+    assertTrue(Bytes.equals(kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1, dest, 0,
+        tagsLength1));
+    dest = new byte[tagsLength2];
+    context.uncompressTags(ob, dest, 0, tagsLength2);
+    assertTrue(Bytes.equals(kv2.getTagsArray(), kv2.getTagsOffset(), tagsLength2, dest, 0,
+        tagsLength2));
+  }
+
+  @Test
+  public void testCompressUncompressTags2() throws Exception {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    TagCompressionContext context = new TagCompressionContext(LRUDictionary.class);
+    KeyValue kv1 = createKVWithTags(1);
+    short tagsLength1 = kv1.getTagsLength();
+    context.compressTags(baos, kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1);
+    KeyValue kv2 = createKVWithTags(3);
+    short tagsLength2 = kv2.getTagsLength();
+    context.compressTags(baos, kv2.getTagsArray(), kv2.getTagsOffset(), tagsLength2);
+
+    context.clear();
+
+    ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+    byte[] dest = new byte[tagsLength1];
+    context.uncompressTags(bais, dest, 0, tagsLength1);
+    assertTrue(Bytes.equals(kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1, dest, 0,
+        tagsLength1));
+    dest = new byte[tagsLength2];
+    context.uncompressTags(bais, dest, 0, tagsLength2);
+    assertTrue(Bytes.equals(kv2.getTagsArray(), kv2.getTagsOffset(), tagsLength2, dest, 0,
+        tagsLength2));
+  }
+
+  private KeyValue createKVWithTags(int noOfTags) {
+    List<Tag> tags = new ArrayList<Tag>();
+    for (int i = 0; i < noOfTags; i++) {
+      tags.add(new Tag((byte) i, "tagValue" + i));
+    }
+    KeyValue kv = new KeyValue(ROW, CF, Q, 1234L, V, tags);
+    return kv;
+  }
+}

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java?rev=1537377&r1=1537376&r2=1537377&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java Thu
Oct 31 04:59:00 2013
@@ -620,6 +620,7 @@ public class HFile {
     static final byte [] AVG_KEY_LEN = Bytes.toBytes(RESERVED_PREFIX + "AVG_KEY_LEN");
     static final byte [] AVG_VALUE_LEN = Bytes.toBytes(RESERVED_PREFIX + "AVG_VALUE_LEN");
     static final byte [] COMPARATOR = Bytes.toBytes(RESERVED_PREFIX + "COMPARATOR");
+    static final byte [] TAGS_COMPRESSED = Bytes.toBytes(RESERVED_PREFIX + "TAGS_COMPRESSED");
     public static final byte [] MAX_TAGS_LEN = Bytes.toBytes(RESERVED_PREFIX + "MAX_TAGS_LEN");
     private final SortedMap<byte [], byte []> map = new TreeMap<byte [], byte []>(Bytes.BYTES_COMPARATOR);
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV3.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV3.java?rev=1537377&r1=1537376&r2=1537377&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV3.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV3.java
Thu Oct 31 04:59:00 2013
@@ -64,6 +64,10 @@ public class HFileReaderV3 extends HFile
     // max tag length is not present in the HFile means tags were not at all written to file.
     if (tmp != null) {
       hfileContext.setIncludesTags(true);
+      tmp = fileInfo.get(FileInfo.TAGS_COMPRESSED);
+      if (tmp != null && Bytes.toBoolean(tmp)) {
+        hfileContext.setCompressTags(true);
+      }
     }
   }
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV3.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV3.java?rev=1537377&r1=1537376&r2=1537377&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV3.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV3.java
Thu Oct 31 04:59:00 2013
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
 import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -178,6 +179,9 @@ public class HFileWriterV3 extends HFile
       // When tags are not being written in this file, MAX_TAGS_LEN is excluded
       // from the FileInfo
       fileInfo.append(FileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false);
+      boolean tagsCompressed = (hFileContext.getEncodingOnDisk() != DataBlockEncoding.NONE)
+          && hFileContext.shouldCompressTags();
+      fileInfo.append(FileInfo.TAGS_COMPRESSED, Bytes.toBytes(tagsCompressed), false);
     }
   }
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1537377&r1=1537376&r2=1537377&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
Thu Oct 31 04:59:00 2013
@@ -828,6 +828,7 @@ public class HStore implements Store {
     HFileContext hFileContext = new HFileContextBuilder()
                                 .withIncludesMvcc(includeMVCCReadpoint)
                                 .withIncludesTags(includesTag)
+                                .withCompressTags(family.shouldCompressTags())
                                 .withCompressionAlgo(compression)
                                 .withChecksumType(checksumType)
                                 .withBytesPerCheckSum(bytesPerChecksum)

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java?rev=1537377&r1=1537376&r2=1537377&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java
(original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java
Thu Oct 31 04:59:00 2013
@@ -71,6 +71,7 @@ public class TestEncodedSeekers {
   private final DataBlockEncoding encoding;
   private final boolean encodeOnDisk;
   private final boolean includeTags;
+  private final boolean compressTags;
 
   /** Enable when debugging */
   private static final boolean VERBOSE = false;
@@ -81,22 +82,27 @@ public class TestEncodedSeekers {
     for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
       for (boolean includeTags : new boolean[] { false, true }) {
         for (boolean encodeOnDisk : new boolean[] { false, true }) {
-          paramList.add(new Object[] { encoding, encodeOnDisk, includeTags });
+          for (boolean compressTags : new boolean[] { false, true }) {
+            paramList.add(new Object[] { encoding, encodeOnDisk, includeTags, compressTags
});
+          }
         }
       }
     }
     return paramList;
   }
 
-  public TestEncodedSeekers(DataBlockEncoding encoding, boolean encodeOnDisk, boolean includeTags)
{
+  public TestEncodedSeekers(DataBlockEncoding encoding, boolean encodeOnDisk, boolean includeTags,
+      boolean compressTags) {
     this.encoding = encoding;
     this.encodeOnDisk = encodeOnDisk;
     this.includeTags = includeTags;
+    this.compressTags = compressTags;
   }
 
   @Test
   public void testEncodedSeeker() throws IOException {
-    System.err.println("Testing encoded seekers for encoding " + encoding);
+    System.err.println("Testing encoded seekers for encoding : " + encoding + ", encodeOnDisk
: "
+        + encodeOnDisk + ", includeTags : " + includeTags + ", compressTags : " + compressTags);
     if(includeTags) {
       testUtil.getConfiguration().setInt(HFile.FORMAT_VERSION_KEY, 3);
     }
@@ -108,7 +114,8 @@ public class TestEncodedSeekers {
         setDataBlockEncoding(encoding).
         setEncodeOnDisk(encodeOnDisk).
         setBlocksize(BLOCK_SIZE).
-        setBloomFilterType(BloomType.NONE);
+        setBloomFilterType(BloomType.NONE).
+        setCompressTags(compressTags);
     HRegion region = testUtil.createTestRegion(TABLE_NAME, hcd);
 
     //write the data, but leave some in the memstore



Mime
View raw message