hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ramkris...@apache.org
Subject [3/3] hbase git commit: HBASE-18649 Deprecate KV Usage in MR to move to Cells in 3.0 (Ram)
Date Wed, 04 Oct 2017 09:14:58 GMT
HBASE-18649 Deprecate KV Usage in MR to move to Cells in 3.0 (Ram)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/66fb60d4
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/66fb60d4
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/66fb60d4

Branch: refs/heads/branch-2
Commit: 66fb60d4a4828d181c4ebb7fd908f188e65141a4
Parents: 4475ba8
Author: Ramkrishna <ramkrishna.s.vasudevan@intel.com>
Authored: Wed Oct 4 14:44:19 2017 +0530
Committer: Ramkrishna <ramkrishna.s.vasudevan@intel.com>
Committed: Wed Oct 4 14:44:19 2017 +0530

----------------------------------------------------------------------
 .../mapreduce/MapReduceHFileSplitterJob.java    |   33 +-
 .../java/org/apache/hadoop/hbase/CellUtil.java  |  114 +-
 .../hbase/io/encoding/FastDiffDeltaEncoder.java |    3 +-
 .../io/encoding/PrefixKeyDeltaEncoder.java      |    3 +-
 .../hadoop/hbase/util/ByteBufferUtils.java      |   23 +
 .../org/apache/hadoop/hbase/TestCellUtil.java   |  117 ++
 .../hbase/mapreduce/CellSerialization.java      |   96 ++
 .../hadoop/hbase/mapreduce/CellSortReducer.java |   60 +
 .../hadoop/hbase/mapreduce/CopyTable.java       |    2 +-
 .../hbase/mapreduce/HFileOutputFormat2.java     |   26 +-
 .../apache/hadoop/hbase/mapreduce/Import.java   |  303 +++-
 .../hadoop/hbase/mapreduce/ImportTsv.java       |    2 +-
 .../hbase/mapreduce/KeyValueSerialization.java  |    8 +-
 .../hbase/mapreduce/KeyValueSortReducer.java    |    3 +
 .../hadoop/hbase/mapreduce/PutSortReducer.java  |    2 +-
 .../hbase/mapreduce/TableMapReduceUtil.java     |    2 +-
 .../hadoop/hbase/mapreduce/TextSortReducer.java |    2 +-
 .../hadoop/hbase/mapreduce/WALPlayer.java       |   54 +-
 .../apache/hadoop/hbase/util/MapReduceCell.java |  270 ++++
 .../TestCellBasedHFileOutputFormat2.java        | 1496 ++++++++++++++++++
 .../mapreduce/TestCellBasedImportExport2.java   |  801 ++++++++++
 .../mapreduce/TestCellBasedWALPlayer2.java      |  232 +++
 .../hbase/mapreduce/TestHFileOutputFormat2.java |    1 +
 .../hbase/mapreduce/TestImportExport.java       |    1 +
 .../hadoop/hbase/mapreduce/TestWALPlayer.java   |    1 +
 25 files changed, 3555 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/66fb60d4/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java
index 97ece3d..51a6b1d 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java
@@ -24,22 +24,21 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.CellSortReducer;
 import org.apache.hadoop.hbase.mapreduce.HFileInputFormat;
 import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
-import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.MapReduceCell;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -47,6 +46,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.yetus.audience.InterfaceAudience;
 
 /**
  * A tool to split HFiles into new region boundaries as a MapReduce job. The tool generates HFiles
@@ -70,24 +70,15 @@ public class MapReduceHFileSplitterJob extends Configured implements Tool {
 
   /**
    * A mapper that just writes out cells. This one can be used together with
-   * {@link KeyValueSortReducer}
+   * {@link CellSortReducer}
    */
   static class HFileCellMapper extends
-      Mapper<NullWritable, KeyValue, ImmutableBytesWritable, KeyValue> {
+      Mapper<NullWritable, Cell, ImmutableBytesWritable, Cell> {
 
     @Override
-    public void map(NullWritable key, KeyValue value, Context context) throws IOException,
-        InterruptedException {
-      // Convert value to KeyValue if subclass
-      if (!value.getClass().equals(KeyValue.class)) {
-        value =
-            new KeyValue(value.getRowArray(), value.getRowOffset(), value.getRowLength(),
-                value.getFamilyArray(), value.getFamilyOffset(), value.getFamilyLength(),
-                value.getQualifierArray(), value.getQualifierOffset(), value.getQualifierLength(),
-                value.getTimestamp(), Type.codeToType(value.getTypeByte()), value.getValueArray(),
-                value.getValueOffset(), value.getValueLength());
-      }
-      context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)), value);
+    public void map(NullWritable key, Cell value, Context context)
+        throws IOException, InterruptedException {
+      context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)), new MapReduceCell(value));
     }
 
     @Override
@@ -119,14 +110,14 @@ public class MapReduceHFileSplitterJob extends Configured implements Tool {
       LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
       TableName tableName = TableName.valueOf(tabName);
       job.setMapperClass(HFileCellMapper.class);
-      job.setReducerClass(KeyValueSortReducer.class);
+      job.setReducerClass(CellSortReducer.class);
       Path outputDir = new Path(hfileOutPath);
       FileOutputFormat.setOutputPath(job, outputDir);
-      job.setMapOutputValueClass(KeyValue.class);
+      job.setMapOutputValueClass(MapReduceCell.class);
       try (Connection conn = ConnectionFactory.createConnection(conf);
           Table table = conn.getTable(tableName);
           RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
-        HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
+          HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
       }
       LOG.debug("success configuring load incremental job");
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/66fb60d4/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
index dc5df30..7a3695f 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
@@ -24,6 +24,7 @@ import static org.apache.hadoop.hbase.KeyValue.COLUMN_FAMILY_DELIMITER;
 import static org.apache.hadoop.hbase.KeyValue.getDelimiter;
 import static org.apache.hadoop.hbase.KeyValue.COLUMN_FAMILY_DELIM_ARRAY;
 
+import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
@@ -1465,9 +1466,12 @@ public final class CellUtil {
   }
 
   /**
-   * Estimate based on keyvalue's serialization format.
+   * Estimate based on keyvalue's serialization format in the RPC layer. Note that there is an extra
+   * SIZEOF_INT added to the size here that indicates the actual length of the cell for cases where
+   * cell's are serialized in a contiguous format (For eg in RPCs).
    * @param cell
-   * @return Estimate of the <code>cell</code> size in bytes.
+   * @return Estimate of the <code>cell</code> size in bytes plus an extra SIZEOF_INT indicating the
+   *         actual cell length.
    */
   public static int estimatedSerializedSizeOf(final Cell cell) {
     if (cell instanceof ExtendedCell) {
@@ -1762,8 +1766,10 @@ public final class CellUtil {
    * timestamp&gt;&lt;1 byte type&gt;
    * @param cell
    * @param out
+   * @deprecated Use {@link #writeFlatKey(Cell, DataOutput)}
    * @throws IOException
    */
+  @Deprecated
   public static void writeFlatKey(Cell cell, DataOutputStream out) throws IOException {
     short rowLen = cell.getRowLength();
     byte fLen = cell.getFamilyLength();
@@ -1772,6 +1778,43 @@ public final class CellUtil {
     // component of cell
     if (cell instanceof ByteBufferCell) {
       out.writeShort(rowLen);
+      ByteBufferUtils.copyBufferToStream((DataOutput) out,
+        ((ByteBufferCell) cell).getRowByteBuffer(), ((ByteBufferCell) cell).getRowPosition(),
+        rowLen);
+      out.writeByte(fLen);
+      ByteBufferUtils.copyBufferToStream((DataOutput) out,
+        ((ByteBufferCell) cell).getFamilyByteBuffer(), ((ByteBufferCell) cell).getFamilyPosition(),
+        fLen);
+      ByteBufferUtils.copyBufferToStream((DataOutput) out,
+        ((ByteBufferCell) cell).getQualifierByteBuffer(),
+        ((ByteBufferCell) cell).getQualifierPosition(), qLen);
+    } else {
+      out.writeShort(rowLen);
+      out.write(cell.getRowArray(), cell.getRowOffset(), rowLen);
+      out.writeByte(fLen);
+      out.write(cell.getFamilyArray(), cell.getFamilyOffset(), fLen);
+      out.write(cell.getQualifierArray(), cell.getQualifierOffset(), qLen);
+    }
+    out.writeLong(cell.getTimestamp());
+    out.writeByte(cell.getTypeByte());
+  }
+
+  /**
+   * Writes the Cell's key part as it would have serialized in a KeyValue. The format is &lt;2 bytes
+   * rk len&gt;&lt;rk&gt;&lt;1 byte cf len&gt;&lt;cf&gt;&lt;qualifier&gt;&lt;8 bytes
+   * timestamp&gt;&lt;1 byte type&gt;
+   * @param cell
+   * @param out
+   * @throws IOException
+   */
+  public static void writeFlatKey(Cell cell, DataOutput out) throws IOException {
+    short rowLen = cell.getRowLength();
+    byte fLen = cell.getFamilyLength();
+    int qLen = cell.getQualifierLength();
+    // Using just one if/else loop instead of every time checking before writing every
+    // component of cell
+    if (cell instanceof ByteBufferCell) {
+      out.writeShort(rowLen);
       ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getRowByteBuffer(),
         ((ByteBufferCell) cell).getRowPosition(), rowLen);
       out.writeByte(fLen);
@@ -1790,6 +1833,69 @@ public final class CellUtil {
     out.writeByte(cell.getTypeByte());
   }
 
+  /**
+   * Deep clones the given cell if the cell supports deep cloning
+   * @param cell the cell to be cloned
+   * @return the cloned cell
+   * @throws CloneNotSupportedException
+   */
+  public static Cell deepClone(Cell cell) throws CloneNotSupportedException {
+    if (cell instanceof ExtendedCell) {
+      return ((ExtendedCell) cell).deepClone();
+    }
+    throw new CloneNotSupportedException();
+  }
+
+  /**
+   * Writes the cell to the given OutputStream
+   * @param cell the cell to be written
+   * @param out the outputstream
+   * @param withTags if tags are to be written or not
+   * @return the total bytes written
+   * @throws IOException
+   */
+  public static int writeCell(Cell cell, OutputStream out, boolean withTags) throws IOException {
+    if (cell instanceof ExtendedCell) {
+      return ((ExtendedCell) cell).write(out, withTags);
+    } else {
+      ByteBufferUtils.putInt(out, CellUtil.estimatedSerializedSizeOfKey(cell));
+      ByteBufferUtils.putInt(out, cell.getValueLength());
+      CellUtil.writeFlatKey(cell, out);
+      CellUtil.writeValue(out, cell, cell.getValueLength());
+      int tagsLength = cell.getTagsLength();
+      if (withTags) {
+        byte[] len = new byte[Bytes.SIZEOF_SHORT];
+        Bytes.putAsShort(len, 0, tagsLength);
+        out.write(len);
+        if (tagsLength > 0) {
+          CellUtil.writeTags(out, cell, tagsLength);
+        }
+      }
+      int lenWritten = (2 * Bytes.SIZEOF_INT) + CellUtil.estimatedSerializedSizeOfKey(cell)
+          + cell.getValueLength();
+      if (withTags) {
+        lenWritten += Bytes.SIZEOF_SHORT + tagsLength;
+      }
+      return lenWritten;
+    }
+  }
+
+  /**
+   * Writes a cell to the buffer at the given offset
+   * @param cell the cell to be written
+   * @param buf the buffer to which the cell has to be wrriten
+   * @param offset the offset at which the cell should be written
+   */
+  public static void writeCellToBuffer(Cell cell, ByteBuffer buf, int offset) {
+    if (cell instanceof ExtendedCell) {
+      ((ExtendedCell) cell).write(buf, offset);
+    } else {
+      // Using the KVUtil
+      byte[] bytes = KeyValueUtil.copyToNewByteArray(cell);
+      ByteBufferUtils.copyFromArrayToBuffer(buf, offset, bytes, 0, bytes.length);
+    }
+  }
+
   public static int writeFlatKey(Cell cell, OutputStream out) throws IOException {
     short rowLen = cell.getRowLength();
     byte fLen = cell.getFamilyLength();
@@ -1844,7 +1950,7 @@ public final class CellUtil {
   public static void writeRowSkippingBytes(DataOutputStream out, Cell cell, short rlength,
       int commonPrefix) throws IOException {
     if (cell instanceof ByteBufferCell) {
-      ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getRowByteBuffer(),
+      ByteBufferUtils.copyBufferToStream((DataOutput)out, ((ByteBufferCell) cell).getRowByteBuffer(),
         ((ByteBufferCell) cell).getRowPosition() + commonPrefix, rlength - commonPrefix);
     } else {
       out.write(cell.getRowArray(), cell.getRowOffset() + commonPrefix, rlength - commonPrefix);
@@ -1894,7 +2000,7 @@ public final class CellUtil {
   public static void writeQualifierSkippingBytes(DataOutputStream out, Cell cell,
       int qlength, int commonPrefix) throws IOException {
     if (cell instanceof ByteBufferCell) {
-      ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getQualifierByteBuffer(),
+      ByteBufferUtils.copyBufferToStream((DataOutput)out, ((ByteBufferCell) cell).getQualifierByteBuffer(),
         ((ByteBufferCell) cell).getQualifierPosition() + commonPrefix, qlength - commonPrefix);
     } else {
       out.write(cell.getQualifierArray(), cell.getQualifierOffset() + commonPrefix,

http://git-wip-us.apache.org/repos/asf/hbase/blob/66fb60d4/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
index ac81049..03cf768 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
@@ -17,6 +17,7 @@
 package org.apache.hadoop.hbase.io.encoding;
 
 import java.io.DataInputStream;
+import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -262,7 +263,7 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
       ByteBufferUtils.putCompressedInt(out, kLength);
       ByteBufferUtils.putCompressedInt(out, vLength);
       ByteBufferUtils.putCompressedInt(out, 0);
-      CellUtil.writeFlatKey(cell, out);
+      CellUtil.writeFlatKey(cell, (DataOutput)out);
       // Write the value part
       CellUtil.writeValue(out, cell, cell.getValueLength());
     } else {

http://git-wip-us.apache.org/repos/asf/hbase/blob/66fb60d4/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
index 6f529db..8edb305 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
@@ -17,6 +17,7 @@
 package org.apache.hadoop.hbase.io.encoding;
 
 import java.io.DataInputStream;
+import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -59,7 +60,7 @@ public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder {
       ByteBufferUtils.putCompressedInt(out, klength);
       ByteBufferUtils.putCompressedInt(out, vlength);
       ByteBufferUtils.putCompressedInt(out, 0);
-      CellUtil.writeFlatKey(cell, out);
+      CellUtil.writeFlatKey(cell, (DataOutput)out);
     } else {
       // find a common prefix and skip it
       int common = CellUtil.findCommonPrefixInFlatKey(cell, state.prevCell, true, true);

http://git-wip-us.apache.org/repos/asf/hbase/blob/66fb60d4/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
index 7ef578d..3fc1a7b 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
@@ -20,6 +20,7 @@ import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTe
 import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
 import java.io.DataInputStream;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -193,6 +194,28 @@ public final class ByteBufferUtils {
     }
   }
 
+  /**
+   * Copy data from a buffer to an output stream. Does not update the position
+   * in the buffer.
+   * @param out the output stream to write bytes to
+   * @param in the buffer to read bytes from
+   * @param offset the offset in the buffer (from the buffer's array offset)
+   *      to start copying bytes from
+   * @param length the number of bytes to copy
+   */
+  public static void copyBufferToStream(DataOutput out, ByteBuffer in, int offset, int length)
+      throws IOException {
+    if (out instanceof ByteBufferWriter) {
+      ((ByteBufferWriter) out).write(in, offset, length);
+    } else if (in.hasArray()) {
+      out.write(in.array(), in.arrayOffset() + offset, length);
+    } else {
+      for (int i = 0; i < length; ++i) {
+        out.write(toByte(in, offset + i));
+      }
+    }
+  }
+
   public static int putLong(OutputStream out, final long value,
       final int fitInBytes) throws IOException {
     long tmpValue = value;

http://git-wip-us.apache.org/repos/asf/hbase/blob/66fb60d4/hbase-common/src/test/java/org/apache/hadoop/hbase/TestCellUtil.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestCellUtil.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestCellUtil.java
index ac9fc45..3bd1b66 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestCellUtil.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestCellUtil.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
@@ -488,4 +490,119 @@ public class TestCellUtil {
     bbCell = new ByteBufferKeyValue(buffer, 0, buffer.remaining());
     assertEquals(bd, CellUtil.getValueAsBigDecimal(bbCell));
   }
+
+  @Test
+  public void testWriteCell() throws IOException {
+    byte[] r = Bytes.toBytes("row1");
+    byte[] f = Bytes.toBytes("cf1");
+    byte[] q1 = Bytes.toBytes("qual1");
+    byte[] q2 = Bytes.toBytes("qual2");
+    byte[] v = Bytes.toBytes("val1");
+    byte[] tags = Bytes.toBytes("tag1");
+    KeyValue kv = new KeyValue(r, f, q1, 0, q1.length, 1234L, Type.Put, v, 0, v.length, tags);
+    NonExtendedCell nonExtCell = new NonExtendedCell(kv);
+    ByteArrayOutputStream os = new ByteArrayOutputStream();
+    int writeCell = CellUtil.writeCell(nonExtCell, os, true);
+    byte[] byteArray = os.toByteArray();
+    KeyValue res = new KeyValue(byteArray);
+    assertTrue(CellUtil.equals(kv, res));
+  }
+
+  private static class NonExtendedCell implements Cell {
+    private KeyValue kv;
+
+    public NonExtendedCell(KeyValue kv) {
+      this.kv = kv;
+    }
+
+    @Override
+    public byte[] getRowArray() {
+      return this.kv.getRowArray();
+    }
+
+    @Override
+    public int getRowOffset() {
+      return this.kv.getRowOffset();
+    }
+
+    @Override
+    public short getRowLength() {
+      return this.kv.getRowLength();
+    }
+
+    @Override
+    public byte[] getFamilyArray() {
+      return this.kv.getFamilyArray();
+    }
+
+    @Override
+    public int getFamilyOffset() {
+      return this.kv.getFamilyOffset();
+    }
+
+    @Override
+    public byte getFamilyLength() {
+      return this.kv.getFamilyLength();
+    }
+
+    @Override
+    public byte[] getQualifierArray() {
+      return this.kv.getQualifierArray();
+    }
+
+    @Override
+    public int getQualifierOffset() {
+      return this.kv.getQualifierOffset();
+    }
+
+    @Override
+    public int getQualifierLength() {
+      return this.kv.getQualifierLength();
+    }
+
+    @Override
+    public long getTimestamp() {
+      return this.kv.getTimestamp();
+    }
+
+    @Override
+    public byte getTypeByte() {
+      return this.kv.getTypeByte();
+    }
+
+    @Override
+    public long getSequenceId() {
+      return this.kv.getSequenceId();
+    }
+
+    @Override
+    public byte[] getValueArray() {
+      return this.kv.getValueArray();
+    }
+
+    @Override
+    public int getValueOffset() {
+      return this.kv.getValueOffset();
+    }
+
+    @Override
+    public int getValueLength() {
+      return this.kv.getValueLength();
+    }
+
+    @Override
+    public byte[] getTagsArray() {
+      return this.kv.getTagsArray();
+    }
+
+    @Override
+    public int getTagsOffset() {
+      return this.kv.getTagsOffset();
+    }
+
+    @Override
+    public int getTagsLength() {
+      return this.kv.getTagsLength();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/66fb60d4/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellSerialization.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellSerialization.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellSerialization.java
new file mode 100644
index 0000000..6f4419e
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellSerialization.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.ExtendedCell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.Serializer;
+
+/**
+ * Use to specify the type of serialization for the mappers and reducers
+ */
+@InterfaceAudience.Public
+public class CellSerialization implements Serialization<Cell> {
+  @Override
+  public boolean accept(Class<?> c) {
+    return Cell.class.isAssignableFrom(c);
+  }
+
+  @Override
+  public CellDeserializer getDeserializer(Class<Cell> t) {
+    return new CellDeserializer();
+  }
+
+  @Override
+  public CellSerializer getSerializer(Class<Cell> c) {
+    return new CellSerializer();
+  }
+
+  public static class CellDeserializer implements Deserializer<Cell> {
+    private DataInputStream dis;
+
+    @Override
+    public void close() throws IOException {
+      this.dis.close();
+    }
+
+    @Override
+    public KeyValue deserialize(Cell ignore) throws IOException {
+      // I can't overwrite the passed in KV, not from a proto kv, not just yet.  TODO
+      return KeyValueUtil.create(this.dis);
+    }
+
+    @Override
+    public void open(InputStream is) throws IOException {
+      this.dis = new DataInputStream(is);
+    }
+  }
+
+  public static class CellSerializer implements Serializer<Cell> {
+    private DataOutputStream dos;
+
+    @Override
+    public void close() throws IOException {
+      this.dos.close();
+    }
+
+    @Override
+    public void open(OutputStream os) throws IOException {
+      this.dos = new DataOutputStream(os);
+    }
+
+    @Override
+    public void serialize(Cell kv) throws IOException {
+      dos.writeInt(CellUtil.estimatedSerializedSizeOf(kv) - Bytes.SIZEOF_INT);
+      CellUtil.writeCell(kv, dos, true);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/66fb60d4/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellSortReducer.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellSortReducer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellSortReducer.java
new file mode 100644
index 0000000..c33ee15
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellSortReducer.java
@@ -0,0 +1,60 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.MapReduceCell;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Emits sorted Cells.
+ * Reads in all Cells from passed Iterator, sorts them, then emits
+ * Cells in sorted order.  If lots of columns per row, it will use lots of
+ * memory sorting.
+ * @see HFileOutputFormat2
+ */
+@InterfaceAudience.Public
+public class CellSortReducer
+    extends Reducer<ImmutableBytesWritable, Cell, ImmutableBytesWritable, Cell> {
+  protected void reduce(ImmutableBytesWritable row, Iterable<Cell> kvs,
+      Reducer<ImmutableBytesWritable, Cell, ImmutableBytesWritable, Cell>.Context context)
+  throws java.io.IOException, InterruptedException {
+    TreeSet<Cell> map = new TreeSet<>(CellComparator.COMPARATOR);
+    for (Cell kv : kvs) {
+      try {
+        map.add(CellUtil.deepClone(kv));
+      } catch (CloneNotSupportedException e) {
+        throw new IOException(e);
+      }
+    }
+    context.setStatus("Read " + map.getClass());
+    int index = 0;
+    for (Cell kv: map) {
+      context.write(row, new MapReduceCell(kv));
+      if (++index % 100 == 0) context.setStatus("Wrote " + index);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/66fb60d4/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
index 679d991..81af165 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
@@ -139,7 +139,7 @@ public class CopyTable extends Configured implements Tool {
     job.setNumReduceTasks(0);
 
     if (bulkload) {
-      TableMapReduceUtil.initTableMapperJob(tableName, scan, Import.KeyValueImporter.class, null,
+      TableMapReduceUtil.initTableMapperJob(tableName, scan, Import.CellImporter.class, null,
         null, job);
 
       // We need to split the inputs by destination tables so that output of Map can be bulk-loaded.

http://git-wip-us.apache.org/repos/asf/hbase/blob/66fb60d4/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
index e757742..20b2d42 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
@@ -78,6 +78,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.MapReduceCell;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
@@ -90,7 +91,6 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
 import org.apache.yetus.audience.InterfaceAudience;
-
 import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
 
 /**
@@ -233,14 +233,13 @@ public class HFileOutputFormat2
       private final Map<byte[], WriterLength> writers =
               new TreeMap<>(Bytes.BYTES_COMPARATOR);
       private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY;
-      private final byte[] now = Bytes.toBytes(EnvironmentEdgeManager.currentTime());
+      private final long now = EnvironmentEdgeManager.currentTime();
       private boolean rollRequested = false;
 
       @Override
       public void write(ImmutableBytesWritable row, V cell)
           throws IOException {
-        KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
-
+        Cell kv = cell;
         // null input == user explicitly wants to flush
         if (row == null && kv == null) {
           rollWriters();
@@ -248,7 +247,7 @@ public class HFileOutputFormat2
         }
 
         byte[] rowKey = CellUtil.cloneRow(kv);
-        long length = kv.getLength();
+        int length = (CellUtil.estimatedSerializedSizeOf(kv)) - Bytes.SIZEOF_INT;
         byte[] family = CellUtil.cloneFamily(kv);
         byte[] tableNameBytes = null;
         if (writeMultipleTables) {
@@ -337,7 +336,8 @@ public class HFileOutputFormat2
         }
 
         // we now have the proper WAL writer. full steam ahead
-        kv.updateLatestStamp(this.now);
+        // TODO : Currently in SettableTimeStamp but this will also move to ExtendedCell
+        CellUtil.updateLatestStamp(cell, this.now);
         wl.writer.append(kv);
         wl.written += length;
 
@@ -578,10 +578,11 @@ public class HFileOutputFormat2
     configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class);
   }
 
-  static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo, Class<? extends OutputFormat<?, ?>> cls) throws IOException {
+  static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo,
+      Class<? extends OutputFormat<?, ?>> cls) throws IOException {
     Configuration conf = job.getConfiguration();
     job.setOutputKeyClass(ImmutableBytesWritable.class);
-    job.setOutputValueClass(KeyValue.class);
+    job.setOutputValueClass(MapReduceCell.class);
     job.setOutputFormatClass(cls);
 
     if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) {
@@ -595,8 +596,9 @@ public class HFileOutputFormat2
     // Based on the configured map output class, set the correct reducer to properly
     // sort the incoming values.
     // TODO it would be nice to pick one or the other of these formats.
-    if (KeyValue.class.equals(job.getMapOutputValueClass())) {
-      job.setReducerClass(KeyValueSortReducer.class);
+    if (KeyValue.class.equals(job.getMapOutputValueClass())
+        || MapReduceCell.class.equals(job.getMapOutputValueClass())) {
+      job.setReducerClass(CellSortReducer.class);
     } else if (Put.class.equals(job.getMapOutputValueClass())) {
       job.setReducerClass(PutSortReducer.class);
     } else if (Text.class.equals(job.getMapOutputValueClass())) {
@@ -607,7 +609,7 @@ public class HFileOutputFormat2
 
     conf.setStrings("io.serializations", conf.get("io.serializations"),
         MutationSerialization.class.getName(), ResultSerialization.class.getName(),
-        KeyValueSerialization.class.getName());
+        CellSerialization.class.getName());
 
     if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
       LOG.info("bulkload locality sensitive enabled");
@@ -655,7 +657,7 @@ public class HFileOutputFormat2
     Configuration conf = job.getConfiguration();
 
     job.setOutputKeyClass(ImmutableBytesWritable.class);
-    job.setOutputValueClass(KeyValue.class);
+    job.setOutputValueClass(MapReduceCell.class);
     job.setOutputFormatClass(HFileOutputFormat2.class);
 
     ArrayList<TableDescriptor> singleTableDescriptor = new ArrayList<>(1);

http://git-wip-us.apache.org/repos/asf/hbase/blob/66fb60d4/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
index 28962bb..7af7738 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.MapReduceCell;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.io.RawComparator;
@@ -96,12 +97,33 @@ public class Import extends Configured implements Tool {
 
   private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
 
+  public static class CellWritableComparablePartitioner
+      extends Partitioner<CellWritableComparable, Cell> {
+    private static CellWritableComparable[] START_KEYS = null;
+    @Override
+    public int getPartition(CellWritableComparable key, Cell value,
+        int numPartitions) {
+      for (int i = 0; i < START_KEYS.length; ++i) {
+        if (key.compareTo(START_KEYS[i]) <= 0) {
+          return i;
+        }
+      }
+      return START_KEYS.length;
+    }
+
+  }
+
+  /**
+   * @deprecated Use {@link CellWritableComparablePartitioner}. Will be removed
+   * from 3.0 onwards
+   */
+  @Deprecated
   public static class KeyValueWritableComparablePartitioner
       extends Partitioner<KeyValueWritableComparable, KeyValue> {
     private static KeyValueWritableComparable[] START_KEYS = null;
+
     @Override
-    public int getPartition(KeyValueWritableComparable key, KeyValue value,
-        int numPartitions) {
+    public int getPartition(KeyValueWritableComparable key, KeyValue value, int numPartitions) {
       for (int i = 0; i < START_KEYS.length; ++i) {
         if (key.compareTo(START_KEYS[i]) <= 0) {
           return i;
@@ -109,7 +131,6 @@ public class Import extends Configured implements Tool {
       }
       return START_KEYS.length;
     }
-
   }
 
   public static class KeyValueWritableComparable
@@ -119,8 +140,7 @@ public class Import extends Configured implements Tool {
 
     static {
       // register this comparator
-      WritableComparator.define(KeyValueWritableComparable.class,
-          new KeyValueWritableComparator());
+      WritableComparator.define(KeyValueWritableComparable.class, new KeyValueWritableComparator());
     }
 
     public KeyValueWritableComparable() {
@@ -141,10 +161,10 @@ public class Import extends Configured implements Tool {
     }
 
     @Override
-    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
-      justification="This is wrong, yes, but we should be purging Writables, not fixing them")
+    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "EQ_COMPARETO_USE_OBJECT_EQUALS",
+        justification = "This is wrong, yes, but we should be purging Writables, not fixing them")
     public int compareTo(KeyValueWritableComparable o) {
-      return CellComparator.COMPARATOR.compare(this.kv, ((KeyValueWritableComparable)o).kv);
+      return CellComparator.COMPARATOR.compare(this.kv, ((KeyValueWritableComparable) o).kv);
     }
 
     public static class KeyValueWritableComparator extends WritableComparator {
@@ -166,18 +186,93 @@ public class Import extends Configured implements Tool {
 
   }
 
+  public static class CellWritableComparable
+      implements WritableComparable<CellWritableComparable> {
+
+    private Cell kv = null;
+
+    static {
+      // register this comparator
+      WritableComparator.define(CellWritableComparable.class,
+          new CellWritableComparator());
+    }
+
+    public CellWritableComparable() {
+    }
+
+    public CellWritableComparable(Cell kv) {
+      this.kv = kv;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeInt(CellUtil.estimatedSerializedSizeOfKey(kv));
+      out.writeInt(0);
+      CellUtil.writeFlatKey(kv, out);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      kv = KeyValue.create(in);
+    }
+
+    @Override
+    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
+      justification="This is wrong, yes, but we should be purging Writables, not fixing them")
+    public int compareTo(CellWritableComparable o) {
+      return CellComparator.COMPARATOR.compare(this.kv, ((CellWritableComparable)o).kv);
+    }
+
+    public static class CellWritableComparator extends WritableComparator {
+
+      @Override
+      public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+        try {
+          CellWritableComparable kv1 = new CellWritableComparable();
+          kv1.readFields(new DataInputStream(new ByteArrayInputStream(b1, s1, l1)));
+          CellWritableComparable kv2 = new CellWritableComparable();
+          kv2.readFields(new DataInputStream(new ByteArrayInputStream(b2, s2, l2)));
+          return compare(kv1, kv2);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+    }
+
+  }
+
+  /**
+   * @deprecated Use {@link CellReducer}. Will be removed from 3.0 onwards
+   */
+  @Deprecated
   public static class KeyValueReducer
-      extends
-      Reducer<KeyValueWritableComparable, KeyValue, ImmutableBytesWritable, KeyValue> {
-    protected void reduce(
-        KeyValueWritableComparable row,
-        Iterable<KeyValue> kvs,
-        Reducer<KeyValueWritableComparable,
-          KeyValue, ImmutableBytesWritable, KeyValue>.Context context)
+      extends Reducer<KeyValueWritableComparable, KeyValue, ImmutableBytesWritable, KeyValue> {
+    protected void reduce(KeyValueWritableComparable row, Iterable<KeyValue> kvs,
+        Reducer<KeyValueWritableComparable, KeyValue, ImmutableBytesWritable, KeyValue>.Context context)
         throws java.io.IOException, InterruptedException {
       int index = 0;
       for (KeyValue kv : kvs) {
         context.write(new ImmutableBytesWritable(kv.getRowArray()), kv);
+        if (++index % 100 == 0) context.setStatus("Wrote " + index + " KeyValues, "
+            + "and the rowkey whose is being wrote is " + Bytes.toString(kv.getRowArray()));
+      }
+    }
+  }
+
+  public static class CellReducer
+      extends
+      Reducer<CellWritableComparable, Cell, ImmutableBytesWritable, Cell> {
+    protected void reduce(
+        CellWritableComparable row,
+        Iterable<Cell> kvs,
+        Reducer<CellWritableComparable,
+          Cell, ImmutableBytesWritable, Cell>.Context context)
+        throws java.io.IOException, InterruptedException {
+      int index = 0;
+      for (Cell kv : kvs) {
+        context.write(new ImmutableBytesWritable(CellUtil.cloneRow(kv)),
+          new MapReduceCell(kv));
         if (++index % 100 == 0)
           context.setStatus("Wrote " + index + " KeyValues, "
               + "and the rowkey whose is being wrote is " + Bytes.toString(kv.getRowArray()));
@@ -185,13 +280,123 @@ public class Import extends Configured implements Tool {
     }
   }
 
+  /**
+   * @deprecated Use {@link CellSortImporter}. Will be removed from 3.0 onwards
+   */
+  @Deprecated
   public static class KeyValueSortImporter
       extends TableMapper<KeyValueWritableComparable, KeyValue> {
     private Map<byte[], byte[]> cfRenameMap;
     private Filter filter;
+    private static final Log LOG = LogFactory.getLog(KeyValueSortImporter.class);
+
+    /**
+     * @param row The current table row key.
+     * @param value The columns.
+     * @param context The current context.
+     * @throws IOException When something is broken with the data.
+     */
+    @Override
+    public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException {
+      try {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(
+            "Considering the row." + Bytes.toString(row.get(), row.getOffset(), row.getLength()));
+        }
+        if (filter == null || !filter.filterRowKey(
+          CellUtil.createFirstOnRow(row.get(), row.getOffset(), (short) row.getLength()))) {
+          for (Cell kv : value.rawCells()) {
+            kv = filterKv(filter, kv);
+            // skip if we filtered it out
+            if (kv == null) continue;
+            // TODO get rid of ensureKeyValue
+            KeyValue ret = KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap));
+            context.write(new KeyValueWritableComparable(ret.createKeyOnly(false)), ret);
+          }
+        }
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+
+    @Override
+    public void setup(Context context) throws IOException {
+      cfRenameMap = createCfRenameMap(context.getConfiguration());
+      filter = instantiateFilter(context.getConfiguration());
+      int reduceNum = context.getNumReduceTasks();
+      Configuration conf = context.getConfiguration();
+      TableName tableName = TableName.valueOf(context.getConfiguration().get(TABLE_NAME));
+      try (Connection conn = ConnectionFactory.createConnection(conf);
+          RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
+        byte[][] startKeys = regionLocator.getStartKeys();
+        if (startKeys.length != reduceNum) {
+          throw new IOException("Region split after job initialization");
+        }
+        KeyValueWritableComparable[] startKeyWraps =
+            new KeyValueWritableComparable[startKeys.length - 1];
+        for (int i = 1; i < startKeys.length; ++i) {
+          startKeyWraps[i - 1] =
+              new KeyValueWritableComparable(KeyValueUtil.createFirstOnRow(startKeys[i]));
+        }
+        KeyValueWritableComparablePartitioner.START_KEYS = startKeyWraps;
+      }
+    }
+  }
+
+  /**
+   * A mapper that just writes out KeyValues.
+   * @deprecated Use {@link CellImporter}. Will be removed from 3.0 onwards
+   */
+  @Deprecated
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "EQ_COMPARETO_USE_OBJECT_EQUALS",
+      justification = "Writables are going away and this has been this way forever")
+  public static class KeyValueImporter extends TableMapper<ImmutableBytesWritable, KeyValue> {
+    private Map<byte[], byte[]> cfRenameMap;
+    private Filter filter;
     private static final Log LOG = LogFactory.getLog(KeyValueImporter.class);
 
     /**
+     * @param row The current table row key.
+     * @param value The columns.
+     * @param context The current context.
+     * @throws IOException When something is broken with the data.
+     */
+    @Override
+    public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException {
+      try {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(
+            "Considering the row." + Bytes.toString(row.get(), row.getOffset(), row.getLength()));
+        }
+        if (filter == null || !filter.filterRowKey(
+          CellUtil.createFirstOnRow(row.get(), row.getOffset(), (short) row.getLength()))) {
+          for (Cell kv : value.rawCells()) {
+            kv = filterKv(filter, kv);
+            // skip if we filtered it out
+            if (kv == null) continue;
+            // TODO get rid of ensureKeyValue
+            context.write(row, KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap)));
+          }
+        }
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+
+    @Override
+    public void setup(Context context) {
+      cfRenameMap = createCfRenameMap(context.getConfiguration());
+      filter = instantiateFilter(context.getConfiguration());
+    }
+  }
+
+  public static class CellSortImporter
+      extends TableMapper<CellWritableComparable, Cell> {
+    private Map<byte[], byte[]> cfRenameMap;
+    private Filter filter;
+    private static final Log LOG = LogFactory.getLog(CellImporter.class);
+
+    /**
      * @param row  The current table row key.
      * @param value  The columns.
      * @param context  The current context.
@@ -213,9 +418,8 @@ public class Import extends Configured implements Tool {
             kv = filterKv(filter, kv);
             // skip if we filtered it out
             if (kv == null) continue;
-            // TODO get rid of ensureKeyValue
-            KeyValue ret = KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap));
-            context.write(new KeyValueWritableComparable(ret.createKeyOnly(false)), ret);
+            Cell ret = convertKv(kv, cfRenameMap);
+            context.write(new CellWritableComparable(ret), ret);
           }
         }
       } catch (InterruptedException e) {
@@ -236,13 +440,13 @@ public class Import extends Configured implements Tool {
         if (startKeys.length != reduceNum) {
           throw new IOException("Region split after job initialization");
         }
-        KeyValueWritableComparable[] startKeyWraps =
-            new KeyValueWritableComparable[startKeys.length - 1];
+        CellWritableComparable[] startKeyWraps =
+            new CellWritableComparable[startKeys.length - 1];
         for (int i = 1; i < startKeys.length; ++i) {
           startKeyWraps[i - 1] =
-              new KeyValueWritableComparable(KeyValueUtil.createFirstOnRow(startKeys[i]));
+              new CellWritableComparable(KeyValueUtil.createFirstOnRow(startKeys[i]));
         }
-        KeyValueWritableComparablePartitioner.START_KEYS = startKeyWraps;
+        CellWritableComparablePartitioner.START_KEYS = startKeyWraps;
       }
     }
   }
@@ -252,10 +456,10 @@ public class Import extends Configured implements Tool {
    */
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
       justification="Writables are going away and this has been this way forever")
-  public static class KeyValueImporter extends TableMapper<ImmutableBytesWritable, KeyValue> {
+  public static class CellImporter extends TableMapper<ImmutableBytesWritable, Cell> {
     private Map<byte[], byte[]> cfRenameMap;
     private Filter filter;
-    private static final Log LOG = LogFactory.getLog(KeyValueImporter.class);
+    private static final Log LOG = LogFactory.getLog(CellImporter.class);
 
     /**
      * @param row  The current table row key.
@@ -279,8 +483,7 @@ public class Import extends Configured implements Tool {
             kv = filterKv(filter, kv);
             // skip if we filtered it out
             if (kv == null) continue;
-            // TODO get rid of ensureKeyValue
-            context.write(row, KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap)));
+            context.write(row, new MapReduceCell(convertKv(kv, cfRenameMap)));
           }
         }
       } catch (InterruptedException e) {
@@ -505,21 +708,21 @@ public class Import extends Configured implements Tool {
     if(cfRenameMap != null) {
       // If there's a rename mapping for this CF, create a new KeyValue
       byte[] newCfName = cfRenameMap.get(CellUtil.cloneFamily(kv));
-      if(newCfName != null) {
-          kv = new KeyValue(kv.getRowArray(), // row buffer
-                  kv.getRowOffset(),        // row offset
-                  kv.getRowLength(),        // row length
-                  newCfName,                // CF buffer
-                  0,                        // CF offset
-                  newCfName.length,         // CF length
-                  kv.getQualifierArray(),   // qualifier buffer
-                  kv.getQualifierOffset(),  // qualifier offset
-                  kv.getQualifierLength(),  // qualifier length
-                  kv.getTimestamp(),        // timestamp
-                  KeyValue.Type.codeToType(kv.getTypeByte()), // KV Type
-                  kv.getValueArray(),       // value buffer
-                  kv.getValueOffset(),      // value offset
-                  kv.getValueLength());     // value length
+      if (newCfName != null) {
+        kv = new KeyValue(kv.getRowArray(), // row buffer
+            kv.getRowOffset(),              // row offset
+            kv.getRowLength(),              // row length
+            newCfName,                      // CF buffer
+            0,                              // CF offset
+            newCfName.length,               // CF length
+            kv.getQualifierArray(),         // qualifier buffer
+            kv.getQualifierOffset(),        // qualifier offset
+            kv.getQualifierLength(),        // qualifier length
+            kv.getTimestamp(),              // timestamp
+            KeyValue.Type.codeToType(kv.getTypeByte()), // KV Type
+            kv.getValueArray(),             // value buffer
+            kv.getValueOffset(),            // value offset
+            kv.getValueLength());           // value length
       }
     }
     return kv;
@@ -626,35 +829,35 @@ public class Import extends Configured implements Tool {
           Table table = conn.getTable(tableName);
           RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
         HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
-        job.setMapperClass(KeyValueSortImporter.class);
-        job.setReducerClass(KeyValueReducer.class);
+        job.setMapperClass(CellSortImporter.class);
+        job.setReducerClass(CellReducer.class);
         Path outputDir = new Path(hfileOutPath);
         FileOutputFormat.setOutputPath(job, outputDir);
-        job.setMapOutputKeyClass(KeyValueWritableComparable.class);
-        job.setMapOutputValueClass(KeyValue.class);
+        job.setMapOutputKeyClass(CellWritableComparable.class);
+        job.setMapOutputValueClass(MapReduceCell.class);
         job.getConfiguration().setClass("mapreduce.job.output.key.comparator.class",
-            KeyValueWritableComparable.KeyValueWritableComparator.class,
+            CellWritableComparable.CellWritableComparator.class,
             RawComparator.class);
         Path partitionsPath =
             new Path(TotalOrderPartitioner.getPartitionFile(job.getConfiguration()));
         FileSystem fs = FileSystem.get(job.getConfiguration());
         fs.deleteOnExit(partitionsPath);
-        job.setPartitionerClass(KeyValueWritableComparablePartitioner.class);
+        job.setPartitionerClass(CellWritableComparablePartitioner.class);
         job.setNumReduceTasks(regionLocator.getStartKeys().length);
         TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
             org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.class);
       }
     } else if (hfileOutPath != null) {
       LOG.info("writing to hfiles for bulk load.");
-      job.setMapperClass(KeyValueImporter.class);
+      job.setMapperClass(CellImporter.class);
       try (Connection conn = ConnectionFactory.createConnection(conf);
           Table table = conn.getTable(tableName);
           RegionLocator regionLocator = conn.getRegionLocator(tableName)){
-        job.setReducerClass(KeyValueSortReducer.class);
+        job.setReducerClass(CellSortReducer.class);
         Path outputDir = new Path(hfileOutPath);
         FileOutputFormat.setOutputPath(job, outputDir);
         job.setMapOutputKeyClass(ImmutableBytesWritable.class);
-        job.setMapOutputValueClass(KeyValue.class);
+        job.setMapOutputValueClass(MapReduceCell.class);
         HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
         TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
             org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.class);

http://git-wip-us.apache.org/repos/asf/hbase/blob/66fb60d4/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
index 4a1dea8..d672803 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
@@ -584,7 +584,7 @@ public class ImportTsv extends Configured implements Tool {
           job.getConfiguration().setStrings("io.serializations",
               job.getConfiguration().get("io.serializations"),
               MutationSerialization.class.getName(), ResultSerialization.class.getName(),
-              KeyValueSerialization.class.getName());
+              CellSerialization.class.getName());
         }
         TableMapReduceUtil.addDependencyJars(job);
         TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),

http://git-wip-us.apache.org/repos/asf/hbase/blob/66fb60d4/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java
index d0cc00e..3207712 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java
@@ -29,7 +29,13 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.Serialization;
 import org.apache.hadoop.io.serializer.Serializer;
-
+/**
+ * Use to specify the type of serialization for the mappers
+ * and reducers
+ * @deprecated Use {@link CellSerialization}. Will be
+ * removed from 3.0 onwards
+ */
+@Deprecated
 @InterfaceAudience.Public
 public class KeyValueSerialization implements Serialization<KeyValue> {
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/66fb60d4/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java
index 824f23d..3ba5198 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java
@@ -32,7 +32,10 @@ import org.apache.hadoop.mapreduce.Reducer;
  * KeyValues in sorted order.  If lots of columns per row, it will use lots of
  * memory sorting.
  * @see HFileOutputFormat2
+ * @deprecated Use {@link CellSortReducer}. Will be removed from
+ * 3.0 onwards
  */
+@Deprecated
 @InterfaceAudience.Public
 public class KeyValueSortReducer
     extends Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue> {

http://git-wip-us.apache.org/repos/asf/hbase/blob/66fb60d4/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
index 49902f4..bb935c3 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
@@ -48,7 +48,7 @@ import org.apache.hadoop.util.StringUtils;
  * Puts in sorted order.  If lots of columns per row, it will use lots of
  * memory sorting.
  * @see HFileOutputFormat2
- * @see KeyValueSortReducer
+ * @see CellSortReducer
  */
 @InterfaceAudience.Public
 public class PutSortReducer extends

http://git-wip-us.apache.org/repos/asf/hbase/blob/66fb60d4/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
index c99af4e..cdecf14 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
@@ -208,7 +208,7 @@ public class TableMapReduceUtil {
     conf.set(TableInputFormat.SCAN, convertScanToString(scan));
     conf.setStrings("io.serializations", conf.get("io.serializations"),
         MutationSerialization.class.getName(), ResultSerialization.class.getName(),
-        KeyValueSerialization.class.getName());
+        CellSerialization.class.getName());
     if (addDependencyJars) {
       addDependencyJars(job);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/66fb60d4/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
index 14da314..2aaa4eb 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
@@ -45,7 +45,7 @@ import org.apache.hadoop.util.StringUtils;
 /**
  * Emits Sorted KeyValues. Parse the passed text and creates KeyValues. Sorts them before emit.
  * @see HFileOutputFormat2
- * @see KeyValueSortReducer
+ * @see CellSortReducer
  * @see PutSortReducer
  */
 @InterfaceAudience.Public

http://git-wip-us.apache.org/repos/asf/hbase/blob/66fb60d4/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
index e1d8d28..02c4640 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Delete;
@@ -44,8 +43,9 @@ import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
-import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.MapReduceCell;
+import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -53,6 +53,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.yetus.audience.InterfaceAudience;
 
 /**
  * A tool to replay WAL files as a M/R job.
@@ -95,7 +96,9 @@ public class WALPlayer extends Configured implements Tool {
   /**
    * A mapper that just writes out KeyValues.
    * This one can be used together with {@link KeyValueSortReducer}
+   * @deprecated Use {@link WALCellMapper}. Will  be removed from 3.0 onwards
    */
+  @Deprecated
   static class WALKeyValueMapper
     extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, KeyValue> {
     private byte[] table;
@@ -133,6 +136,47 @@ public class WALPlayer extends Configured implements Tool {
     }
 
   }
+  /**
+   * A mapper that just writes out Cells.
+   * This one can be used together with {@link CellSortReducer}
+   */
+  static class WALCellMapper
+    extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, Cell> {
+    private byte[] table;
+
+    @Override
+    public void map(WALKey key, WALEdit value,
+      Context context)
+    throws IOException {
+      try {
+        // skip all other tables
+        if (Bytes.equals(table, key.getTablename().getName())) {
+          for (Cell cell : value.getCells()) {
+            if (WALEdit.isMetaEditFamily(cell)) {
+              continue;
+            }
+            context.write(new ImmutableBytesWritable(CellUtil.cloneRow(cell)),
+              new MapReduceCell(cell));
+          }
+        }
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+
+    @Override
+    public void setup(Context context) throws IOException {
+      // only a single table is supported when HFiles are generated with HFileOutputFormat
+      String[] tables = context.getConfiguration().getStrings(TABLES_KEY);
+      if (tables == null || tables.length != 1) {
+        // this can only happen when WALMapper is used directly by a class other than WALPlayer
+        throw new IOException("Exactly one table must be specified for bulk HFile case.");
+      }
+      table = Bytes.toBytes(tables[0]);
+
+    }
+
+  }
 
   /**
    * A mapper that writes out {@link Mutation} to be directly applied to
@@ -299,11 +343,11 @@ public class WALPlayer extends Configured implements Tool {
         throw new IOException("Exactly one table must be specified for the bulk export option");
       }
       TableName tableName = TableName.valueOf(tables[0]);
-      job.setMapperClass(WALKeyValueMapper.class);
-      job.setReducerClass(KeyValueSortReducer.class);
+      job.setMapperClass(WALCellMapper.class);
+      job.setReducerClass(CellSortReducer.class);
       Path outputDir = new Path(hfileOutPath);
       FileOutputFormat.setOutputPath(job, outputDir);
-      job.setMapOutputValueClass(KeyValue.class);
+      job.setMapOutputValueClass(MapReduceCell.class);
       try (Connection conn = ConnectionFactory.createConnection(conf);
           Table table = conn.getTable(tableName);
           RegionLocator regionLocator = conn.getRegionLocator(tableName)) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/66fb60d4/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/util/MapReduceCell.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/util/MapReduceCell.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/util/MapReduceCell.java
new file mode 100644
index 0000000..c0f74a5
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/util/MapReduceCell.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.ByteBufferCell;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.ExtendedCell;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * A wrapper for a cell to be used with mapreduce, as the output value class for mappers/reducers.
+ */
+@InterfaceAudience.Private
+public class MapReduceCell extends ByteBufferCell implements ExtendedCell {
+
+  private final Cell cell;
+
+  public MapReduceCell(Cell cell) {
+    this.cell = cell;
+  }
+
+  @Override
+  public byte[] getRowArray() {
+    return this.cell.getRowArray();
+  }
+
+  @Override
+  public int getRowOffset() {
+    return this.cell.getRowOffset();
+  }
+
+  @Override
+  public short getRowLength() {
+    return this.cell.getRowLength();
+  }
+
+  @Override
+  public byte[] getFamilyArray() {
+    return this.cell.getFamilyArray();
+  }
+
+  @Override
+  public int getFamilyOffset() {
+    return this.cell.getFamilyOffset();
+  }
+
+  @Override
+  public byte getFamilyLength() {
+    return this.cell.getFamilyLength();
+  }
+
+  @Override
+  public byte[] getQualifierArray() {
+    return this.cell.getQualifierArray();
+  }
+
+  @Override
+  public int getQualifierOffset() {
+    return this.cell.getQualifierOffset();
+  }
+
+  @Override
+  public int getQualifierLength() {
+    return this.cell.getQualifierLength();
+  }
+
+  @Override
+  public long getTimestamp() {
+    return this.cell.getTimestamp();
+  }
+
+  @Override
+  public byte getTypeByte() {
+    return this.cell.getTypeByte();
+  }
+
+  @Override
+  public long getSequenceId() {
+    return this.cell.getSequenceId();
+  }
+
+  @Override
+  public byte[] getValueArray() {
+    return this.cell.getValueArray();
+  }
+
+  @Override
+  public int getValueOffset() {
+    return this.cell.getValueOffset();
+  }
+
+  @Override
+  public int getValueLength() {
+    return this.cell.getValueLength();
+  }
+
+  @Override
+  public byte[] getTagsArray() {
+    return this.cell.getTagsArray();
+  }
+
+  @Override
+  public int getTagsOffset() {
+    return this.cell.getTagsOffset();
+  }
+
+  @Override
+  public int getTagsLength() {
+    return this.cell.getTagsLength();
+  }
+
+  @Override
+  public ByteBuffer getRowByteBuffer() {
+    if (cell instanceof ByteBufferCell) {
+      return ((ByteBufferCell) this.cell).getRowByteBuffer();
+    } else {
+      return ByteBuffer.wrap(CellUtil.cloneRow(this.cell));
+    }
+  }
+
+  @Override
+  public int getRowPosition() {
+    if (cell instanceof ByteBufferCell) {
+      return ((ByteBufferCell) this.cell).getRowPosition();
+    } else {
+      return 0;
+    }
+  }
+
+  @Override
+  public ByteBuffer getFamilyByteBuffer() {
+    if (cell instanceof ByteBufferCell) {
+      return ((ByteBufferCell) this.cell).getFamilyByteBuffer();
+    } else {
+      return ByteBuffer.wrap(CellUtil.cloneFamily(this.cell));
+    }
+  }
+
+  @Override
+  public int getFamilyPosition() {
+    if (cell instanceof ByteBufferCell) {
+      return ((ByteBufferCell) this.cell).getFamilyPosition();
+    } else {
+      return 0;
+    }
+  }
+
+  @Override
+  public ByteBuffer getQualifierByteBuffer() {
+    if (cell instanceof ByteBufferCell) {
+      return ((ByteBufferCell) this.cell).getQualifierByteBuffer();
+    } else {
+      return ByteBuffer.wrap(CellUtil.cloneQualifier(this.cell));
+    }
+  }
+
+  @Override
+  public int getQualifierPosition() {
+    if (cell instanceof ByteBufferCell) {
+      return ((ByteBufferCell) this.cell).getQualifierPosition();
+    } else {
+      return 0;
+    }
+  }
+
+  @Override
+  public ByteBuffer getValueByteBuffer() {
+    if (cell instanceof ByteBufferCell) {
+      return ((ByteBufferCell) this.cell).getValueByteBuffer();
+    } else {
+      return ByteBuffer.wrap(CellUtil.cloneValue(this.cell));
+    }
+  }
+
+  @Override
+  public int getValuePosition() {
+    if (cell instanceof ByteBufferCell) {
+      return ((ByteBufferCell) this.cell).getValuePosition();
+    } else {
+      return 0;
+    }
+  }
+
+  @Override
+  public ByteBuffer getTagsByteBuffer() {
+    if (cell instanceof ByteBufferCell) {
+      return ((ByteBufferCell) this.cell).getTagsByteBuffer();
+    } else {
+      return ByteBuffer.wrap(CellUtil.cloneTags(this.cell));
+    }
+  }
+
+  @Override
+  public int getTagsPosition() {
+    if (cell instanceof ByteBufferCell) {
+      return ((ByteBufferCell) this.cell).getTagsPosition();
+    } else {
+      return 0;
+    }
+  }
+
+  @Override
+  public String toString() {
+    return this.cell.toString();
+  }
+
+  @Override
+  public void setSequenceId(long seqId) throws IOException {
+    CellUtil.setSequenceId(cell, seqId);
+  }
+
+  @Override
+  public void setTimestamp(long ts) throws IOException {
+    CellUtil.setTimestamp(cell, ts);
+  }
+
+  @Override
+  public void setTimestamp(byte[] ts, int tsOffset) throws IOException {
+    CellUtil.setTimestamp(cell, ts, tsOffset);
+  }
+
+  @Override
+  public long heapSize() {
+    return CellUtil.estimatedHeapSizeOf(cell);
+  }
+
+  @Override
+  public int write(OutputStream out, boolean withTags) throws IOException {
+    return CellUtil.writeCell(cell, out, withTags);
+  }
+
+  @Override
+  public int getSerializedSize(boolean withTags) {
+    return CellUtil.estimatedSerializedSizeOf(cell) - Bytes.SIZEOF_INT;
+  }
+
+  @Override
+  public void write(ByteBuffer buf, int offset) {
+    CellUtil.writeCellToBuffer(cell, buf, offset);
+  }
+
+  @Override
+  public ExtendedCell deepClone() {
+    try {
+      return (ExtendedCell) CellUtil.deepClone(cell);
+    } catch (CloneNotSupportedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}


Mime
View raw message