hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject [1/2] hbase git commit: HBASE-18995 Move methods that are for internal usage from CellUtil to Private util class (Ramkrishna Vasudevan); ADDENDUM add file I forgot to add
Date Sat, 28 Oct 2017 03:36:04 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-2 250a8bc7a -> efb5d7b24


http://git-wip-us.apache.org/repos/asf/hbase/blob/efb5d7b2/hbase-common/src/main/java/org/apache/hadoop/hbase/PrivateCellUtil.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/PrivateCellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/PrivateCellUtil.java
new file mode 100644
index 0000000..d70d974
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/PrivateCellUtil.java
@@ -0,0 +1,2776 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY;
+import static org.apache.hadoop.hbase.Tag.TAG_LENGTH_SIZE;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.io.TagCompressionContext;
+import org.apache.hadoop.hbase.io.util.Dictionary;
+import org.apache.hadoop.hbase.io.util.StreamUtils;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.ByteRange;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Utility methods helpful slinging {@link Cell} instances. It has more powerful and
+ * rich set of APIs than those in {@link CellUtil} for internal usage.
+ */
+@InterfaceAudience.Private
+// TODO : Make Tag IA.LimitedPrivate and move some of the Util methods to CP exposed Util class
+public class PrivateCellUtil {
+
+  /**
+   * Private constructor to keep this class from being instantiated.
+   */
+  private PrivateCellUtil() {
+
+  }
+
+  /******************* ByteRange *******************************/
+
+  public static ByteRange fillRowRange(Cell cell, ByteRange range) {
+    return range.set(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
+  }
+
+  public static ByteRange fillFamilyRange(Cell cell, ByteRange range) {
+    return range.set(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
+  }
+
+  public static ByteRange fillQualifierRange(Cell cell, ByteRange range) {
+    return range.set(cell.getQualifierArray(), cell.getQualifierOffset(),
+      cell.getQualifierLength());
+  }
+
+  public static ByteRange fillValueRange(Cell cell, ByteRange range) {
+    return range.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+  }
+
+  public static ByteRange fillTagRange(Cell cell, ByteRange range) {
+    return range.set(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength());
+  }
+
+  /**
+   * Returns tag value in a new byte array. If server-side, use {@link Tag#getValueArray()} with
+   * appropriate {@link Tag#getValueOffset()} and {@link Tag#getValueLength()} instead to save on
+   * allocations.
+   * @param cell
+   * @return tag value in a new byte array.
+   */
+  public static byte[] getTagsArray(Cell cell) {
+    byte[] output = new byte[cell.getTagsLength()];
+    copyTagsTo(cell, output, 0);
+    return output;
+  }
+
+  public static byte[] cloneTags(Cell cell) {
+    byte[] output = new byte[cell.getTagsLength()];
+    copyTagsTo(cell, output, 0);
+    return output;
+  }
+
+  /**
+   * Copies the tags info into the tag portion of the cell
+   * @param cell
+   * @param destination
+   * @param destinationOffset
+   * @return position after tags
+   */
+  public static int copyTagsTo(Cell cell, byte[] destination, int destinationOffset) {
+    int tlen = cell.getTagsLength();
+    if (cell instanceof ByteBufferCell) {
+      ByteBufferUtils.copyFromBufferToArray(destination,
+        ((ByteBufferCell) cell).getTagsByteBuffer(), ((ByteBufferCell) cell).getTagsPosition(),
+        destinationOffset, tlen);
+    } else {
+      System.arraycopy(cell.getTagsArray(), cell.getTagsOffset(), destination, destinationOffset,
+        tlen);
+    }
+    return destinationOffset + tlen;
+  }
+
+  /**
+   * Copies the tags info into the tag portion of the cell
+   * @param cell
+   * @param destination
+   * @param destinationOffset
+   * @return the position after tags
+   */
+  public static int copyTagsTo(Cell cell, ByteBuffer destination, int destinationOffset) {
+    int tlen = cell.getTagsLength();
+    if (cell instanceof ByteBufferCell) {
+      ByteBufferUtils.copyFromBufferToBuffer(((ByteBufferCell) cell).getTagsByteBuffer(),
+        destination, ((ByteBufferCell) cell).getTagsPosition(), destinationOffset, tlen);
+    } else {
+      ByteBufferUtils.copyFromArrayToBuffer(destination, destinationOffset, cell.getTagsArray(),
+        cell.getTagsOffset(), tlen);
+    }
+    return destinationOffset + tlen;
+  }
+
+  /********************* misc *************************************/
+
+  public static byte getRowByte(Cell cell, int index) {
+    if (cell instanceof ByteBufferCell) {
+      return ((ByteBufferCell) cell).getRowByteBuffer()
+          .get(((ByteBufferCell) cell).getRowPosition() + index);
+    }
+    return cell.getRowArray()[cell.getRowOffset() + index];
+  }
+
+  public static byte getQualifierByte(Cell cell, int index) {
+    if (cell instanceof ByteBufferCell) {
+      return ((ByteBufferCell) cell).getQualifierByteBuffer()
+          .get(((ByteBufferCell) cell).getQualifierPosition() + index);
+    }
+    return cell.getQualifierArray()[cell.getQualifierOffset() + index];
+  }
+
+  public static ByteBuffer getValueBufferShallowCopy(Cell cell) {
+    ByteBuffer buffer =
+        ByteBuffer.wrap(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+    return buffer;
+  }
+
+  /**
+   * @return A new cell which is having the extra tags also added to it.
+   */
+  public static Cell createCell(Cell cell, List<Tag> tags) {
+    return createCell(cell, TagUtil.fromList(tags));
+  }
+
+  /**
+   * @return A new cell which is having the extra tags also added to it.
+   */
+  public static Cell createCell(Cell cell, byte[] tags) {
+    if (cell instanceof ByteBufferCell) {
+      return new TagRewriteByteBufferCell((ByteBufferCell) cell, tags);
+    }
+    return new TagRewriteCell(cell, tags);
+  }
+
+  public static Cell createCell(Cell cell, byte[] value, byte[] tags) {
+    if (cell instanceof ByteBufferCell) {
+      return new ValueAndTagRewriteByteBufferCell((ByteBufferCell) cell, value, tags);
+    }
+    return new ValueAndTagRewriteCell(cell, value, tags);
+  }
+
+  /**
+   * This can be used when a Cell has to change with addition/removal of one or more tags. This is
+   * an efficient way to do so in which only the tags bytes part need to recreated and copied. All
+   * other parts, refer to the original Cell.
+   */
+  static class TagRewriteCell implements ExtendedCell {
+    protected Cell cell;
+    protected byte[] tags;
+    private static final long HEAP_SIZE_OVERHEAD = ClassSize.OBJECT + 2 * ClassSize.REFERENCE;
+
+    /**
+     * @param cell The original Cell which it rewrites
+     * @param tags the tags bytes. The array suppose to contain the tags bytes alone.
+     */
+    public TagRewriteCell(Cell cell, byte[] tags) {
+      assert cell instanceof ExtendedCell;
+      assert tags != null;
+      this.cell = cell;
+      this.tags = tags;
+      // tag offset will be treated as 0 and length this.tags.length
+      if (this.cell instanceof TagRewriteCell) {
+        // Cleaning the ref so that the byte[] can be GCed
+        ((TagRewriteCell) this.cell).tags = null;
+      }
+    }
+
+    @Override
+    public byte[] getRowArray() {
+      return cell.getRowArray();
+    }
+
+    @Override
+    public int getRowOffset() {
+      return cell.getRowOffset();
+    }
+
+    @Override
+    public short getRowLength() {
+      return cell.getRowLength();
+    }
+
+    @Override
+    public byte[] getFamilyArray() {
+      return cell.getFamilyArray();
+    }
+
+    @Override
+    public int getFamilyOffset() {
+      return cell.getFamilyOffset();
+    }
+
+    @Override
+    public byte getFamilyLength() {
+      return cell.getFamilyLength();
+    }
+
+    @Override
+    public byte[] getQualifierArray() {
+      return cell.getQualifierArray();
+    }
+
+    @Override
+    public int getQualifierOffset() {
+      return cell.getQualifierOffset();
+    }
+
+    @Override
+    public int getQualifierLength() {
+      return cell.getQualifierLength();
+    }
+
+    @Override
+    public long getTimestamp() {
+      return cell.getTimestamp();
+    }
+
+    @Override
+    public byte getTypeByte() {
+      return cell.getTypeByte();
+    }
+
+    @Override
+    public long getSequenceId() {
+      return cell.getSequenceId();
+    }
+
+    @Override
+    public byte[] getValueArray() {
+      return cell.getValueArray();
+    }
+
+    @Override
+    public int getValueOffset() {
+      return cell.getValueOffset();
+    }
+
+    @Override
+    public int getValueLength() {
+      return cell.getValueLength();
+    }
+
+    @Override
+    public byte[] getTagsArray() {
+      return this.tags;
+    }
+
+    @Override
+    public int getTagsOffset() {
+      return 0;
+    }
+
+    @Override
+    public int getTagsLength() {
+      if (null == this.tags) {
+        // Nulled out tags array optimization in constructor
+        return 0;
+      }
+      return this.tags.length;
+    }
+
+    @Override
+    public long heapSize() {
+      long sum = HEAP_SIZE_OVERHEAD + estimatedHeapSizeOf(cell);
+      if (this.tags != null) {
+        sum += ClassSize.sizeOf(this.tags);
+      }
+      return sum;
+    }
+
+    @Override
+    public void setTimestamp(long ts) throws IOException {
+      // The incoming cell is supposed to be SettableTimestamp type.
+      PrivateCellUtil.setTimestamp(cell, ts);
+    }
+
+    @Override
+    public void setTimestamp(byte[] ts, int tsOffset) throws IOException {
+      // The incoming cell is supposed to be SettableTimestamp type.
+      PrivateCellUtil.setTimestamp(cell, ts, tsOffset);
+    }
+
+    @Override
+    public void setSequenceId(long seqId) throws IOException {
+      // The incoming cell is supposed to be SettableSequenceId type.
+      PrivateCellUtil.setSequenceId(cell, seqId);
+    }
+
+    @Override
+    public int write(OutputStream out, boolean withTags) throws IOException {
+      int len = ((ExtendedCell) this.cell).write(out, false);
+      if (withTags && this.tags != null) {
+        // Write the tagsLength 2 bytes
+        out.write((byte) (0xff & (this.tags.length >> 8)));
+        out.write((byte) (0xff & this.tags.length));
+        out.write(this.tags);
+        len += KeyValue.TAGS_LENGTH_SIZE + this.tags.length;
+      }
+      return len;
+    }
+
+    @Override
+    public int getSerializedSize(boolean withTags) {
+      int len = ((ExtendedCell) this.cell).getSerializedSize(false);
+      if (withTags && this.tags != null) {
+        len += KeyValue.TAGS_LENGTH_SIZE + this.tags.length;
+      }
+      return len;
+    }
+
+    @Override
+    public void write(ByteBuffer buf, int offset) {
+      offset = KeyValueUtil.appendTo(this.cell, buf, offset, false);
+      int tagsLen = this.tags == null ? 0 : this.tags.length;
+      if (tagsLen > 0) {
+        offset = ByteBufferUtils.putAsShort(buf, offset, tagsLen);
+        ByteBufferUtils.copyFromArrayToBuffer(buf, offset, this.tags, 0, tagsLen);
+      }
+    }
+
+    @Override
+    public ExtendedCell deepClone() {
+      Cell clonedBaseCell = ((ExtendedCell) this.cell).deepClone();
+      return new TagRewriteCell(clonedBaseCell, this.tags);
+    }
+  }
+
+  static class TagRewriteByteBufferCell extends ByteBufferCell implements ExtendedCell {
+
+    protected ByteBufferCell cell;
+    protected byte[] tags;
+    private static final long HEAP_SIZE_OVERHEAD = ClassSize.OBJECT + 2 * ClassSize.REFERENCE;
+
+    /**
+     * @param cell The original ByteBufferCell which it rewrites
+     * @param tags the tags bytes. The array suppose to contain the tags bytes alone.
+     */
+    public TagRewriteByteBufferCell(ByteBufferCell cell, byte[] tags) {
+      assert cell instanceof ExtendedCell;
+      assert tags != null;
+      this.cell = cell;
+      this.tags = tags;
+      // tag offset will be treated as 0 and length this.tags.length
+      if (this.cell instanceof TagRewriteByteBufferCell) {
+        // Cleaning the ref so that the byte[] can be GCed
+        ((TagRewriteByteBufferCell) this.cell).tags = null;
+      }
+    }
+
+    @Override
+    public byte[] getRowArray() {
+      return this.cell.getRowArray();
+    }
+
+    @Override
+    public int getRowOffset() {
+      return this.cell.getRowOffset();
+    }
+
+    @Override
+    public short getRowLength() {
+      return this.cell.getRowLength();
+    }
+
+    @Override
+    public byte[] getFamilyArray() {
+      return this.cell.getFamilyArray();
+    }
+
+    @Override
+    public int getFamilyOffset() {
+      return this.cell.getFamilyOffset();
+    }
+
+    @Override
+    public byte getFamilyLength() {
+      return this.cell.getFamilyLength();
+    }
+
+    @Override
+    public byte[] getQualifierArray() {
+      return this.cell.getQualifierArray();
+    }
+
+    @Override
+    public int getQualifierOffset() {
+      return this.cell.getQualifierOffset();
+    }
+
+    @Override
+    public int getQualifierLength() {
+      return this.cell.getQualifierLength();
+    }
+
+    @Override
+    public long getTimestamp() {
+      return this.cell.getTimestamp();
+    }
+
+    @Override
+    public byte getTypeByte() {
+      return this.cell.getTypeByte();
+    }
+
+    @Override
+    public long getSequenceId() {
+      return this.cell.getSequenceId();
+    }
+
+    @Override
+    public byte[] getValueArray() {
+      return this.cell.getValueArray();
+    }
+
+    @Override
+    public int getValueOffset() {
+      return this.cell.getValueOffset();
+    }
+
+    @Override
+    public int getValueLength() {
+      return this.cell.getValueLength();
+    }
+
+    @Override
+    public byte[] getTagsArray() {
+      return this.tags;
+    }
+
+    @Override
+    public int getTagsOffset() {
+      return 0;
+    }
+
+    @Override
+    public int getTagsLength() {
+      if (null == this.tags) {
+        // Nulled out tags array optimization in constructor
+        return 0;
+      }
+      return this.tags.length;
+    }
+
+    @Override
+    public void setSequenceId(long seqId) throws IOException {
+      PrivateCellUtil.setSequenceId(this.cell, seqId);
+    }
+
+    @Override
+    public void setTimestamp(long ts) throws IOException {
+      PrivateCellUtil.setTimestamp(this.cell, ts);
+    }
+
+    @Override
+    public void setTimestamp(byte[] ts, int tsOffset) throws IOException {
+      PrivateCellUtil.setTimestamp(this.cell, ts, tsOffset);
+    }
+
+    @Override
+    public long heapSize() {
+      long sum = HEAP_SIZE_OVERHEAD + estimatedHeapSizeOf(cell);
+      // this.tags is on heap byte[]
+      if (this.tags != null) {
+        sum += ClassSize.sizeOf(this.tags);
+      }
+      return sum;
+    }
+
+    @Override
+    public int write(OutputStream out, boolean withTags) throws IOException {
+      int len = ((ExtendedCell) this.cell).write(out, false);
+      if (withTags && this.tags != null) {
+        // Write the tagsLength 2 bytes
+        out.write((byte) (0xff & (this.tags.length >> 8)));
+        out.write((byte) (0xff & this.tags.length));
+        out.write(this.tags);
+        len += KeyValue.TAGS_LENGTH_SIZE + this.tags.length;
+      }
+      return len;
+    }
+
+    @Override
+    public int getSerializedSize(boolean withTags) {
+      int len = ((ExtendedCell) this.cell).getSerializedSize(false);
+      if (withTags && this.tags != null) {
+        len += KeyValue.TAGS_LENGTH_SIZE + this.tags.length;
+      }
+      return len;
+    }
+
+    @Override
+    public void write(ByteBuffer buf, int offset) {
+      offset = KeyValueUtil.appendTo(this.cell, buf, offset, false);
+      int tagsLen = this.tags == null ? 0 : this.tags.length;
+      if (tagsLen > 0) {
+        offset = ByteBufferUtils.putAsShort(buf, offset, tagsLen);
+        ByteBufferUtils.copyFromArrayToBuffer(buf, offset, this.tags, 0, tagsLen);
+      }
+    }
+
+    @Override
+    public ExtendedCell deepClone() {
+      Cell clonedBaseCell = ((ExtendedCell) this.cell).deepClone();
+      if (clonedBaseCell instanceof ByteBufferCell) {
+        return new TagRewriteByteBufferCell((ByteBufferCell) clonedBaseCell, this.tags);
+      }
+      return new TagRewriteCell(clonedBaseCell, this.tags);
+    }
+
+    @Override
+    public ByteBuffer getRowByteBuffer() {
+      return this.cell.getRowByteBuffer();
+    }
+
+    @Override
+    public int getRowPosition() {
+      return this.cell.getRowPosition();
+    }
+
+    @Override
+    public ByteBuffer getFamilyByteBuffer() {
+      return this.cell.getFamilyByteBuffer();
+    }
+
+    @Override
+    public int getFamilyPosition() {
+      return this.cell.getFamilyPosition();
+    }
+
+    @Override
+    public ByteBuffer getQualifierByteBuffer() {
+      return this.cell.getQualifierByteBuffer();
+    }
+
+    @Override
+    public int getQualifierPosition() {
+      return this.cell.getQualifierPosition();
+    }
+
+    @Override
+    public ByteBuffer getValueByteBuffer() {
+      return this.cell.getValueByteBuffer();
+    }
+
+    @Override
+    public int getValuePosition() {
+      return this.cell.getValuePosition();
+    }
+
+    @Override
+    public ByteBuffer getTagsByteBuffer() {
+      return this.tags == null ? HConstants.EMPTY_BYTE_BUFFER : ByteBuffer.wrap(this.tags);
+    }
+
+    @Override
+    public int getTagsPosition() {
+      return 0;
+    }
+  }
+
+  static class ValueAndTagRewriteCell extends TagRewriteCell {
+
+    protected byte[] value;
+
+    public ValueAndTagRewriteCell(Cell cell, byte[] value, byte[] tags) {
+      super(cell, tags);
+      this.value = value;
+    }
+
+    @Override
+    public byte[] getValueArray() {
+      return this.value;
+    }
+
+    @Override
+    public int getValueOffset() {
+      return 0;
+    }
+
+    @Override
+    public int getValueLength() {
+      return this.value == null ? 0 : this.value.length;
+    }
+
+    @Override
+    public long heapSize() {
+      long sum = ClassSize.REFERENCE + super.heapSize();
+      if (this.value != null) {
+        sum += ClassSize.sizeOf(this.value);
+      }
+      return sum;
+    }
+
+    @Override
+    public int write(OutputStream out, boolean withTags) throws IOException {
+      return write(out, withTags, this.cell, this.value, this.tags);
+    }
+
+    // Made into a static method so as to reuse the logic within ValueAndTagRewriteByteBufferCell
+    static int write(OutputStream out, boolean withTags, Cell cell, byte[] value, byte[] tags)
+        throws IOException {
+      int valLen = value == null ? 0 : value.length;
+      ByteBufferUtils.putInt(out, KeyValueUtil.keyLength(cell));// Key length
+      ByteBufferUtils.putInt(out, valLen);// Value length
+      int len = 2 * Bytes.SIZEOF_INT;
+      len += writeFlatKey(cell, out);// Key
+      if (valLen > 0) out.write(value);// Value
+      len += valLen;
+      if (withTags && tags != null) {
+        // Write the tagsLength 2 bytes
+        out.write((byte) (0xff & (tags.length >> 8)));
+        out.write((byte) (0xff & tags.length));
+        out.write(tags);
+        len += KeyValue.TAGS_LENGTH_SIZE + tags.length;
+      }
+      return len;
+    }
+
+    @Override
+    public int getSerializedSize(boolean withTags) {
+      return super.getSerializedSize(withTags) - this.cell.getValueLength() + this.value.length;
+    }
+
+    @Override
+    public void write(ByteBuffer buf, int offset) {
+      write(buf, offset, this.cell, this.value, this.tags);
+    }
+
+    // Made into a static method so as to reuse the logic within ValueAndTagRewriteByteBufferCell
+    static void write(ByteBuffer buf, int offset, Cell cell, byte[] value, byte[] tags) {
+      offset = ByteBufferUtils.putInt(buf, offset, KeyValueUtil.keyLength(cell));// Key length
+      offset = ByteBufferUtils.putInt(buf, offset, value.length);// Value length
+      offset = KeyValueUtil.appendKeyTo(cell, buf, offset);
+      ByteBufferUtils.copyFromArrayToBuffer(buf, offset, value, 0, value.length);
+      offset += value.length;
+      int tagsLen = tags == null ? 0 : tags.length;
+      if (tagsLen > 0) {
+        offset = ByteBufferUtils.putAsShort(buf, offset, tagsLen);
+        ByteBufferUtils.copyFromArrayToBuffer(buf, offset, tags, 0, tagsLen);
+      }
+    }
+
+    @Override
+    public ExtendedCell deepClone() {
+      Cell clonedBaseCell = ((ExtendedCell) this.cell).deepClone();
+      return new ValueAndTagRewriteCell(clonedBaseCell, this.value, this.tags);
+    }
+  }
+
+  static class ValueAndTagRewriteByteBufferCell extends TagRewriteByteBufferCell {
+
+    protected byte[] value;
+
+    public ValueAndTagRewriteByteBufferCell(ByteBufferCell cell, byte[] value, byte[] tags) {
+      super(cell, tags);
+      this.value = value;
+    }
+
+    @Override
+    public byte[] getValueArray() {
+      return this.value;
+    }
+
+    @Override
+    public int getValueOffset() {
+      return 0;
+    }
+
+    @Override
+    public int getValueLength() {
+      return this.value == null ? 0 : this.value.length;
+    }
+
+    @Override
+    public ByteBuffer getValueByteBuffer() {
+      return ByteBuffer.wrap(this.value);
+    }
+
+    @Override
+    public int getValuePosition() {
+      return 0;
+    }
+
+    @Override
+    public long heapSize() {
+      long sum = ClassSize.REFERENCE + super.heapSize();
+      if (this.value != null) {
+        sum += ClassSize.sizeOf(this.value);
+      }
+      return sum;
+    }
+
+    @Override
+    public int write(OutputStream out, boolean withTags) throws IOException {
+      return ValueAndTagRewriteCell.write(out, withTags, this.cell, this.value, this.tags);
+    }
+
+    @Override
+    public int getSerializedSize(boolean withTags) {
+      return super.getSerializedSize(withTags) - this.cell.getValueLength() + this.value.length;
+    }
+
+    @Override
+    public void write(ByteBuffer buf, int offset) {
+      ValueAndTagRewriteCell.write(buf, offset, this.cell, this.value, this.tags);
+    }
+
+    @Override
+    public ExtendedCell deepClone() {
+      Cell clonedBaseCell = ((ExtendedCell) this.cell).deepClone();
+      if (clonedBaseCell instanceof ByteBufferCell) {
+        return new ValueAndTagRewriteByteBufferCell((ByteBufferCell) clonedBaseCell, this.value,
+            this.tags);
+      }
+      return new ValueAndTagRewriteCell(clonedBaseCell, this.value, this.tags);
+    }
+  }
+
+  public static boolean matchingRows(final Cell left, final byte[] buf, final int offset,
+      final int length) {
+    if (left instanceof ByteBufferCell) {
+      return ByteBufferUtils.equals(((ByteBufferCell) left).getRowByteBuffer(),
+        ((ByteBufferCell) left).getRowPosition(), left.getRowLength(), buf, offset, length);
+    }
+    return Bytes.equals(left.getRowArray(), left.getRowOffset(), left.getRowLength(), buf, offset,
+      length);
+  }
+
+  public static boolean matchingFamily(final Cell left, final byte[] buf, final int offset,
+      final int length) {
+    if (left instanceof ByteBufferCell) {
+      return ByteBufferUtils.equals(((ByteBufferCell) left).getFamilyByteBuffer(),
+        ((ByteBufferCell) left).getFamilyPosition(), left.getFamilyLength(), buf, offset, length);
+    }
+    return Bytes.equals(left.getFamilyArray(), left.getFamilyOffset(), left.getFamilyLength(), buf,
+      offset, length);
+  }
+
+  /**
+   * Finds if the qualifier part of the cell and the KV serialized byte[] are equal
+   * @param left
+   * @param buf the serialized keyvalue format byte[]
+   * @param offset the offset of the qualifier in the byte[]
+   * @param length the length of the qualifier in the byte[]
+   * @return true if the qualifier matches, false otherwise
+   */
+  public static boolean matchingQualifier(final Cell left, final byte[] buf, final int offset,
+      final int length) {
+    if (buf == null) {
+      return left.getQualifierLength() == 0;
+    }
+    if (left instanceof ByteBufferCell) {
+      return ByteBufferUtils.equals(((ByteBufferCell) left).getQualifierByteBuffer(),
+        ((ByteBufferCell) left).getQualifierPosition(), left.getQualifierLength(), buf, offset,
+        length);
+    }
+    return Bytes.equals(left.getQualifierArray(), left.getQualifierOffset(),
+      left.getQualifierLength(), buf, offset, length);
+  }
+
+  public static boolean matchingColumn(final Cell left, final byte[] fam, final int foffset,
+      final int flength, final byte[] qual, final int qoffset, final int qlength) {
+    if (!matchingFamily(left, fam, foffset, flength)) return false;
+    return matchingQualifier(left, qual, qoffset, qlength);
+  }
+
+  public static boolean matchingValue(final Cell left, final Cell right, int lvlength,
+      int rvlength) {
+    if (left instanceof ByteBufferCell && right instanceof ByteBufferCell) {
+      return ByteBufferUtils.equals(((ByteBufferCell) left).getValueByteBuffer(),
+        ((ByteBufferCell) left).getValuePosition(), lvlength,
+        ((ByteBufferCell) right).getValueByteBuffer(), ((ByteBufferCell) right).getValuePosition(),
+        rvlength);
+    }
+    if (left instanceof ByteBufferCell) {
+      return ByteBufferUtils.equals(((ByteBufferCell) left).getValueByteBuffer(),
+        ((ByteBufferCell) left).getValuePosition(), lvlength, right.getValueArray(),
+        right.getValueOffset(), rvlength);
+    }
+    if (right instanceof ByteBufferCell) {
+      return ByteBufferUtils.equals(((ByteBufferCell) right).getValueByteBuffer(),
+        ((ByteBufferCell) right).getValuePosition(), rvlength, left.getValueArray(),
+        left.getValueOffset(), lvlength);
+    }
+    return Bytes.equals(left.getValueArray(), left.getValueOffset(), lvlength,
+      right.getValueArray(), right.getValueOffset(), rvlength);
+  }
+
+  public static boolean matchingType(Cell a, Cell b) {
+    return a.getTypeByte() == b.getTypeByte();
+  }
+
+  /**
+   * @return True if a delete type, a {@link KeyValue.Type#Delete} or a {KeyValue.Type#DeleteFamily}
+   *         or a {@link KeyValue.Type#DeleteColumn} KeyValue type.
+   */
+  public static boolean isDelete(final byte type) {
+    return Type.Delete.getCode() <= type && type <= Type.DeleteFamily.getCode();
+  }
+
+  /**
+   * @return True if this cell is a {@link KeyValue.Type#Delete} type.
+   */
+  public static boolean isDeleteType(Cell cell) {
+    return cell.getTypeByte() == Type.Delete.getCode();
+  }
+
+  public static boolean isDeleteFamily(final Cell cell) {
+    return cell.getTypeByte() == Type.DeleteFamily.getCode();
+  }
+
+  public static boolean isDeleteFamilyVersion(final Cell cell) {
+    return cell.getTypeByte() == Type.DeleteFamilyVersion.getCode();
+  }
+
+  public static boolean isDeleteColumns(final Cell cell) {
+    return cell.getTypeByte() == Type.DeleteColumn.getCode();
+  }
+
+  public static boolean isDeleteColumnVersion(final Cell cell) {
+    return cell.getTypeByte() == Type.Delete.getCode();
+  }
+
+  /**
+   * @return True if this cell is a delete family or column type.
+   */
+  public static boolean isDeleteColumnOrFamily(Cell cell) {
+    int t = cell.getTypeByte();
+    return t == Type.DeleteColumn.getCode() || t == Type.DeleteFamily.getCode();
+  }
+
+  private static Iterator<Tag> tagsIterator(final ByteBuffer tags, final int offset,
+      final int length) {
+    return new Iterator<Tag>() {
+      private int pos = offset;
+      private int endOffset = offset + length - 1;
+
+      @Override
+      public boolean hasNext() {
+        return this.pos < endOffset;
+      }
+
+      @Override
+      public Tag next() {
+        if (hasNext()) {
+          int curTagLen = ByteBufferUtils.readAsInt(tags, this.pos, Tag.TAG_LENGTH_SIZE);
+          Tag tag = new ByteBufferTag(tags, pos, curTagLen + Tag.TAG_LENGTH_SIZE);
+          this.pos += Bytes.SIZEOF_SHORT + curTagLen;
+          return tag;
+        }
+        return null;
+      }
+
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException();
+      }
+    };
+  }
+
+  /**
+   * Util method to iterate through the tags in the given cell.
+   * @param cell The Cell over which tags iterator is needed.
+   * @return iterator for the tags
+   */
+  public static Iterator<Tag> tagsIterator(final Cell cell) {
+    final int tagsLength = cell.getTagsLength();
+    // Save an object allocation where we can
+    if (tagsLength == 0) {
+      return TagUtil.EMPTY_TAGS_ITR;
+    }
+    if (cell instanceof ByteBufferCell) {
+      return tagsIterator(((ByteBufferCell) cell).getTagsByteBuffer(),
+        ((ByteBufferCell) cell).getTagsPosition(), tagsLength);
+    }
+    return tagsIterator(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength());
+  }
+
+  private static Iterator<Tag> tagsIterator(final byte[] tags, final int offset, final int length) {
+    return new Iterator<Tag>() {
+      private int pos = offset;
+      private int endOffset = offset + length - 1;
+
+      @Override
+      public boolean hasNext() {
+        return this.pos < endOffset;
+      }
+
+      @Override
+      public Tag next() {
+        if (hasNext()) {
+          int curTagLen = Bytes.readAsInt(tags, this.pos, Tag.TAG_LENGTH_SIZE);
+          Tag tag = new ArrayBackedTag(tags, pos, curTagLen + TAG_LENGTH_SIZE);
+          this.pos += Bytes.SIZEOF_SHORT + curTagLen;
+          return tag;
+        }
+        return null;
+      }
+
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException();
+      }
+    };
+  }
+
+  /**
+   * @param cell The Cell
+   * @return Tags in the given Cell as a List
+   */
+  public static List<Tag> getTags(Cell cell) {
+    List<Tag> tags = new ArrayList<>();
+    Iterator<Tag> tagsItr = tagsIterator(cell);
+    while (tagsItr.hasNext()) {
+      tags.add(tagsItr.next());
+    }
+    return tags;
+  }
+
+  /**
+   * Retrieve Cell's first tag, matching the passed in type
+   * @param cell The Cell
+   * @param type Type of the Tag to retrieve
+   * @return null if there is no tag of the passed in tag type
+   */
+  public static Tag getTag(Cell cell, byte type) {
+    boolean bufferBacked = cell instanceof ByteBufferCell;
+    int length = cell.getTagsLength();
+    int offset = bufferBacked ? ((ByteBufferCell) cell).getTagsPosition() : cell.getTagsOffset();
+    int pos = offset;
+    while (pos < offset + length) {
+      int tagLen;
+      if (bufferBacked) {
+        ByteBuffer tagsBuffer = ((ByteBufferCell) cell).getTagsByteBuffer();
+        tagLen = ByteBufferUtils.readAsInt(tagsBuffer, pos, TAG_LENGTH_SIZE);
+        if (ByteBufferUtils.toByte(tagsBuffer, pos + TAG_LENGTH_SIZE) == type) {
+          return new ByteBufferTag(tagsBuffer, pos, tagLen + TAG_LENGTH_SIZE);
+        }
+      } else {
+        tagLen = Bytes.readAsInt(cell.getTagsArray(), pos, TAG_LENGTH_SIZE);
+        if (cell.getTagsArray()[pos + TAG_LENGTH_SIZE] == type) {
+          return new ArrayBackedTag(cell.getTagsArray(), pos, tagLen + TAG_LENGTH_SIZE);
+        }
+      }
+      pos += TAG_LENGTH_SIZE + tagLen;
+    }
+    return null;
+  }
+
+  /**
+   * Returns true if the first range start1...end1 overlaps with the second range start2...end2,
+   * assuming the byte arrays represent row keys
+   */
+  public static boolean overlappingKeys(final byte[] start1, final byte[] end1, final byte[] start2,
+      final byte[] end2) {
+    return (end2.length == 0 || start1.length == 0 || Bytes.compareTo(start1, end2) < 0)
+        && (end1.length == 0 || start2.length == 0 || Bytes.compareTo(start2, end1) < 0);
+  }
+
+  /**
+   * Write rowkey excluding the common part.
+   * @param cell
+   * @param rLen
+   * @param commonPrefix
+   * @param out
+   * @throws IOException
+   */
+  public static void writeRowKeyExcludingCommon(Cell cell, short rLen, int commonPrefix,
+      DataOutputStream out) throws IOException {
+    if (commonPrefix == 0) {
+      out.writeShort(rLen);
+    } else if (commonPrefix == 1) {
+      out.writeByte((byte) rLen);
+      commonPrefix--;
+    } else {
+      commonPrefix -= KeyValue.ROW_LENGTH_SIZE;
+    }
+    if (rLen > commonPrefix) {
+      writeRowSkippingBytes(out, cell, rLen, commonPrefix);
+    }
+  }
+
+  /**
+   * Writes the row from the given cell to the output stream excluding the common prefix
+   * @param out The dataoutputstream to which the data has to be written
+   * @param cell The cell whose contents has to be written
+   * @param rlength the row length
+   * @throws IOException
+   */
+  public static void writeRowSkippingBytes(DataOutputStream out, Cell cell, short rlength,
+      int commonPrefix) throws IOException {
+    if (cell instanceof ByteBufferCell) {
+      ByteBufferUtils.copyBufferToStream((DataOutput) out,
+        ((ByteBufferCell) cell).getRowByteBuffer(),
+        ((ByteBufferCell) cell).getRowPosition() + commonPrefix, rlength - commonPrefix);
+    } else {
+      out.write(cell.getRowArray(), cell.getRowOffset() + commonPrefix, rlength - commonPrefix);
+    }
+  }
+
+  /**
+   * Find length of common prefix in keys of the cells, considering key as byte[] if serialized in
+   * {@link KeyValue}. The key format is &lt;2 bytes rk len&gt;&lt;rk&gt;&lt;1 byte cf
+   * len&gt;&lt;cf&gt;&lt;qualifier&gt;&lt;8 bytes timestamp&gt;&lt;1 byte type&gt;
+   * @param c1 the cell
+   * @param c2 the cell
+   * @param bypassFamilyCheck when true assume the family bytes same in both cells. Pass it as true
+   *          when dealing with Cells in same CF so as to avoid some checks
+   * @param withTsType when true check timestamp and type bytes also.
+   * @return length of common prefix
+   */
+  public static int findCommonPrefixInFlatKey(Cell c1, Cell c2, boolean bypassFamilyCheck,
+      boolean withTsType) {
+    // Compare the 2 bytes in RK length part
+    short rLen1 = c1.getRowLength();
+    short rLen2 = c2.getRowLength();
+    int commonPrefix = KeyValue.ROW_LENGTH_SIZE;
+    if (rLen1 != rLen2) {
+      // early out when the RK length itself is not matching
+      return ByteBufferUtils.findCommonPrefix(Bytes.toBytes(rLen1), 0, KeyValue.ROW_LENGTH_SIZE,
+        Bytes.toBytes(rLen2), 0, KeyValue.ROW_LENGTH_SIZE);
+    }
+    // Compare the RKs
+    int rkCommonPrefix = 0;
+    if (c1 instanceof ByteBufferCell && c2 instanceof ByteBufferCell) {
+      rkCommonPrefix = ByteBufferUtils.findCommonPrefix(((ByteBufferCell) c1).getRowByteBuffer(),
+        ((ByteBufferCell) c1).getRowPosition(), rLen1, ((ByteBufferCell) c2).getRowByteBuffer(),
+        ((ByteBufferCell) c2).getRowPosition(), rLen2);
+    } else {
+      // There cannot be a case where one cell is BBCell and other is KeyValue. This flow comes
+      // either
+      // in flush or compactions. In flushes both cells are KV and in case of compaction it will be
+      // either
+      // KV or BBCell
+      rkCommonPrefix = ByteBufferUtils.findCommonPrefix(c1.getRowArray(), c1.getRowOffset(), rLen1,
+        c2.getRowArray(), c2.getRowOffset(), rLen2);
+    }
+    commonPrefix += rkCommonPrefix;
+    if (rkCommonPrefix != rLen1) {
+      // Early out when RK is not fully matching.
+      return commonPrefix;
+    }
+    // Compare 1 byte CF length part
+    byte fLen1 = c1.getFamilyLength();
+    if (bypassFamilyCheck) {
+      // This flag will be true when caller is sure that the family will be same for both the cells
+      // Just make commonPrefix to increment by the family part
+      commonPrefix += KeyValue.FAMILY_LENGTH_SIZE + fLen1;
+    } else {
+      byte fLen2 = c2.getFamilyLength();
+      if (fLen1 != fLen2) {
+        // early out when the CF length itself is not matching
+        return commonPrefix;
+      }
+      // CF lengths are same so there is one more byte common in key part
+      commonPrefix += KeyValue.FAMILY_LENGTH_SIZE;
+      // Compare the CF names
+      int fCommonPrefix;
+      if (c1 instanceof ByteBufferCell && c2 instanceof ByteBufferCell) {
+        fCommonPrefix = ByteBufferUtils.findCommonPrefix(
+          ((ByteBufferCell) c1).getFamilyByteBuffer(), ((ByteBufferCell) c1).getFamilyPosition(),
+          fLen1, ((ByteBufferCell) c2).getFamilyByteBuffer(),
+          ((ByteBufferCell) c2).getFamilyPosition(), fLen2);
+      } else {
+        fCommonPrefix = ByteBufferUtils.findCommonPrefix(c1.getFamilyArray(), c1.getFamilyOffset(),
+          fLen1, c2.getFamilyArray(), c2.getFamilyOffset(), fLen2);
+      }
+      commonPrefix += fCommonPrefix;
+      if (fCommonPrefix != fLen1) {
+        return commonPrefix;
+      }
+    }
+    // Compare the Qualifiers
+    int qLen1 = c1.getQualifierLength();
+    int qLen2 = c2.getQualifierLength();
+    int qCommon;
+    if (c1 instanceof ByteBufferCell && c2 instanceof ByteBufferCell) {
+      qCommon = ByteBufferUtils.findCommonPrefix(((ByteBufferCell) c1).getQualifierByteBuffer(),
+        ((ByteBufferCell) c1).getQualifierPosition(), qLen1,
+        ((ByteBufferCell) c2).getQualifierByteBuffer(),
+        ((ByteBufferCell) c2).getQualifierPosition(), qLen2);
+    } else {
+      qCommon = ByteBufferUtils.findCommonPrefix(c1.getQualifierArray(), c1.getQualifierOffset(),
+        qLen1, c2.getQualifierArray(), c2.getQualifierOffset(), qLen2);
+    }
+    commonPrefix += qCommon;
+    if (!withTsType || Math.max(qLen1, qLen2) != qCommon) {
+      return commonPrefix;
+    }
+    // Compare the timestamp parts
+    int tsCommonPrefix = ByteBufferUtils.findCommonPrefix(Bytes.toBytes(c1.getTimestamp()), 0,
+      KeyValue.TIMESTAMP_SIZE, Bytes.toBytes(c2.getTimestamp()), 0, KeyValue.TIMESTAMP_SIZE);
+    commonPrefix += tsCommonPrefix;
+    if (tsCommonPrefix != KeyValue.TIMESTAMP_SIZE) {
+      return commonPrefix;
+    }
+    // Compare the type
+    if (c1.getTypeByte() == c2.getTypeByte()) {
+      commonPrefix += KeyValue.TYPE_SIZE;
+    }
+    return commonPrefix;
+  }
+
+  /**
+   * Used to compare two cells based on the column hint provided. This is specifically used when we
+   * need to optimize the seeks based on the next indexed key. This is an advanced usage API
+   * specifically needed for some optimizations.
+   * @param nextIndexedCell the next indexed cell
+   * @param currentCell the cell to be compared
+   * @param foff the family offset of the currentCell
+   * @param flen the family length of the currentCell
+   * @param colHint the column hint provided - could be null
+   * @param coff the offset of the column hint if provided, if not offset of the currentCell's
+   *          qualifier
+   * @param clen the length of the column hint if provided, if not length of the currentCell's
+   *          qualifier
+   * @param ts the timestamp to be seeked
+   * @param type the type to be seeked
+   * @return an int based on the given column hint TODO : To be moved out of here because this is a
+   *         special API used in scan optimization.
+   */
+  // compare a key against row/fam/qual/ts/type
+  public static final int compareKeyBasedOnColHint(CellComparator comparator, Cell nextIndexedCell,
+      Cell currentCell, int foff, int flen, byte[] colHint, int coff, int clen, long ts,
+      byte type) {
+    int compare = comparator.compareRows(nextIndexedCell, currentCell);
+    if (compare != 0) {
+      return compare;
+    }
+    // If the column is not specified, the "minimum" key type appears the
+    // latest in the sorted order, regardless of the timestamp. This is used
+    // for specifying the last key/value in a given row, because there is no
+    // "lexicographically last column" (it would be infinitely long). The
+    // "maximum" key type does not need this behavior.
+    if (nextIndexedCell.getFamilyLength() + nextIndexedCell.getQualifierLength() == 0
+        && nextIndexedCell.getTypeByte() == Type.Minimum.getCode()) {
+      // left is "bigger", i.e. it appears later in the sorted order
+      return 1;
+    }
+    if (flen + clen == 0 && type == Type.Minimum.getCode()) {
+      return -1;
+    }
+
+    compare = comparator.compareFamilies(nextIndexedCell, currentCell);
+    if (compare != 0) {
+      return compare;
+    }
+    if (colHint == null) {
+      compare = comparator.compareQualifiers(nextIndexedCell, currentCell);
+    } else {
+      compare = CellUtil.compareQualifiers(nextIndexedCell, colHint, coff, clen);
+    }
+    if (compare != 0) {
+      return compare;
+    }
+    // Next compare timestamps.
+    compare = comparator.compareTimestamps(nextIndexedCell.getTimestamp(), ts);
+    if (compare != 0) {
+      return compare;
+    }
+
+    // Compare types. Let the delete types sort ahead of puts; i.e. types
+    // of higher numbers sort before those of lesser numbers. Maximum (255)
+    // appears ahead of everything, and minimum (0) appears after
+    // everything.
+    return (0xff & type) - (0xff & nextIndexedCell.getTypeByte());
+  }
+
+  /**
+   * Compares only the key portion of a cell. It does not include the sequence id/mvcc of the cell
+   * @param left
+   * @param right
+   * @return an int greater than 0 if left &gt; than right lesser than 0 if left &lt; than right
+   *         equal to 0 if left is equal to right
+   */
+  public static final int compareKeyIgnoresMvcc(CellComparator comparator, Cell left, Cell right) {
+    return ((CellComparatorImpl) comparator).compare(left, right, true);
+  }
+
+  /**
+   * Compare cell's row against given comparator
+   * @param cell
+   * @param comparator
+   * @return result comparing cell's row
+   */
+  public static int compareRow(Cell cell, ByteArrayComparable comparator) {
+    if (cell instanceof ByteBufferCell) {
+      return comparator.compareTo(((ByteBufferCell) cell).getRowByteBuffer(),
+        ((ByteBufferCell) cell).getRowPosition(), cell.getRowLength());
+    }
+    return comparator.compareTo(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
+  }
+
+  /**
+   * Compare cell's column family against given comparator
+   * @param cell
+   * @param comparator
+   * @return result comparing cell's column family
+   */
+  public static int compareFamily(Cell cell, ByteArrayComparable comparator) {
+    if (cell instanceof ByteBufferCell) {
+      return comparator.compareTo(((ByteBufferCell) cell).getFamilyByteBuffer(),
+        ((ByteBufferCell) cell).getFamilyPosition(), cell.getFamilyLength());
+    }
+    return comparator.compareTo(cell.getFamilyArray(), cell.getFamilyOffset(),
+      cell.getFamilyLength());
+  }
+
+  /**
+   * Compare cell's qualifier against given comparator
+   * @param cell
+   * @param comparator
+   * @return result comparing cell's qualifier
+   */
+  public static int compareQualifier(Cell cell, ByteArrayComparable comparator) {
+    if (cell instanceof ByteBufferCell) {
+      return comparator.compareTo(((ByteBufferCell) cell).getQualifierByteBuffer(),
+        ((ByteBufferCell) cell).getQualifierPosition(), cell.getQualifierLength());
+    }
+    return comparator.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(),
+      cell.getQualifierLength());
+  }
+
+  /**
+   * Compare cell's value against given comparator
+   * @param cell
+   * @param comparator
+   * @return result comparing cell's value
+   */
+  public static int compareValue(Cell cell, ByteArrayComparable comparator) {
+    if (cell instanceof ByteBufferCell) {
+      return comparator.compareTo(((ByteBufferCell) cell).getValueByteBuffer(),
+        ((ByteBufferCell) cell).getValuePosition(), cell.getValueLength());
+    }
+    return comparator.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+  }
+
+  /**
+   * These cells are used in reseeks/seeks to improve the read performance. They are not real cells
+   * that are returned back to the clients
+   */
+  private static abstract class EmptyCell implements Cell, SettableSequenceId {
+
+    @Override
+    public void setSequenceId(long seqId) {
+      // Fake cells don't need seqId, so leaving it as a noop.
+    }
+
+    @Override
+    public byte[] getRowArray() {
+      return EMPTY_BYTE_ARRAY;
+    }
+
+    @Override
+    public int getRowOffset() {
+      return 0;
+    }
+
+    @Override
+    public short getRowLength() {
+      return 0;
+    }
+
+    @Override
+    public byte[] getFamilyArray() {
+      return EMPTY_BYTE_ARRAY;
+    }
+
+    @Override
+    public int getFamilyOffset() {
+      return 0;
+    }
+
+    @Override
+    public byte getFamilyLength() {
+      return 0;
+    }
+
+    @Override
+    public byte[] getQualifierArray() {
+      return EMPTY_BYTE_ARRAY;
+    }
+
+    @Override
+    public int getQualifierOffset() {
+      return 0;
+    }
+
+    @Override
+    public int getQualifierLength() {
+      return 0;
+    }
+
+    @Override
+    public long getSequenceId() {
+      return 0;
+    }
+
+    @Override
+    public byte[] getValueArray() {
+      return EMPTY_BYTE_ARRAY;
+    }
+
+    @Override
+    public int getValueOffset() {
+      return 0;
+    }
+
+    @Override
+    public int getValueLength() {
+      return 0;
+    }
+
+    @Override
+    public byte[] getTagsArray() {
+      return EMPTY_BYTE_ARRAY;
+    }
+
+    @Override
+    public int getTagsOffset() {
+      return 0;
+    }
+
+    @Override
+    public int getTagsLength() {
+      return 0;
+    }
+  }
+
+  /**
+   * These cells are used in reseeks/seeks to improve the read performance. They are not real cells
+   * that are returned back to the clients
+   */
+  private static abstract class EmptyByteBufferCell extends ByteBufferCell
+      implements SettableSequenceId {
+
+    @Override
+    public void setSequenceId(long seqId) {
+      // Fake cells don't need seqId, so leaving it as a noop.
+    }
+
+    @Override
+    public byte[] getRowArray() {
+      return CellUtil.cloneRow(this);
+    }
+
+    @Override
+    public int getRowOffset() {
+      return 0;
+    }
+
+    @Override
+    public short getRowLength() {
+      return 0;
+    }
+
+    @Override
+    public byte[] getFamilyArray() {
+      return CellUtil.cloneFamily(this);
+    }
+
+    @Override
+    public int getFamilyOffset() {
+      return 0;
+    }
+
+    @Override
+    public byte getFamilyLength() {
+      return 0;
+    }
+
+    @Override
+    public byte[] getQualifierArray() {
+      return CellUtil.cloneQualifier(this);
+    }
+
+    @Override
+    public int getQualifierOffset() {
+      return 0;
+    }
+
+    @Override
+    public int getQualifierLength() {
+      return 0;
+    }
+
+    @Override
+    public long getSequenceId() {
+      return 0;
+    }
+
+    @Override
+    public byte[] getValueArray() {
+      return CellUtil.cloneValue(this);
+    }
+
+    @Override
+    public int getValueOffset() {
+      return 0;
+    }
+
+    @Override
+    public int getValueLength() {
+      return 0;
+    }
+
+    @Override
+    public byte[] getTagsArray() {
+      return CellUtil.cloneTags(this);
+    }
+
+    @Override
+    public int getTagsOffset() {
+      return 0;
+    }
+
+    @Override
+    public int getTagsLength() {
+      return 0;
+    }
+
+    @Override
+    public ByteBuffer getRowByteBuffer() {
+      return HConstants.EMPTY_BYTE_BUFFER;
+    }
+
+    @Override
+    public int getRowPosition() {
+      return 0;
+    }
+
+    @Override
+    public ByteBuffer getFamilyByteBuffer() {
+      return HConstants.EMPTY_BYTE_BUFFER;
+    }
+
+    @Override
+    public int getFamilyPosition() {
+      return 0;
+    }
+
+    @Override
+    public ByteBuffer getQualifierByteBuffer() {
+      return HConstants.EMPTY_BYTE_BUFFER;
+    }
+
+    @Override
+    public int getQualifierPosition() {
+      return 0;
+    }
+
+    @Override
+    public ByteBuffer getTagsByteBuffer() {
+      return HConstants.EMPTY_BYTE_BUFFER;
+    }
+
+    @Override
+    public int getTagsPosition() {
+      return 0;
+    }
+
+    @Override
+    public ByteBuffer getValueByteBuffer() {
+      return HConstants.EMPTY_BYTE_BUFFER;
+    }
+
+    @Override
+    public int getValuePosition() {
+      return 0;
+    }
+  }
+
+  private static class FirstOnRowCell extends EmptyCell {
+    private final byte[] rowArray;
+    private final int roffset;
+    private final short rlength;
+
+    public FirstOnRowCell(final byte[] row, int roffset, short rlength) {
+      this.rowArray = row;
+      this.roffset = roffset;
+      this.rlength = rlength;
+    }
+
+    @Override
+    public byte[] getRowArray() {
+      return this.rowArray;
+    }
+
+    @Override
+    public int getRowOffset() {
+      return this.roffset;
+    }
+
+    @Override
+    public short getRowLength() {
+      return this.rlength;
+    }
+
+    @Override
+    public long getTimestamp() {
+      return HConstants.LATEST_TIMESTAMP;
+    }
+
+    @Override
+    public byte getTypeByte() {
+      return Type.Maximum.getCode();
+    }
+  }
+
+  private static class FirstOnRowByteBufferCell extends EmptyByteBufferCell {
+    private final ByteBuffer rowBuff;
+    private final int roffset;
+    private final short rlength;
+
+    public FirstOnRowByteBufferCell(final ByteBuffer row, int roffset, short rlength) {
+      this.rowBuff = row;
+      this.roffset = roffset;
+      this.rlength = rlength;
+    }
+
+    @Override
+    public ByteBuffer getRowByteBuffer() {
+      return this.rowBuff;
+    }
+
+    @Override
+    public int getRowPosition() {
+      return this.roffset;
+    }
+
+    @Override
+    public short getRowLength() {
+      return this.rlength;
+    }
+
+    @Override
+    public long getTimestamp() {
+      return HConstants.LATEST_TIMESTAMP;
+    }
+
+    @Override
+    public byte getTypeByte() {
+      return Type.Maximum.getCode();
+    }
+  }
+
+  private static class LastOnRowByteBufferCell extends EmptyByteBufferCell {
+    private final ByteBuffer rowBuff;
+    private final int roffset;
+    private final short rlength;
+
+    public LastOnRowByteBufferCell(final ByteBuffer row, int roffset, short rlength) {
+      this.rowBuff = row;
+      this.roffset = roffset;
+      this.rlength = rlength;
+    }
+
+    @Override
+    public ByteBuffer getRowByteBuffer() {
+      return this.rowBuff;
+    }
+
+    @Override
+    public int getRowPosition() {
+      return this.roffset;
+    }
+
+    @Override
+    public short getRowLength() {
+      return this.rlength;
+    }
+
+    @Override
+    public long getTimestamp() {
+      return HConstants.OLDEST_TIMESTAMP;
+    }
+
+    @Override
+    public byte getTypeByte() {
+      return Type.Minimum.getCode();
+    }
+  }
+
+  private static class FirstOnRowColByteBufferCell extends FirstOnRowByteBufferCell {
+    private final ByteBuffer famBuff;
+    private final int famOffset;
+    private final byte famLength;
+    private final ByteBuffer colBuff;
+    private final int colOffset;
+    private final int colLength;
+
+    public FirstOnRowColByteBufferCell(final ByteBuffer row, int roffset, short rlength,
+        final ByteBuffer famBuff, final int famOffset, final byte famLength, final ByteBuffer col,
+        final int colOffset, final int colLength) {
+      super(row, roffset, rlength);
+      this.famBuff = famBuff;
+      this.famOffset = famOffset;
+      this.famLength = famLength;
+      this.colBuff = col;
+      this.colOffset = colOffset;
+      this.colLength = colLength;
+    }
+
+    @Override
+    public ByteBuffer getFamilyByteBuffer() {
+      return this.famBuff;
+    }
+
+    @Override
+    public int getFamilyPosition() {
+      return this.famOffset;
+    }
+
+    @Override
+    public byte getFamilyLength() {
+      return famLength;
+    }
+
+    @Override
+    public ByteBuffer getQualifierByteBuffer() {
+      return this.colBuff;
+    }
+
+    @Override
+    public int getQualifierPosition() {
+      return this.colOffset;
+    }
+
+    @Override
+    public int getQualifierLength() {
+      return this.colLength;
+    }
+  }
+
+  private static class FirstOnRowColCell extends FirstOnRowCell {
+    private final byte[] fArray;
+    private final int foffset;
+    private final byte flength;
+    private final byte[] qArray;
+    private final int qoffset;
+    private final int qlength;
+
+    public FirstOnRowColCell(byte[] rArray, int roffset, short rlength, byte[] fArray, int foffset,
+        byte flength, byte[] qArray, int qoffset, int qlength) {
+      super(rArray, roffset, rlength);
+      this.fArray = fArray;
+      this.foffset = foffset;
+      this.flength = flength;
+      this.qArray = qArray;
+      this.qoffset = qoffset;
+      this.qlength = qlength;
+    }
+
+    @Override
+    public byte[] getFamilyArray() {
+      return this.fArray;
+    }
+
+    @Override
+    public int getFamilyOffset() {
+      return this.foffset;
+    }
+
+    @Override
+    public byte getFamilyLength() {
+      return this.flength;
+    }
+
+    @Override
+    public byte[] getQualifierArray() {
+      return this.qArray;
+    }
+
+    @Override
+    public int getQualifierOffset() {
+      return this.qoffset;
+    }
+
+    @Override
+    public int getQualifierLength() {
+      return this.qlength;
+    }
+  }
+
+  private static class FirstOnRowColTSCell extends FirstOnRowColCell {
+
+    private long ts;
+
+    public FirstOnRowColTSCell(byte[] rArray, int roffset, short rlength, byte[] fArray,
+        int foffset, byte flength, byte[] qArray, int qoffset, int qlength, long ts) {
+      super(rArray, roffset, rlength, fArray, foffset, flength, qArray, qoffset, qlength);
+      this.ts = ts;
+    }
+
+    @Override
+    public long getTimestamp() {
+      return this.ts;
+    }
+  }
+
+  private static class FirstOnRowColTSByteBufferCell extends FirstOnRowColByteBufferCell {
+
+    private long ts;
+
+    public FirstOnRowColTSByteBufferCell(ByteBuffer rBuffer, int roffset, short rlength,
+        ByteBuffer fBuffer, int foffset, byte flength, ByteBuffer qBuffer, int qoffset, int qlength,
+        long ts) {
+      super(rBuffer, roffset, rlength, fBuffer, foffset, flength, qBuffer, qoffset, qlength);
+      this.ts = ts;
+    }
+
+    @Override
+    public long getTimestamp() {
+      return this.ts;
+    }
+  }
+
+  private static class LastOnRowCell extends EmptyCell {
+    private final byte[] rowArray;
+    private final int roffset;
+    private final short rlength;
+
+    public LastOnRowCell(byte[] row, int roffset, short rlength) {
+      this.rowArray = row;
+      this.roffset = roffset;
+      this.rlength = rlength;
+    }
+
+    @Override
+    public byte[] getRowArray() {
+      return this.rowArray;
+    }
+
+    @Override
+    public int getRowOffset() {
+      return this.roffset;
+    }
+
+    @Override
+    public short getRowLength() {
+      return this.rlength;
+    }
+
+    @Override
+    public long getTimestamp() {
+      return HConstants.OLDEST_TIMESTAMP;
+    }
+
+    @Override
+    public byte getTypeByte() {
+      return Type.Minimum.getCode();
+    }
+  }
+
+  private static class LastOnRowColCell extends LastOnRowCell {
+    private final byte[] fArray;
+    private final int foffset;
+    private final byte flength;
+    private final byte[] qArray;
+    private final int qoffset;
+    private final int qlength;
+
+    public LastOnRowColCell(byte[] rArray, int roffset, short rlength, byte[] fArray, int foffset,
+        byte flength, byte[] qArray, int qoffset, int qlength) {
+      super(rArray, roffset, rlength);
+      this.fArray = fArray;
+      this.foffset = foffset;
+      this.flength = flength;
+      this.qArray = qArray;
+      this.qoffset = qoffset;
+      this.qlength = qlength;
+    }
+
+    @Override
+    public byte[] getFamilyArray() {
+      return this.fArray;
+    }
+
+    @Override
+    public int getFamilyOffset() {
+      return this.foffset;
+    }
+
+    @Override
+    public byte getFamilyLength() {
+      return this.flength;
+    }
+
+    @Override
+    public byte[] getQualifierArray() {
+      return this.qArray;
+    }
+
+    @Override
+    public int getQualifierOffset() {
+      return this.qoffset;
+    }
+
+    @Override
+    public int getQualifierLength() {
+      return this.qlength;
+    }
+  }
+
+  private static class LastOnRowColByteBufferCell extends LastOnRowByteBufferCell {
+    private final ByteBuffer fBuffer;
+    private final int foffset;
+    private final byte flength;
+    private final ByteBuffer qBuffer;
+    private final int qoffset;
+    private final int qlength;
+
+    public LastOnRowColByteBufferCell(ByteBuffer rBuffer, int roffset, short rlength,
+        ByteBuffer fBuffer, int foffset, byte flength, ByteBuffer qBuffer, int qoffset,
+        int qlength) {
+      super(rBuffer, roffset, rlength);
+      this.fBuffer = fBuffer;
+      this.foffset = foffset;
+      this.flength = flength;
+      this.qBuffer = qBuffer;
+      this.qoffset = qoffset;
+      this.qlength = qlength;
+    }
+
+    @Override
+    public ByteBuffer getFamilyByteBuffer() {
+      return this.fBuffer;
+    }
+
+    @Override
+    public int getFamilyPosition() {
+      return this.foffset;
+    }
+
+    @Override
+    public byte getFamilyLength() {
+      return this.flength;
+    }
+
+    @Override
+    public ByteBuffer getQualifierByteBuffer() {
+      return this.qBuffer;
+    }
+
+    @Override
+    public int getQualifierPosition() {
+      return this.qoffset;
+    }
+
+    @Override
+    public int getQualifierLength() {
+      return this.qlength;
+    }
+  }
+
+  private static class FirstOnRowDeleteFamilyCell extends EmptyCell {
+    private final byte[] row;
+    private final byte[] fam;
+
+    public FirstOnRowDeleteFamilyCell(byte[] row, byte[] fam) {
+      this.row = row;
+      this.fam = fam;
+    }
+
+    @Override
+    public byte[] getRowArray() {
+      return this.row;
+    }
+
+    @Override
+    public short getRowLength() {
+      return (short) this.row.length;
+    }
+
+    @Override
+    public byte[] getFamilyArray() {
+      return this.fam;
+    }
+
+    @Override
+    public byte getFamilyLength() {
+      return (byte) this.fam.length;
+    }
+
+    @Override
+    public long getTimestamp() {
+      return HConstants.LATEST_TIMESTAMP;
+    }
+
+    @Override
+    public byte getTypeByte() {
+      return Type.DeleteFamily.getCode();
+    }
+  }
+
+  /**
+   * Writes the Cell's key part as it would have serialized in a KeyValue. The format is &lt;2 bytes
+   * rk len&gt;&lt;rk&gt;&lt;1 byte cf len&gt;&lt;cf&gt;&lt;qualifier&gt;&lt;8 bytes
+   * timestamp&gt;&lt;1 byte type&gt;
+   * @param cell
+   * @param out
+   * @throws IOException
+   */
+  public static void writeFlatKey(Cell cell, DataOutput out) throws IOException {
+    short rowLen = cell.getRowLength();
+    byte fLen = cell.getFamilyLength();
+    int qLen = cell.getQualifierLength();
+    // Using just one if/else loop instead of every time checking before writing every
+    // component of cell
+    if (cell instanceof ByteBufferCell) {
+      out.writeShort(rowLen);
+      ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getRowByteBuffer(),
+        ((ByteBufferCell) cell).getRowPosition(), rowLen);
+      out.writeByte(fLen);
+      ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getFamilyByteBuffer(),
+        ((ByteBufferCell) cell).getFamilyPosition(), fLen);
+      ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getQualifierByteBuffer(),
+        ((ByteBufferCell) cell).getQualifierPosition(), qLen);
+    } else {
+      out.writeShort(rowLen);
+      out.write(cell.getRowArray(), cell.getRowOffset(), rowLen);
+      out.writeByte(fLen);
+      out.write(cell.getFamilyArray(), cell.getFamilyOffset(), fLen);
+      out.write(cell.getQualifierArray(), cell.getQualifierOffset(), qLen);
+    }
+    out.writeLong(cell.getTimestamp());
+    out.writeByte(cell.getTypeByte());
+  }
+
+  /**
+   * Deep clones the given cell if the cell supports deep cloning
+   * @param cell the cell to be cloned
+   * @return the cloned cell
+   * @throws CloneNotSupportedException
+   */
+  public static Cell deepClone(Cell cell) throws CloneNotSupportedException {
+    if (cell instanceof ExtendedCell) {
+      return ((ExtendedCell) cell).deepClone();
+    }
+    throw new CloneNotSupportedException();
+  }
+
+  /**
+   * Writes the cell to the given OutputStream
+   * @param cell the cell to be written
+   * @param out the outputstream
+   * @param withTags if tags are to be written or not
+   * @return the total bytes written
+   * @throws IOException
+   */
+  public static int writeCell(Cell cell, OutputStream out, boolean withTags) throws IOException {
+    if (cell instanceof ExtendedCell) {
+      return ((ExtendedCell) cell).write(out, withTags);
+    } else {
+      ByteBufferUtils.putInt(out, estimatedSerializedSizeOfKey(cell));
+      ByteBufferUtils.putInt(out, cell.getValueLength());
+      writeFlatKey(cell, out);
+      writeValue(out, cell, cell.getValueLength());
+      int tagsLength = cell.getTagsLength();
+      if (withTags) {
+        byte[] len = new byte[Bytes.SIZEOF_SHORT];
+        Bytes.putAsShort(len, 0, tagsLength);
+        out.write(len);
+        if (tagsLength > 0) {
+          writeTags(out, cell, tagsLength);
+        }
+      }
+      int lenWritten = (2 * Bytes.SIZEOF_INT) + estimatedSerializedSizeOfKey(cell)
+          + cell.getValueLength();
+      if (withTags) {
+        lenWritten += Bytes.SIZEOF_SHORT + tagsLength;
+      }
+      return lenWritten;
+    }
+  }
+
+  /**
+   * Writes a cell to the buffer at the given offset
+   * @param cell the cell to be written
+   * @param buf the buffer to which the cell has to be wrriten
+   * @param offset the offset at which the cell should be written
+   */
+  public static void writeCellToBuffer(Cell cell, ByteBuffer buf, int offset) {
+    if (cell instanceof ExtendedCell) {
+      ((ExtendedCell) cell).write(buf, offset);
+    } else {
+      // Using the KVUtil
+      byte[] bytes = KeyValueUtil.copyToNewByteArray(cell);
+      ByteBufferUtils.copyFromArrayToBuffer(buf, offset, bytes, 0, bytes.length);
+    }
+  }
+
+  public static int writeFlatKey(Cell cell, OutputStream out) throws IOException {
+    short rowLen = cell.getRowLength();
+    byte fLen = cell.getFamilyLength();
+    int qLen = cell.getQualifierLength();
+    // Using just one if/else loop instead of every time checking before writing every
+    // component of cell
+    if (cell instanceof ByteBufferCell) {
+      StreamUtils.writeShort(out, rowLen);
+      ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getRowByteBuffer(),
+        ((ByteBufferCell) cell).getRowPosition(), rowLen);
+      out.write(fLen);
+      ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getFamilyByteBuffer(),
+        ((ByteBufferCell) cell).getFamilyPosition(), fLen);
+      ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getQualifierByteBuffer(),
+        ((ByteBufferCell) cell).getQualifierPosition(), qLen);
+    } else {
+      StreamUtils.writeShort(out, rowLen);
+      out.write(cell.getRowArray(), cell.getRowOffset(), rowLen);
+      out.write(fLen);
+      out.write(cell.getFamilyArray(), cell.getFamilyOffset(), fLen);
+      out.write(cell.getQualifierArray(), cell.getQualifierOffset(), qLen);
+    }
+    StreamUtils.writeLong(out, cell.getTimestamp());
+    out.write(cell.getTypeByte());
+    return Bytes.SIZEOF_SHORT + rowLen + Bytes.SIZEOF_BYTE + fLen + qLen + Bytes.SIZEOF_LONG
+        + Bytes.SIZEOF_BYTE;
+  }
+
+  /**
+   * Sets the given seqId to the cell. Marked as audience Private as of 1.2.0. Setting a Cell
+   * sequenceid is an internal implementation detail not for general public use.
+   * @param cell
+   * @param seqId
+   * @throws IOException when the passed cell is not of type {@link SettableSequenceId}
+   */
+  public static void setSequenceId(Cell cell, long seqId) throws IOException {
+    if (cell instanceof SettableSequenceId) {
+      ((SettableSequenceId) cell).setSequenceId(seqId);
+    } else {
+      throw new IOException(new UnsupportedOperationException(
+          "Cell is not of type " + SettableSequenceId.class.getName()));
+    }
+  }
+
+  /**
+   * Sets the given timestamp to the cell.
+   * @param cell
+   * @param ts
+   * @throws IOException when the passed cell is not of type {@link SettableTimestamp}
+   */
+  public static void setTimestamp(Cell cell, long ts) throws IOException {
+    if (cell instanceof SettableTimestamp) {
+      ((SettableTimestamp) cell).setTimestamp(ts);
+    } else {
+      throw new IOException(new UnsupportedOperationException(
+          "Cell is not of type " + SettableTimestamp.class.getName()));
+    }
+  }
+
+  /**
+   * Sets the given timestamp to the cell.
+   * @param cell
+   * @param ts buffer containing the timestamp value
+   * @param tsOffset offset to the new timestamp
+   * @throws IOException when the passed cell is not of type {@link SettableTimestamp}
+   */
+  public static void setTimestamp(Cell cell, byte[] ts, int tsOffset) throws IOException {
+    if (cell instanceof SettableTimestamp) {
+      ((SettableTimestamp) cell).setTimestamp(ts, tsOffset);
+    } else {
+      throw new IOException(new UnsupportedOperationException(
+          "Cell is not of type " + SettableTimestamp.class.getName()));
+    }
+  }
+
+  /**
+   * Sets the given timestamp to the cell iff current timestamp is
+   * {@link HConstants#LATEST_TIMESTAMP}.
+   * @param cell
+   * @param ts
+   * @return True if cell timestamp is modified.
+   * @throws IOException when the passed cell is not of type {@link SettableTimestamp}
+   */
+  public static boolean updateLatestStamp(Cell cell, long ts) throws IOException {
+    if (cell.getTimestamp() == HConstants.LATEST_TIMESTAMP) {
+      setTimestamp(cell, ts);
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Sets the given timestamp to the cell iff current timestamp is
+   * {@link HConstants#LATEST_TIMESTAMP}.
+   * @param cell
+   * @param ts buffer containing the timestamp value
+   * @param tsOffset offset to the new timestamp
+   * @return True if cell timestamp is modified.
+   * @throws IOException when the passed cell is not of type {@link SettableTimestamp}
+   */
+  public static boolean updateLatestStamp(Cell cell, byte[] ts, int tsOffset) throws IOException {
+    if (cell.getTimestamp() == HConstants.LATEST_TIMESTAMP) {
+      setTimestamp(cell, ts, tsOffset);
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Writes the row from the given cell to the output stream
+   * @param out The outputstream to which the data has to be written
+   * @param cell The cell whose contents has to be written
+   * @param rlength the row length
+   * @throws IOException
+   */
+  public static void writeRow(OutputStream out, Cell cell, short rlength) throws IOException {
+    if (cell instanceof ByteBufferCell) {
+      ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getRowByteBuffer(),
+        ((ByteBufferCell) cell).getRowPosition(), rlength);
+    } else {
+      out.write(cell.getRowArray(), cell.getRowOffset(), rlength);
+    }
+  }
+
+  /**
+   * Writes the family from the given cell to the output stream
+   * @param out The outputstream to which the data has to be written
+   * @param cell The cell whose contents has to be written
+   * @param flength the family length
+   * @throws IOException
+   */
+  public static void writeFamily(OutputStream out, Cell cell, byte flength) throws IOException {
+    if (cell instanceof ByteBufferCell) {
+      ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getFamilyByteBuffer(),
+        ((ByteBufferCell) cell).getFamilyPosition(), flength);
+    } else {
+      out.write(cell.getFamilyArray(), cell.getFamilyOffset(), flength);
+    }
+  }
+
+  /**
+   * Writes the qualifier from the given cell to the output stream
+   * @param out The outputstream to which the data has to be written
+   * @param cell The cell whose contents has to be written
+   * @param qlength the qualifier length
+   * @throws IOException
+   */
+  public static void writeQualifier(OutputStream out, Cell cell, int qlength) throws IOException {
+    if (cell instanceof ByteBufferCell) {
+      ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getQualifierByteBuffer(),
+        ((ByteBufferCell) cell).getQualifierPosition(), qlength);
+    } else {
+      out.write(cell.getQualifierArray(), cell.getQualifierOffset(), qlength);
+    }
+  }
+
+  /**
+   * Writes the qualifier from the given cell to the output stream excluding the common prefix
+   * @param out The dataoutputstream to which the data has to be written
+   * @param cell The cell whose contents has to be written
+   * @param qlength the qualifier length
+   * @throws IOException
+   */
+  public static void writeQualifierSkippingBytes(DataOutputStream out, Cell cell, int qlength,
+      int commonPrefix) throws IOException {
+    if (cell instanceof ByteBufferCell) {
+      ByteBufferUtils.copyBufferToStream((DataOutput) out,
+        ((ByteBufferCell) cell).getQualifierByteBuffer(),
+        ((ByteBufferCell) cell).getQualifierPosition() + commonPrefix, qlength - commonPrefix);
+    } else {
+      out.write(cell.getQualifierArray(), cell.getQualifierOffset() + commonPrefix,
+        qlength - commonPrefix);
+    }
+  }
+
+  /**
+   * Writes the value from the given cell to the output stream
+   * @param out The outputstream to which the data has to be written
+   * @param cell The cell whose contents has to be written
+   * @param vlength the value length
+   * @throws IOException
+   */
+  public static void writeValue(OutputStream out, Cell cell, int vlength) throws IOException {
+    if (cell instanceof ByteBufferCell) {
+      ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getValueByteBuffer(),
+        ((ByteBufferCell) cell).getValuePosition(), vlength);
+    } else {
+      out.write(cell.getValueArray(), cell.getValueOffset(), vlength);
+    }
+  }
+
+  /**
+   * Writes the tag from the given cell to the output stream
+   * @param out The outputstream to which the data has to be written
+   * @param cell The cell whose contents has to be written
+   * @param tagsLength the tag length
+   * @throws IOException
+   */
+  public static void writeTags(OutputStream out, Cell cell, int tagsLength) throws IOException {
+    if (cell instanceof ByteBufferCell) {
+      ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getTagsByteBuffer(),
+        ((ByteBufferCell) cell).getTagsPosition(), tagsLength);
+    } else {
+      out.write(cell.getTagsArray(), cell.getTagsOffset(), tagsLength);
+    }
+  }
+
+  /**
+   * special case for Cell.equals
+   */
+  public static boolean equalsIgnoreMvccVersion(Cell a, Cell b) {
+    // row
+    boolean res = CellUtil.matchingRows(a, b);
+    if (!res) return res;
+
+    // family
+    res = CellUtil.matchingColumn(a, b);
+    if (!res) return res;
+
+    // timestamp: later sorts first
+    if (!CellUtil.matchingTimestamp(a, b)) return false;
+
+    // type
+    int c = (0xff & b.getTypeByte()) - (0xff & a.getTypeByte());
+    if (c != 0) return false;
+    else return true;
+  }
+
+  /**
+   * Converts the rowkey bytes of the given cell into an int value
+   * @param cell
+   * @return rowkey as int
+   */
+  public static int getRowAsInt(Cell cell) {
+    if (cell instanceof ByteBufferCell) {
+      return ByteBufferUtils.toInt(((ByteBufferCell) cell).getRowByteBuffer(),
+        ((ByteBufferCell) cell).getRowPosition());
+    }
+    return Bytes.toInt(cell.getRowArray(), cell.getRowOffset());
+  }
+
+  /**
+   * Converts the value bytes of the given cell into a long value
+   * @param cell
+   * @return value as long
+   */
+  public static long getValueAsLong(Cell cell) {
+    if (cell instanceof ByteBufferCell) {
+      return ByteBufferUtils.toLong(((ByteBufferCell) cell).getValueByteBuffer(),
+        ((ByteBufferCell) cell).getValuePosition());
+    }
+    return Bytes.toLong(cell.getValueArray(), cell.getValueOffset());
+  }
+
+  /**
+   * Converts the value bytes of the given cell into a int value
+   * @param cell
+   * @return value as int
+   */
+  public static int getValueAsInt(Cell cell) {
+    if (cell instanceof ByteBufferCell) {
+      return ByteBufferUtils.toInt(((ByteBufferCell) cell).getValueByteBuffer(),
+        ((ByteBufferCell) cell).getValuePosition());
+    }
+    return Bytes.toInt(cell.getValueArray(), cell.getValueOffset());
+  }
+
+  /**
+   * Converts the value bytes of the given cell into a double value
+   * @param cell
+   * @return value as double
+   */
+  public static double getValueAsDouble(Cell cell) {
+    if (cell instanceof ByteBufferCell) {
+      return ByteBufferUtils.toDouble(((ByteBufferCell) cell).getValueByteBuffer(),
+        ((ByteBufferCell) cell).getValuePosition());
+    }
+    return Bytes.toDouble(cell.getValueArray(), cell.getValueOffset());
+  }
+
+  /**
+   * Converts the value bytes of the given cell into a BigDecimal
+   * @param cell
+   * @return value as BigDecimal
+   */
+  public static BigDecimal getValueAsBigDecimal(Cell cell) {
+    if (cell instanceof ByteBufferCell) {
+      return ByteBufferUtils.toBigDecimal(((ByteBufferCell) cell).getValueByteBuffer(),
+        ((ByteBufferCell) cell).getValuePosition(), cell.getValueLength());
+    }
+    return Bytes.toBigDecimal(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+  }
+
+  /**
+   * Compresses the tags to the given outputstream using the TagcompressionContext
+   * @param out the outputstream to which the compression should happen
+   * @param cell the cell which has tags
+   * @param tagCompressionContext the TagCompressionContext
+   * @throws IOException can throw IOException if the compression encounters issue
+   */
+  public static void compressTags(OutputStream out, Cell cell,
+      TagCompressionContext tagCompressionContext) throws IOException {
+    if (cell instanceof ByteBufferCell) {
+      tagCompressionContext.compressTags(out, ((ByteBufferCell) cell).getTagsByteBuffer(),
+        ((ByteBufferCell) cell).getTagsPosition(), cell.getTagsLength());
+    } else {
+      tagCompressionContext.compressTags(out, cell.getTagsArray(), cell.getTagsOffset(),
+        cell.getTagsLength());
+    }
+  }
+
+  public static void compressRow(OutputStream out, Cell cell, Dictionary dict) throws IOException {
+    if (cell instanceof ByteBufferCell) {
+      Dictionary.write(out, ((ByteBufferCell) cell).getRowByteBuffer(),
+        ((ByteBufferCell) cell).getRowPosition(), cell.getRowLength(), dict);
+    } else {
+      Dictionary.write(out, cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), dict);
+    }
+  }
+
+  public static void compressFamily(OutputStream out, Cell cell, Dictionary dict)
+      throws IOException {
+    if (cell instanceof ByteBufferCell) {
+      Dictionary.write(out, ((ByteBufferCell) cell).getFamilyByteBuffer(),
+        ((ByteBufferCell) cell).getFamilyPosition(), cell.getFamilyLength(), dict);
+    } else {
+      Dictionary.write(out, cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
+        dict);
+    }
+  }
+
+  public static void compressQualifier(OutputStream out, Cell cell, Dictionary dict)
+      throws IOException {
+    if (cell instanceof ByteBufferCell) {
+      Dictionary.write(out, ((ByteBufferCell) cell).getQualifierByteBuffer(),
+        ((ByteBufferCell) cell).getQualifierPosition(), cell.getQualifierLength(), dict);
+    } else {
+      Dictionary.write(out, cell.getQualifierArray(), cell.getQualifierOffset(),
+        cell.getQualifierLength(), dict);
+    }
+  }
+
+  /**
+   * Used when a cell needs to be compared with a key byte[] such as cases of finding the index from
+   * the index block, bloom keys from the bloom blocks This byte[] is expected to be serialized in
+   * the KeyValue serialization format If the KeyValue (Cell's) serialization format changes this
+   * method cannot be used.
+   * @param comparator the cell comparator
+   * @param left the cell to be compared
+   * @param key the serialized key part of a KeyValue
+   * @param offset the offset in the key byte[]
+   * @param length the length of the key byte[]
+   * @return an int greater than 0 if left is greater than right lesser than 0 if left is lesser
+   *         than right equal to 0 if left is equal to right
+   */
+  @VisibleForTesting
+  public static final int compare(CellComparator comparator, Cell left, byte[] key, int offset,
+      int length) {
+    // row
+    short rrowlength = Bytes.toShort(key, offset);
+    int c = comparator.compareRows(left, key, offset + Bytes.SIZEOF_SHORT, rrowlength);
+    if (c != 0) return c;
+
+    // Compare the rest of the two KVs without making any assumptions about
+    // the common prefix. This function will not compare rows anyway, so we
+    // don't need to tell it that the common prefix includes the row.
+    return compareWithoutRow(comparator, left, key, offset, length, rrowlength);
+  }
+
+  /**
+   * Compare columnFamily, qualifier, timestamp, and key type (everything except the row). This
+   * method is used both in the normal comparator and the "same-prefix" comparator. Note that we are
+   * assuming that row portions of both KVs have already been parsed and found identical, and we
+   * don't validate that assumption here.
+   * @param commonPrefix the length of the common prefix of the two key-values being compared,
+   *          including row length and row
+   */
+  static final int compareWithoutRow(CellComparator comparator, Cell left, byte[] right,
+      int roffset, int rlength, short rowlength) {
+    /***
+     * KeyValue Format and commonLength:
+     * |_keyLen_|_valLen_|_rowLen_|_rowKey_|_famiLen_|_fami_|_Quali_|....
+     * ------------------|-------commonLength--------|--------------
+     */
+    int commonLength = KeyValue.ROW_LENGTH_SIZE + KeyValue.FAMILY_LENGTH_SIZE + rowlength;
+
+    // commonLength + TIMESTAMP_TYPE_SIZE
+    int commonLengthWithTSAndType = KeyValue.TIMESTAMP_TYPE_SIZE + commonLength;
+    // ColumnFamily + Qualifier length.
+    int lcolumnlength = left.getFamilyLength() + left.getQualifierLength();
+    int rcolumnlength = rlength - commonLengthWithTSAndType;
+
+    byte ltype = left.getTypeByte();
+    byte rtype = right[roffset + (rlength - 1)];
+
+    // If the column is not specified, the "minimum" key type appears the
+    // latest in the sorted order, regardless of the timestamp. This is used
+    // for specifying the last key/value in a given row, because there is no
+    // "lexicographically last column" (it would be infinitely long). The
+    // "maximum" key type does not need this behavior.
+    if (lcolumnlength == 0 && ltype == Type.Minimum.getCode()) {
+      // left is "bigger", i.e. it appears later in the sorted order
+      return 1;
+    }
+    if (rcolumnlength == 0 && rtype == Type.Minimum.getCode()) {
+      return -1;
+    }
+
+    int rfamilyoffset = commonLength + roffset;
+
+    // Column family length.
+    int lfamilylength = left.getFamilyLength();
+    int rfamilylength = right[rfamilyoffset - 1];
+    // If left family size is not equal to right family size, we need not
+    // compare the qualifiers.
+    boolean sameFamilySize = (lfamilylength == rfamilylength);
+    if (!sameFamilySize) {
+      // comparing column family is enough.
+      return CellUtil.compareFamilies(left, right, rfamilyoffset, rfamilylength);
+    }
+    // Compare family & qualifier together.
+    // Families are same. Compare on qualifiers.
+    int comparison = CellUtil.compareColumns(left, right, rfamilyoffset, rfamilylength,
+      rfamilyoffset + rfamilylength, (rcolumnlength - rfamilylength));
+    if (comparison != 0) {
+      return comparison;
+    }
+
+    // //
+    // Next compare timestamps.
+    long rtimestamp = Bytes.toLong(right, roffset + (rlength - KeyValue.TIMESTAMP_TYPE_SIZE));
+    int compare = comparator.compareTimestamps(left.getTimestamp(), rtimestamp);
+    if (compare != 0) {
+      return compare;
+    }
+
+    // Compare types. Let the delete types sort ahead of puts; i.e. types
+    // of higher numbers sort before those of lesser numbers. Maximum (255)
+    // appears ahead of everything, and minimum (0) appears after
+    // everything.
+    return (0xff & rtype) - (0xff & ltype);
+  }
+
+  /**
+   * @return An new cell is located following input cell. If both of type and timestamp are minimum,
+   *         the input cell will be returned directly.
+   */
+  public static Cell createNextOnRowCol(Cell cell) {
+    long ts = cell.getTimestamp();
+    byte type = cell.getTypeByte();
+    if (type != Type.Minimum.getCode()) {
+      type = KeyValue.Type.values()[KeyValue.Type.codeToType(type).ordinal() - 1].getCode();
+    } else if (ts != HConstants.OLDEST_TIMESTAMP) {
+      ts = ts - 1;
+      type = Type.Maximum.getCode();
+    } else {
+      return cell;
+    }
+    return createNextOnRowCol(cell, ts, type);
+  }
+
+  static Cell createNextOnRowCol(Cell cell, long ts, byte type) {
+    if (cell instanceof ByteBufferCell) {
+      return new LastOnRowColByteBufferCell(
+          ((ByteBufferCell) cell).getRowByteBuffer(), ((ByteBufferCell) cell).getRowPosition(),
+          cell.getRowLength(), ((ByteBufferCell) cell).getFamilyByteBuffer(),
+          ((ByteBufferCell) cell).getFamilyPosition(), cell.getFamilyLength(),
+          ((ByteBufferCell) cell).getQualifierByteBuffer(),
+          ((ByteBufferCell) cell).getQualifierPosition(), cell.getQualifierLength()) {
+        @Override
+        public long getTimestamp() {
+          return ts;
+        }
+
+        @Override
+        public byte getTypeByte() {
+          return type;
+        }
+      };
+    }
+    return new LastOnRowColCell(cell.getRowArray(), cell.getRowOffset(),
+        cell.getRowLength(), cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
+        cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) {
+      @Override
+      public long getTimestamp() {
+        return ts;
+      }
+
+      @Override
+      public byte getTypeByte() {
+        return type;
+      }
+    };
+  }
+
+  /**
+   * Estimate based on keyvalue's serialization format in the RPC layer. Note that there is an extra
+   * SIZEOF_INT added to the size here that indicates the actual length of the cell for cases where
+   * cell's are serialized in a contiguous format (For eg in RPCs).
+   * @param cell
+   * @return Estimate of the <code>cell</code> size in bytes plus an extra SIZEOF_INT indicating the
+   *         actual cell length.
+   */
+  public static int estimatedSerializedSizeOf(final Cell cell) {
+    if (cell instanceof ExtendedCell) {
+      return ((ExtendedCell) cell).getSerializedSize(true) + Bytes.SIZEOF_INT;
+    }
+
+    return getSumOfCellElementLengths(cell) +
+    // Use the KeyValue's infrastructure size presuming that another implementation would have
+    // same basic cost.
+        KeyValue.ROW_LENGTH_SIZE + KeyValue.FAMILY_LENGTH_SIZE +
+        // Serialization is probably preceded by a length (it is in the KeyValueCodec at least).
+        Bytes.SIZEOF_INT;
+  }
+  
+  /**
+   * @param cell
+   * @return Sum of the lengths of all the elements in a Cell; does not count in any infrastructure
+   */
+  private static int getSumOfCellElementLengths(final Cell cell) {
+    return getSumOfCellKeyElementLengths(cell) + cell.getValueLength() + cell.getTagsLength();
+  }
+
+  /**
+   * @param cell
+   * @return Sum of all elements that make up a key; does not include infrastructure, tags or
+   *         values.
+   */
+  private static int getSumOfCellKeyElementLengths(final Cell cell) {
+    return cell.getRowLength() + cell.getFamilyLength() + cell.getQualifierLength()
+        + KeyValue.TIMESTAMP_TYPE_SIZE;
+  }
+
+  /**
+   * Calculates the serialized key size. We always serialize in the KeyValue's serialization format.
+   * @param cell the cell for which the key size has to be calculated.
+   * @return the key size
+   */
+  public static int estimatedSerializedSizeOfKey(final Cell cell) {
+    if (cell instanceof KeyValue) return ((KeyValue) cell).getKeyLength();
+    return cell.getRowLength() + cell.getFamilyLength() + cell.getQualifierLength()
+        + KeyValue.KEY_INFRASTRUCTURE_SIZE;
+  }
+
+  /**
+   * This is an estimate of the heap space occupied by a cell. When the cell is of type
+   * {@link HeapSize} we call {@link HeapSize#heapSize()} so cell can give a correct value. In other
+   * cases we just consider the bytes occupied by the cell components ie. row, CF, qualifier,
+   * timestamp, type, value and tags.
+   * @param cell
+   * @return estimate of the heap space
+   */
+  public static long estimatedHeapSizeOf(final Cell cell) {
+    if (cell instanceof HeapSize) {
+      return ((HeapSize) cell).heapSize();
+    }
+    // TODO: Add sizing of references that hold the row, family, etc., arrays.
+    return estimatedSerializedSizeOf(cell);
+  }
+
+  /**
+   * This method exists just to encapsulate how we serialize keys. To be replaced by a factory that
+   * we query to figure what the Cell implementation is and then, what serialization engine to use
+   * and further, how to serialize the key for inclusion in hfile index. TODO.
+   * @param cell
+   * @return The key portion of the Cell serialized in the old-school KeyValue way or null if passed
+   *         a null <code>cell</code>
+   */
+  public static byte[] getCellKeySerializedAsKeyValueKey(final Cell cell) {
+    if (cell == null) return null;
+    byte[] b = new byte[KeyValueUtil.keyLength(cell)];
+    KeyValueUtil.appendKeyTo(cell, b, 0);
+    return b;
+  }
+
+  /**
+   * Create a Cell that is smaller than all other possible Cells for the given Cell's row.
+   * @param cell
+   * @return First possible Cell on passed Cell's row.
+   */
+  public static Cell createFirstOnRow(final Cell cell) {
+    if (cell instanceof ByteBufferCell) {
+      return new FirstOnRowByteBufferCell(
+          ((ByteBufferCell) cell).getRowByteBuffer(), ((ByteBufferCell) cell).getRowPosition(),
+          cell.getRowLength());
+    }
+    return new FirstOnRowCell(cell.getRowArray(), cell.getRowOffset(),
+        cell.getRowLength());
+  }
+
+  public static Cell createFirstOnRow(final byte[] row, int roffset, short rlength) {
+    return new FirstOnRowCell(row, roffset, rlength);
+  }
+
+  public static Cell createFirstOnRow(final byte[] row, final byte[] family, final byte[] col) {
+    return createFirstOnRow(row, 0, (short) row.length, family, 0, (byte) family.length, col, 0,
+      col.length);
+  }
+
+  public static Cell createFirstOnRow(final byte[] row, int roffset, short rlength,
+      final byte[] family, int foffset, byte flength, final byte[] col, int coffset, int clength) {
+    return new FirstOnRowColCell(row, roffset, rlength, family, foffset, flength, col, coffset,
+        clength);
+  }
+
+  public static Cell createFirstOnRow(final byte[] row) {
+    return createFirstOnRow(row, 0, (short) row.length);
+  }
+
+  public static Cell createFirstOnRowFamily(Cell cell, byte[] fArray, int foff, int flen) {
+    if (cell instanceof ByteBufferCell) {
+      return new FirstOnRowColByteBufferCell(
+          ((ByteBufferCell) cell).getRowByteBuffer(), ((ByteBufferCell) cell).getRowPosition(),
+          cell.getRowLength(), ByteBuffer.wrap(fArray), foff, (byte) flen,
+          HConstants.EMPTY_BYTE_BUFFER, 0, 0);
+    }
+    return new FirstOnRowColCell(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
+        fArray, foff, (byte) flen, HConstants.EMPTY_BYTE_ARRAY, 0, 0);
+  }
+
+  public static Cell createFirstOnRowCol(final Cell cell) {
+    if (cell instanceof ByteBufferCell) {
+      return new FirstOnRowColByteBufferCell(
+          ((ByteBufferCell) cell).getRowByteBuffer(), ((ByteBufferCell) cell).getRowPosition(),
+          cell.getRowLength(), HConstants.EMPTY_BYTE_BUF

<TRUNCATED>

Mime
View raw message