hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From syuanji...@apache.org
Subject [13/50] [abbrv] hbase git commit: Revert "HBASE-15477 Purge 'next block header' from cached blocks"
Date Wed, 30 Mar 2016 16:03:15 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/54a543de/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 69c42c3..66aced0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -1317,22 +1317,25 @@ public class BucketCache implements BlockCache, HeapSize {
         final AtomicLong realCacheSize) throws CacheFullException, IOException,
         BucketAllocatorException {
       int len = data.getSerializedLength();
-      // This cacheable thing can't be serialized
+      // This cacheable thing can't be serialized...
       if (len == 0) return null;
       long offset = bucketAllocator.allocateBlock(len);
       BucketEntry bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory);
       bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap);
       try {
         if (data instanceof HFileBlock) {
-          // If an instance of HFileBlock, save on some allocations.
-          HFileBlock block = (HFileBlock)data;
-          ByteBuff sliceBuf = block.getBufferReadOnly();
-          ByteBuffer metadata = block.getMetaData();
+          HFileBlock block = (HFileBlock) data;
+          ByteBuff sliceBuf = block.getBufferReadOnlyWithHeader();
+          sliceBuf.rewind();
+          assert len == sliceBuf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE ||
+            len == sliceBuf.limit() + block.headerSize() + HFileBlock.EXTRA_SERIALIZATION_SPACE;
+          ByteBuffer extraInfoBuffer = ByteBuffer.allocate(HFileBlock.EXTRA_SERIALIZATION_SPACE);
+          block.serializeExtraInfo(extraInfoBuffer);
           if (LOG.isTraceEnabled()) {
             LOG.trace("Write offset=" + offset + ", len=" + len);
           }
           ioEngine.write(sliceBuf, offset);
-          ioEngine.write(metadata, offset + len - metadata.limit());
+          ioEngine.write(extraInfoBuffer, offset + len - HFileBlock.EXTRA_SERIALIZATION_SPACE);
         } else {
           ByteBuffer bb = ByteBuffer.allocate(len);
           data.serialize(bb);

http://git-wip-us.apache.org/repos/asf/hbase/blob/54a543de/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
index ed86a83..e26022e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
@@ -18,7 +18,6 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import java.io.Closeable;
 import java.io.IOException;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -33,7 +32,7 @@ import org.apache.hadoop.hbase.client.Scan;
 // TODO: Change name from KeyValueScanner to CellScanner only we already have a simple CellScanner
 // so this should be something else altogether, a decoration on our base CellScanner. TODO.
 // This class shows in CPs so do it all in one swell swoop. HBase-2.0.0.
-public interface KeyValueScanner extends Shipper, Closeable {
+public interface KeyValueScanner extends Shipper {
   /**
    * The byte array represents for NO_NEXT_INDEXED_KEY;
    * The actual value is irrelevant because this is always compared by reference.
@@ -75,7 +74,6 @@ public interface KeyValueScanner extends Shipper, Closeable {
    * The default implementation for this would be to return 0. A file having
    * lower sequence id will be considered to be the older one.
    */
-  // TODO: Implement SequenceId Interface instead.
   long getSequenceID();
 
   /**
@@ -139,11 +137,11 @@ public interface KeyValueScanner extends Shipper, Closeable {
    * peek KeyValue of scanner has the same row with specified Cell,
    * otherwise seek the scanner at the first Cell of the row which is the
    * previous row of specified KeyValue
-   *
+   * 
    * @param key seek KeyValue
    * @return true if the scanner is at the valid KeyValue, false if such
    *         KeyValue does not exist
-   *
+   * 
    */
   public boolean backwardSeek(Cell key) throws IOException;
 
@@ -158,7 +156,7 @@ public interface KeyValueScanner extends Shipper, Closeable {
 
   /**
    * Seek the scanner at the first KeyValue of last row
-   *
+   * 
    * @return true if scanner has values left, false if the underlying data is
    *         empty
    * @throws IOException
@@ -171,4 +169,4 @@ public interface KeyValueScanner extends Shipper, Closeable {
    * see HFileWriterImpl#getMidpoint, or null if not known.
    */
   public Cell getNextIndexedKey();
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/54a543de/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
index b6164b2..61eb9b8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
@@ -1271,7 +1271,7 @@ public class StoreFile {
     }
 
     /**
-     * @deprecated Do not write further code which depends on this call. Instead
+     * Warning: Do not write further code which depends on this call. Instead
      * use getStoreFileScanner() which uses the StoreFileScanner class/interface
      * which is the preferred way to scan a store with higher level concepts.
      *
@@ -1285,7 +1285,7 @@ public class StoreFile {
     }
 
     /**
-     * @deprecated Do not write further code which depends on this call. Instead
+     * Warning: Do not write further code which depends on this call. Instead
      * use getStoreFileScanner() which uses the StoreFileScanner class/interface
      * which is the preferred way to scan a store with higher level concepts.
      *

http://git-wip-us.apache.org/repos/asf/hbase/blob/54a543de/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
index 040685d..69671e2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
 import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.hadoop.hbase.util.ChecksumType;
 
 public class CacheTestUtils {
@@ -65,7 +66,6 @@ public class CacheTestUtils {
     /*Post eviction, heapsize should be the same */
     assertEquals(heapSize, ((HeapSize) toBeTested).heapSize());
   }
-
   public static void testCacheMultiThreaded(final BlockCache toBeTested,
       final int blockSize, final int numThreads, final int numQueries,
       final double passingScore) throws Exception {
@@ -339,16 +339,25 @@ public class CacheTestUtils {
   }
 
 
-  private static HFileBlockPair[] generateHFileBlocks(int blockSize, int numBlocks) {
+  private static HFileBlockPair[] generateHFileBlocks(int blockSize,
+      int numBlocks) {
     HFileBlockPair[] returnedBlocks = new HFileBlockPair[numBlocks];
     Random rand = new Random();
     HashSet<String> usedStrings = new HashSet<String>();
     for (int i = 0; i < numBlocks; i++) {
-      ByteBuffer cachedBuffer = ByteBuffer.allocate(blockSize);
+
+      // The buffer serialized size needs to match the size of BlockSize. So we
+      // declare our data size to be smaller than it by the serialization space
+      // required.
+
+      SingleByteBuff cachedBuffer = new SingleByteBuff(ByteBuffer.allocate(blockSize
+          - HFileBlock.EXTRA_SERIALIZATION_SPACE));
       rand.nextBytes(cachedBuffer.array());
       cachedBuffer.rewind();
-      int onDiskSizeWithoutHeader = blockSize;
-      int uncompressedSizeWithoutHeader = blockSize;
+      int onDiskSizeWithoutHeader = blockSize
+          - HFileBlock.EXTRA_SERIALIZATION_SPACE;
+      int uncompressedSizeWithoutHeader = blockSize
+          - HFileBlock.EXTRA_SERIALIZATION_SPACE;
       long prevBlockOffset = rand.nextLong();
       BlockType.DATA.write(cachedBuffer);
       cachedBuffer.putInt(onDiskSizeWithoutHeader);
@@ -367,7 +376,7 @@ public class CacheTestUtils {
           onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader,
           prevBlockOffset, cachedBuffer, HFileBlock.DONT_FILL_HEADER,
           blockSize,
-          onDiskSizeWithoutHeader + HConstants.HFILEBLOCK_HEADER_SIZE, -1, meta);
+          onDiskSizeWithoutHeader + HConstants.HFILEBLOCK_HEADER_SIZE, meta);
 
       String strKey;
       /* No conflicting keys */
@@ -386,4 +395,4 @@ public class CacheTestUtils {
     BlockCacheKey blockName;
     HFileBlock block;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/54a543de/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
index a9d8258..5158e35 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
@@ -259,6 +259,7 @@ public class TestCacheOnWrite {
     assertTrue(testDescription, scanner.seekTo());
 
     long offset = 0;
+    HFileBlock prevBlock = null;
     EnumMap<BlockType, Integer> blockCountByType =
         new EnumMap<BlockType, Integer>(BlockType.class);
 
@@ -266,10 +267,14 @@ public class TestCacheOnWrite {
     List<Long> cachedBlocksOffset = new ArrayList<Long>();
     Map<Long, HFileBlock> cachedBlocks = new HashMap<Long, HFileBlock>();
     while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
+      long onDiskSize = -1;
+      if (prevBlock != null) {
+         onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader();
+      }
       // Flags: don't cache the block, use pread, this is not a compaction.
       // Also, pass null for expected block type to avoid checking it.
-      HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null,
-          encodingInCache);
+      HFileBlock block = reader.readBlock(offset, onDiskSize, false, true,
+        false, true, null, encodingInCache);
       BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(),
           offset);
       HFileBlock fromCache = (HFileBlock) blockCache.getBlock(blockCacheKey, true, false, true);
@@ -302,6 +307,7 @@ public class TestCacheOnWrite {
         assertEquals(
           block.getUncompressedSizeWithoutHeader(), fromCache.getUncompressedSizeWithoutHeader());
       }
+      prevBlock = block;
       offset += block.getOnDiskSizeWithHeader();
       BlockType bt = block.getBlockType();
       Integer count = blockCountByType.get(bt);

http://git-wip-us.apache.org/repos/asf/hbase/blob/54a543de/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
index d91a811..91ab8c0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
@@ -94,7 +94,7 @@ public class TestChecksum {
     meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
     HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(
         is, totalSize, (HFileSystem) fs, path, meta);
-    HFileBlock b = hbr.readBlockData(0, -1, false);
+    HFileBlock b = hbr.readBlockData(0, -1, -1, false);
     assertEquals(b.getChecksumType(), ChecksumType.getDefaultChecksumType().getCode());
   }
 
@@ -108,14 +108,12 @@ public class TestChecksum {
       ChecksumType cktype = itr.next();
       Path path = new Path(TEST_UTIL.getDataTestDir(), "checksum" + cktype.getName());
       FSDataOutputStream os = fs.create(path);
-      HFileContext meta = new HFileContextBuilder().
-          withChecksumType(cktype).
-          build();
+      HFileContext meta = new HFileContextBuilder()
+          .withChecksumType(cktype).build();
       HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
       DataOutputStream dos = hbw.startWriting(BlockType.DATA);
-      for (int i = 0; i < 1000; ++i) {
+      for (int i = 0; i < 1000; ++i)
         dos.writeInt(i);
-      }
       hbw.writeHeaderAndData(os);
       int totalSize = hbw.getOnDiskSizeWithHeader();
       os.close();
@@ -127,7 +125,7 @@ public class TestChecksum {
       meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
       HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(
           is, totalSize, (HFileSystem) fs, path, meta);
-      HFileBlock b = hbr.readBlockData(0, -1, false);
+      HFileBlock b = hbr.readBlockData(0, -1, -1, false);
       ByteBuff data = b.getBufferWithoutHeader();
       for (int i = 0; i < 1000; i++) {
         assertEquals(i, data.getInt());
@@ -190,7 +188,7 @@ public class TestChecksum {
               .withHBaseCheckSum(true)
               .build();
         HFileBlock.FSReader hbr = new FSReaderImplTest(is, totalSize, fs, path, meta);
-        HFileBlock b = hbr.readBlockData(0, -1, pread);
+        HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
         b.sanityCheck();
         assertEquals(4936, b.getUncompressedSizeWithoutHeader());
         assertEquals(algo == GZ ? 2173 : 4936, 
@@ -211,17 +209,17 @@ public class TestChecksum {
         // requests. Verify that this is correct.
         for (int i = 0; i < 
              HFileBlock.CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD + 1; i++) {
-          b = hbr.readBlockData(0, -1, pread);
+          b = hbr.readBlockData(0, -1, -1, pread);
           assertEquals(0, HFile.getChecksumFailuresCount());
         }
         // The next read should have hbase checksum verification reanabled,
         // we verify this by assertng that there was a hbase-checksum failure.
-        b = hbr.readBlockData(0, -1, pread);
+        b = hbr.readBlockData(0, -1, -1, pread);
         assertEquals(1, HFile.getChecksumFailuresCount());
 
         // Since the above encountered a checksum failure, we switch
         // back to not checking hbase checksums.
-        b = hbr.readBlockData(0, -1, pread);
+        b = hbr.readBlockData(0, -1, -1, pread);
         assertEquals(0, HFile.getChecksumFailuresCount());
         is.close();
 
@@ -232,7 +230,7 @@ public class TestChecksum {
         assertEquals(false, newfs.useHBaseChecksum());
         is = new FSDataInputStreamWrapper(newfs, path);
         hbr = new FSReaderImplTest(is, totalSize, newfs, path, meta);
-        b = hbr.readBlockData(0, -1, pread);
+        b = hbr.readBlockData(0, -1, -1, pread);
         is.close();
         b.sanityCheck();
         b = b.unpack(meta, hbr);
@@ -316,7 +314,7 @@ public class TestChecksum {
                .build();
         HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper(
             is, nochecksum), totalSize, hfs, path, meta);
-        HFileBlock b = hbr.readBlockData(0, -1, pread);
+        HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
         is.close();
         b.sanityCheck();
         assertEquals(dataSize, b.getUncompressedSizeWithoutHeader());
@@ -356,4 +354,5 @@ public class TestChecksum {
       return false;  // checksum validation failure
     }
   }
-}
\ No newline at end of file
+}
+

http://git-wip-us.apache.org/repos/asf/hbase/blob/54a543de/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
index eb87a0c..6748efc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
@@ -320,7 +320,7 @@ public class TestHFileBlock {
         .withIncludesTags(includesTag)
         .withCompression(algo).build();
         HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta);
-        HFileBlock b = hbr.readBlockData(0, -1, pread);
+        HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
         is.close();
         assertEquals(0, HFile.getChecksumFailuresCount());
 
@@ -334,15 +334,17 @@ public class TestHFileBlock {
           is = fs.open(path);
           hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta);
           b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE +
-                                b.totalChecksumBytes(), pread);
+                                b.totalChecksumBytes(), -1, pread);
           assertEquals(expected, b);
           int wrongCompressedSize = 2172;
           try {
             b = hbr.readBlockData(0, wrongCompressedSize
-                + HConstants.HFILEBLOCK_HEADER_SIZE, pread);
+                + HConstants.HFILEBLOCK_HEADER_SIZE, -1, pread);
             fail("Exception expected");
           } catch (IOException ex) {
-            String expectedPrefix = "Passed in onDiskSizeWithHeader=";
+            String expectedPrefix = "On-disk size without header provided is "
+                + wrongCompressedSize + ", but block header contains "
+                + b.getOnDiskSizeWithoutHeader() + ".";
             assertTrue("Invalid exception message: '" + ex.getMessage()
                 + "'.\nMessage is expected to start with: '" + expectedPrefix
                 + "'", ex.getMessage().startsWith(expectedPrefix));
@@ -422,7 +424,7 @@ public class TestHFileBlock {
           HFileBlock blockFromHFile, blockUnpacked;
           int pos = 0;
           for (int blockId = 0; blockId < numBlocks; ++blockId) {
-            blockFromHFile = hbr.readBlockData(pos, -1, pread);
+            blockFromHFile = hbr.readBlockData(pos, -1, -1, pread);
             assertEquals(0, HFile.getChecksumFailuresCount());
             blockFromHFile.sanityCheck();
             pos += blockFromHFile.getOnDiskSizeWithHeader();
@@ -558,7 +560,7 @@ public class TestHFileBlock {
             if (detailedLogging) {
               LOG.info("Reading block #" + i + " at offset " + curOffset);
             }
-            HFileBlock b = hbr.readBlockData(curOffset, -1, pread);
+            HFileBlock b = hbr.readBlockData(curOffset, -1, -1, pread);
             if (detailedLogging) {
               LOG.info("Block #" + i + ": " + b);
             }
@@ -572,7 +574,8 @@ public class TestHFileBlock {
 
             // Now re-load this block knowing the on-disk size. This tests a
             // different branch in the loader.
-            HFileBlock b2 = hbr.readBlockData(curOffset, b.getOnDiskSizeWithHeader(), pread);
+            HFileBlock b2 = hbr.readBlockData(curOffset,
+                b.getOnDiskSizeWithHeader(), -1, pread);
             b2.sanityCheck();
 
             assertEquals(b.getBlockType(), b2.getBlockType());
@@ -598,7 +601,7 @@ public class TestHFileBlock {
               b = b.unpack(meta, hbr);
               // b's buffer has header + data + checksum while
               // expectedContents have header + data only
-              ByteBuff bufRead = b.getBufferReadOnly();
+              ByteBuff bufRead = b.getBufferWithHeader();
               ByteBuffer bufExpected = expectedContents.get(i);
               boolean bytesAreCorrect = Bytes.compareTo(bufRead.array(),
                   bufRead.arrayOffset(),
@@ -681,7 +684,7 @@ public class TestHFileBlock {
         HFileBlock b;
         try {
           long onDiskSizeArg = withOnDiskSize ? expectedSize : -1;
-          b = hbr.readBlockData(offset, onDiskSizeArg, pread);
+          b = hbr.readBlockData(offset, onDiskSizeArg, -1, pread);
         } catch (IOException ex) {
           LOG.error("Error in client " + clientId + " trying to read block at "
               + offset + ", pread=" + pread + ", withOnDiskSize=" +
@@ -716,7 +719,8 @@ public class TestHFileBlock {
   protected void testConcurrentReadingInternals() throws IOException,
       InterruptedException, ExecutionException {
     for (Compression.Algorithm compressAlgo : COMPRESSION_ALGORITHMS) {
-      Path path = new Path(TEST_UTIL.getDataTestDir(), "concurrent_reading");
+      Path path =
+          new Path(TEST_UTIL.getDataTestDir(), "concurrent_reading");
       Random rand = defaultRandom();
       List<Long> offsets = new ArrayList<Long>();
       List<BlockType> types = new ArrayList<BlockType>();
@@ -839,7 +843,8 @@ public class TestHFileBlock {
                           .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
                           .withChecksumType(ChecksumType.NULL).build();
       HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf,
-          HFileBlock.FILL_HEADER, -1, 0, -1, meta);
+          HFileBlock.FILL_HEADER, -1,
+          0, meta);
       long byteBufferExpectedSize = ClassSize.align(ClassSize.estimateBase(
           new MultiByteBuff(buf).getClass(), true)
           + HConstants.HFILEBLOCK_HEADER_SIZE + size);

http://git-wip-us.apache.org/repos/asf/hbase/blob/54a543de/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java
new file mode 100644
index 0000000..16607b9
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java
@@ -0,0 +1,750 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ;
+import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
+import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
+import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ChecksumType;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.compress.Compressor;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class has unit tests to prove that older versions of
+ * HFiles (without checksums) are compatible with current readers.
+ */
+@Category({IOTests.class, SmallTests.class})
+@RunWith(Parameterized.class)
+public class TestHFileBlockCompatibility {
+
+  private static final Log LOG = LogFactory.getLog(TestHFileBlockCompatibility.class);
+  private static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = {
+      NONE, GZ };
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private HFileSystem fs;
+
+  private final boolean includesMemstoreTS;
+  private final boolean includesTag;
+
+  public TestHFileBlockCompatibility(boolean includesMemstoreTS, boolean includesTag) {
+    this.includesMemstoreTS = includesMemstoreTS;
+    this.includesTag = includesTag;
+  }
+
+  @Parameters
+  public static Collection<Object[]> parameters() {
+    return HBaseTestingUtility.MEMSTORETS_TAGS_PARAMETRIZED;
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    fs = (HFileSystem)HFileSystem.get(TEST_UTIL.getConfiguration());
+  }
+
+  public byte[] createTestV1Block(Compression.Algorithm algo)
+      throws IOException {
+    Compressor compressor = algo.getCompressor();
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    OutputStream os = algo.createCompressionStream(baos, compressor, 0);
+    DataOutputStream dos = new DataOutputStream(os);
+    BlockType.META.write(dos); // Let's make this a meta block.
+    TestHFileBlock.writeTestBlockContents(dos);
+    dos.flush();
+    algo.returnCompressor(compressor);
+    return baos.toByteArray();
+  }
+
+  private Writer createTestV2Block(Compression.Algorithm algo)
+      throws IOException {
+    final BlockType blockType = BlockType.DATA;
+    Writer hbw = new Writer(algo, null,
+        includesMemstoreTS, includesTag);
+    DataOutputStream dos = hbw.startWriting(blockType);
+    TestHFileBlock.writeTestBlockContents(dos);
+    // make sure the block is ready by calling hbw.getHeaderAndData()
+    hbw.getHeaderAndData();
+    assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader());
+    hbw.releaseCompressor();
+    return hbw;
+  }
+
+ private String createTestBlockStr(Compression.Algorithm algo,
+      int correctLength) throws IOException {
+    Writer hbw = createTestV2Block(algo);
+    byte[] testV2Block = hbw.getHeaderAndData();
+    int osOffset = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM + 9;
+    if (testV2Block.length == correctLength) {
+      // Force-set the "OS" field of the gzip header to 3 (Unix) to avoid
+      // variations across operating systems.
+      // See http://www.gzip.org/zlib/rfc-gzip.html for gzip format.
+      testV2Block[osOffset] = 3;
+    }
+    return Bytes.toStringBinary(testV2Block);
+  }
+
+  @Test
+  public void testNoCompression() throws IOException {
+    assertEquals(4000, createTestV2Block(NONE).getBlockForCaching().
+        getUncompressedSizeWithoutHeader());
+  }
+
+  @Test
+  public void testGzipCompression() throws IOException {
+    final String correctTestBlockStr =
+        "DATABLK*\\x00\\x00\\x00:\\x00\\x00\\x0F\\xA0\\xFF\\xFF\\xFF\\xFF"
+            + "\\xFF\\xFF\\xFF\\xFF"
+            // gzip-compressed block: http://www.gzip.org/zlib/rfc-gzip.html
+            + "\\x1F\\x8B"  // gzip magic signature
+            + "\\x08"  // Compression method: 8 = "deflate"
+            + "\\x00"  // Flags
+            + "\\x00\\x00\\x00\\x00"  // mtime
+            + "\\x00"  // XFL (extra flags)
+            // OS (0 = FAT filesystems, 3 = Unix). However, this field
+            // sometimes gets set to 0 on Linux and Mac, so we reset it to 3.
+            + "\\x03"
+            + "\\xED\\xC3\\xC1\\x11\\x00 \\x08\\xC00DD\\xDD\\x7Fa"
+            + "\\xD6\\xE8\\xA3\\xB9K\\x84`\\x96Q\\xD3\\xA8\\xDB\\xA8e\\xD4c"
+            + "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\x5Cs\\xA0\\x0F\\x00\\x00";
+    final int correctGzipBlockLength = 82;
+
+    String returnedStr = createTestBlockStr(GZ, correctGzipBlockLength);
+    assertEquals(correctTestBlockStr, returnedStr);
+  }
+
+  @Test
+  public void testReaderV2() throws IOException {
+    if(includesTag) {
+      TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
+    }
+    for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
+      for (boolean pread : new boolean[] { false, true }) {
+          LOG.info("testReaderV2: Compression algorithm: " + algo +
+                   ", pread=" + pread);
+        Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
+            + algo);
+        FSDataOutputStream os = fs.create(path);
+        Writer hbw = new Writer(algo, null,
+            includesMemstoreTS, includesTag);
+        long totalSize = 0;
+        for (int blockId = 0; blockId < 2; ++blockId) {
+          DataOutputStream dos = hbw.startWriting(BlockType.DATA);
+          for (int i = 0; i < 1234; ++i)
+            dos.writeInt(i);
+          hbw.writeHeaderAndData(os);
+          totalSize += hbw.getOnDiskSizeWithHeader();
+        }
+        os.close();
+
+        FSDataInputStream is = fs.open(path);
+        HFileContext meta = new HFileContextBuilder()
+                           .withHBaseCheckSum(false)
+                           .withIncludesMvcc(includesMemstoreTS)
+                           .withIncludesTags(includesTag)
+                           .withCompression(algo)
+                           .build();
+        HFileBlock.FSReader hbr =
+          new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper(is), totalSize, fs, path, meta);
+        HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
+        is.close();
+
+        b.sanityCheck();
+        assertEquals(4936, b.getUncompressedSizeWithoutHeader());
+        assertEquals(algo == GZ ? 2173 : 4936,
+                     b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
+        HFileBlock expected = b;
+
+        if (algo == GZ) {
+          is = fs.open(path);
+          hbr = new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper(is), totalSize, fs, path,
+              meta);
+          b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM +
+                                b.totalChecksumBytes(), -1, pread);
+          assertEquals(expected, b);
+          int wrongCompressedSize = 2172;
+          try {
+            b = hbr.readBlockData(0, wrongCompressedSize
+                + HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM, -1, pread);
+            fail("Exception expected");
+          } catch (IOException ex) {
+            String expectedPrefix = "On-disk size without header provided is "
+                + wrongCompressedSize + ", but block header contains "
+                + b.getOnDiskSizeWithoutHeader() + ".";
+            assertTrue("Invalid exception message: '" + ex.getMessage()
+                + "'.\nMessage is expected to start with: '" + expectedPrefix
+                + "'", ex.getMessage().startsWith(expectedPrefix));
+          }
+          is.close();
+        }
+      }
+    }
+  }
+
+  /**
+   * Test encoding/decoding data blocks.
+   * @throws IOException a bug or a problem with temporary files.
+   */
+  @Test
+  public void testDataBlockEncoding() throws IOException {
+    if(includesTag) {
+      TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
+    }
+    final int numBlocks = 5;
+    for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
+      for (boolean pread : new boolean[] { false, true }) {
+        for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
+          LOG.info("testDataBlockEncoding algo " + algo +
+                   " pread = " + pread +
+                   " encoding " + encoding);
+          Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
+              + algo + "_" + encoding.toString());
+          FSDataOutputStream os = fs.create(path);
+          HFileDataBlockEncoder dataBlockEncoder = (encoding != DataBlockEncoding.NONE) ?
+              new HFileDataBlockEncoderImpl(encoding) : NoOpDataBlockEncoder.INSTANCE;
+          TestHFileBlockCompatibility.Writer hbw =
+              new TestHFileBlockCompatibility.Writer(algo,
+                  dataBlockEncoder, includesMemstoreTS, includesTag);
+          long totalSize = 0;
+          final List<Integer> encodedSizes = new ArrayList<Integer>();
+          final List<ByteBuffer> encodedBlocks = new ArrayList<ByteBuffer>();
+          for (int blockId = 0; blockId < numBlocks; ++blockId) {
+            hbw.startWriting(BlockType.DATA);
+            TestHFileBlock.writeTestKeyValues(hbw, blockId, pread, includesTag);
+            hbw.writeHeaderAndData(os);
+            int headerLen = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
+            byte[] encodedResultWithHeader = hbw.getUncompressedDataWithHeader();
+            final int encodedSize = encodedResultWithHeader.length - headerLen;
+            if (encoding != DataBlockEncoding.NONE) {
+              // We need to account for the two-byte encoding algorithm ID that
+              // comes after the 24-byte block header but before encoded KVs.
+              headerLen += DataBlockEncoding.ID_SIZE;
+            }
+            byte[] encodedDataSection =
+                new byte[encodedResultWithHeader.length - headerLen];
+            System.arraycopy(encodedResultWithHeader, headerLen,
+                encodedDataSection, 0, encodedDataSection.length);
+            final ByteBuffer encodedBuf =
+                ByteBuffer.wrap(encodedDataSection);
+            encodedSizes.add(encodedSize);
+            encodedBlocks.add(encodedBuf);
+            totalSize += hbw.getOnDiskSizeWithHeader();
+          }
+          os.close();
+
+          FSDataInputStream is = fs.open(path);
+          HFileContext meta = new HFileContextBuilder()
+                              .withHBaseCheckSum(false)
+                              .withIncludesMvcc(includesMemstoreTS)
+                              .withIncludesTags(includesTag)
+                              .withCompression(algo)
+                              .build();
+          HFileBlock.FSReaderImpl hbr = new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper(is),
+              totalSize, fs, path, meta);
+          hbr.setDataBlockEncoder(dataBlockEncoder);
+          hbr.setIncludesMemstoreTS(includesMemstoreTS);
+
+          HFileBlock b;
+          int pos = 0;
+          for (int blockId = 0; blockId < numBlocks; ++blockId) {
+            b = hbr.readBlockData(pos, -1, -1, pread);
+            b.sanityCheck();
+            if (meta.isCompressedOrEncrypted()) {
+              assertFalse(b.isUnpacked());
+              b = b.unpack(meta, hbr);
+            }
+            pos += b.getOnDiskSizeWithHeader();
+
+            assertEquals((int) encodedSizes.get(blockId),
+                b.getUncompressedSizeWithoutHeader());
+            ByteBuff actualBuffer = b.getBufferWithoutHeader();
+            if (encoding != DataBlockEncoding.NONE) {
+              // We expect a two-byte big-endian encoding id.
+              assertEquals(0, actualBuffer.get(0));
+              assertEquals(encoding.getId(), actualBuffer.get(1));
+              actualBuffer.position(2);
+              actualBuffer = actualBuffer.slice();
+            }
+
+            ByteBuffer expectedBuffer = encodedBlocks.get(blockId);
+            expectedBuffer.rewind();
+
+            // test if content matches, produce nice message
+            TestHFileBlock.assertBuffersEqual(new SingleByteBuff(expectedBuffer), actualBuffer,
+              algo, encoding, pread);
+          }
+          is.close();
+        }
+      }
+    }
+  }
+  /**
+   * This is the version of the HFileBlock.Writer that is used to
+   * create V2 blocks with minor version 0. These blocks do not
+   * have hbase-level checksums. The code is here to test
+   * backward compatibility. The reason we do not inherit from
+   * HFileBlock.Writer is because we never ever want to change the code
+   * in this class but the code in HFileBlock.Writer will continually
+   * evolve.
+   */
+  public static final class Writer extends HFileBlock.Writer {
+
+    // These constants are as they were in minorVersion 0.
+    private static final int HEADER_SIZE = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
+    private static final boolean DONT_FILL_HEADER = HFileBlock.DONT_FILL_HEADER;
+    private static final byte[] DUMMY_HEADER = HFileBlock.DUMMY_HEADER_NO_CHECKSUM;
+
+    private enum State {
+      INIT,
+      WRITING,
+      BLOCK_READY
+    };
+
+    /** Writer state. Used to ensure the correct usage protocol. */
+    private State state = State.INIT;
+
+    /** Compression algorithm for all blocks this instance writes. */
+    private final Compression.Algorithm compressAlgo;
+
+    /** Data block encoder used for data blocks */
+    private final HFileDataBlockEncoder dataBlockEncoder;
+
+    private HFileBlockEncodingContext dataBlockEncodingCtx;
+    /** block encoding context for non-data blocks */
+    private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx;
+
+    /**
+     * The stream we use to accumulate data in uncompressed format for each
+     * block. We reset this stream at the end of each block and reuse it. The
+     * header is written as the first {@link #HEADER_SIZE} bytes into this
+     * stream.
+     */
+    private ByteArrayOutputStream baosInMemory;
+
+    /** Compressor, which is also reused between consecutive blocks. */
+    private Compressor compressor;
+
+    /**
+     * Current block type. Set in {@link #startWriting(BlockType)}. Could be
+     * changed in {@link #encodeDataBlockForDisk()} from {@link BlockType#DATA}
+     * to {@link BlockType#ENCODED_DATA}.
+     */
+    private BlockType blockType;
+
+    /**
+     * A stream that we write uncompressed bytes to, which compresses them and
+     * writes them to {@link #baosInMemory}.
+     */
+    private DataOutputStream userDataStream;
+
+    /**
+     * Bytes to be written to the file system, including the header. Compressed
+     * if compression is turned on.
+     */
+    private byte[] onDiskBytesWithHeader;
+
+    /**
+     * Valid in the READY state. Contains the header and the uncompressed (but
+     * potentially encoded, if this is a data block) bytes, so the length is
+     * {@link #uncompressedSizeWithoutHeader} + {@link org.apache.hadoop.hbase.HConstants#HFILEBLOCK_HEADER_SIZE}.
+     */
+    private byte[] uncompressedBytesWithHeader;
+
+    /**
+     * Current block's start offset in the {@link HFile}. Set in
+     * {@link #writeHeaderAndData(FSDataOutputStream)}.
+     */
+    private long startOffset;
+
+    /**
+     * Offset of previous block by block type. Updated when the next block is
+     * started.
+     */
+    private long[] prevOffsetByType;
+
+    /** The offset of the previous block of the same type */
+    private long prevOffset;
+
+    private int unencodedDataSizeWritten;
+
+    public Writer(Compression.Algorithm compressionAlgorithm,
+        HFileDataBlockEncoder dataBlockEncoder, boolean includesMemstoreTS, boolean includesTag) {
+      this(dataBlockEncoder, new HFileContextBuilder().withHBaseCheckSum(false)
+          .withIncludesMvcc(includesMemstoreTS).withIncludesTags(includesTag)
+          .withCompression(compressionAlgorithm).build());
+    }
+
+    public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext meta) {
+      super(dataBlockEncoder, meta);
+      compressAlgo = meta.getCompression() == null ? NONE : meta.getCompression();
+      this.dataBlockEncoder = dataBlockEncoder != null ? dataBlockEncoder
+          : NoOpDataBlockEncoder.INSTANCE;
+      defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null, DUMMY_HEADER, meta);
+      dataBlockEncodingCtx = this.dataBlockEncoder.newDataBlockEncodingContext(DUMMY_HEADER, meta);
+      baosInMemory = new ByteArrayOutputStream();
+
+      prevOffsetByType = new long[BlockType.values().length];
+      for (int i = 0; i < prevOffsetByType.length; ++i)
+        prevOffsetByType[i] = -1;
+    }
+
+    /**
+     * Starts writing into the block. The previous block's data is discarded.
+     *
+     * @return the stream the user can write their data into
+     * @throws IOException
+     */
+    public DataOutputStream startWriting(BlockType newBlockType)
+        throws IOException {
+      if (state == State.BLOCK_READY && startOffset != -1) {
+        // We had a previous block that was written to a stream at a specific
+        // offset. Save that offset as the last offset of a block of that type.
+        prevOffsetByType[blockType.getId()] = startOffset;
+      }
+
+      startOffset = -1;
+      blockType = newBlockType;
+
+      baosInMemory.reset();
+      baosInMemory.write(DUMMY_HEADER);
+
+      state = State.WRITING;
+
+      // We will compress it later in finishBlock()
+      userDataStream = new DataOutputStream(baosInMemory);
+      if (newBlockType == BlockType.DATA) {
+        this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
+      }
+      this.unencodedDataSizeWritten = 0;
+      return userDataStream;
+    }
+
+    @Override
+    public void write(Cell c) throws IOException {
+      KeyValue kv = KeyValueUtil.ensureKeyValue(c);
+      expectState(State.WRITING);
+      this.dataBlockEncoder.encode(kv, dataBlockEncodingCtx, this.userDataStream);
+      this.unencodedDataSizeWritten += kv.getLength();
+      if (dataBlockEncodingCtx.getHFileContext().isIncludesMvcc()) {
+        this.unencodedDataSizeWritten += WritableUtils.getVIntSize(kv.getSequenceId());
+      }
+    }
+
+    /**
+     * Returns the stream for the user to write to. The block writer takes care
+     * of handling compression and buffering for caching on write. Can only be
+     * called in the "writing" state.
+     *
+     * @return the data output stream for the user to write to
+     */
+    DataOutputStream getUserDataStream() {
+      expectState(State.WRITING);
+      return userDataStream;
+    }
+
+    /**
+     * Transitions the block writer from the "writing" state to the "block
+     * ready" state.  Does nothing if a block is already finished.
+     */
+    void ensureBlockReady() throws IOException {
+      Preconditions.checkState(state != State.INIT,
+          "Unexpected state: " + state);
+
+      if (state == State.BLOCK_READY)
+        return;
+
+      // This will set state to BLOCK_READY.
+      finishBlock();
+    }
+
+    /**
+     * An internal method that flushes the compressing stream (if using
+     * compression), serializes the header, and takes care of the separate
+     * uncompressed stream for caching on write, if applicable. Sets block
+     * write state to "block ready".
+     */
+    void finishBlock() throws IOException {
+      if (blockType == BlockType.DATA) {
+        this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream,
+            baosInMemory.toByteArray(), blockType);
+        blockType = dataBlockEncodingCtx.getBlockType();
+      }
+      userDataStream.flush();
+      // This does an array copy, so it is safe to cache this byte array.
+      uncompressedBytesWithHeader = baosInMemory.toByteArray();
+      prevOffset = prevOffsetByType[blockType.getId()];
+
+      // We need to set state before we can package the block up for
+      // cache-on-write. In a way, the block is ready, but not yet encoded or
+      // compressed.
+      state = State.BLOCK_READY;
+      if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) {
+        onDiskBytesWithHeader = dataBlockEncodingCtx
+            .compressAndEncrypt(uncompressedBytesWithHeader);
+      } else {
+        onDiskBytesWithHeader = defaultBlockEncodingCtx
+            .compressAndEncrypt(uncompressedBytesWithHeader);
+      }
+
+      // put the header for on disk bytes
+      putHeader(onDiskBytesWithHeader, 0,
+          onDiskBytesWithHeader.length,
+          uncompressedBytesWithHeader.length);
+      //set the header for the uncompressed bytes (for cache-on-write)
+      putHeader(uncompressedBytesWithHeader, 0,
+          onDiskBytesWithHeader.length,
+        uncompressedBytesWithHeader.length);
+    }
+
+    /**
+     * Put the header into the given byte array at the given offset.
+     * @param onDiskSize size of the block on disk
+     * @param uncompressedSize size of the block after decompression (but
+     *          before optional data block decoding)
+     */
+    private void putHeader(byte[] dest, int offset, int onDiskSize,
+        int uncompressedSize) {
+      offset = blockType.put(dest, offset);
+      offset = Bytes.putInt(dest, offset, onDiskSize - HEADER_SIZE);
+      offset = Bytes.putInt(dest, offset, uncompressedSize - HEADER_SIZE);
+      Bytes.putLong(dest, offset, prevOffset);
+    }
+
+    /**
+     * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records
+     * the offset of this block so that it can be referenced in the next block
+     * of the same type.
+     *
+     * @param out
+     * @throws IOException
+     */
+    public void writeHeaderAndData(FSDataOutputStream out) throws IOException {
+      long offset = out.getPos();
+      if (startOffset != -1 && offset != startOffset) {
+        throw new IOException("A " + blockType + " block written to a "
+            + "stream twice, first at offset " + startOffset + ", then at "
+            + offset);
+      }
+      startOffset = offset;
+
+      writeHeaderAndData((DataOutputStream) out);
+    }
+
+    /**
+     * Writes the header and the compressed data of this block (or uncompressed
+     * data when not using compression) into the given stream. Can be called in
+     * the "writing" state or in the "block ready" state. If called in the
+     * "writing" state, transitions the writer to the "block ready" state.
+     *
+     * @param out the output stream to write the
+     * @throws IOException
+     */
+    private void writeHeaderAndData(DataOutputStream out) throws IOException {
+      ensureBlockReady();
+      out.write(onDiskBytesWithHeader);
+    }
+
+    /**
+     * Returns the header or the compressed data (or uncompressed data when not
+     * using compression) as a byte array. Can be called in the "writing" state
+     * or in the "block ready" state. If called in the "writing" state,
+     * transitions the writer to the "block ready" state.
+     *
+     * @return header and data as they would be stored on disk in a byte array
+     * @throws IOException
+     */
+    public byte[] getHeaderAndData() throws IOException {
+      ensureBlockReady();
+      return onDiskBytesWithHeader;
+    }
+
+    /**
+     * Releases the compressor this writer uses to compress blocks into the
+     * compressor pool. Needs to be called before the writer is discarded.
+     */
+    public void releaseCompressor() {
+      if (compressor != null) {
+        compressAlgo.returnCompressor(compressor);
+        compressor = null;
+      }
+    }
+
+    /**
+     * Returns the on-disk size of the data portion of the block. This is the
+     * compressed size if compression is enabled. Can only be called in the
+     * "block ready" state. Header is not compressed, and its size is not
+     * included in the return value.
+     *
+     * @return the on-disk size of the block, not including the header.
+     */
+    public int getOnDiskSizeWithoutHeader() {
+      expectState(State.BLOCK_READY);
+      return onDiskBytesWithHeader.length - HEADER_SIZE;
+    }
+
+    /**
+     * Returns the on-disk size of the block. Can only be called in the
+     * "block ready" state.
+     *
+     * @return the on-disk size of the block ready to be written, including the
+     *         header size
+     */
+    public int getOnDiskSizeWithHeader() {
+      expectState(State.BLOCK_READY);
+      return onDiskBytesWithHeader.length;
+    }
+
+    /**
+     * The uncompressed size of the block data. Does not include header size.
+     */
+    public int getUncompressedSizeWithoutHeader() {
+      expectState(State.BLOCK_READY);
+      return uncompressedBytesWithHeader.length - HEADER_SIZE;
+    }
+
+    /**
+     * The uncompressed size of the block data, including header size.
+     */
+    public int getUncompressedSizeWithHeader() {
+      expectState(State.BLOCK_READY);
+      return uncompressedBytesWithHeader.length;
+    }
+
+    /** @return true if a block is being written  */
+    public boolean isWriting() {
+      return state == State.WRITING;
+    }
+
+    /**
+     * Returns the number of bytes written into the current block so far, or
+     * zero if not writing the block at the moment. Note that this will return
+     * zero in the "block ready" state as well.
+     *
+     * @return the number of bytes written
+     */
+    public int blockSizeWritten() {
+      if (state != State.WRITING)
+        return 0;
+      return this.unencodedDataSizeWritten;
+    }
+
+    /**
+     * Returns the header followed by the uncompressed data, even if using
+     * compression. This is needed for storing uncompressed blocks in the block
+     * cache. Can be called in the "writing" state or the "block ready" state.
+     *
+     * @return uncompressed block bytes for caching on write
+     */
+    private byte[] getUncompressedDataWithHeader() {
+      expectState(State.BLOCK_READY);
+
+      return uncompressedBytesWithHeader;
+    }
+
+    private void expectState(State expectedState) {
+      if (state != expectedState) {
+        throw new IllegalStateException("Expected state: " + expectedState +
+            ", actual state: " + state);
+      }
+    }
+
+    /**
+     * Similar to {@link #getUncompressedBufferWithHeader()} but returns a byte
+     * buffer.
+     *
+     * @return uncompressed block for caching on write in the form of a buffer
+     */
+    public ByteBuffer getUncompressedBufferWithHeader() {
+      byte[] b = getUncompressedDataWithHeader();
+      return ByteBuffer.wrap(b, 0, b.length);
+    }
+
+    /**
+     * Takes the given {@link BlockWritable} instance, creates a new block of
+     * its appropriate type, writes the writable into this block, and flushes
+     * the block into the output stream. The writer is instructed not to buffer
+     * uncompressed bytes for cache-on-write.
+     *
+     * @param bw the block-writable object to write as a block
+     * @param out the file system output stream
+     * @throws IOException
+     */
+    public void writeBlock(BlockWritable bw, FSDataOutputStream out)
+        throws IOException {
+      bw.writeToBlock(startWriting(bw.getBlockType()));
+      writeHeaderAndData(out);
+    }
+
+    /**
+     * Creates a new HFileBlock.
+     */
+    public HFileBlock getBlockForCaching() {
+      HFileContext meta = new HFileContextBuilder()
+             .withHBaseCheckSum(false)
+             .withChecksumType(ChecksumType.NULL)
+             .withBytesPerCheckSum(0)
+             .build();
+      return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
+          getUncompressedSizeWithoutHeader(), prevOffset,
+          getUncompressedBufferWithHeader(), DONT_FILL_HEADER, startOffset,
+          getOnDiskSizeWithoutHeader(), meta);
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/hbase/blob/54a543de/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
index 470d483..687d3cd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
@@ -185,7 +185,8 @@ public class TestHFileBlockIndex {
       }
 
       missCount += 1;
-      prevBlock = realReader.readBlockData(offset, onDiskSize, pread);
+      prevBlock = realReader.readBlockData(offset, onDiskSize,
+          -1, pread);
       prevOffset = offset;
       prevOnDiskSize = onDiskSize;
       prevPread = pread;

http://git-wip-us.apache.org/repos/asf/hbase/blob/54a543de/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java
index 387514e..6f434bb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java
@@ -92,7 +92,8 @@ public class TestHFileDataBlockEncoder {
 
     if (blockEncoder.getDataBlockEncoding() ==
         DataBlockEncoding.NONE) {
-      assertEquals(block.getBufferReadOnly(), returnedBlock.getBufferReadOnly());
+      assertEquals(block.getBufferWithHeader(),
+          returnedBlock.getBufferWithHeader());
     } else {
       if (BlockType.ENCODED_DATA != returnedBlock.getBlockType()) {
         System.out.println(blockEncoder);
@@ -126,7 +127,7 @@ public class TestHFileDataBlockEncoder {
                         .build();
     HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf,
         HFileBlock.FILL_HEADER, 0,
-        0, -1, hfileContext);
+        0, hfileContext);
     HFileBlock cacheBlock = createBlockOnDisk(kvs, block, useTags);
     assertEquals(headerSize, cacheBlock.getDummyHeaderForVersion().length);
   }
@@ -197,7 +198,7 @@ public class TestHFileDataBlockEncoder {
                         .build();
     HFileBlock b = new HFileBlock(BlockType.DATA, size, size, -1, buf,
         HFileBlock.FILL_HEADER, 0, 
-         0, -1, meta);
+         0, meta);
     return b;
   }
 
@@ -219,8 +220,7 @@ public class TestHFileDataBlockEncoder {
     byte[] encodedBytes = baos.toByteArray();
     size = encodedBytes.length - block.getDummyHeaderForVersion().length;
     return new HFileBlock(context.getBlockType(), size, size, -1, ByteBuffer.wrap(encodedBytes),
-        HFileBlock.FILL_HEADER, 0, block.getOnDiskDataSizeWithHeader(), -1,
-        block.getHFileContext());
+        HFileBlock.FILL_HEADER, 0, block.getOnDiskDataSizeWithHeader(), block.getHFileContext());
   }
 
   private void writeBlock(List<Cell> kvs, HFileContext fileContext, boolean useTags)

http://git-wip-us.apache.org/repos/asf/hbase/blob/54a543de/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java
index 3264558..ba3a344 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java
@@ -99,7 +99,7 @@ public class TestHFileEncryption {
 
   private long readAndVerifyBlock(long pos, HFileContext ctx, HFileBlock.FSReaderImpl hbr, int size)
       throws IOException {
-    HFileBlock b = hbr.readBlockData(pos, -1, false);
+    HFileBlock b = hbr.readBlockData(pos, -1, -1, false);
     assertEquals(0, HFile.getChecksumFailuresCount());
     b.sanityCheck();
     assertFalse(b.isUnpacked());

http://git-wip-us.apache.org/repos/asf/hbase/blob/54a543de/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java
index 983ec2f..c7eb11b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java
@@ -218,7 +218,7 @@ public class TestHFileWriterV3 {
     fsdis.seek(0);
     long curBlockPos = 0;
     while (curBlockPos <= trailer.getLastDataBlockOffset()) {
-      HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false)
+      HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false)
         .unpack(context, blockReader);
       assertEquals(BlockType.DATA, block.getBlockType());
       ByteBuff buf = block.getBufferWithoutHeader();
@@ -279,14 +279,13 @@ public class TestHFileWriterV3 {
     while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) {
       LOG.info("Current offset: " + fsdis.getPos() + ", scanning until " +
           trailer.getLoadOnOpenDataOffset());
-      HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false)
+      HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false)
         .unpack(context, blockReader);
       assertEquals(BlockType.META, block.getBlockType());
       Text t = new Text();
       ByteBuff buf = block.getBufferWithoutHeader();
       if (Writables.getWritable(buf.array(), buf.arrayOffset(), buf.limit(), t) == null) {
-        throw new IOException("Failed to deserialize block " + this +
-            " into a " + t.getClass().getSimpleName());
+        throw new IOException("Failed to deserialize block " + this + " into a " + t.getClass().getSimpleName());
       }
       Text expectedText =
           (metaCounter == 0 ? new Text("Paris") : metaCounter == 1 ? new Text(

http://git-wip-us.apache.org/repos/asf/hbase/blob/54a543de/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
index d20ba2b..69a77bf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
@@ -78,8 +78,14 @@ public class TestPrefetch {
     // Check that all of the data blocks were preloaded
     BlockCache blockCache = cacheConf.getBlockCache();
     long offset = 0;
+    HFileBlock prevBlock = null;
     while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
-      HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null);
+      long onDiskSize = -1;
+      if (prevBlock != null) {
+         onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader();
+      }
+      HFileBlock block = reader.readBlock(offset, onDiskSize, false, true, false, true, null,
+        null);
       BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
       boolean isCached = blockCache.getBlock(blockCacheKey, true, false, true) != null;
       if (block.getBlockType() == BlockType.DATA ||
@@ -87,6 +93,7 @@ public class TestPrefetch {
           block.getBlockType() == BlockType.INTERMEDIATE_INDEX) {
         assertTrue(isCached);
       }
+      prevBlock = block;
       offset += block.getOnDiskSizeWithHeader();
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/54a543de/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java
index 2357bef..0916fe6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java
@@ -227,10 +227,15 @@ public class TestCacheOnWriteInSchema {
       assertTrue(testDescription, scanner.seekTo());
       // Cribbed from io.hfile.TestCacheOnWrite
       long offset = 0;
+      HFileBlock prevBlock = null;
       while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
+        long onDiskSize = -1;
+        if (prevBlock != null) {
+          onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader();
+        }
         // Flags: don't cache the block, use pread, this is not a compaction.
         // Also, pass null for expected block type to avoid checking it.
-        HFileBlock block = reader.readBlock(offset, -1, false, true,
+        HFileBlock block = reader.readBlock(offset, onDiskSize, false, true,
           false, true, null, DataBlockEncoding.NONE);
         BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(),
           offset);
@@ -244,6 +249,7 @@ public class TestCacheOnWriteInSchema {
             "block: " + block + "\n" +
             "blockCacheKey: " + blockCacheKey);
         }
+        prevBlock = block;
         offset += block.getOnDiskSizeWithHeader();
       }
     } finally {


Mime
View raw message