hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From anoopsamj...@apache.org
Subject git commit: HBASE-12202 Support DirectByteBuffer usage in HFileBlock.
Date Tue, 21 Oct 2014 12:28:23 GMT
Repository: hbase
Updated Branches:
  refs/heads/master c7f51db13 -> c01d9981d


HBASE-12202 Support DirectByteBuffer usage in HFileBlock.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c01d9981
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c01d9981
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c01d9981

Branch: refs/heads/master
Commit: c01d9981d89ba3633377952d132c88536c86ebc5
Parents: c7f51db
Author: anoopsjohn <anoopsamjohn@gmail.com>
Authored: Tue Oct 21 17:57:55 2014 +0530
Committer: anoopsjohn <anoopsamjohn@gmail.com>
Committed: Tue Oct 21 17:57:55 2014 +0530

----------------------------------------------------------------------
 .../hadoop/hbase/io/ByteBufferInputStream.java  | 101 ++++++++++++++
 .../io/encoding/HFileBlockDecodingContext.java  |   6 +-
 .../HFileBlockDefaultDecodingContext.java       |   8 +-
 .../apache/hadoop/hbase/io/hfile/BlockType.java |   8 +-
 .../hadoop/hbase/util/ByteBufferUtils.java      |  37 ++++++
 .../hbase/io/TestByteBufferInputStream.java     |  82 ++++++++++++
 .../hadoop/hbase/io/hfile/HFileBlock.java       | 132 +++++++++++--------
 7 files changed, 308 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c01d9981/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java
new file mode 100644
index 0000000..1530ccd
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Not thread safe!
+ * <p>
+ * Please note that the reads will cause position movement on wrapped ByteBuffer.
+ */
+@InterfaceAudience.Private
+public class ByteBufferInputStream extends InputStream {
+
+  private ByteBuffer buf;
+
+  public ByteBufferInputStream(ByteBuffer buf) {
+      this.buf = buf;
+  }
+
+  /**
+   * Reads the next byte of data from this input stream. The value byte is returned as an
+   * <code>int</code> in the range <code>0</code> to <code>255</code>.
If no byte is available
+   * because the end of the stream has been reached, the value <code>-1</code>
is returned.
+   * @return the next byte of data, or <code>-1</code> if the end of the stream
has been reached.
+   */
+  public int read() {
+    if (this.buf.hasRemaining()) {
+      return (this.buf.get() & 0xff);
+    }
+    return -1;
+  }
+
+  /**
+   * Reads up to next <code>len</code> bytes of data from buffer into passed
array(starting from
+   * given offset).
+   * @param b the array into which the data is read.
+   * @param off the start offset in the destination array <code>b</code>
+   * @param len the maximum number of bytes to read.
+   * @return the total number of bytes actually read into the buffer, or <code>-1</code>
if not even
+   *         1 byte can be read because the end of the stream has been reached.
+   */
+  public int read(byte b[], int off, int len) {
+    int avail = available();
+    if (avail <= 0) {
+      return -1;
+    }
+
+    if (len > avail) {
+      len = avail;
+    }
+    if (len <= 0) {
+      return 0;
+    }
+
+    this.buf.get(b, off, len);
+    return len;
+  }
+
+  /**
+   * Skips <code>n</code> bytes of input from this input stream. Fewer bytes
might be skipped if the
+   * end of the input stream is reached. The actual number <code>k</code> of
bytes to be skipped is
+   * equal to the smaller of <code>n</code> and remaining bytes in the stream.
+   * @param n the number of bytes to be skipped.
+   * @return the actual number of bytes skipped.
+   */
+  public long skip(long n) {
+    long k = Math.min(n, available());
+    if (k < 0) {
+      k = 0;
+    }
+    this.buf.position((int) (this.buf.position() + k));
+    return k;
+  }
+
+  /**
+   * @return  the number of remaining bytes that can be read (or skipped
+   *          over) from this input stream.
+   */
+  public int available() {
+    return this.buf.remaining();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c01d9981/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java
index 5889c7b..37001cc 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java
@@ -40,16 +40,14 @@ public interface HFileBlockDecodingContext {
    * @param uncompressedSizeWithoutHeader numBytes without header required to store the block
after
    *          decompressing (not decoding)
    * @param blockBufferWithoutHeader ByteBuffer pointed after the header but before the data
-   * @param onDiskBlock on disk bytes to be decoded
-   * @param offset data start offset in onDiskBlock
+   * @param onDiskBlock on disk data to be decoded
    * @throws IOException
    */
   void prepareDecoding(
     int onDiskSizeWithoutHeader,
     int uncompressedSizeWithoutHeader,
     ByteBuffer blockBufferWithoutHeader,
-    byte[] onDiskBlock,
-    int offset
+    ByteBuffer onDiskBlock
   ) throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/c01d9981/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java
index b27a998..18407d1 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java
@@ -16,7 +16,6 @@
  */
 package org.apache.hadoop.hbase.io.encoding;
 
-import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -24,13 +23,13 @@ import java.nio.ByteBuffer;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.io.ByteBufferInputStream;
 import org.apache.hadoop.hbase.io.TagCompressionContext;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.crypto.Cipher;
 import org.apache.hadoop.hbase.io.crypto.Decryptor;
 import org.apache.hadoop.hbase.io.crypto.Encryption;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
-import org.apache.hadoop.hbase.io.util.StreamUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
@@ -52,9 +51,8 @@ public class HFileBlockDefaultDecodingContext implements
 
   @Override
   public void prepareDecoding(int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader,
-      ByteBuffer blockBufferWithoutHeader, byte[] onDiskBlock, int offset) throws IOException
{
-    InputStream in = new DataInputStream(new ByteArrayInputStream(onDiskBlock, offset,
-      onDiskSizeWithoutHeader));
+      ByteBuffer blockBufferWithoutHeader, ByteBuffer onDiskBlock) throws IOException {
+    InputStream in = new DataInputStream(new ByteBufferInputStream(onDiskBlock));
 
     Encryption.Context cryptoContext = fileContext.getEncryptionContext();
     if (cryptoContext != Encryption.Context.NONE) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/c01d9981/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java
index 2583647..0db584e 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java
@@ -162,12 +162,10 @@ public enum BlockType {
   }
 
   public static BlockType read(ByteBuffer buf) throws IOException {
-    BlockType blockType = parse(buf.array(),
-        buf.arrayOffset() + buf.position(),
-        Math.min(buf.limit() - buf.position(), MAGIC_LENGTH));
-
+    byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), MAGIC_LENGTH)];
+    buf.get(magicBuf);
+    BlockType blockType = parse(magicBuf, 0, magicBuf.length);
     // If we got here, we have read exactly MAGIC_LENGTH bytes.
-    buf.position(buf.position() + MAGIC_LENGTH);
     return blockType;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/c01d9981/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
index b4c6690..1cbee83 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
@@ -352,6 +352,27 @@ public final class ByteBufferUtils {
   }
 
   /**
+   * Copy from one buffer to another from given offset. This will be absolute positional
copying and
+   * won't affect the position of any of the buffers.
+   * @param out
+   * @param in
+   * @param sourceOffset
+   * @param destinationOffset
+   * @param length
+   */
+  public static void copyFromBufferToBuffer(ByteBuffer out, ByteBuffer in, int sourceOffset,
+      int destinationOffset, int length) {
+    if (in.hasArray() && out.hasArray()) {
+      System.arraycopy(in.array(), sourceOffset + in.arrayOffset(), out.array(), out.arrayOffset()
+          + destinationOffset, length);
+    } else {
+      for (int i = 0; i < length; ++i) {
+        out.put((destinationOffset + i), in.get(sourceOffset + i));
+      }
+    }
+  }
+
+  /**
    * Find length of common prefix of two parts in the buffer
    * @param buffer Where parts are located.
    * @param offsetLeft Offset of the first part.
@@ -454,4 +475,20 @@ public final class ByteBufferUtils {
     return output;
   }
 
+  public static int compareTo(ByteBuffer buf1, int o1, int len1, ByteBuffer buf2, int o2,
int len2) {
+    if (buf1.hasArray() && buf2.hasArray()) {
+      return Bytes.compareTo(buf1.array(), buf1.arrayOffset() + o1, len1, buf2.array(),
+          buf2.arrayOffset() + o2, len2);
+    }
+    int end1 = o1 + len1;
+    int end2 = o2 + len2;
+    for (int i = o1, j = o2; i < end1 && j < end2; i++, j++) {
+      byte a = buf1.get(i);
+      byte b = buf2.get(j);
+      if (a != b) {
+        return a - b;
+      }
+    }
+    return len1 - len2;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c01d9981/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferInputStream.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferInputStream.java
b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferInputStream.java
new file mode 100644
index 0000000..30fb71e
--- /dev/null
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferInputStream.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ IOTests.class, SmallTests.class })
+public class TestByteBufferInputStream {
+
+  @Test
+  public void testReads() throws Exception {
+    ByteArrayOutputStream bos = new ByteArrayOutputStream(100);
+    DataOutputStream dos = new DataOutputStream(bos);
+    String s = "test";
+    int i = 128;
+    dos.write(1);
+    dos.writeInt(i);
+    dos.writeBytes(s);
+    dos.writeLong(12345L);
+    dos.writeShort(2);
+    dos.flush();
+    ByteBuffer bb = ByteBuffer.wrap(bos.toByteArray());
+
+    // bbis contains 19 bytes
+    // 1 byte, 4 bytes int, 4 bytes string, 8 bytes long and 2 bytes short
+    ByteBufferInputStream bbis = new ByteBufferInputStream(bb);
+    assertEquals(15 + s.length(), bbis.available());
+    assertEquals(1, bbis.read());
+    byte[] ib = new byte[4];
+    bbis.read(ib);
+    assertEquals(i, Bytes.toInt(ib));
+    byte[] sb = new byte[s.length()];
+    bbis.read(sb);
+    assertEquals(s, Bytes.toString(sb));
+    byte[] lb = new byte[8];
+    bbis.read(lb);
+    assertEquals(12345, Bytes.toLong(lb));
+    assertEquals(2, bbis.available());
+    ib = new byte[4];
+    int read = bbis.read(ib, 0, ib.length);
+    // We dont have 4 bytes remainig but only 2. So onlt those should be returned back
+    assertEquals(2, read);
+    assertEquals(2, Bytes.toShort(ib));
+    assertEquals(0, bbis.available());
+    // At end. The read() should return -1
+    assertEquals(-1, bbis.read());
+    bbis.close();
+
+    bb = ByteBuffer.wrap(bos.toByteArray());
+    bbis = new ByteBufferInputStream(bb);
+    DataInputStream dis = new DataInputStream(bbis);
+    dis.read();
+    assertEquals(i, dis.readInt());
+    dis.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c01d9981/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index 6341f2d..7f3d80f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
-import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutput;
@@ -35,6 +34,7 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.io.ByteBufferInputStream;
 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ChecksumType;
 import org.apache.hadoop.hbase.util.ClassSize;
@@ -331,8 +332,10 @@ public class HFileBlock implements Cacheable {
    * @return the buffer with header skipped and checksum omitted.
    */
   public ByteBuffer getBufferWithoutHeader() {
-    return ByteBuffer.wrap(buf.array(), buf.arrayOffset() + headerSize(),
-        buf.limit() - headerSize() - totalChecksumBytes()).slice();
+    ByteBuffer dup = this.buf.duplicate();
+    dup.position(headerSize());
+    dup.limit(buf.limit() - totalChecksumBytes());
+    return dup.slice();
   }
 
   /**
@@ -345,8 +348,9 @@ public class HFileBlock implements Cacheable {
    * @return the buffer of this block for read-only operations
    */
   public ByteBuffer getBufferReadOnly() {
-    return ByteBuffer.wrap(buf.array(), buf.arrayOffset(),
-        buf.limit() - totalChecksumBytes()).slice();
+    ByteBuffer dup = this.buf.duplicate();
+    dup.limit(buf.limit() - totalChecksumBytes());
+    return dup.slice();
   }
 
   /**
@@ -357,7 +361,8 @@ public class HFileBlock implements Cacheable {
    * @return the buffer with header and checksum included for read-only operations
    */
   public ByteBuffer getBufferReadOnlyWithHeader() {
-    return ByteBuffer.wrap(buf.array(), buf.arrayOffset(), buf.limit()).slice();
+    ByteBuffer dup = this.buf.duplicate();
+    return dup.slice();
   }
 
   /**
@@ -450,17 +455,22 @@ public class HFileBlock implements Cacheable {
         .append("(").append(onDiskSizeWithoutHeader)
         .append("+").append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")");
     }
+    String dataBegin = null;
+    if (buf.hasArray()) {
+      dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(),
+          Math.min(32, buf.limit() - buf.arrayOffset() - headerSize()));
+    } else {
+      ByteBuffer bufWithoutHeader = getBufferWithoutHeader();
+      byte[] dataBeginBytes = new byte[Math.min(32,
+          bufWithoutHeader.limit() - bufWithoutHeader.position())];
+      bufWithoutHeader.get(dataBeginBytes);
+      dataBegin = Bytes.toStringBinary(dataBeginBytes);
+    }
     sb.append(" getOnDiskSizeWithHeader()=").append(getOnDiskSizeWithHeader())
       .append(" totalChecksumBytes()=").append(totalChecksumBytes())
       .append(" isUnpacked()=").append(isUnpacked())
-      .append(" buf=[ ")
-        .append(buf)
-        .append(", array().length=").append(buf.array().length)
-        .append(", arrayOffset()=").append(buf.arrayOffset())
-      .append(" ]")
-      .append(" dataBeginsWith=")
-      .append(Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(),
-        Math.min(32, buf.limit() - buf.arrayOffset() - headerSize())))
+      .append(" buf=[ ").append(buf).append(" ]")
+      .append(" dataBeginsWith=").append(dataBegin)
       .append(" fileContext=").append(fileContext)
       .append(" ]");
     return sb.toString();
@@ -472,10 +482,17 @@ public class HFileBlock implements Cacheable {
   private void validateOnDiskSizeWithoutHeader(
       int expectedOnDiskSizeWithoutHeader) throws IOException {
     if (onDiskSizeWithoutHeader != expectedOnDiskSizeWithoutHeader) {
+      String dataBegin = null;
+      if (buf.hasArray()) {
+        dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset(), Math.min(32, buf.limit()));
+      } else {
+        ByteBuffer bufDup = getBufferReadOnly();
+        byte[] dataBeginBytes = new byte[Math.min(32, bufDup.limit() - bufDup.position())];
+        bufDup.get(dataBeginBytes);
+        dataBegin = Bytes.toStringBinary(dataBeginBytes);
+      }
       String blockInfoMsg =
-        "Block offset: " + offset + ", data starts with: "
-          + Bytes.toStringBinary(buf.array(), buf.arrayOffset(),
-              buf.arrayOffset() + Math.min(32, buf.limit()));
+        "Block offset: " + offset + ", data starts with: " + dataBegin;
       throw new IOException("On-disk size without header provided is "
           + expectedOnDiskSizeWithoutHeader + ", but block "
           + "header contains " + onDiskSizeWithoutHeader + ". " +
@@ -500,16 +517,30 @@ public class HFileBlock implements Cacheable {
 
     HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA ?
       reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext();
+
+    ByteBuffer dup = this.buf.duplicate();
+    dup.position(this.headerSize());
+    dup = dup.slice();
     ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
       unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(),
-      this.getBufferReadOnlyWithHeader().array(), this.headerSize());
+      dup);
 
     // Preserve the next block's header bytes in the new block if we have them.
     if (unpacked.hasNextBlockHeader()) {
-      System.arraycopy(this.buf.array(), this.buf.arrayOffset() + this.onDiskDataSizeWithHeader,
-        unpacked.buf.array(), unpacked.buf.arrayOffset() + unpacked.headerSize() +
-          unpacked.uncompressedSizeWithoutHeader + unpacked.totalChecksumBytes(),
-        unpacked.headerSize());
+      // Both the buffers are limited till checksum bytes and avoid the next block's header.
+      // Below call to copyFromBufferToBuffer() will try positional read/write from/to buffers
when
+      // any of the buffer is DBB. So we change the limit on a dup buffer. No copying just
create
+      // new BB objects
+      ByteBuffer inDup = this.buf.duplicate();
+      inDup.limit(inDup.limit() + headerSize());
+      ByteBuffer outDup = unpacked.buf.duplicate();
+      outDup.limit(outDup.limit() + unpacked.headerSize());
+      ByteBufferUtils.copyFromBufferToBuffer(
+          outDup,
+          inDup,
+          this.onDiskDataSizeWithHeader,
+          unpacked.headerSize() + unpacked.uncompressedSizeWithoutHeader
+              + unpacked.totalChecksumBytes(), unpacked.headerSize());
     }
     return unpacked;
   }
@@ -532,11 +563,14 @@ public class HFileBlock implements Cacheable {
     int capacityNeeded = headerSize + uncompressedSizeWithoutHeader +
         cksumBytes + (hasNextBlockHeader() ? headerSize : 0);
 
+    // TODO we need consider allocating offheap here?
     ByteBuffer newBuf = ByteBuffer.allocate(capacityNeeded);
 
-    // Copy header bytes.
-    System.arraycopy(buf.array(), buf.arrayOffset(), newBuf.array(),
-        newBuf.arrayOffset(), headerSize);
+    // Copy header bytes into newBuf.
+    // newBuf is HBB so no issue in calling array()
+    ByteBuffer dup = buf.duplicate();
+    dup.position(0);
+    dup.get(newBuf.array(), newBuf.arrayOffset(), headerSize);
 
     buf = newBuf;
     // set limit to exclude next block's header
@@ -590,8 +624,9 @@ public class HFileBlock implements Cacheable {
    * @return a byte stream reading the data + checksum of this block
    */
   public DataInputStream getByteStream() {
-    return new DataInputStream(new ByteArrayInputStream(buf.array(),
-        buf.arrayOffset() + headerSize(), buf.limit() - headerSize()));
+    ByteBuffer dup = this.buf.duplicate();
+    dup.position(this.headerSize());
+    return new DataInputStream(new ByteBufferInputStream(dup));
   }
 
   @Override
@@ -1360,7 +1395,7 @@ public class HFileBlock implements Cacheable {
   private static class PrefetchedHeader {
     long offset = -1;
     byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
-    ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE);
+    final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE);
   }
 
   /** Reads version 2 blocks from the filesystem. */
@@ -1547,6 +1582,7 @@ public class HFileBlock implements Cacheable {
         if (headerBuf != null) {
           // the header has been read when reading the previous block, copy
           // to this block's header
+          // headerBuf is HBB
           System.arraycopy(headerBuf.array(),
               headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
         } else {
@@ -1591,11 +1627,13 @@ public class HFileBlock implements Cacheable {
           // in a series of reads or a random read, and we don't have access
           // to the block index. This is costly and should happen very rarely.
           headerBuf = ByteBuffer.allocate(hdrSize);
+          // headerBuf is HBB
           readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(),
               hdrSize, false, offset, pread);
         }
         b = new HFileBlock(headerBuf, fileContext.isUseHBaseChecksum());
         onDiskBlock = new byte[b.getOnDiskSizeWithHeader() + hdrSize];
+        // headerBuf is HBB
         System.arraycopy(headerBuf.array(), headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
         nextBlockOnDiskSize =
           readAtOffset(is, onDiskBlock, hdrSize, b.getOnDiskSizeWithHeader()
@@ -1685,9 +1723,8 @@ public class HFileBlock implements Cacheable {
 
   @Override
   public void serialize(ByteBuffer destination) {
-    // assumes HeapByteBuffer
-    destination.put(this.buf.array(), this.buf.arrayOffset(),
-      getSerializedLength() - EXTRA_SERIALIZATION_SPACE);
+    ByteBufferUtils.copyFromBufferToBuffer(destination, this.buf, 0, getSerializedLength()
+        - EXTRA_SERIALIZATION_SPACE);
     serializeExtraInfo(destination);
   }
 
@@ -1735,9 +1772,8 @@ public class HFileBlock implements Cacheable {
     if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader)
{
       return false;
     }
-    if (Bytes.compareTo(this.buf.array(), this.buf.arrayOffset(), this.buf.limit(),
-      castedComparison.buf.array(), castedComparison.buf.arrayOffset(),
-      castedComparison.buf.limit()) != 0) {
+    if (ByteBufferUtils.compareTo(this.buf, 0, this.buf.limit(), castedComparison.buf, 0,
+        castedComparison.buf.limit()) != 0) {
       return false;
     }
     return true;
@@ -1826,24 +1862,16 @@ public class HFileBlock implements Cacheable {
    * has minor version > 0.
    */
   static String toStringHeader(ByteBuffer buf) throws IOException {
-    int offset = buf.arrayOffset();
-    byte[] b = buf.array();
-    long magic = Bytes.toLong(b, offset);
-    BlockType bt = BlockType.read(buf);
-    offset += Bytes.SIZEOF_LONG;
-    int compressedBlockSizeNoHeader = Bytes.toInt(b, offset);
-    offset += Bytes.SIZEOF_INT;
-    int uncompressedBlockSizeNoHeader = Bytes.toInt(b, offset);
-    offset += Bytes.SIZEOF_INT;
-    long prevBlockOffset = Bytes.toLong(b, offset); 
-    offset += Bytes.SIZEOF_LONG;
-    byte cksumtype = b[offset];
-    offset += Bytes.SIZEOF_BYTE;
-    long bytesPerChecksum = Bytes.toInt(b, offset); 
-    offset += Bytes.SIZEOF_INT;
-    long onDiskDataSizeWithHeader = Bytes.toInt(b, offset); 
-    offset += Bytes.SIZEOF_INT;
-    return " Header dump: magic: " + magic +
+    byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), BlockType.MAGIC_LENGTH)];
+    buf.get(magicBuf);
+    BlockType bt = BlockType.parse(magicBuf, 0, BlockType.MAGIC_LENGTH);
+    int compressedBlockSizeNoHeader = buf.getInt();;
+    int uncompressedBlockSizeNoHeader = buf.getInt();;
+    long prevBlockOffset = buf.getLong();
+    byte cksumtype = buf.get();
+    long bytesPerChecksum = buf.getInt();
+    long onDiskDataSizeWithHeader = buf.getInt();
+    return " Header dump: magic: " + Bytes.toString(magicBuf) +
                    " blockType " + bt +
                    " compressedBlockSizeNoHeader " + 
                    compressedBlockSizeNoHeader +


Mime
View raw message