hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From anoopsamj...@apache.org
Subject [1/2] HBASE-10835 DBE encode path improvements.(Anoop)
Date Thu, 22 May 2014 06:31:19 GMT
Repository: hbase
Updated Branches:
  refs/heads/master cb1428ddc -> 53513dcb4


http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java
index 993c6cb..15c32ba 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java
@@ -20,7 +20,9 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -36,6 +38,7 @@ import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.LargeTests;
 import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.hfile.HFileBlock.Writer.BufferGrabbingByteArrayOutputStream;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -91,48 +94,6 @@ public class TestDataBlockEncoders {
           HConstants.HFILEBLOCK_DUMMY_HEADER, meta);
     }
   }
-  
-  private byte[] encodeBytes(DataBlockEncoding encoding, ByteBuffer dataset)
-      throws IOException {
-    DataBlockEncoder encoder = encoding.getEncoder();
-    HFileBlockEncodingContext encodingCtx = getEncodingContext(Compression.Algorithm.NONE,
-        encoding);
-
-    encoder.encodeKeyValues(dataset, encodingCtx);
-
-    byte[] encodedBytesWithHeader = encodingCtx.getUncompressedBytesWithHeader();
-    byte[] encodedData = new byte[encodedBytesWithHeader.length - ENCODED_DATA_OFFSET];
-    System.arraycopy(encodedBytesWithHeader, ENCODED_DATA_OFFSET, encodedData, 0,
-        encodedData.length);
-    return encodedData;
-  }
-  
-  private void testAlgorithm(ByteBuffer dataset, DataBlockEncoding encoding,
-      List<KeyValue> kvList) throws IOException {
-    // encode
-    byte[] encodedBytes = encodeBytes(encoding, dataset);
-    // decode
-    ByteArrayInputStream bais = new ByteArrayInputStream(encodedBytes);
-    DataInputStream dis = new DataInputStream(bais);
-    ByteBuffer actualDataset;
-    DataBlockEncoder encoder = encoding.getEncoder();
-    HFileContext meta = new HFileContextBuilder()
-                        .withHBaseCheckSum(false)
-                        .withIncludesMvcc(includesMemstoreTS)
-                        .withIncludesTags(includesTags)
-                        .withCompression(Compression.Algorithm.NONE).build();
-    actualDataset = encoder.decodeKeyValues(dis, encoder.newDataBlockDecodingContext(meta));
-    dataset.rewind();
-    actualDataset.rewind();
-
-    // this is because in case of prefix tree the decoded stream will not have
-    // the
-    // mvcc in it.
-    // if (encoding != DataBlockEncoding.PREFIX_TREE) {
-    assertEquals("Encoding -> decoding gives different results for " + encoder,
-        Bytes.toStringBinary(dataset), Bytes.toStringBinary(actualDataset));
-    // }
-  }
 
   /**
    * Test data block encoding of empty KeyValue.
@@ -158,8 +119,7 @@ public class TestDataBlockEncoders {
       kvList.add(new KeyValue(row, family, qualifier, 0l, value, new Tag[] { new Tag((byte) 1,
           metaValue2) }));
     }
-    testEncodersOnDataset(RedundantKVGenerator.convertKvToByteBuffer(kvList, includesMemstoreTS),
-        kvList);
+    testEncodersOnDataset(kvList, includesMemstoreTS, includesTags);
   }
 
   /**
@@ -186,8 +146,7 @@ public class TestDataBlockEncoders {
       kvList.add(new KeyValue(row, family, qualifier, -1l, Type.Put, value));
       kvList.add(new KeyValue(row, family, qualifier, -2l, Type.Put, value));
     }
-    testEncodersOnDataset(RedundantKVGenerator.convertKvToByteBuffer(kvList, includesMemstoreTS),
-        kvList);
+    testEncodersOnDataset(kvList, includesMemstoreTS, includesTags);
   }
 
 
@@ -199,8 +158,7 @@ public class TestDataBlockEncoders {
   @Test
   public void testExecutionOnSample() throws IOException {
     List<KeyValue> kvList = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags);
-    testEncodersOnDataset(RedundantKVGenerator.convertKvToByteBuffer(kvList, includesMemstoreTS),
-        kvList);
+    testEncodersOnDataset(kvList, includesMemstoreTS, includesTags);
   }
 
   /**
@@ -209,18 +167,17 @@ public class TestDataBlockEncoders {
   @Test
   public void testSeekingOnSample() throws IOException {
     List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags);
-    ByteBuffer originalBuffer = RedundantKVGenerator.convertKvToByteBuffer(sampleKv,
-        includesMemstoreTS);
 
     // create all seekers
-    List<DataBlockEncoder.EncodedSeeker> encodedSeekers = new ArrayList<DataBlockEncoder.EncodedSeeker>();
+    List<DataBlockEncoder.EncodedSeeker> encodedSeekers = 
+        new ArrayList<DataBlockEncoder.EncodedSeeker>();
     for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
-      if (encoding.getEncoder() == null) {
+      DataBlockEncoder encoder = encoding.getEncoder();
+      if (encoder == null) {
         continue;
       }
-
-      ByteBuffer encodedBuffer = ByteBuffer.wrap(encodeBytes(encoding, originalBuffer));
-      DataBlockEncoder encoder = encoding.getEncoder();
+      ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv,
+          getEncodingContext(Compression.Algorithm.NONE, encoding));
       HFileContext meta = new HFileContextBuilder()
                           .withHBaseCheckSum(false)
                           .withIncludesMvcc(includesMemstoreTS)
@@ -258,25 +215,35 @@ public class TestDataBlockEncoders {
     }
   }
 
+  static ByteBuffer encodeKeyValues(DataBlockEncoding encoding, List<KeyValue> kvs,
+      HFileBlockEncodingContext encodingContext) throws IOException {
+    DataBlockEncoder encoder = encoding.getEncoder();
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    baos.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
+    DataOutputStream dos = new DataOutputStream(baos);
+    encoder.startBlockEncoding(encodingContext, dos);
+    for (KeyValue kv : kvs) {
+      encoder.encode(kv, encodingContext, dos);
+    }
+    BufferGrabbingByteArrayOutputStream stream = new BufferGrabbingByteArrayOutputStream();
+    baos.writeTo(stream);
+    encoder.endBlockEncoding(encodingContext, dos, stream.getBuffer());
+    byte[] encodedData = new byte[baos.size() - ENCODED_DATA_OFFSET];
+    System.arraycopy(baos.toByteArray(), ENCODED_DATA_OFFSET, encodedData, 0, encodedData.length);
+    return ByteBuffer.wrap(encodedData);
+  }
+
   @Test
-  public void testNextOnSample() {
+  public void testNextOnSample() throws IOException {
     List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags);
-    ByteBuffer originalBuffer = RedundantKVGenerator.convertKvToByteBuffer(sampleKv,
-        includesMemstoreTS);
 
     for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
       if (encoding.getEncoder() == null) {
         continue;
       }
-
       DataBlockEncoder encoder = encoding.getEncoder();
-      ByteBuffer encodedBuffer = null;
-      try {
-        encodedBuffer = ByteBuffer.wrap(encodeBytes(encoding, originalBuffer));
-      } catch (IOException e) {
-        throw new RuntimeException(String.format("Bug while encoding using '%s'",
-            encoder.toString()), e);
-      }
+      ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv,
+          getEncodingContext(Compression.Algorithm.NONE, encoding));
       HFileContext meta = new HFileContextBuilder()
                           .withHBaseCheckSum(false)
                           .withIncludesMvcc(includesMemstoreTS)
@@ -318,25 +285,19 @@ public class TestDataBlockEncoders {
 
   /**
    * Test whether the decompression of first key is implemented correctly.
+   * @throws IOException
    */
   @Test
-  public void testFirstKeyInBlockOnSample() {
+  public void testFirstKeyInBlockOnSample() throws IOException {
     List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags);
-    ByteBuffer originalBuffer = RedundantKVGenerator.convertKvToByteBuffer(sampleKv,
-        includesMemstoreTS);
 
     for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
       if (encoding.getEncoder() == null) {
         continue;
       }
       DataBlockEncoder encoder = encoding.getEncoder();
-      ByteBuffer encodedBuffer = null;
-      try {
-        encodedBuffer = ByteBuffer.wrap(encodeBytes(encoding, originalBuffer));
-      } catch (IOException e) {
-        throw new RuntimeException(String.format("Bug while encoding using '%s'",
-            encoder.toString()), e);
-      }
+      ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv,
+          getEncodingContext(Compression.Algorithm.NONE, encoding));
       ByteBuffer keyBuffer = encoder.getFirstKeyInBlock(encodedBuffer);
       KeyValue firstKv = sampleKv.get(0);
       if (0 != Bytes.compareTo(keyBuffer.array(), keyBuffer.arrayOffset(), keyBuffer.limit(),
@@ -360,9 +321,7 @@ public class TestDataBlockEncoders {
     ByteBuffer expectedKey = null;
     ByteBuffer expectedValue = null;
     for (DataBlockEncoder.EncodedSeeker seeker : encodedSeekers) {
-      seeker.seekToKeyInBlock(
-          new KeyValue.KeyOnlyKeyValue(keyValue.getBuffer(), keyValue.getKeyOffset(), keyValue
-              .getKeyLength()), seekBefore);
+      seeker.seekToKeyInBlock(keyValue, seekBefore);
       seeker.rewind();
 
       ByteBuffer actualKeyValue = seeker.getKeyValueBuffer();
@@ -388,24 +347,34 @@ public class TestDataBlockEncoders {
       }
     }
   }
-  
-  private void testEncodersOnDataset(ByteBuffer onDataset, List<KeyValue> kvList) throws IOException {
-    ByteBuffer dataset = ByteBuffer.allocate(onDataset.capacity());
-    onDataset.rewind();
-    dataset.put(onDataset);
-    onDataset.rewind();
-    dataset.flip();
 
+  private void testEncodersOnDataset(List<KeyValue> kvList, boolean includesMemstoreTS,
+      boolean includesTags) throws IOException {
+    ByteBuffer unencodedDataBuf = RedundantKVGenerator.convertKvToByteBuffer(kvList,
+        includesMemstoreTS);
+    HFileContext fileContext = new HFileContextBuilder().withIncludesMvcc(includesMemstoreTS)
+        .withIncludesTags(includesTags).build();
     for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
-      if (encoding.getEncoder() == null) {
+      DataBlockEncoder encoder = encoding.getEncoder();
+      if (encoder == null) {
         continue;
       }
+      HFileBlockEncodingContext encodingContext = new HFileBlockDefaultEncodingContext(encoding,
+          HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
+
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      baos.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
+      DataOutputStream dos = new DataOutputStream(baos);
+      encoder.startBlockEncoding(encodingContext, dos);
+      for (KeyValue kv : kvList) {
+        encoder.encode(kv, encodingContext, dos);
+      }
+      BufferGrabbingByteArrayOutputStream stream = new BufferGrabbingByteArrayOutputStream();
+      baos.writeTo(stream);
+      encoder.endBlockEncoding(encodingContext, dos, stream.getBuffer());
+      byte[] encodedData = baos.toByteArray();
 
-      testAlgorithm(dataset, encoding, kvList);
-
-      // ensure that dataset is unchanged
-      dataset.rewind();
-      assertEquals("Input of two methods is changed", onDataset, dataset);
+      testAlgorithm(encodedData, unencodedDataBuf, encoder);
     }
   }
   
@@ -427,8 +396,26 @@ public class TestDataBlockEncoders {
       kvList.add(new KeyValue(row, family, qualifier0, 0, Type.Put, value0));
       kvList.add(new KeyValue(row, family, qualifier1, 0, Type.Put, value1));
     }
-    testEncodersOnDataset(RedundantKVGenerator.convertKvToByteBuffer(kvList, includesMemstoreTS),
-        kvList);
+    testEncodersOnDataset(kvList, includesMemstoreTS, includesTags);
   }
 
+  private void testAlgorithm(byte[] encodedData, ByteBuffer unencodedDataBuf,
+      DataBlockEncoder encoder) throws IOException {
+    // decode
+    ByteArrayInputStream bais = new ByteArrayInputStream(encodedData, ENCODED_DATA_OFFSET,
+        encodedData.length - ENCODED_DATA_OFFSET);
+    DataInputStream dis = new DataInputStream(bais);
+    ByteBuffer actualDataset;
+    HFileContext meta = new HFileContextBuilder().withHBaseCheckSum(false)
+        .withIncludesMvcc(includesMemstoreTS).withIncludesTags(includesTags)
+        .withCompression(Compression.Algorithm.NONE).build();
+    actualDataset = encoder.decodeKeyValues(dis, encoder.newDataBlockDecodingContext(meta));
+    actualDataset.rewind();
+
+    // this is because in case of prefix tree the decoded stream will not have
+    // the
+    // mvcc in it.
+    assertEquals("Encoding -> decoding gives different results for " + encoder,
+        Bytes.toStringBinary(unencodedDataBuf), Bytes.toStringBinary(actualDataset));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestPrefixTreeEncoding.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestPrefixTreeEncoding.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestPrefixTreeEncoding.java
index 1ab4ca7..034771c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestPrefixTreeEncoding.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestPrefixTreeEncoding.java
@@ -98,7 +98,6 @@ public class TestPrefixTreeEncoding {
     formatRowNum = true;
     PrefixTreeCodec encoder = new PrefixTreeCodec();
     int batchId = numBatchesWritten++;
-    ByteBuffer dataBuffer = generateFixedTestData(kvset, batchId, false, includesTag);
     HFileContext meta = new HFileContextBuilder()
                         .withHBaseCheckSum(false)
                         .withIncludesMvcc(false)
@@ -106,10 +105,13 @@ public class TestPrefixTreeEncoding {
                         .withCompression(Algorithm.NONE).build();
     HFileBlockEncodingContext blkEncodingCtx = new HFileBlockDefaultEncodingContext(
         DataBlockEncoding.PREFIX_TREE, new byte[0], meta);
-    encoder.encodeKeyValues(dataBuffer, blkEncodingCtx);
+    ByteArrayOutputStream baosInMemory = new ByteArrayOutputStream();
+    DataOutputStream userDataStream = new DataOutputStream(baosInMemory);
+    generateFixedTestData(kvset, batchId, false, includesTag, encoder, blkEncodingCtx,
+        userDataStream);
     EncodedSeeker seeker = encoder.createSeeker(KeyValue.COMPARATOR,
         encoder.newDataBlockDecodingContext(meta));
-    byte[] onDiskBytes = blkEncodingCtx.getOnDiskBytesWithHeader();
+    byte[] onDiskBytes = baosInMemory.toByteArray();
     ByteBuffer readBuffer = ByteBuffer.wrap(onDiskBytes, DataBlockEncoding.ID_SIZE,
         onDiskBytes.length - DataBlockEncoding.ID_SIZE);
     seeker.setCurrentBuffer(readBuffer);
@@ -142,7 +144,8 @@ public class TestPrefixTreeEncoding {
   @Test
   public void testScanWithRandomData() throws Exception {
     PrefixTreeCodec encoder = new PrefixTreeCodec();
-    ByteBuffer dataBuffer = generateRandomTestData(kvset, numBatchesWritten++, includesTag);
+    ByteArrayOutputStream baosInMemory = new ByteArrayOutputStream();
+    DataOutputStream userDataStream = new DataOutputStream(baosInMemory);
     HFileContext meta = new HFileContextBuilder()
                         .withHBaseCheckSum(false)
                         .withIncludesMvcc(false)
@@ -151,10 +154,11 @@ public class TestPrefixTreeEncoding {
                         .build();
     HFileBlockEncodingContext blkEncodingCtx = new HFileBlockDefaultEncodingContext(
         DataBlockEncoding.PREFIX_TREE, new byte[0], meta);
-    encoder.encodeKeyValues(dataBuffer, blkEncodingCtx);
+    generateRandomTestData(kvset, numBatchesWritten++, includesTag, encoder, blkEncodingCtx,
+        userDataStream);
     EncodedSeeker seeker = encoder.createSeeker(KeyValue.COMPARATOR,
         encoder.newDataBlockDecodingContext(meta));
-    byte[] onDiskBytes = blkEncodingCtx.getOnDiskBytesWithHeader();
+    byte[] onDiskBytes = baosInMemory.toByteArray();
     ByteBuffer readBuffer = ByteBuffer.wrap(onDiskBytes, DataBlockEncoding.ID_SIZE,
         onDiskBytes.length - DataBlockEncoding.ID_SIZE);
     seeker.setCurrentBuffer(readBuffer);
@@ -178,8 +182,9 @@ public class TestPrefixTreeEncoding {
   @Test
   public void testSeekWithRandomData() throws Exception {
     PrefixTreeCodec encoder = new PrefixTreeCodec();
+    ByteArrayOutputStream baosInMemory = new ByteArrayOutputStream();
+    DataOutputStream userDataStream = new DataOutputStream(baosInMemory);
     int batchId = numBatchesWritten++;
-    ByteBuffer dataBuffer = generateRandomTestData(kvset, batchId, includesTag);
     HFileContext meta = new HFileContextBuilder()
                         .withHBaseCheckSum(false)
                         .withIncludesMvcc(false)
@@ -188,10 +193,10 @@ public class TestPrefixTreeEncoding {
                         .build();
     HFileBlockEncodingContext blkEncodingCtx = new HFileBlockDefaultEncodingContext(
         DataBlockEncoding.PREFIX_TREE, new byte[0], meta);
-    encoder.encodeKeyValues(dataBuffer, blkEncodingCtx);
+    generateRandomTestData(kvset, batchId, includesTag, encoder, blkEncodingCtx, userDataStream);
     EncodedSeeker seeker = encoder.createSeeker(KeyValue.COMPARATOR,
         encoder.newDataBlockDecodingContext(meta));
-    byte[] onDiskBytes = blkEncodingCtx.getOnDiskBytesWithHeader();
+    byte[] onDiskBytes = baosInMemory.toByteArray();
     ByteBuffer readBuffer = ByteBuffer.wrap(onDiskBytes, DataBlockEncoding.ID_SIZE,
         onDiskBytes.length - DataBlockEncoding.ID_SIZE);
     verifySeeking(seeker, readBuffer, batchId);
@@ -201,7 +206,6 @@ public class TestPrefixTreeEncoding {
   public void testSeekWithFixedData() throws Exception {
     PrefixTreeCodec encoder = new PrefixTreeCodec();
     int batchId = numBatchesWritten++;
-    ByteBuffer dataBuffer = generateFixedTestData(kvset, batchId, includesTag);
     HFileContext meta = new HFileContextBuilder()
                         .withHBaseCheckSum(false)
                         .withIncludesMvcc(false)
@@ -210,10 +214,12 @@ public class TestPrefixTreeEncoding {
                         .build();
     HFileBlockEncodingContext blkEncodingCtx = new HFileBlockDefaultEncodingContext(
         DataBlockEncoding.PREFIX_TREE, new byte[0], meta);
-    encoder.encodeKeyValues(dataBuffer, blkEncodingCtx);
+    ByteArrayOutputStream baosInMemory = new ByteArrayOutputStream();
+    DataOutputStream userDataStream = new DataOutputStream(baosInMemory);
+    generateFixedTestData(kvset, batchId, includesTag, encoder, blkEncodingCtx, userDataStream);
     EncodedSeeker seeker = encoder.createSeeker(KeyValue.COMPARATOR,
         encoder.newDataBlockDecodingContext(meta));
-    byte[] onDiskBytes = blkEncodingCtx.getOnDiskBytesWithHeader();
+    byte[] onDiskBytes = baosInMemory.toByteArray();
     ByteBuffer readBuffer = ByteBuffer.wrap(onDiskBytes, DataBlockEncoding.ID_SIZE,
         onDiskBytes.length - DataBlockEncoding.ID_SIZE);
     verifySeeking(seeker, readBuffer, batchId);
@@ -255,15 +261,15 @@ public class TestPrefixTreeEncoding {
     }
   }
 
-  private static ByteBuffer generateFixedTestData(ConcurrentSkipListSet<KeyValue> kvset,
-      int batchId, boolean useTags) throws Exception {
-    return generateFixedTestData(kvset, batchId, true, useTags);
+  private static void generateFixedTestData(ConcurrentSkipListSet<KeyValue> kvset, int batchId,
+      boolean useTags, PrefixTreeCodec encoder, HFileBlockEncodingContext blkEncodingCtx,
+      DataOutputStream userDataStream) throws Exception {
+    generateFixedTestData(kvset, batchId, true, useTags, encoder, blkEncodingCtx, userDataStream);
   }
 
-  private static ByteBuffer generateFixedTestData(ConcurrentSkipListSet<KeyValue> kvset,
-      int batchId, boolean partial, boolean useTags) throws Exception {
-    ByteArrayOutputStream baosInMemory = new ByteArrayOutputStream();
-    DataOutputStream userDataStream = new DataOutputStream(baosInMemory);
+  private static void generateFixedTestData(ConcurrentSkipListSet<KeyValue> kvset,
+      int batchId, boolean partial, boolean useTags, PrefixTreeCodec encoder,
+      HFileBlockEncodingContext blkEncodingCtx, DataOutputStream userDataStream) throws Exception {
     for (int i = 0; i < NUM_ROWS_PER_BATCH; ++i) {
       if (partial && i / 10 % 2 == 1)
         continue;
@@ -279,24 +285,16 @@ public class TestPrefixTreeEncoding {
         }
       }
     }
+    encoder.startBlockEncoding(blkEncodingCtx, userDataStream);
     for (KeyValue kv : kvset) {
-      userDataStream.writeInt(kv.getKeyLength());
-      userDataStream.writeInt(kv.getValueLength());
-      userDataStream.write(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength());
-      userDataStream.write(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
-      if (useTags) {
-        userDataStream.writeShort(kv.getTagsLength());
-        userDataStream.write(kv.getBuffer(), kv.getValueOffset() + kv.getValueLength()
-            + Bytes.SIZEOF_SHORT, kv.getTagsLength());
-      }
+      encoder.encode(kv, blkEncodingCtx, userDataStream);
     }
-    return ByteBuffer.wrap(baosInMemory.toByteArray());
+    encoder.endBlockEncoding(blkEncodingCtx, userDataStream, null);
   }
 
-  private static ByteBuffer generateRandomTestData(ConcurrentSkipListSet<KeyValue> kvset,
-      int batchId, boolean useTags) throws Exception {
-    ByteArrayOutputStream baosInMemory = new ByteArrayOutputStream();
-    DataOutputStream userDataStream = new DataOutputStream(baosInMemory);
+  private static void generateRandomTestData(ConcurrentSkipListSet<KeyValue> kvset,
+      int batchId, boolean useTags, PrefixTreeCodec encoder,
+      HFileBlockEncodingContext blkEncodingCtx, DataOutputStream userDataStream) throws Exception {
     Random random = new Random();
     for (int i = 0; i < NUM_ROWS_PER_BATCH; ++i) {
       if (random.nextInt(100) < 50)
@@ -315,19 +313,11 @@ public class TestPrefixTreeEncoding {
         }
       }
     }
-
+    encoder.startBlockEncoding(blkEncodingCtx, userDataStream);
     for (KeyValue kv : kvset) {
-      userDataStream.writeInt(kv.getKeyLength());
-      userDataStream.writeInt(kv.getValueLength());
-      userDataStream.write(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength());
-      userDataStream.write(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
-      if (useTags) {
-        userDataStream.writeShort(kv.getTagsLength());
-        userDataStream.write(kv.getBuffer(), kv.getValueOffset() + kv.getValueLength()
-            + Bytes.SIZEOF_SHORT, kv.getTagsLength());
-      }
+      encoder.encode(kv, blkEncodingCtx, userDataStream);
     }
-    return ByteBuffer.wrap(baosInMemory.toByteArray());
+    encoder.endBlockEncoding(blkEncodingCtx, userDataStream, null);
   }
 
   private static byte[] getRowKey(int batchId, int i) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestSeekToBlockWithEncoders.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestSeekToBlockWithEncoders.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestSeekToBlockWithEncoders.java
index 75464d2..ab60c8b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestSeekToBlockWithEncoders.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestSeekToBlockWithEncoders.java
@@ -32,30 +32,12 @@ import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.test.RedundantKVGenerator;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 @Category(SmallTests.class)
 public class TestSeekToBlockWithEncoders {
 
-  private static int ENCODED_DATA_OFFSET = HConstants.HFILEBLOCK_HEADER_SIZE
-      + DataBlockEncoding.ID_SIZE;
-
-  private HFileBlockEncodingContext getEncodingContext(Compression.Algorithm algo,
-      DataBlockEncoding encoding) {
-    DataBlockEncoder encoder = encoding.getEncoder();
-    HFileContext meta = new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(false)
-        .withIncludesTags(false).withCompression(algo).build();
-    if (encoder != null) {
-      return encoder
-          .newDataBlockEncodingContext(encoding, HConstants.HFILEBLOCK_DUMMY_HEADER, meta);
-    } else {
-      return new HFileBlockDefaultEncodingContext(encoding, HConstants.HFILEBLOCK_DUMMY_HEADER,
-          meta);
-    }
-  }
-
   /**
    * Test seeking while file is encoded.
    */
@@ -77,10 +59,9 @@ public class TestSeekToBlockWithEncoders {
     KeyValue kv5 = new KeyValue(Bytes.toBytes("bba"), Bytes.toBytes("f1"), Bytes.toBytes("q1"),
         Bytes.toBytes("val"));
     sampleKv.add(kv5);
-    ByteBuffer originalBuffer = RedundantKVGenerator.convertKvToByteBuffer(sampleKv, false);
     KeyValue toSeek = new KeyValue(Bytes.toBytes("aae"), Bytes.toBytes("f1"), Bytes.toBytes("q1"),
         Bytes.toBytes("val"));
-    seekToTheKey(kv4, originalBuffer, toSeek);
+    seekToTheKey(kv4, sampleKv, toSeek);
   }
 
   /**
@@ -104,10 +85,9 @@ public class TestSeekToBlockWithEncoders {
     KeyValue kv5 = new KeyValue(Bytes.toBytes("aaaad"), Bytes.toBytes("f1"), Bytes.toBytes("q1"),
         Bytes.toBytes("val"));
     sampleKv.add(kv5);
-    ByteBuffer originalBuffer = RedundantKVGenerator.convertKvToByteBuffer(sampleKv, false);
     KeyValue toSeek = new KeyValue(Bytes.toBytes("aaaa"), Bytes.toBytes("f1"), Bytes.toBytes("q1"),
         Bytes.toBytes("val"));
-    seekToTheKey(kv1, originalBuffer, toSeek);
+    seekToTheKey(kv1, sampleKv, toSeek);
   }
 
   /**
@@ -131,10 +111,9 @@ public class TestSeekToBlockWithEncoders {
     KeyValue kv5 = new KeyValue(Bytes.toBytes("bbbcd"), Bytes.toBytes("f1"), Bytes.toBytes("q1"),
         Bytes.toBytes("val"));
     sampleKv.add(kv5);
-    ByteBuffer originalBuffer = RedundantKVGenerator.convertKvToByteBuffer(sampleKv, false);
     KeyValue toSeek = new KeyValue(Bytes.toBytes("bbbce"), Bytes.toBytes("f1"),
         Bytes.toBytes("q1"), Bytes.toBytes("val"));
-    seekToTheKey(kv5, originalBuffer, toSeek);
+    seekToTheKey(kv5, sampleKv, toSeek);
   }
 
   /**
@@ -155,10 +134,9 @@ public class TestSeekToBlockWithEncoders {
     KeyValue kv4 = new KeyValue(Bytes.toBytes("row11baa"), Bytes.toBytes("f1"),
         Bytes.toBytes("q1"), Bytes.toBytes("val"));
     sampleKv.add(kv4);
-    ByteBuffer originalBuffer = RedundantKVGenerator.convertKvToByteBuffer(sampleKv, false);
     KeyValue toSeek = KeyValueUtil.createLastOnRow(kv3.getRowArray(), kv3.getRowOffset(),
         kv3.getRowLength(), null, 0, 0, null, 0, 0);
-    seekToTheKey(kv3, originalBuffer, toSeek);
+    seekToTheKey(kv3, sampleKv, toSeek);
   }
 
   @Test
@@ -176,10 +154,9 @@ public class TestSeekToBlockWithEncoders {
     KeyValue kv5 = new KeyValue(Bytes.toBytes("aac"), Bytes.toBytes("f1"), Bytes.toBytes("q2"),
         Bytes.toBytes("val"));
     sampleKv.add(kv5);
-    ByteBuffer originalBuffer = RedundantKVGenerator.convertKvToByteBuffer(sampleKv, false);
     KeyValue toSeek = new KeyValue(Bytes.toBytes("aac"), Bytes.toBytes("f1"), Bytes.toBytes("q2"),
         Bytes.toBytes("val"));
-    seekToTheKey(kv5, originalBuffer, toSeek);
+    seekToTheKey(kv5, sampleKv, toSeek);
   }
 
   @Test
@@ -200,10 +177,9 @@ public class TestSeekToBlockWithEncoders {
     KeyValue kv6 = new KeyValue(Bytes.toBytes("aaa"), Bytes.toBytes("f1"), Bytes.toBytes("q5"),
         Bytes.toBytes("val"));
     sampleKv.add(kv6);
-    ByteBuffer originalBuffer = RedundantKVGenerator.convertKvToByteBuffer(sampleKv, false);
     KeyValue toSeek = new KeyValue(Bytes.toBytes("aaa"), Bytes.toBytes("f1"), Bytes.toBytes("q5"),
         Bytes.toBytes("val"));
-    seekToTheKey(kv6, originalBuffer, toSeek);
+    seekToTheKey(kv6, sampleKv, toSeek);
   }
 
   @Test
@@ -224,10 +200,9 @@ public class TestSeekToBlockWithEncoders {
     KeyValue kv6 = new KeyValue(Bytes.toBytes("aaa"), Bytes.toBytes("f1"), Bytes.toBytes("z5"),
         Bytes.toBytes("val"));
     sampleKv.add(kv6);
-    ByteBuffer originalBuffer = RedundantKVGenerator.convertKvToByteBuffer(sampleKv, false);
     KeyValue toSeek = new KeyValue(Bytes.toBytes("aaa"), Bytes.toBytes("f1"), Bytes.toBytes("q5"),
         Bytes.toBytes("val"));
-    seekToTheKey(kv5, originalBuffer, toSeek);
+    seekToTheKey(kv5, sampleKv, toSeek);
   }
 
   @Test
@@ -248,10 +223,9 @@ public class TestSeekToBlockWithEncoders {
     KeyValue kv6 = new KeyValue(Bytes.toBytes("aaa"), Bytes.toBytes("f1"), Bytes.toBytes("qz"),
         Bytes.toBytes("val"));
     sampleKv.add(kv6);
-    ByteBuffer originalBuffer = RedundantKVGenerator.convertKvToByteBuffer(sampleKv, false);
     KeyValue toSeek = new KeyValue(Bytes.toBytes("aaa"), Bytes.toBytes("f1"), Bytes.toBytes("qz"),
         Bytes.toBytes("val"));
-    seekToTheKey(kv6, originalBuffer, toSeek);
+    seekToTheKey(kv6, sampleKv, toSeek);
   }
 
   @Test
@@ -269,27 +243,28 @@ public class TestSeekToBlockWithEncoders {
     KeyValue kv5 = new KeyValue(Bytes.toBytes("aac"), Bytes.toBytes("fam1"), Bytes.toBytes("q2"),
         Bytes.toBytes("val"));
     sampleKv.add(kv5);
-    ByteBuffer originalBuffer = RedundantKVGenerator.convertKvToByteBuffer(sampleKv, false);
     KeyValue toSeek = new KeyValue(Bytes.toBytes("aac"), Bytes.toBytes("fam2"),
         Bytes.toBytes("q2"), Bytes.toBytes("val"));
-    seekToTheKey(kv5, originalBuffer, toSeek);
+    seekToTheKey(kv5, sampleKv, toSeek);
   }
 
-  private void seekToTheKey(KeyValue expected, ByteBuffer originalBuffer, KeyValue toSeek)
+  private void seekToTheKey(KeyValue expected, List<KeyValue> kvs, KeyValue toSeek)
       throws IOException {
     // create all seekers
-    List<DataBlockEncoder.EncodedSeeker> encodedSeekers = 
-        new ArrayList<DataBlockEncoder.EncodedSeeker>();
+    List<DataBlockEncoder.EncodedSeeker> encodedSeekers = new ArrayList<DataBlockEncoder.EncodedSeeker>();
     for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
       if (encoding.getEncoder() == null || encoding == DataBlockEncoding.PREFIX_TREE) {
         continue;
       }
 
-      ByteBuffer encodedBuffer = ByteBuffer.wrap(encodeBytes(encoding, originalBuffer));
       DataBlockEncoder encoder = encoding.getEncoder();
       HFileContext meta = new HFileContextBuilder().withHBaseCheckSum(false)
           .withIncludesMvcc(false).withIncludesTags(false)
           .withCompression(Compression.Algorithm.NONE).build();
+      HFileBlockEncodingContext encodingContext = encoder.newDataBlockEncodingContext(encoding,
+          HConstants.HFILEBLOCK_DUMMY_HEADER, meta);
+      ByteBuffer encodedBuffer = TestDataBlockEncoders.encodeKeyValues(encoding, kvs,
+          encodingContext);
       DataBlockEncoder.EncodedSeeker seeker = encoder.createSeeker(KeyValue.COMPARATOR,
           encoder.newDataBlockDecodingContext(meta));
       seeker.setCurrentBuffer(encodedBuffer);
@@ -311,17 +286,4 @@ public class TestSeekToBlockWithEncoders {
       seeker.rewind();
     }
   }
-
-  private byte[] encodeBytes(DataBlockEncoding encoding, ByteBuffer dataset) throws IOException {
-    DataBlockEncoder encoder = encoding.getEncoder();
-    HFileBlockEncodingContext encodingCtx = getEncodingContext(Compression.Algorithm.NONE, encoding);
-
-    encoder.encodeKeyValues(dataset, encodingCtx);
-
-    byte[] encodedBytesWithHeader = encodingCtx.getUncompressedBytesWithHeader();
-    byte[] encodedData = new byte[encodedBytesWithHeader.length - ENCODED_DATA_OFFSET];
-    System.arraycopy(encodedBytesWithHeader, ENCODED_DATA_OFFSET, encodedData, 0,
-        encodedData.length);
-    return encodedData;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
index 0a2187d..37456a8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
@@ -55,13 +55,9 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.MediumTests;
 import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.fs.HFileSystem;
-import org.apache.hadoop.hbase.io.DoubleOutputStream;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
-import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
-import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
-import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ChecksumType;
 import org.apache.hadoop.hbase.util.ClassSize;
@@ -83,8 +79,7 @@ public class TestHFileBlock {
 
   private static final Log LOG = LogFactory.getLog(TestHFileBlock.class);
 
-  static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = {
-      NONE, GZ };
+  static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { NONE, GZ };
 
   private static final int NUM_TEST_BLOCKS = 1000;
   private static final int NUM_READER_THREADS = 26;
@@ -94,10 +89,8 @@ public class TestHFileBlock {
   private static int FIELD_LENGTH = 10;
   private static float CHANCE_TO_REPEAT = 0.6f;
 
-  private static final HBaseTestingUtility TEST_UTIL =
-    new HBaseTestingUtility();
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
   private FileSystem fs;
-  private int uncompressedSizeV1;
 
   private final boolean includesMemstoreTS;
   private final boolean includesTag;
@@ -122,8 +115,8 @@ public class TestHFileBlock {
       dos.writeInt(i / 100);
   }
 
- static int writeTestKeyValues(OutputStream dos, int seed, boolean includesMemstoreTS, boolean useTag)
-      throws IOException {
+  static int writeTestKeyValues(HFileBlock.Writer hbw, int seed, boolean includesMemstoreTS,
+      boolean useTag) throws IOException {
     List<KeyValue> keyValues = new ArrayList<KeyValue>();
     Random randomizer = new Random(42l + seed); // just any fixed number
 
@@ -177,26 +170,16 @@ public class TestHFileBlock {
 
     // sort it and write to stream
     int totalSize = 0;
-   Collections.sort(keyValues, KeyValue.COMPARATOR);
-    DataOutputStream dataOutputStream = new DataOutputStream(dos);
+    Collections.sort(keyValues, KeyValue.COMPARATOR);
 
     for (KeyValue kv : keyValues) {
-      dataOutputStream.writeInt(kv.getKeyLength());
-      dataOutputStream.writeInt(kv.getValueLength());
-      dataOutputStream.write(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength());
-      dataOutputStream.write(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
-      // Write the additonal tag into the stream
-      // always write the taglength
       totalSize += kv.getLength();
-      if (useTag) {
-        dataOutputStream.writeShort(kv.getTagsLength());
-        dataOutputStream.write(kv.getTagsArray(), kv.getTagsOffset(), kv.getTagsLength());
-      }
       if (includesMemstoreTS) {
         long memstoreTS = randomizer.nextLong();
-        WritableUtils.writeVLong(dataOutputStream, memstoreTS);
+        kv.setMvccVersion(memstoreTS);
         totalSize += WritableUtils.getVIntSize(memstoreTS);
       }
+      hbw.write(kv);
     }
     return totalSize;
   }
@@ -209,7 +192,6 @@ public class TestHFileBlock {
     DataOutputStream dos = new DataOutputStream(os);
     BlockType.META.write(dos); // Let's make this a meta block.
     writeTestBlockContents(dos);
-    uncompressedSizeV1 = dos.size();
     dos.flush();
     algo.returnCompressor(compressor);
     return baos.toByteArray();
@@ -229,7 +211,7 @@ public class TestHFileBlock {
     DataOutputStream dos = hbw.startWriting(blockType);
     writeTestBlockContents(dos);
     dos.flush();
-    byte[] headerAndData = hbw.getHeaderAndDataForTest();
+    hbw.ensureBlockReady();
     assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader());
     hbw.release();
     return hbw;
@@ -383,8 +365,8 @@ public class TestHFileBlock {
           Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
               + algo + "_" + encoding.toString());
           FSDataOutputStream os = fs.create(path);
-          HFileDataBlockEncoder dataBlockEncoder =
-              new HFileDataBlockEncoderImpl(encoding);
+          HFileDataBlockEncoder dataBlockEncoder = (encoding != DataBlockEncoding.NONE) ?
+              new HFileDataBlockEncoderImpl(encoding) : NoOpDataBlockEncoder.INSTANCE;
           HFileContext meta = new HFileContextBuilder()
                               .withCompression(algo)
                               .withIncludesMvcc(includesMemstoreTS)
@@ -392,16 +374,30 @@ public class TestHFileBlock {
                               .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
                               .withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE)
                               .build();
-          HFileBlock.Writer hbw = new HFileBlock.Writer(dataBlockEncoder,
-             meta);
+          HFileBlock.Writer hbw = new HFileBlock.Writer(dataBlockEncoder, meta);
           long totalSize = 0;
           final List<Integer> encodedSizes = new ArrayList<Integer>();
           final List<ByteBuffer> encodedBlocks = new ArrayList<ByteBuffer>();
           for (int blockId = 0; blockId < numBlocks; ++blockId) {
-            DataOutputStream dos = hbw.startWriting(BlockType.DATA);
-            writeEncodedBlock(algo, encoding, dos, encodedSizes, encodedBlocks,
-                blockId, includesMemstoreTS, HConstants.HFILEBLOCK_DUMMY_HEADER, includesTag);
+            hbw.startWriting(BlockType.DATA);
+            writeTestKeyValues(hbw, blockId, includesMemstoreTS, includesTag);
             hbw.writeHeaderAndData(os);
+            int headerLen = HConstants.HFILEBLOCK_HEADER_SIZE;
+            byte[] encodedResultWithHeader = hbw.getUncompressedBufferWithHeader().array();
+            final int encodedSize = encodedResultWithHeader.length - headerLen;
+            if (encoding != DataBlockEncoding.NONE) {
+              // We need to account for the two-byte encoding algorithm ID that
+              // comes after the 24-byte block header but before encoded KVs.
+              headerLen += DataBlockEncoding.ID_SIZE;
+            }
+            byte[] encodedDataSection =
+                new byte[encodedResultWithHeader.length - headerLen];
+            System.arraycopy(encodedResultWithHeader, headerLen,
+                encodedDataSection, 0, encodedDataSection.length);
+            final ByteBuffer encodedBuf =
+                ByteBuffer.wrap(encodedDataSection);
+            encodedSizes.add(encodedSize);
+            encodedBlocks.add(encodedBuf);
             totalSize += hbw.getOnDiskSizeWithHeader();
           }
           os.close();
@@ -438,8 +434,7 @@ public class TestHFileBlock {
             expectedBuffer.rewind();
 
             // test if content matches, produce nice message
-            assertBuffersEqual(expectedBuffer, actualBuffer, algo, encoding,
-                pread);
+            assertBuffersEqual(expectedBuffer, actualBuffer, algo, encoding, pread);
           }
           is.close();
         }
@@ -447,60 +442,6 @@ public class TestHFileBlock {
     }
   }
 
-  static void writeEncodedBlock(Algorithm algo, DataBlockEncoding encoding,
-       DataOutputStream dos, final List<Integer> encodedSizes,
-      final List<ByteBuffer> encodedBlocks, int blockId, 
-      boolean includesMemstoreTS, byte[] dummyHeader, boolean useTag) throws IOException {
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    DoubleOutputStream doubleOutputStream =
-        new DoubleOutputStream(dos, baos);
-    writeTestKeyValues(doubleOutputStream, blockId, includesMemstoreTS, useTag);
-    ByteBuffer rawBuf = ByteBuffer.wrap(baos.toByteArray());
-    rawBuf.rewind();
-
-    DataBlockEncoder encoder = encoding.getEncoder();
-    int headerLen = dummyHeader.length;
-    byte[] encodedResultWithHeader = null;
-    HFileContext meta = new HFileContextBuilder()
-                        .withCompression(algo)
-                        .withIncludesMvcc(includesMemstoreTS)
-                        .withIncludesTags(useTag)
-                        .build();
-    if (encoder != null) {
-      HFileBlockEncodingContext encodingCtx = encoder.newDataBlockEncodingContext(encoding,
-          dummyHeader, meta);
-      encoder.encodeKeyValues(rawBuf, encodingCtx);
-      encodedResultWithHeader =
-          encodingCtx.getUncompressedBytesWithHeader();
-    } else {
-      HFileBlockDefaultEncodingContext defaultEncodingCtx = new HFileBlockDefaultEncodingContext(
-          encoding, dummyHeader, meta);
-      byte[] rawBufWithHeader =
-          new byte[rawBuf.array().length + headerLen];
-      System.arraycopy(rawBuf.array(), 0, rawBufWithHeader,
-          headerLen, rawBuf.array().length);
-      defaultEncodingCtx.compressAfterEncodingWithBlockType(rawBufWithHeader,
-          BlockType.DATA);
-      encodedResultWithHeader =
-        defaultEncodingCtx.getUncompressedBytesWithHeader();
-    }
-    final int encodedSize =
-        encodedResultWithHeader.length - headerLen;
-    if (encoder != null) {
-      // We need to account for the two-byte encoding algorithm ID that
-      // comes after the 24-byte block header but before encoded KVs.
-      headerLen += DataBlockEncoding.ID_SIZE;
-    }
-    byte[] encodedDataSection =
-        new byte[encodedResultWithHeader.length - headerLen];
-    System.arraycopy(encodedResultWithHeader, headerLen,
-        encodedDataSection, 0, encodedDataSection.length);
-    final ByteBuffer encodedBuf =
-        ByteBuffer.wrap(encodedDataSection);
-    encodedSizes.add(encodedSize);
-    encodedBlocks.add(encodedBuf);
-  }
-
   static void assertBuffersEqual(ByteBuffer expectedBuffer,
       ByteBuffer actualBuffer, Compression.Algorithm compression,
       DataBlockEncoding encoding, boolean pread) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java
index 1d48509..88fdb77 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.SmallTests;
 import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
@@ -50,6 +51,7 @@ import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
 import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ChecksumType;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.compress.Compressor;
 import org.junit.Before;
 import org.junit.Test;
@@ -67,21 +69,13 @@ import com.google.common.base.Preconditions;
 @Category(SmallTests.class)
 @RunWith(Parameterized.class)
 public class TestHFileBlockCompatibility {
-  // change this value to activate more logs
-  private static final boolean[] BOOLEAN_VALUES = new boolean[] { false, true };
 
   private static final Log LOG = LogFactory.getLog(TestHFileBlockCompatibility.class);
-
   private static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = {
       NONE, GZ };
 
-  // The mnior version for pre-checksum files
-  private static int MINOR_VERSION = 0;
-
-  private static final HBaseTestingUtility TEST_UTIL =
-    new HBaseTestingUtility();
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
   private HFileSystem fs;
-  private int uncompressedSizeV1;
 
   private final boolean includesMemstoreTS;
   private final boolean includesTag;
@@ -109,7 +103,6 @@ public class TestHFileBlockCompatibility {
     DataOutputStream dos = new DataOutputStream(os);
     BlockType.META.write(dos); // Let's make this a meta block.
     TestHFileBlock.writeTestBlockContents(dos);
-    uncompressedSizeV1 = dos.size();
     dos.flush();
     algo.returnCompressor(compressor);
     return baos.toByteArray();
@@ -259,8 +252,8 @@ public class TestHFileBlockCompatibility {
           Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
               + algo + "_" + encoding.toString());
           FSDataOutputStream os = fs.create(path);
-          HFileDataBlockEncoder dataBlockEncoder =
-              new HFileDataBlockEncoderImpl(encoding);
+          HFileDataBlockEncoder dataBlockEncoder = (encoding != DataBlockEncoding.NONE) ?
+              new HFileDataBlockEncoderImpl(encoding) : NoOpDataBlockEncoder.INSTANCE;
           TestHFileBlockCompatibility.Writer hbw =
               new TestHFileBlockCompatibility.Writer(algo,
                   dataBlockEncoder, includesMemstoreTS, includesTag);
@@ -268,12 +261,25 @@ public class TestHFileBlockCompatibility {
           final List<Integer> encodedSizes = new ArrayList<Integer>();
           final List<ByteBuffer> encodedBlocks = new ArrayList<ByteBuffer>();
           for (int blockId = 0; blockId < numBlocks; ++blockId) {
-            DataOutputStream dos = hbw.startWriting(BlockType.DATA);
-            TestHFileBlock.writeEncodedBlock(algo, encoding, dos, encodedSizes,
-                encodedBlocks, blockId, includesMemstoreTS,
-                TestHFileBlockCompatibility.Writer.DUMMY_HEADER, includesTag);
-
+            hbw.startWriting(BlockType.DATA);
+            TestHFileBlock.writeTestKeyValues(hbw, blockId, pread, includesTag);
             hbw.writeHeaderAndData(os);
+            int headerLen = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
+            byte[] encodedResultWithHeader = hbw.getUncompressedDataWithHeader();
+            final int encodedSize = encodedResultWithHeader.length - headerLen;
+            if (encoding != DataBlockEncoding.NONE) {
+              // We need to account for the two-byte encoding algorithm ID that
+              // comes after the 24-byte block header but before encoded KVs.
+              headerLen += DataBlockEncoding.ID_SIZE;
+            }
+            byte[] encodedDataSection =
+                new byte[encodedResultWithHeader.length - headerLen];
+            System.arraycopy(encodedResultWithHeader, headerLen,
+                encodedDataSection, 0, encodedDataSection.length);
+            final ByteBuffer encodedBuf =
+                ByteBuffer.wrap(encodedDataSection);
+            encodedSizes.add(encodedSize);
+            encodedBlocks.add(encodedBuf);
             totalSize += hbw.getOnDiskSizeWithHeader();
           }
           os.close();
@@ -329,7 +335,7 @@ public class TestHFileBlockCompatibility {
    * in this class but the code in HFileBlock.Writer will continually
    * evolve.
    */
-  public static final class Writer {
+  public static final class Writer extends HFileBlock.Writer{
 
     // These constants are as they were in minorVersion 0.
     private static final int HEADER_SIZE = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
@@ -408,34 +414,31 @@ public class TestHFileBlockCompatibility {
     /** The offset of the previous block of the same type */
     private long prevOffset;
 
-    private HFileContext meta;
+    private int unencodedDataSizeWritten;
 
     /**
      * @param compressionAlgorithm compression algorithm to use
      * @param dataBlockEncoderAlgo data block encoding algorithm to use
      */
     public Writer(Compression.Algorithm compressionAlgorithm,
-          HFileDataBlockEncoder dataBlockEncoder, boolean includesMemstoreTS, boolean includesTag) {
-      compressAlgo = compressionAlgorithm == null ? NONE : compressionAlgorithm;
-      this.dataBlockEncoder = dataBlockEncoder != null
-          ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
-
-      meta = new HFileContextBuilder()
-              .withHBaseCheckSum(false)
-              .withIncludesMvcc(includesMemstoreTS)
-              .withIncludesTags(includesTag)
-              .withCompression(compressionAlgorithm)
-              .build();
+        HFileDataBlockEncoder dataBlockEncoder, boolean includesMemstoreTS, boolean includesTag) {
+      this(dataBlockEncoder, new HFileContextBuilder().withHBaseCheckSum(false)
+          .withIncludesMvcc(includesMemstoreTS).withIncludesTags(includesTag)
+          .withCompression(compressionAlgorithm).build());
+    }
+
+    public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext meta) {
+      super(dataBlockEncoder, meta);
+      compressAlgo = meta.getCompression() == null ? NONE : meta.getCompression();
+      this.dataBlockEncoder = dataBlockEncoder != null ? dataBlockEncoder
+          : NoOpDataBlockEncoder.INSTANCE;
       defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null, DUMMY_HEADER, meta);
-      dataBlockEncodingCtx =
-          this.dataBlockEncoder.newDataBlockEncodingContext(
-              DUMMY_HEADER, meta);
+      dataBlockEncodingCtx = this.dataBlockEncoder.newDataBlockEncodingContext(DUMMY_HEADER, meta);
       baosInMemory = new ByteArrayOutputStream();
 
       prevOffsetByType = new long[BlockType.values().length];
       for (int i = 0; i < prevOffsetByType.length; ++i)
         prevOffsetByType[i] = -1;
-
     }
 
     /**
@@ -462,9 +465,22 @@ public class TestHFileBlockCompatibility {
 
       // We will compress it later in finishBlock()
       userDataStream = new DataOutputStream(baosInMemory);
+      if (newBlockType == BlockType.DATA) {
+        this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
+      }
+      this.unencodedDataSizeWritten = 0;
       return userDataStream;
     }
 
+    public void write(KeyValue kv) throws IOException{
+      expectState(State.WRITING);
+      this.dataBlockEncoder.encode(kv, dataBlockEncodingCtx, this.userDataStream);
+      this.unencodedDataSizeWritten += kv.getLength();
+      if (dataBlockEncodingCtx.getHFileContext().isIncludesMvcc()) {
+        this.unencodedDataSizeWritten += WritableUtils.getVIntSize(kv.getMvccVersion());
+      }
+    }
+
     /**
      * Returns the stream for the user to write to. The block writer takes care
      * of handling compression and buffering for caching on write. Can only be
@@ -481,7 +497,7 @@ public class TestHFileBlockCompatibility {
      * Transitions the block writer from the "writing" state to the "block
      * ready" state.  Does nothing if a block is already finished.
      */
-    private void ensureBlockReady() throws IOException {
+    void ensureBlockReady() throws IOException {
       Preconditions.checkState(state != State.INIT,
           "Unexpected state: " + state);
 
@@ -498,7 +514,12 @@ public class TestHFileBlockCompatibility {
      * uncompressed stream for caching on write, if applicable. Sets block
      * write state to "block ready".
      */
-    private void finishBlock() throws IOException {
+    void finishBlock() throws IOException {
+      if (blockType == BlockType.DATA) {
+        this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream,
+            baosInMemory.toByteArray(), blockType);
+        blockType = dataBlockEncodingCtx.getBlockType();
+      }
       userDataStream.flush();
       // This does an array copy, so it is safe to cache this byte array.
       uncompressedBytesWithHeader = baosInMemory.toByteArray();
@@ -508,13 +529,12 @@ public class TestHFileBlockCompatibility {
       // cache-on-write. In a way, the block is ready, but not yet encoded or
       // compressed.
       state = State.BLOCK_READY;
-      if (blockType == BlockType.DATA) {
-        encodeDataBlockForDisk();
+      if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) {
+        onDiskBytesWithHeader = dataBlockEncodingCtx
+            .compressAndEncrypt(uncompressedBytesWithHeader);
       } else {
-        defaultBlockEncodingCtx.compressAfterEncodingWithBlockType(
-            uncompressedBytesWithHeader, blockType);
-        onDiskBytesWithHeader =
-          defaultBlockEncodingCtx.getOnDiskBytesWithHeader();
+        onDiskBytesWithHeader = defaultBlockEncodingCtx
+            .compressAndEncrypt(uncompressedBytesWithHeader);
       }
 
       // put the header for on disk bytes
@@ -528,26 +548,6 @@ public class TestHFileBlockCompatibility {
     }
 
     /**
-     * Encodes this block if it is a data block and encoding is turned on in
-     * {@link #dataBlockEncoder}.
-     */
-    private void encodeDataBlockForDisk() throws IOException {
-      // do data block encoding, if data block encoder is set
-      ByteBuffer rawKeyValues =
-          ByteBuffer.wrap(uncompressedBytesWithHeader, HEADER_SIZE,
-              uncompressedBytesWithHeader.length - HEADER_SIZE).slice();
-
-      //do the encoding
-      dataBlockEncoder.beforeWriteToDisk(rawKeyValues, dataBlockEncodingCtx, blockType);
-
-      uncompressedBytesWithHeader =
-          dataBlockEncodingCtx.getUncompressedBytesWithHeader();
-      onDiskBytesWithHeader =
-          dataBlockEncodingCtx.getOnDiskBytesWithHeader();
-      blockType = dataBlockEncodingCtx.getBlockType();
-    }
-
-    /**
      * Put the header into the given byte array at the given offset.
      * @param onDiskSize size of the block on disk
      * @param uncompressedSize size of the block after decompression (but
@@ -676,7 +676,7 @@ public class TestHFileBlockCompatibility {
     public int blockSizeWritten() {
       if (state != State.WRITING)
         return 0;
-      return userDataStream.size();
+      return this.unencodedDataSizeWritten;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java
index b005d41..0288e92 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.io.hfile;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -26,12 +28,14 @@ import java.util.Collection;
 import java.util.List;
 
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.SmallTests;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
+import org.apache.hadoop.hbase.io.hfile.HFileBlock.Writer.BufferGrabbingByteArrayOutputStream;
 import org.apache.hadoop.hbase.util.ChecksumType;
 import org.apache.hadoop.hbase.util.test.RedundantKVGenerator;
 import org.junit.Test;
@@ -43,7 +47,7 @@ import org.junit.runners.Parameterized.Parameters;
 @RunWith(Parameterized.class)
 @Category(SmallTests.class)
 public class TestHFileDataBlockEncoder {
-  private HFileDataBlockEncoderImpl blockEncoder;
+  private HFileDataBlockEncoder blockEncoder;
   private RedundantKVGenerator generator = new RedundantKVGenerator();
   private boolean includesMemstoreTS;
 
@@ -51,7 +55,7 @@ public class TestHFileDataBlockEncoder {
    * Create test for given data block encoding configuration.
    * @param blockEncoder What kind of encoding policy will be used.
    */
-  public TestHFileDataBlockEncoder(HFileDataBlockEncoderImpl blockEncoder,
+  public TestHFileDataBlockEncoder(HFileDataBlockEncoder blockEncoder,
       boolean includesMemstoreTS) {
     this.blockEncoder = blockEncoder;
     this.includesMemstoreTS = includesMemstoreTS;
@@ -70,8 +74,9 @@ public class TestHFileDataBlockEncoder {
   }
 
   private void testEncodingWithCacheInternals(boolean useTag) throws IOException {
-    HFileBlock block = getSampleHFileBlock(useTag);
-    HFileBlock cacheBlock = createBlockOnDisk(block, useTag);
+    List<KeyValue> kvs = generator.generateTestKeyValues(60, useTag);
+    HFileBlock block = getSampleHFileBlock(kvs, useTag);
+    HFileBlock cacheBlock = createBlockOnDisk(kvs, block, useTag);
 
     LruBlockCache blockCache =
         new LruBlockCache(8 * 1024 * 1024, 32 * 1024);
@@ -105,8 +110,8 @@ public class TestHFileDataBlockEncoder {
   private void testHeaderSizeInCacheWithoutChecksumInternals(boolean useTags) throws IOException {
     int headerSize = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
     // Create some KVs and create the block with old-style header.
-    ByteBuffer keyValues = RedundantKVGenerator.convertKvToByteBuffer(
-        generator.generateTestKeyValues(60, useTags), includesMemstoreTS);
+    List<KeyValue> kvs = generator.generateTestKeyValues(60, useTags);
+    ByteBuffer keyValues = RedundantKVGenerator.convertKvToByteBuffer(kvs, includesMemstoreTS);
     int size = keyValues.limit();
     ByteBuffer buf = ByteBuffer.allocate(size + headerSize);
     buf.position(headerSize);
@@ -121,24 +126,10 @@ public class TestHFileDataBlockEncoder {
     HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf,
         HFileBlock.FILL_HEADER, 0,
         0, hfileContext);
-    HFileBlock cacheBlock = createBlockOnDisk(block, useTags);
+    HFileBlock cacheBlock = createBlockOnDisk(kvs, block, useTags);
     assertEquals(headerSize, cacheBlock.getDummyHeaderForVersion().length);
   }
 
-  private HFileBlock createBlockOnDisk(HFileBlock block, boolean useTags) throws IOException {
-    int size;
-    HFileBlockEncodingContext context = new HFileBlockDefaultEncodingContext(
-        blockEncoder.getDataBlockEncoding(),
-        HConstants.HFILEBLOCK_DUMMY_HEADER, block.getHFileContext());
-    context.setDummyHeader(block.getDummyHeaderForVersion());
-    blockEncoder.beforeWriteToDisk(block.getBufferWithoutHeader(), context, block.getBlockType());
-    byte[] encodedBytes = context.getUncompressedBytesWithHeader();
-    size = encodedBytes.length - block.getDummyHeaderForVersion().length;
-    return new HFileBlock(context.getBlockType(), size, size, -1,
-            ByteBuffer.wrap(encodedBytes), HFileBlock.FILL_HEADER, 0,
-            block.getOnDiskDataSizeWithHeader(), block.getHFileContext());
-  }
-
   /**
    * Test encoding.
    * @throws IOException
@@ -151,8 +142,9 @@ public class TestHFileDataBlockEncoder {
 
   private void testEncodingInternals(boolean useTag) throws IOException {
     // usually we have just block without headers, but don't complicate that
-    HFileBlock block = getSampleHFileBlock(useTag);
-    HFileBlock blockOnDisk = createBlockOnDisk(block, useTag);
+    List<KeyValue> kvs = generator.generateTestKeyValues(60, useTag);
+    HFileBlock block = getSampleHFileBlock(kvs, useTag);
+    HFileBlock blockOnDisk = createBlockOnDisk(kvs, block, useTag);
 
     if (blockEncoder.getDataBlockEncoding() !=
         DataBlockEncoding.NONE) {
@@ -164,9 +156,8 @@ public class TestHFileDataBlockEncoder {
     }
   }
 
-  private HFileBlock getSampleHFileBlock(boolean useTag) {
-    ByteBuffer keyValues = RedundantKVGenerator.convertKvToByteBuffer(
-        generator.generateTestKeyValues(60, useTag), includesMemstoreTS);
+  private HFileBlock getSampleHFileBlock(List<KeyValue> kvs, boolean useTag) {
+    ByteBuffer keyValues = RedundantKVGenerator.convertKvToByteBuffer(kvs, includesMemstoreTS);
     int size = keyValues.limit();
     ByteBuffer buf = ByteBuffer.allocate(size + HConstants.HFILEBLOCK_HEADER_SIZE);
     buf.position(HConstants.HFILEBLOCK_HEADER_SIZE);
@@ -186,6 +177,29 @@ public class TestHFileDataBlockEncoder {
     return b;
   }
 
+  private HFileBlock createBlockOnDisk(List<KeyValue> kvs, HFileBlock block, boolean useTags)
+      throws IOException {
+    int size;
+    HFileBlockEncodingContext context = new HFileBlockDefaultEncodingContext(
+        blockEncoder.getDataBlockEncoding(), HConstants.HFILEBLOCK_DUMMY_HEADER,
+        block.getHFileContext());
+
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    baos.write(block.getDummyHeaderForVersion());
+    DataOutputStream dos = new DataOutputStream(baos);
+    blockEncoder.startBlockEncoding(context, dos);
+    for (KeyValue kv : kvs) {
+      blockEncoder.encode(kv, context, dos);
+    }
+    BufferGrabbingByteArrayOutputStream stream = new BufferGrabbingByteArrayOutputStream();
+    baos.writeTo(stream);
+    blockEncoder.endBlockEncoding(context, dos, stream.getBuffer(), BlockType.DATA);
+    byte[] encodedBytes = baos.toByteArray();
+    size = encodedBytes.length - block.getDummyHeaderForVersion().length;
+    return new HFileBlock(context.getBlockType(), size, size, -1, ByteBuffer.wrap(encodedBytes),
+        HFileBlock.FILL_HEADER, 0, block.getOnDiskDataSizeWithHeader(), block.getHFileContext());
+  }
+
   /**
    * @return All possible data block encoding configurations
    */
@@ -195,10 +209,10 @@ public class TestHFileDataBlockEncoder {
         new ArrayList<Object[]>();
 
     for (DataBlockEncoding diskAlgo : DataBlockEncoding.values()) {
-      for (boolean includesMemstoreTS : new boolean[] {false, true}) {
-        configurations.add(new Object[] {
-            new HFileDataBlockEncoderImpl(diskAlgo),
-            new Boolean(includesMemstoreTS)});
+      for (boolean includesMemstoreTS : new boolean[] { false, true }) {
+        HFileDataBlockEncoder dbe = (diskAlgo == DataBlockEncoding.NONE) ? 
+            NoOpDataBlockEncoder.INSTANCE : new HFileDataBlockEncoderImpl(diskAlgo);
+        configurations.add(new Object[] { dbe, new Boolean(includesMemstoreTS) });
       }
     }
 


Mime
View raw message