hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbau...@apache.org
Subject svn commit: r1302602 [2/2] - in /hbase/trunk/src: main/java/org/apache/hadoop/hbase/io/encoding/ main/java/org/apache/hadoop/hbase/io/hfile/ test/java/org/apache/hadoop/hbase/io/encoding/ test/java/org/apache/hadoop/hbase/io/hfile/ test/java/org/apache...
Date Mon, 19 Mar 2012 19:12:20 GMT
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java?rev=1302602&r1=1302601&r2=1302602&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java Mon
Mar 19 19:12:19 2012
@@ -16,12 +16,17 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
+import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
+import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
+import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
+import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * Does not perform any kind of encoding/decoding.
@@ -45,9 +50,19 @@ public class NoOpDataBlockEncoder implem
   }
 
   @Override
-  public Pair<ByteBuffer, BlockType> beforeWriteToDisk(
-      ByteBuffer in, boolean includesMemstoreTS, byte[] dummyHeader) {
-    return new Pair<ByteBuffer, BlockType>(in, BlockType.DATA);
+  public void beforeWriteToDisk(ByteBuffer in,
+      boolean includesMemstoreTS,
+      HFileBlockEncodingContext encodeCtx, BlockType blockType)
+      throws IOException {
+    if (!(encodeCtx.getClass().getName().equals(
+        HFileBlockDefaultEncodingContext.class.getName()))) {
+      throw new IOException (this.getClass().getName() + " only accepts " +
+          HFileBlockDefaultEncodingContext.class.getName() + ".");
+    }
+
+    HFileBlockDefaultEncodingContext defaultContext =
+        (HFileBlockDefaultEncodingContext) encodeCtx;
+    defaultContext.compressAfterEncoding(in.array(), blockType);
   }
 
   @Override
@@ -79,4 +94,17 @@ public class NoOpDataBlockEncoder implem
     return getClass().getSimpleName();
   }
 
+  @Override
+  public HFileBlockEncodingContext newOnDiskDataBlockEncodingContext(
+      Algorithm compressionAlgorithm, byte[] dummyHeader) {
+    return new HFileBlockDefaultEncodingContext(compressionAlgorithm,
+        null, dummyHeader);
+  }
+
+  @Override
+  public HFileBlockDecodingContext newOnDiskDataBlockDecodingContext(
+      Algorithm compressionAlgorithm) {
+    return new HFileBlockDefaultDecodingContext(compressionAlgorithm);
+  }
+
 }

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java?rev=1302602&r1=1302601&r2=1302602&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java
(original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java
Mon Mar 19 19:12:19 2012
@@ -20,9 +20,7 @@ import static org.junit.Assert.assertEqu
 import static org.junit.Assert.fail;
 
 import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -34,6 +32,8 @@ import org.apache.hadoop.hbase.HBaseTest
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.io.hfile.HFileBlock;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -51,6 +51,9 @@ public class TestDataBlockEncoders {
   static int NUMBER_OF_KV = 10000;
   static int NUM_RANDOM_SEEKS = 10000;
 
+  private static int ENCODED_DATA_OFFSET =
+      HFileBlock.HEADER_SIZE + DataBlockEncoding.ID_SIZE;
+
   private RedundantKVGenerator generator = new RedundantKVGenerator();
   private Random randomizer = new Random(42l);
 
@@ -65,17 +68,44 @@ public class TestDataBlockEncoders {
     this.includesMemstoreTS = includesMemstoreTS;
   }
 
-  private void testAlgorithm(ByteBuffer dataset, DataBlockEncoder encoder)
+  private HFileBlockEncodingContext getEncodingContext(
+      Compression.Algorithm algo, DataBlockEncoding encoding) {
+    DataBlockEncoder encoder = encoding.getEncoder();
+    if (encoder != null) {
+      return encoder.newDataBlockEncodingContext(algo, encoding,
+          HFileBlock.DUMMY_HEADER);
+    } else {
+      return new HFileBlockDefaultEncodingContext(algo, encoding);
+    }
+  }
+
+  private byte[] encodeBytes(DataBlockEncoding encoding,
+      ByteBuffer dataset) throws IOException {
+    DataBlockEncoder encoder = encoding.getEncoder();
+    HFileBlockEncodingContext encodingCtx =
+        getEncodingContext(Compression.Algorithm.NONE, encoding);
+
+    encoder.compressKeyValues(dataset, includesMemstoreTS,
+        encodingCtx);
+
+    byte[] encodedBytesWithHeader =
+        encodingCtx.getUncompressedBytesWithHeader();
+    byte[] encodedData =
+        new byte[encodedBytesWithHeader.length - ENCODED_DATA_OFFSET];
+    System.arraycopy(encodedBytesWithHeader, ENCODED_DATA_OFFSET, encodedData,
+        0, encodedData.length);
+    return encodedData;
+  }
+
+  private void testAlgorithm(ByteBuffer dataset, DataBlockEncoding encoding)
       throws IOException {
     // encode
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    DataOutputStream dataOut = new DataOutputStream(baos);
-    encoder.compressKeyValues(dataOut, dataset, includesMemstoreTS);
-
-    // decode
-    ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+    byte[] encodedBytes = encodeBytes(encoding, dataset);
+    //decode
+    ByteArrayInputStream bais = new ByteArrayInputStream(encodedBytes);
     DataInputStream dis = new DataInputStream(bais);
     ByteBuffer actualDataset;
+    DataBlockEncoder encoder = encoding.getEncoder();
     actualDataset = encoder.uncompressKeyValues(dis, includesMemstoreTS);
 
     dataset.rewind();
@@ -142,17 +172,17 @@ public class TestDataBlockEncoders {
     ByteBuffer originalBuffer =
         RedundantKVGenerator.convertKvToByteBuffer(sampleKv,
             includesMemstoreTS);
-    List<DataBlockEncoder> dataBlockEncoders =
-        DataBlockEncoding.getAllEncoders();
 
     // create all seekers
     List<DataBlockEncoder.EncodedSeeker> encodedSeekers =
         new ArrayList<DataBlockEncoder.EncodedSeeker>();
-    for (DataBlockEncoder encoder : dataBlockEncoders) {
-      ByteArrayOutputStream baos = new ByteArrayOutputStream();
-      DataOutputStream dataOut = new DataOutputStream(baos);
-      encoder.compressKeyValues(dataOut, originalBuffer, includesMemstoreTS);
-      ByteBuffer encodedBuffer = ByteBuffer.wrap(baos.toByteArray());
+    for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
+      if (encoding.getEncoder() == null) {
+        continue;
+      }
+      ByteBuffer encodedBuffer =
+          ByteBuffer.wrap(encodeBytes(encoding, originalBuffer));
+      DataBlockEncoder encoder = encoding.getEncoder();
       DataBlockEncoder.EncodedSeeker seeker =
           encoder.createSeeker(KeyValue.KEY_COMPARATOR, includesMemstoreTS);
       seeker.setCurrentBuffer(encodedBuffer);
@@ -195,20 +225,19 @@ public class TestDataBlockEncoders {
     ByteBuffer originalBuffer =
         RedundantKVGenerator.convertKvToByteBuffer(sampleKv,
             includesMemstoreTS);
-    List<DataBlockEncoder> dataBlockEncoders =
-        DataBlockEncoding.getAllEncoders();
 
-    for (DataBlockEncoder encoder : dataBlockEncoders) {
-      ByteArrayOutputStream baos = new ByteArrayOutputStream();
-      DataOutputStream dataOut = new DataOutputStream(baos);
+    for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
+      if (encoding.getEncoder() == null) {
+        continue;
+      }
+      DataBlockEncoder encoder = encoding.getEncoder();
+      ByteBuffer encodedBuffer = null;
       try {
-        encoder.compressKeyValues(dataOut, originalBuffer, includesMemstoreTS);
+        encodedBuffer = ByteBuffer.wrap(encodeBytes(encoding, originalBuffer));
       } catch (IOException e) {
         throw new RuntimeException(String.format(
             "Bug while encoding using '%s'", encoder.toString()), e);
       }
-
-      ByteBuffer encodedBuffer = ByteBuffer.wrap(baos.toByteArray());
       DataBlockEncoder.EncodedSeeker seeker =
           encoder.createSeeker(KeyValue.KEY_COMPARATOR, includesMemstoreTS);
       seeker.setCurrentBuffer(encodedBuffer);
@@ -255,20 +284,19 @@ public class TestDataBlockEncoders {
     ByteBuffer originalBuffer =
         RedundantKVGenerator.convertKvToByteBuffer(sampleKv,
             includesMemstoreTS);
-    List<DataBlockEncoder> dataBlockEncoders =
-        DataBlockEncoding.getAllEncoders();
 
-    for (DataBlockEncoder encoder : dataBlockEncoders) {
-      ByteArrayOutputStream baos = new ByteArrayOutputStream();
-      DataOutputStream dataOut = new DataOutputStream(baos);
+    for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
+      if (encoding.getEncoder() == null) {
+        continue;
+      }
+      DataBlockEncoder encoder = encoding.getEncoder();
+      ByteBuffer encodedBuffer = null;
       try {
-        encoder.compressKeyValues(dataOut, originalBuffer, includesMemstoreTS);
+        encodedBuffer = ByteBuffer.wrap(encodeBytes(encoding, originalBuffer));
       } catch (IOException e) {
         throw new RuntimeException(String.format(
             "Bug while encoding using '%s'", encoder.toString()), e);
       }
-
-      ByteBuffer encodedBuffer = ByteBuffer.wrap(baos.toByteArray());
       ByteBuffer keyBuffer = encoder.getFirstKeyInBlock(encodedBuffer);
       KeyValue firstKv = sampleKv.get(0);
       if (0 != Bytes.compareTo(
@@ -327,16 +355,17 @@ public class TestDataBlockEncoders {
 
   private void testEncodersOnDataset(ByteBuffer onDataset)
       throws IOException{
-    List<DataBlockEncoder> dataBlockEncoders =
-        DataBlockEncoding.getAllEncoders();
     ByteBuffer dataset = ByteBuffer.allocate(onDataset.capacity());
     onDataset.rewind();
     dataset.put(onDataset);
     onDataset.rewind();
     dataset.flip();
 
-    for (DataBlockEncoder encoder : dataBlockEncoders) {
-      testAlgorithm(dataset, encoder);
+    for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
+      if (encoding.getEncoder() == null) {
+        continue;
+      }
+      testAlgorithm(dataset, encoding);
 
       // ensure that dataset is unchanged
       dataset.rewind();

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java?rev=1302602&r1=1302601&r2=1302602&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java Mon Mar
19 19:12:19 2012
@@ -50,12 +50,15 @@ import org.apache.hadoop.hbase.MediumTes
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.DoubleOutputStream;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
+import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
+import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ChecksumType;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.compress.CompressionOutputStream;
 import org.apache.hadoop.io.compress.Compressor;
 
 import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.*;
@@ -203,7 +206,7 @@ public class TestHFileBlock {
     writeTestBlockContents(dos);
     byte[] headerAndData = hbw.getHeaderAndDataForTest();
     assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader());
-    hbw.releaseCompressor();
+    hbw.release();
     return hbw;
   }
 
@@ -371,9 +374,8 @@ public class TestHFileBlock {
           final List<ByteBuffer> encodedBlocks = new ArrayList<ByteBuffer>();
           for (int blockId = 0; blockId < numBlocks; ++blockId) {
             DataOutputStream dos = hbw.startWriting(BlockType.DATA);
-            writeEncodedBlock(encoding, dos, encodedSizes, encodedBlocks,
-                blockId, includesMemstoreTS);
-
+            writeEncodedBlock(algo, encoding, dos, encodedSizes, encodedBlocks,
+                blockId, includesMemstoreTS, HFileBlock.DUMMY_HEADER);
             hbw.writeHeaderAndData(os);
             totalSize += hbw.getOnDiskSizeWithHeader();
           }
@@ -392,7 +394,6 @@ public class TestHFileBlock {
             assertEquals(0, HFile.getChecksumFailuresCount());
             b.sanityCheck();
             pos += b.getOnDiskSizeWithHeader();
-
             assertEquals((int) encodedSizes.get(blockId),
                 b.getUncompressedSizeWithoutHeader());
             ByteBuffer actualBuffer = b.getBufferWithoutHeader();
@@ -417,35 +418,52 @@ public class TestHFileBlock {
     }
   }
 
-  static void writeEncodedBlock(DataBlockEncoding encoding,
-      DataOutputStream dos, final List<Integer> encodedSizes,
+  static void writeEncodedBlock(Algorithm algo, DataBlockEncoding encoding,
+       DataOutputStream dos, final List<Integer> encodedSizes,
       final List<ByteBuffer> encodedBlocks, int blockId, 
-      boolean includesMemstoreTS) throws IOException {
+      boolean includesMemstoreTS, byte[] dummyHeader) throws IOException {
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     DoubleOutputStream doubleOutputStream =
         new DoubleOutputStream(dos, baos);
-
-    final int rawBlockSize = writeTestKeyValues(doubleOutputStream,
-        blockId, includesMemstoreTS);
-
+    writeTestKeyValues(doubleOutputStream, blockId, includesMemstoreTS);
     ByteBuffer rawBuf = ByteBuffer.wrap(baos.toByteArray());
     rawBuf.rewind();
 
-    final int encodedSize;
-    final ByteBuffer encodedBuf;
-    if (encoding == DataBlockEncoding.NONE) {
-      encodedSize = rawBlockSize;
-      encodedBuf = rawBuf;
+    DataBlockEncoder encoder = encoding.getEncoder();
+    int headerLen = dummyHeader.length;
+    byte[] encodedResultWithHeader = null;
+    if (encoder != null) {
+      HFileBlockEncodingContext encodingCtx =
+          encoder.newDataBlockEncodingContext(algo, encoding, dummyHeader);
+      encoder.compressKeyValues(rawBuf, includesMemstoreTS,
+          encodingCtx);
+      encodedResultWithHeader =
+          encodingCtx.getUncompressedBytesWithHeader();
     } else {
-      ByteArrayOutputStream encodedOut = new ByteArrayOutputStream();
-      encoding.getEncoder().compressKeyValues(
-          new DataOutputStream(encodedOut),
-          rawBuf.duplicate(), includesMemstoreTS);
+      HFileBlockDefaultEncodingContext defaultEncodingCtx =
+        new HFileBlockDefaultEncodingContext(algo, encoding, dummyHeader);
+      byte[] rawBufWithHeader =
+          new byte[rawBuf.array().length + headerLen];
+      System.arraycopy(rawBuf.array(), 0, rawBufWithHeader,
+          headerLen, rawBuf.array().length);
+      defaultEncodingCtx.compressAfterEncoding(rawBufWithHeader,
+          BlockType.DATA);
+      encodedResultWithHeader =
+        defaultEncodingCtx.getUncompressedBytesWithHeader();
+    }
+    final int encodedSize =
+        encodedResultWithHeader.length - headerLen;
+    if (encoder != null) {
       // We need to account for the two-byte encoding algorithm ID that
       // comes after the 24-byte block header but before encoded KVs.
-      encodedSize = encodedOut.size() + DataBlockEncoding.ID_SIZE;
-      encodedBuf = ByteBuffer.wrap(encodedOut.toByteArray());
+      headerLen += DataBlockEncoding.ID_SIZE;
     }
+    byte[] encodedDataSection =
+        new byte[encodedResultWithHeader.length - headerLen];
+    System.arraycopy(encodedResultWithHeader, headerLen,
+        encodedDataSection, 0, encodedDataSection.length);
+    final ByteBuffer encodedBuf =
+        ByteBuffer.wrap(encodedDataSection);
     encodedSizes.add(encodedSize);
     encodedBlocks.add(encodedBuf);
   }

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java?rev=1302602&r1=1302601&r2=1302602&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java
(original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java
Mon Mar 19 19:12:19 2012
@@ -19,7 +19,11 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
-import static org.junit.Assert.*;
+import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.GZ;
+import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.NONE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
@@ -28,40 +32,23 @@ import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.MediumTests;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.fs.HFileSystem;
-import org.apache.hadoop.hbase.io.DoubleOutputStream;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ClassSize;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.compress.CompressionOutputStream;
-import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
+import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
 import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ChecksumType;
-import org.apache.hadoop.hbase.util.Pair;
-import com.google.common.base.Preconditions;
-
-import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.*;
+import org.apache.hadoop.io.compress.Compressor;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -69,6 +56,8 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
+import com.google.common.base.Preconditions;
+
 /**
  * This class has unit tests to prove that older versions of
  * HFiles (without checksums) are compatible with current readers.
@@ -129,7 +118,8 @@ public class TestHFileBlockCompatibility
         includesMemstoreTS);
     DataOutputStream dos = hbw.startWriting(blockType);
     TestHFileBlock.writeTestBlockContents(dos);
-    byte[] headerAndData = hbw.getHeaderAndData();
+    // make sure the block is ready by calling hbw.getHeaderAndData()
+    hbw.getHeaderAndData();
     assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader());
     hbw.releaseCompressor();
     return hbw;
@@ -145,7 +135,7 @@ public class TestHFileBlockCompatibility
       // variations across operating systems.
       // See http://www.gzip.org/zlib/rfc-gzip.html for gzip format.
       testV2Block[osOffset] = 3;
-    }   
+    }
     return Bytes.toStringBinary(testV2Block);
   }
 
@@ -173,8 +163,9 @@ public class TestHFileBlockCompatibility
             + "\\xD6\\xE8\\xA3\\xB9K\\x84`\\x96Q\\xD3\\xA8\\xDB\\xA8e\\xD4c"
             + "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\s\\xA0\\x0F\\x00\\x00";
     final int correctGzipBlockLength = 82;
-    assertEquals(correctTestBlockStr, createTestBlockStr(GZ,
-        correctGzipBlockLength));
+
+    String returnedStr = createTestBlockStr(GZ, correctGzipBlockLength);
+    assertEquals(correctTestBlockStr, returnedStr);
   }
 
   @Test
@@ -288,16 +279,19 @@ public class TestHFileBlockCompatibility
               + algo + "_" + encoding.toString());
           FSDataOutputStream os = fs.create(path);
           HFileDataBlockEncoder dataBlockEncoder =
-              new HFileDataBlockEncoderImpl(encoding);
-          Writer hbw = new Writer(algo, dataBlockEncoder,
-              includesMemstoreTS);
+              new HFileDataBlockEncoderImpl(encoding, encoding,
+                  TestHFileBlockCompatibility.Writer.DUMMY_HEADER);
+          TestHFileBlockCompatibility.Writer hbw =
+              new TestHFileBlockCompatibility.Writer(algo,
+                  dataBlockEncoder, includesMemstoreTS);
           long totalSize = 0;
           final List<Integer> encodedSizes = new ArrayList<Integer>();
           final List<ByteBuffer> encodedBlocks = new ArrayList<ByteBuffer>();
           for (int blockId = 0; blockId < numBlocks; ++blockId) {
             DataOutputStream dos = hbw.startWriting(BlockType.DATA);
-            TestHFileBlock.writeEncodedBlock(encoding, dos, encodedSizes, encodedBlocks,
-                blockId, includesMemstoreTS);
+            TestHFileBlock.writeEncodedBlock(algo, encoding, dos, encodedSizes,
+                encodedBlocks, blockId, includesMemstoreTS,
+                TestHFileBlockCompatibility.Writer.DUMMY_HEADER);
 
             hbw.writeHeaderAndData(os);
             totalSize += hbw.getOnDiskSizeWithHeader();
@@ -332,8 +326,8 @@ public class TestHFileBlockCompatibility
             expectedBuffer.rewind();
 
             // test if content matches, produce nice message
-            TestHFileBlock.assertBuffersEqual(expectedBuffer, actualBuffer, algo, encoding,
-                pread);
+            TestHFileBlock.assertBuffersEqual(expectedBuffer, actualBuffer,
+              algo, encoding, pread);
           }
           is.close();
         }
@@ -378,6 +372,10 @@ public class TestHFileBlockCompatibility
     /** Data block encoder used for data blocks */
     private final HFileDataBlockEncoder dataBlockEncoder;
 
+    private HFileBlockEncodingContext dataBlockEncodingCtx;
+    /** block encoding context for non-data blocks */
+    private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx;
+
     /**
      * The stream we use to accumulate data in uncompressed format for each
      * block. We reset this stream at the end of each block and reuse it. The
@@ -389,12 +387,6 @@ public class TestHFileBlockCompatibility
     /** Compressor, which is also reused between consecutive blocks. */
     private Compressor compressor;
 
-    /** Compression output stream */
-    private CompressionOutputStream compressionStream;
-
-    /** Underlying stream to write compressed bytes to */
-    private ByteArrayOutputStream compressedByteStream;
-
     /**
      * Current block type. Set in {@link #startWriting(BlockType)}. Could be
      * changed in {@link #encodeDataBlockForDisk()} from {@link BlockType#DATA}
@@ -449,19 +441,14 @@ public class TestHFileBlockCompatibility
       this.dataBlockEncoder = dataBlockEncoder != null
           ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
 
+      defaultBlockEncodingCtx =
+          new HFileBlockDefaultEncodingContext(compressionAlgorithm,
+              null, DUMMY_HEADER);
+      dataBlockEncodingCtx =
+        this.dataBlockEncoder.newOnDiskDataBlockEncodingContext(
+            compressionAlgorithm, DUMMY_HEADER);
+
       baosInMemory = new ByteArrayOutputStream();
-      if (compressAlgo != NONE) {
-        compressor = compressionAlgorithm.getCompressor();
-        compressedByteStream = new ByteArrayOutputStream();
-        try {
-          compressionStream =
-              compressionAlgorithm.createPlainCompressionStream(
-                  compressedByteStream, compressor);
-        } catch (IOException e) {
-          throw new RuntimeException("Could not create compression stream " +
-              "for algorithm " + compressionAlgorithm, e);
-        }
-      }
 
       prevOffsetByType = new long[BlockType.values().length];
       for (int i = 0; i < prevOffsetByType.length; ++i)
@@ -532,48 +519,31 @@ public class TestHFileBlockCompatibility
      */
     private void finishBlock() throws IOException {
       userDataStream.flush();
-
       // This does an array copy, so it is safe to cache this byte array.
       uncompressedBytesWithHeader = baosInMemory.toByteArray();
-      LOG.warn("Writer.finishBlock user data size with header before compression " +
-               uncompressedBytesWithHeader.length);
       prevOffset = prevOffsetByType[blockType.getId()];
 
       // We need to set state before we can package the block up for
       // cache-on-write. In a way, the block is ready, but not yet encoded or
       // compressed.
       state = State.BLOCK_READY;
-      encodeDataBlockForDisk();
-
-      doCompression();
-      putHeader(uncompressedBytesWithHeader, 0, onDiskBytesWithHeader.length,
-          uncompressedBytesWithHeader.length);
-    }
-
-    /**
-     * Do compression if it is enabled, or re-use the uncompressed buffer if
-     * it is not. Fills in the compressed block's header if doing compression.
-     */
-    private void doCompression() throws IOException {
-      // do the compression
-      if (compressAlgo != NONE) {
-        compressedByteStream.reset();
-        compressedByteStream.write(DUMMY_HEADER);
-
-        compressionStream.resetState();
-
-        compressionStream.write(uncompressedBytesWithHeader, HEADER_SIZE,
-            uncompressedBytesWithHeader.length - HEADER_SIZE);
-
-        compressionStream.flush();
-        compressionStream.finish();
-
-        onDiskBytesWithHeader = compressedByteStream.toByteArray();
-        putHeader(onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length,
-            uncompressedBytesWithHeader.length);
+      if (blockType == BlockType.DATA) {
+        encodeDataBlockForDisk();
       } else {
-        onDiskBytesWithHeader = uncompressedBytesWithHeader;
+        defaultBlockEncodingCtx.compressAfterEncoding(
+            uncompressedBytesWithHeader, blockType);
+        onDiskBytesWithHeader =
+          defaultBlockEncodingCtx.getOnDiskBytesWithHeader();
       }
+
+      // put the header for on disk bytes
+      putHeader(onDiskBytesWithHeader, 0,
+          onDiskBytesWithHeader.length,
+          uncompressedBytesWithHeader.length);
+      //set the header for the uncompressed bytes (for cache-on-write)
+      putHeader(uncompressedBytesWithHeader, 0,
+          onDiskBytesWithHeader.length,
+        uncompressedBytesWithHeader.length);
     }
 
     /**
@@ -581,35 +551,20 @@ public class TestHFileBlockCompatibility
      * {@link #dataBlockEncoder}.
      */
     private void encodeDataBlockForDisk() throws IOException {
-      if (blockType != BlockType.DATA) {
-        return; // skip any non-data block
-      }
-
       // do data block encoding, if data block encoder is set
-      ByteBuffer rawKeyValues = ByteBuffer.wrap(uncompressedBytesWithHeader,
-          HEADER_SIZE, uncompressedBytesWithHeader.length -
-          HEADER_SIZE).slice();
-      Pair<ByteBuffer, BlockType> encodingResult =
-          dataBlockEncoder.beforeWriteToDisk(rawKeyValues,
-              includesMemstoreTS, DUMMY_HEADER);
-
-      BlockType encodedBlockType = encodingResult.getSecond();
-      if (encodedBlockType == BlockType.ENCODED_DATA) {
-        uncompressedBytesWithHeader = encodingResult.getFirst().array();
-        blockType = BlockType.ENCODED_DATA;
-      } else {
-        // There is no encoding configured. Do some extra sanity-checking.
-        if (encodedBlockType != BlockType.DATA) {
-          throw new IOException("Unexpected block type coming out of data " +
-              "block encoder: " + encodedBlockType);
-        }
-        if (userDataStream.size() !=
-            uncompressedBytesWithHeader.length - HEADER_SIZE) {
-          throw new IOException("Uncompressed size mismatch: "
-              + userDataStream.size() + " vs. "
-              + (uncompressedBytesWithHeader.length - HEADER_SIZE));
-        }
-      }
+      ByteBuffer rawKeyValues =
+          ByteBuffer.wrap(uncompressedBytesWithHeader, HEADER_SIZE,
+              uncompressedBytesWithHeader.length - HEADER_SIZE).slice();
+
+      //do the encoding
+      dataBlockEncoder.beforeWriteToDisk(rawKeyValues,
+              includesMemstoreTS, dataBlockEncodingCtx, blockType);
+
+      uncompressedBytesWithHeader =
+          dataBlockEncodingCtx.getUncompressedBytesWithHeader();
+      onDiskBytesWithHeader =
+          dataBlockEncodingCtx.getOnDiskBytesWithHeader();
+      blockType = dataBlockEncodingCtx.getBlockType();
     }
 
     /**
@@ -802,5 +757,6 @@ public class TestHFileBlockCompatibility
           getOnDiskSizeWithoutHeader());
     }
   }
+
 }
 

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java?rev=1302602&r1=1302601&r2=1302602&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java
(original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java
Mon Mar 19 19:12:19 2012
@@ -30,11 +30,12 @@ import org.apache.hadoop.hbase.HBaseTest
 import org.apache.hadoop.hbase.SmallTests;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
+import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
 import org.apache.hadoop.hbase.io.encoding.RedundantKVGenerator;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.util.ChecksumType;
-import org.apache.hadoop.hbase.util.Pair;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -117,18 +118,23 @@ public class TestHFileDataBlockEncoder {
 
   /**
    * Test writing to disk.
+   * @throws IOException
    */
   @Test
-  public void testEncodingWritePath() {
+  public void testEncodingWritePath() throws IOException {
     // usually we have just block without headers, but don't complicate that
     HFileBlock block = getSampleHFileBlock();
-    Pair<ByteBuffer, BlockType> result =
-        blockEncoder.beforeWriteToDisk(block.getBufferWithoutHeader(),
-            includesMemstoreTS, HFileBlock.DUMMY_HEADER);
-
-    int size = result.getFirst().limit() - HFileBlock.HEADER_SIZE;
-    HFileBlock blockOnDisk = new HFileBlock(result.getSecond(),
-        size, size, -1, result.getFirst(), HFileBlock.FILL_HEADER, 0,
+    HFileBlockEncodingContext context =
+        new HFileBlockDefaultEncodingContext(
+            Compression.Algorithm.NONE, blockEncoder.getEncodingOnDisk());
+    blockEncoder.beforeWriteToDisk(block.getBufferWithoutHeader(),
+            includesMemstoreTS, context, block.getBlockType());
+
+    byte[] encodedBytes = context.getUncompressedBytesWithHeader();
+    int size = encodedBytes.length - HFileBlock.HEADER_SIZE;
+    HFileBlock blockOnDisk =
+        new HFileBlock(context.getBlockType(), size, size, -1,
+            ByteBuffer.wrap(encodedBytes), HFileBlock.FILL_HEADER, 0,
         includesMemstoreTS, block.getMinorVersion(),
         block.getBytesPerChecksum(), block.getChecksumType(),
         block.getOnDiskDataSizeWithHeader());

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java?rev=1302602&r1=1302601&r2=1302602&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java
(original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java
Mon Mar 19 19:12:19 2012
@@ -115,11 +115,10 @@ public class DataBlockEncodingTool {
     byte[] previousKey = null;
     byte[] currentKey;
 
-    List<DataBlockEncoder> dataBlockEncoders =
-        DataBlockEncoding.getAllEncoders();
-
-    for (DataBlockEncoder d : dataBlockEncoders) {
-      codecs.add(new EncodedDataBlock(d, includesMemstoreTS));
+    DataBlockEncoding[] encodings = DataBlockEncoding.values();
+    for(DataBlockEncoding encoding : encodings) {
+      DataBlockEncoder d = encoding.getEncoder();
+      codecs.add(new EncodedDataBlock(d, includesMemstoreTS, encoding));
     }
 
     int j = 0;
@@ -280,7 +279,7 @@ public class DataBlockEncodingTool {
     List<Long> compressDurations = new ArrayList<Long>();
     for (int itTime = 0; itTime < BENCHMARK_N_TIMES; ++itTime) {
       final long startTime = System.nanoTime();
-      codec.doCompressData();
+      codec.encodeData();
       final long finishTime = System.nanoTime();
       if (itTime >= BENCHMARK_N_OMIT) {
         compressDurations.add(finishTime - startTime);



Mime
View raw message