hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From li...@apache.org
Subject svn commit: r1492797 [3/3] - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/io/hfile/ main/java/org/apache/hadoop/hbase/io/hfile/bucket/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/regionserver/m...
Date Thu, 13 Jun 2013 18:18:21 GMT
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaMetrics.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaMetrics.java?rev=1492797&r1=1492796&r2=1492797&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaMetrics.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaMetrics.java Thu Jun 13 18:18:20 2013
@@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
+import org.apache.hadoop.hbase.io.hfile.LruBlockCacheFactory;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
@@ -995,7 +996,8 @@ public class SchemaMetrics {
 
   /** Validates metrics that keep track of the number of cached blocks for each category */ 
   private static void checkNumBlocksInCache() {
-    final LruBlockCache cache = (LruBlockCache) CacheConfig.getGlobalBlockCache();
+    final LruBlockCache cache =
+        LruBlockCacheFactory.getInstance().getCurrentBlockCacheInstance();
     if (cache == null) {
       // There is no global block cache instantiated. Most likely there is no mini-cluster running.
       return;

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java?rev=1492797&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java Thu Jun 13 18:18:20 2013
@@ -0,0 +1,182 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * This class manages an array of ByteBuffers with a default size 4MB. These
+ * buffers are sequential and could be considered as a large buffer.It supports
+ * reading/writing data from this large buffer with a position and offset
+ */
+public class ByteBufferArray {
+
+  private static final Log LOG = LogFactory.getLog(ByteBufferArray.class);
+
+  private final ByteBuffer buffers[];
+  private final Lock locks[];
+  private final int bufferSize;
+  private final int bufferCount;
+
+  /**
+   * We allocate a number of byte buffers as the capacity. In order not to out
+   * of the array bounds for the last byte(see {@link ByteBufferArray#multiple}),
+   * we will allocate one additional buffer with capacity 0;
+   * @param capacity total size of the byte buffer array
+   * @param directByteBuffer true if we allocate direct buffer
+   */
+  public ByteBufferArray(long capacity, int bufferSize, boolean directByteBuffer) {
+    if (bufferSize > (capacity / 16)) {
+      bufferSize = (int) roundUp(capacity / 16, 32768);
+    }
+    this.bufferSize = bufferSize;
+    this.bufferCount = (int) (roundUp(capacity, bufferSize) / bufferSize);
+    LOG.info("Allocating buffers total=" + StringUtils.byteDesc(capacity)
+        + " , sizePerBuffer=" + StringUtils.byteDesc(bufferSize) + ", count="
+        + bufferCount);
+    buffers = new ByteBuffer[bufferCount + 1];
+    locks = new Lock[bufferCount + 1];
+    for (int i = 0; i <= bufferCount; i++) {
+      locks[i] = new ReentrantLock();
+      if (i < bufferCount) {
+        buffers[i] = directByteBuffer ? ByteBuffer.allocateDirect(bufferSize) :
+            ByteBuffer.allocate(bufferSize);
+      } else {
+        buffers[i] = ByteBuffer.allocate(0);
+      }
+
+    }
+  }
+
+  private static long roundUp(long n, long to) {
+    return ((n + to - 1) / to) * to;
+  }
+
+  /**
+   * Transfers bytes from this buffer array into the given destination array
+   * @param start start offset of this buffer array
+   * @param len The maximum number of bytes to be written to the given array
+   * @param dstArray The array into which bytes are to be written
+   * @param dstOffset The offset within the given array of the first byte to be
+   *          written
+   */
+  public void getMultiple(long start, int len, byte[] dstArray, int dstOffset) {
+    multiple(start, len, dstArray, dstOffset, new Visitor() {
+      public void visit(ByteBuffer bb, byte[] array, int arrayIdx, int len) {
+        bb.get(array, arrayIdx, len);
+      }
+    });
+  }
+
+  /**
+   * Transfers bytes from the given source array into this buffer array
+   * @param start start offset of this buffer array
+   * @param len The maximum number of bytes to be read from the given array
+   * @param srcArray The array from which bytes are to be read
+   * @param srcOffset The offset within the given array of the first byte to be
+   *          read
+   */
+  public void putMultiple(long start, int len, byte[] srcArray, int srcOffset) {
+    multiple(start, len, srcArray, srcOffset, new Visitor() {
+      public void visit(ByteBuffer bb, byte[] array, int arrayIdx, int len) {
+        bb.put(array, arrayIdx, len);
+      }
+    });
+  }
+
+  private interface Visitor {
+    /**
+     * Visit the given byte buffer, if it is a read action, we will transfer the
+     * bytes from the buffer to the destination array, else if it is a write
+     * action, we will transfer the bytes from the source array to the buffer
+     * @param bb byte buffer
+     * @param array a source or destination byte array
+     * @param arrayOffset offset of the byte array
+     * @param len read/write length
+     */
+    void visit(ByteBuffer bb, byte[] array, int arrayOffset, int len);
+  }
+
+  /**
+   * Access(read or write) this buffer array with a position and length as the
+   * given array. Here we will only lock one buffer even if it may be need visit
+   * several buffers. The consistency is guaranteed by the caller.
+   * @param start start offset of this buffer array
+   * @param len The maximum number of bytes to be accessed
+   * @param array The array from/to which bytes are to be read/written
+   * @param arrayOffset The offset within the given array of the first byte to
+   *          be read or written
+   * @param visitor implement of how to visit the byte buffer
+   */
+  void multiple(long start, int len, byte[] array, int arrayOffset, Visitor visitor) {
+    Preconditions.checkArgument(len >= 0);
+    Preconditions.checkArgument(len + arrayOffset <= array.length);
+    long end = start + len;
+    int startBuffer = Preconditions.checkPositionIndex(
+        (int) (start / bufferSize), bufferCount);
+    int endBuffer = (int) (end / bufferSize);
+    int endOffset = (int) (end % bufferSize);
+    Preconditions.checkArgument(endBuffer >= 0 && endBuffer < bufferCount
+        || (endBuffer == bufferCount && endOffset == 0));
+
+    int startOffset = (int) (start % bufferSize);
+    if (startBuffer >= locks.length || startBuffer < 0) {
+      String msg = "Failed multiple, start=" + start + ",startBuffer="
+          + startBuffer + ",bufferSize=" + bufferSize;
+      LOG.error(msg);
+      throw new RuntimeException(msg);
+    }
+    int srcIndex = 0, cnt = -1;
+    for (int i = startBuffer; i <= endBuffer; ++i) {
+      Lock lock = locks[i];
+      lock.lock();
+      try {
+        ByteBuffer bb = buffers[i];
+        if (i == startBuffer) {
+          cnt = bufferSize - startOffset;
+          if (cnt > len) {
+            cnt = len;
+          }
+          bb.limit(startOffset + cnt).position(startOffset);
+        } else if (i == endBuffer) {
+          cnt = endOffset;
+          bb.limit(cnt).position(0);
+        } else {
+          cnt = bufferSize ;
+          bb.limit(cnt).position(0);
+        }
+        visitor.visit(bb, array, srcIndex + arrayOffset, cnt);
+        srcIndex += cnt;
+      } finally {
+        lock.unlock();
+      }
+    }
+    if (srcIndex != len) {
+      throw new IllegalStateException("srcIndex(" + srcIndex + ") != len(" +
+          len + ")!");
+    }
+  }
+}

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/ConditionUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/ConditionUtil.java?rev=1492797&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/ConditionUtil.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/ConditionUtil.java Thu Jun 13 18:18:20 2013
@@ -0,0 +1,62 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Extends the patterns in {@link Preconditions}
+ */
+public class ConditionUtil {
+
+  /**
+   * Checks if a specified offset is >= 0
+   * @param offset The offset to check
+   * @return The specified offset if it is >= 0
+   * @throws IndexOutOfBoundsException If specified offset is negative
+   */
+  public static long checkPositiveOffset(long offset) {
+    return checkOffset(offset, -1);
+  }
+
+  /**
+   * Check if an offset is >= 0 but less than a maximum limit (if one is
+   * specified).
+   * @see {@link Preconditions#checkPositionIndex(int, int)}
+   * @param offset The offset to check
+   * @param limit The maximum limit or -1 if none
+   * @return The specified offset if it is positive and if the a limit is
+   *         specified lower than that limit.
+   * @throws IllegalStateException If the offset is negative, or if a limit
+   *         is specified and the offset is greater than the limit.
+   */
+  public static long checkOffset(long offset, long limit) {
+    if (offset < 0) {
+      throw new IndexOutOfBoundsException("Negative offset: " + offset);
+    }
+    if (limit != -1 && offset >= limit) {
+      throw new IndexOutOfBoundsException("Offset (" + offset +
+          ") is greater than limit (" + limit + ")");
+    }
+    return offset;
+  }
+}

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/DirectMemoryUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/DirectMemoryUtils.java?rev=1492797&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/DirectMemoryUtils.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/DirectMemoryUtils.java Thu Jun 13 18:18:20 2013
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+/**
+ * Utilities for dealing with direct (off-heap) allocated memory
+ */
+public class DirectMemoryUtils {
+
+  /**
+   * Get the maximum amount of available off-heap memory
+   * @return The maximum amount of off-heap memory available in bytes
+   *
+   * TODO (avf): do not use proprietary sun.misc.VM API, parse command line
+   *             args instead to avoid the warning
+   */
+  public static long getDirectMemorySize() {
+    return sun.misc.VM.maxDirectMemory();
+  }
+}

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/RandomSeek.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/RandomSeek.java?rev=1492797&r1=1492796&r2=1492797&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/RandomSeek.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/RandomSeek.java Thu Jun 13 18:18:20 2013
@@ -68,8 +68,11 @@ public class RandomSeek {
     long start = System.currentTimeMillis();
     SimpleBlockCache cache = new SimpleBlockCache();
     //LruBlockCache cache = new LruBlockCache();
-    CacheConfig cacheConf = new CacheConfig(cache, true, false, false, false,
-        false, false, false, false, 1);
+    CacheConfig cacheConf = new CacheConfig.CacheConfigBuilder()
+        .withBlockCache(cache)
+        .withCacheDataOnRead(true)
+        .withCacheOnCompactionThreshold(1)
+        .build();
     Reader reader = HFile.createReader(lfs, path, cacheConf);
     reader.loadFileInfo();
     System.out.println(reader.getTrailer());

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCachedBlockQueue.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCachedBlockQueue.java?rev=1492797&r1=1492796&r2=1492797&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCachedBlockQueue.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCachedBlockQueue.java Thu Jun 13 18:18:20 2013
@@ -23,6 +23,8 @@ import junit.framework.TestCase;
 
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 
+import java.nio.ByteBuffer;
+
 public class TestCachedBlockQueue extends TestCase {
 
   public void testQueue() throws Exception {

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java?rev=1492797&r1=1492796&r2=1492797&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java Thu Jun 13 18:18:20 2013
@@ -268,7 +268,7 @@ public class TestHFileBlock {
       int numBlocksRead = 0;
       long pos = 0;
       while (pos < totalSize) {
-        b = hbr.readBlockData(pos, block.length, uncompressedSizeV1);
+        b = hbr.readBlockData(pos, block.length, uncompressedSizeV1, false);
         b.sanityCheck();
         pos += block.length;
         numBlocksRead++;
@@ -301,8 +301,8 @@ public class TestHFileBlock {
 
       FSDataInputStream is = fs.open(path);
       HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(is, algo,
-          totalSize);
-      HFileBlock b = hbr.readBlockData(0, -1, -1);
+          totalSize, null, null);
+      HFileBlock b = hbr.readBlockData(0, -1, -1, false);
       is.close();
 
       b.sanityCheck();
@@ -312,13 +312,13 @@ public class TestHFileBlock {
 
       if (algo == GZ) {
         is = fs.open(path);
-        hbr = new HFileBlock.FSReaderV2(is, algo, totalSize);
-        b = hbr.readBlockData(0, 2173 + HFileBlock.HEADER_SIZE, -1);
+        hbr = new HFileBlock.FSReaderV2(is, algo, totalSize, null, null);
+        b = hbr.readBlockData(0, 2173 + HFileBlock.HEADER_SIZE, -1, false);
         assertEquals(blockStr, b.toString());
         int wrongCompressedSize = 2172;
         try {
           b = hbr.readBlockData(0, wrongCompressedSize
-              + HFileBlock.HEADER_SIZE, -1);
+              + HFileBlock.HEADER_SIZE, -1, false);
           fail("Exception expected");
         } catch (IOException ex) {
           String expectedPrefix = "On-disk size without header provided is "
@@ -369,7 +369,7 @@ public class TestHFileBlock {
 
         FSDataInputStream is = fs.open(path);
         HFileBlock.FSReaderV2 hbr = new HFileBlock.FSReaderV2(is, algo,
-            totalSize);
+            totalSize, null, null);
         hbr.setDataBlockEncoder(dataBlockEncoder);
         hbr.setIncludesMemstoreTS(includesMemstoreTS);
 
@@ -377,7 +377,7 @@ public class TestHFileBlock {
         int pos = 0;
         LOG.info("\n\nStarting to read blocks\n");
         for (int blockId = 0; blockId < numBlocks; ++blockId) {
-          b = hbr.readBlockData(pos, -1, -1);
+          b = hbr.readBlockData(pos, -1, -1, false);
           b.sanityCheck();
           pos += b.getOnDiskSizeWithHeader();
 
@@ -462,12 +462,12 @@ public class TestHFileBlock {
 
         FSDataInputStream is = fs.open(path);
         HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(is, algo,
-            totalSize);
+            totalSize, null, null);
         long curOffset = 0;
         for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
           assertEquals(expectedOffsets.get(i).longValue(), curOffset);
           LOG.info("Reading block #" + i + " at offset " + curOffset);
-          HFileBlock b = hbr.readBlockData(curOffset, -1, -1);
+          HFileBlock b = hbr.readBlockData(curOffset, -1, -1, false);
           LOG.info("Block #" + i + ": " + b);
           assertEquals("Invalid block #" + i + "'s type:",
               expectedTypes.get(i), b.getBlockType());
@@ -480,7 +480,7 @@ public class TestHFileBlock {
           // Now re-load this block knowing the on-disk size. This tests a
           // different branch in the loader.
           HFileBlock b2 = hbr.readBlockData(curOffset,
-              b.getOnDiskSizeWithHeader(), -1);
+              b.getOnDiskSizeWithHeader(), -1, false);
           b2.sanityCheck();
 
           assertEquals(b.getBlockType(), b2.getBlockType());
@@ -568,7 +568,7 @@ public class TestHFileBlock {
         HFileBlock b;
         try {
           long onDiskSizeArg = withOnDiskSize ? expectedSize : -1;
-          b = hbr.readBlockData(offset, onDiskSizeArg, -1);
+          b = hbr.readBlockData(offset, onDiskSizeArg, -1, false);
         } catch (IOException ex) {
           LOG.error("Error in client " + clientId + " trying to read block at "
               + offset + ", pread=" + ", withOnDiskSize=" +
@@ -605,7 +605,7 @@ public class TestHFileBlock {
       FSDataInputStream is = fs.open(path);
       long fileSize = fs.getFileStatus(path).getLen();
       HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(is, compressAlgo,
-          fileSize);
+          fileSize, null, null);
 
       Executor exec = Executors.newFixedThreadPool(NUM_READER_THREADS);
       ExecutorCompletionService<Boolean> ecs =

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java?rev=1492797&r1=1492796&r2=1492797&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java Thu Jun 13 18:18:20 2013
@@ -149,7 +149,7 @@ public class TestHFileBlockIndex {
       }
 
       missCount += 1;
-      prevBlock = realReader.readBlockData(offset, onDiskSize, -1);
+      prevBlock = realReader.readBlockData(offset, onDiskSize, -1, false);
       prevOffset = offset;
       prevOnDiskSize = onDiskSize;
       if (kvContext != null) {
@@ -165,7 +165,7 @@ public class TestHFileBlockIndex {
 
     FSDataInputStream istream = fs.open(path);
     HFileBlock.FSReader blockReader = new HFileBlock.FSReaderV2(istream,
-        compr, fs.getFileStatus(path).getLen());
+        compr, fs.getFileStatus(path).getLen(), null, null);
 
     BlockReaderWrapper brw = new BlockReaderWrapper(blockReader);
     HFileBlockIndex.BlockIndexReader indexReader =
@@ -216,7 +216,7 @@ public class TestHFileBlockIndex {
         includesMemstoreTS);
     FSDataOutputStream outputStream = fs.create(path);
     HFileBlockIndex.BlockIndexWriter biw =
-        new HFileBlockIndex.BlockIndexWriter(hbw, null, null);
+        new HFileBlockIndex.BlockIndexWriter(hbw, null, null, null);
 
     for (int i = 0; i < NUM_DATA_BLOCKS; ++i) {
       hbw.startWriting(BlockType.DATA);

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java?rev=1492797&r1=1492796&r2=1492797&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java Thu Jun 13 18:18:20 2013
@@ -124,7 +124,7 @@ public class TestHFileWriterV2 {
     assertEquals(ENTRY_COUNT, trailer.getEntryCount());
 
     HFileBlock.FSReader blockReader =
-        new HFileBlock.FSReaderV2(fsdis, COMPRESS_ALGO, fileSize);
+        new HFileBlock.FSReaderV2(fsdis, COMPRESS_ALGO, fileSize, null, null);
     // Comparator class name is stored in the trailer in version 2.
     RawComparator<byte []> comparator = trailer.createComparator();
     HFileBlockIndex.BlockIndexReader dataBlockIndexReader =
@@ -164,7 +164,7 @@ public class TestHFileWriterV2 {
     fsdis.seek(0);
     long curBlockPos = 0;
     while (curBlockPos <= trailer.getLastDataBlockOffset()) {
-      HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1);
+      HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false);
       assertEquals(BlockType.DATA, block.getBlockType());
       ByteBuffer buf = block.getBufferWithoutHeader();
       while (buf.hasRemaining()) {
@@ -207,7 +207,7 @@ public class TestHFileWriterV2 {
     while (curBlockPos < trailer.getLoadOnOpenDataOffset()) {
       LOG.info("Current offset: " + fsdis.getPos() + ", scanning until " +
           trailer.getLoadOnOpenDataOffset());
-      HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1);
+      HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false);
       assertEquals(BlockType.META, block.getBlockType());
       Text t = new Text();
       block.readInto(t);

Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestL2BucketCache.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestL2BucketCache.java?rev=1492797&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestL2BucketCache.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestL2BucketCache.java Thu Jun 13 18:18:20 2013
@@ -0,0 +1,269 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
+import org.apache.hadoop.hbase.regionserver.CreateRandomStoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.BloomFilterFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests L2 bucket cache for correctness
+ */
+@RunWith(Parameterized.class)
+public class TestL2BucketCache {
+
+  private static final Log LOG = LogFactory.getLog(TestL2BucketCache.class);
+
+  private static final int DATA_BLOCK_SIZE = 2048;
+  private static final int NUM_KV = 25000;
+  private static final int INDEX_BLOCK_SIZE = 512;
+  private static final int BLOOM_BLOCK_SIZE = 4096;
+  private static final StoreFile.BloomType BLOOM_TYPE =
+      StoreFile.BloomType.ROWCOL;
+
+  private static final HBaseTestingUtility TEST_UTIL =
+      new HBaseTestingUtility();
+  private static final HFileDataBlockEncoderImpl ENCODER =
+      new HFileDataBlockEncoderImpl(DataBlockEncoding.PREFIX);
+
+  private BucketCache underlyingCache;
+  private MockedL2Cache mockedL2Cache;
+
+  private Configuration conf;
+  private CacheConfig cacheConf;
+  private FileSystem fs;
+  private Path storeFilePath;
+
+  private final Random rand = new Random(12983177L);
+  private final String ioEngineName;
+
+  public TestL2BucketCache(String ioEngineName) {
+    this.ioEngineName = ioEngineName;
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> getConfiguration() {
+    Object[][] data = new Object[][] { {"heap"}, {"offheap"}};
+    return Arrays.asList(data);
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    conf = TEST_UTIL.getConfiguration();
+    conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION);
+    conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, INDEX_BLOCK_SIZE);
+    conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE,
+        BLOOM_BLOCK_SIZE);
+    conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_FLUSH_KEY, true);
+    conf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, true);
+    conf.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, true);
+    conf.setBoolean(CacheConfig.L2_CACHE_BLOCKS_ON_FLUSH_KEY, true);
+    underlyingCache = new BucketCache(ioEngineName,
+        32 * DATA_BLOCK_SIZE * 1024,
+        BucketCache.DEFAULT_WRITER_QUEUE_ITEMS,
+        BucketCache.DEFAULT_WRITER_QUEUE_ITEMS,
+        BucketCache.DEFAULT_ERROR_TOLERATION_DURATION,
+        CacheConfig.DEFAULT_L2_BUCKET_CACHE_BUCKET_SIZES,
+        conf);
+    mockedL2Cache = new MockedL2Cache(underlyingCache);
+
+
+    fs = FileSystem.get(conf);
+    cacheConf = new CacheConfig.CacheConfigBuilder(conf)
+        .withL2Cache(mockedL2Cache)
+        .build();
+  }
+
+  @After
+  public void tearDown() {
+    underlyingCache.shutdown();
+  }
+
+  // Tests cache on write: when writing to an HFile, the data being written
+  // should also be placed in the L2 cache.
+  @Test
+  public void testCacheOnWrite() throws Exception {
+    writeStoreFile();
+    DataBlockEncoding encodingInCache = ENCODER.getEncodingInCache();
+    HFileReaderV2 reader = (HFileReaderV2) HFile.createReaderWithEncoding(fs,
+        storeFilePath, cacheConf, encodingInCache);
+    HFileScanner scanner = reader.getScanner(false, false);
+    assertTrue(scanner.seekTo());
+    long offset = 0;
+    long cachedCount = 0;
+    while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
+      mockedL2Cache.enableReads.set(false);
+      HFileBlock blockFromDisk;
+      try {
+        blockFromDisk = reader.readBlock(offset, -1, false, false, null, null);
+      } finally {
+        mockedL2Cache.enableReads.set(true);
+      }
+      boolean isInL1Lcache = cacheConf.getBlockCache().getBlock(
+          new BlockCacheKey(reader.getName(), offset, encodingInCache,
+              blockFromDisk.getBlockType()), true) != null;
+      if (isInL1Lcache) {
+        cachedCount++;
+        byte[] blockFromCacheRaw =
+            mockedL2Cache.getRawBlock(reader.getName(), offset);
+        assertNotNull("All blocks in l1 cache, should also be in l2 cache: "
+            + blockFromDisk.toString(), blockFromCacheRaw);
+        HFileBlock blockFromL2Cache = HFileBlock.fromBytes(blockFromCacheRaw,
+            Compression.Algorithm.GZ, true, offset);
+        assertEquals("Data in block from disk (" + blockFromDisk +
+            ") should match data in block from cache (" + blockFromL2Cache +
+            ").", blockFromL2Cache.getBufferWithHeader(),
+            blockFromDisk.getBufferWithHeader());
+        assertEquals(blockFromDisk, blockFromL2Cache);
+      }
+      offset += blockFromDisk.getOnDiskSizeWithHeader();
+    }
+    assertTrue("> 0 blocks must be cached in L2Cache", cachedCount > 0);
+  }
+
+  // Tests cache on read: when blocks are read from an HFile they should
+  // be cached in the L2 cache.
+  @Test
+  public void testCacheOnRead() throws Exception {
+    writeStoreFile();
+    DataBlockEncoding encodingInCache = ENCODER.getEncodingInCache();
+    HFileReaderV2 reader = (HFileReaderV2) HFile.createReaderWithEncoding(fs,
+        storeFilePath, cacheConf, encodingInCache);
+    long offset = 0;
+    cacheConf.getBlockCache().clearCache();
+    underlyingCache.clearCache();
+    while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
+      HFileBlock blockFromDisk = reader.readBlock(offset, -1, true, false, null, null);
+      assertNotNull(mockedL2Cache.getRawBlock(reader.getName(), offset));
+      cacheConf.getBlockCache().evictBlock(new BlockCacheKey(reader.getName(),
+          offset, encodingInCache, blockFromDisk.getBlockType()));
+      HFileBlock blockFromL2Cache = reader.readBlock(offset, -1, true, false,
+          null, null);
+      assertEquals("Data in block from disk (" + blockFromDisk +
+          ") should match data in block from cache (" + blockFromL2Cache +
+          ").", blockFromL2Cache.getBufferWithHeader(),
+          blockFromDisk.getBufferWithHeader());
+      assertEquals(blockFromDisk, blockFromL2Cache);
+      offset += blockFromDisk.getOnDiskSizeWithHeader();
+    }
+    assertTrue("This test must have read > 0 blocks", offset > 0);
+  }
+
+  private void writeStoreFile() throws IOException {
+    Path storeFileParentDir = new Path(TEST_UTIL.getTestDir(),
+        "test_cache_on_write");
+    StoreFile.Writer sfw = new StoreFile.WriterBuilder(conf, cacheConf, fs,
+        DATA_BLOCK_SIZE)
+        .withOutputDir(storeFileParentDir)
+        .withCompression(Compression.Algorithm.GZ)
+        .withDataBlockEncoder(ENCODER)
+        .withComparator(KeyValue.COMPARATOR)
+        .withBloomType(BLOOM_TYPE)
+        .withMaxKeyCount(NUM_KV)
+        .build();
+
+    final int rowLen = 32;
+    for (int i = 0; i < NUM_KV; ++i) {
+      byte[] k = TestHFileWriterV2.randomOrderedKey(rand, i);
+      byte[] v = TestHFileWriterV2.randomValue(rand);
+      int cfLen = rand.nextInt(k.length - rowLen + 1);
+      KeyValue kv = new KeyValue(
+          k, 0, rowLen,
+          k, rowLen, cfLen,
+          k, rowLen + cfLen, k.length - rowLen - cfLen,
+          rand.nextLong(),
+          CreateRandomStoreFile.generateKeyType(rand),
+          v, 0, v.length);
+      sfw.append(kv);
+    }
+    sfw.close();
+    storeFilePath = sfw.getPath();
+  }
+
+  // Mocked implementation which allows reads to be enabled and disabled
+  // at run time during the tests. Adds additional trace logging that can
+  // enabled during unit tests for further debugging.
+  private static class MockedL2Cache implements L2Cache {
+
+    final L2BucketCache underlying;
+    final AtomicBoolean enableReads = new AtomicBoolean(true);
+
+    MockedL2Cache(BucketCache underlying) throws IOException {
+      this.underlying = new L2BucketCache(underlying);
+    }
+
+    @Override
+    public byte[] getRawBlock(String hfileName, long dataBlockOffset) {
+      byte[] ret = null;
+      if (enableReads.get()) {
+        ret = underlying.getRawBlock(hfileName, dataBlockOffset);
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Cache " + (ret == null ?"miss":"hit")  +
+              " for hfileName=" + hfileName + ", offset=" + dataBlockOffset);
+        }
+      }
+      return ret;
+    }
+
+    @Override
+    public void cacheRawBlock(String hfileName, long dataBlockOffset,
+        byte[] rawBlock) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Caching " + rawBlock.length + " bytes, hfileName=" +
+            hfileName + ", offset=" + dataBlockOffset);
+      }
+      underlying.cacheRawBlock(hfileName, dataBlockOffset, rawBlock);
+    }
+
+    @Override
+    public int evictBlocksByHfileName(String hfileName) {
+      return underlying.evictBlocksByHfileName(hfileName);
+    }
+
+    @Override
+    public void shutdown() {
+      underlying.shutdown();
+    }
+  }
+}

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java?rev=1492797&r1=1492796&r2=1492797&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java Thu Jun 13 18:18:20 2013
@@ -19,6 +19,7 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
+import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Map;
 import java.util.Random;

Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java?rev=1492797&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java Thu Jun 13 18:18:20 2013
@@ -0,0 +1,273 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.MultithreadedTestUtil;
+
+import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static junit.framework.Assert.assertNull;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class TestBucketCache {
+
+  private static final Log LOG = LogFactory.getLog(TestBucketCache.class);
+
+  private static final long CAPACITY_SIZE = 32 * 1024 * 1024;
+
+  private static final int CACHE_SIZE = 1000000;
+  private static final int NUM_BLOCKS = 100;
+  private static final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS;
+  private static final int NUM_THREADS = 1000;
+  private static final int NUM_QUERIES = 10000;
+
+  private final String ioEngineName;
+
+  private BucketCache cache;
+
+
+
+  public TestBucketCache(String ioEngineName) {
+    this.ioEngineName = ioEngineName;
+    LOG.info("Running with ioEngineName = " + ioEngineName);
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> getConfigurations() {
+    Object[][] data = new Object[][] { {"heap"}, {"offheap"} };
+    return Arrays.asList(data);
+  }
+
+  @Before
+  public void setup() throws IOException {
+    cache = new MockedBucketCache(ioEngineName, CAPACITY_SIZE,
+        BucketCache.DEFAULT_WRITER_THREADS,
+        BucketCache.DEFAULT_WRITER_QUEUE_ITEMS);
+  }
+
+  @After
+  public void tearDown() {
+    cache.shutdown();
+  }
+
+  @Test
+  public void testBucketAllocator() throws BucketAllocatorException {
+    BucketAllocator mAllocator = cache.getAllocator();
+    /*
+     * Test the allocator first
+     */
+    int[] blockSizes = new int[2];
+    blockSizes[0] = 4 * 1024;
+    blockSizes[1] = 8 * 1024;
+    boolean full = false;
+    int i = 0;
+    ArrayList<Long> allocations = new ArrayList<Long>();
+    // Fill the allocated extents
+    while (!full) {
+      try {
+        allocations.add(mAllocator.allocateBlock(blockSizes[i
+            % blockSizes.length]));
+        ++i;
+      } catch (CacheFullException cfe) {
+        full = true;
+      }
+    }
+
+    for (i = 0; i < blockSizes.length; i++) {
+      BucketAllocator.BucketSizeInfo bucketSizeInfo = mAllocator
+          .roundUpToBucketSizeInfo(blockSizes[0]);
+      BucketAllocator.IndexStatistics indexStatistics = bucketSizeInfo.statistics();
+      assertTrue(indexStatistics.freeCount() == 0);
+    }
+
+    for (long offset : allocations) {
+      assertTrue(mAllocator.sizeOfAllocation(offset) == mAllocator
+          .freeBlock(offset));
+    }
+    assertTrue(mAllocator.getUsedSize() == 0);
+  }
+
+  @Test
+  public void testCacheSimple() throws Exception {
+    BlockOnDisk[] blocks = generateDiskBlocks(NUM_QUERIES,
+        BLOCK_SIZE);
+    // Confirm empty
+    for (BlockOnDisk block : blocks) {
+      assertNull(cache.getBlock(block.blockName, true));
+    }
+
+    // Add blocks
+    for (BlockOnDisk block : blocks) {
+      cache.cacheBlock(block.blockName, block.block);
+    }
+
+    // Check if all blocks are properly cached and contain the right
+    // information, or the blocks are null.
+    // MapMaker makes no guarantees when it will evict, so neither can we.
+
+    for (BlockOnDisk block : blocks) {
+      byte[] buf = cache.getBlock(block.blockName, true);
+      if (buf != null) {
+        assertArrayEquals(block.block, buf);
+      }
+
+    }
+
+    // Re-add some duplicate blocks. Hope nothing breaks.
+
+    for (BlockOnDisk block : blocks) {
+      try {
+        if (cache.getBlock(block.blockName, true) != null) {
+          cache.cacheBlock(block.blockName, block.block);
+        }
+      } catch (RuntimeException re) {
+        // expected
+      }
+    }
+  }
+
+  @Test
+  public void testCacheMultiThreadedSingleKey() throws Exception {
+    final BlockCacheKey key = new BlockCacheKey("key", 0);
+    final byte[] buf = new byte[5 * 1024];
+    Arrays.fill(buf, (byte) 5);
+
+    Configuration conf = new Configuration();
+    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(
+        conf);
+
+    final AtomicInteger totalQueries = new AtomicInteger();
+    cache.cacheBlock(key, buf);
+
+    for (int i = 0; i < NUM_THREADS; i++) {
+      MultithreadedTestUtil.TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
+        @Override
+        public void doAnAction() throws Exception {
+          byte[] returned = cache.getBlock(key, false);
+          assertArrayEquals(buf, returned);
+          totalQueries.incrementAndGet();
+        }
+      };
+
+      t.setDaemon(true);
+      ctx.addThread(t);
+    }
+
+    ctx.startThreads();
+    while (totalQueries.get() < NUM_QUERIES && ctx.shouldRun()) {
+      Thread.sleep(10);
+    }
+    ctx.stop();
+  }
+
+  @Test
+  public void testHeapSizeChanges() throws Exception {
+    cache.stopWriterThreads();
+    BlockOnDisk[] blocks = generateDiskBlocks(BLOCK_SIZE, 1);
+    long heapSize = cache.heapSize();
+    cache.cacheBlock(blocks[0].blockName, blocks[0].block);
+
+    /*When we cache something HeapSize should always increase */
+    assertTrue(heapSize < cache.heapSize());
+
+    cache.evictBlock(blocks[0].blockName);
+
+    /*Post eviction, heapsize should be the same */
+    assertEquals(heapSize, cache.heapSize());
+  }
+
+  private static class BlockOnDisk {
+    BlockCacheKey blockName;
+    byte[] block;
+  }
+
+  private static BlockOnDisk[] generateDiskBlocks(int blockSize,
+      int numBlocks) {
+    BlockOnDisk[] retVal = new BlockOnDisk[numBlocks];
+    Random rand = new Random();
+    HashSet<String> usedStrings = new HashSet<String>();
+    for (int i = 0; i < numBlocks; i++) {
+      byte[] generated = new byte[blockSize];
+      rand.nextBytes(generated);
+      String strKey;
+      strKey = Long.toString(rand.nextLong());
+      while (!usedStrings.add(strKey)) {
+        strKey = Long.toString(rand.nextLong());
+      }
+      retVal[i] = new BlockOnDisk();
+      retVal[i].blockName = new BlockCacheKey(strKey, 0);
+      retVal[i].block = generated;
+    }
+    return retVal;
+  }
+
+  private static class MockedBucketCache extends BucketCache {
+
+    public MockedBucketCache(String ioEngineName, long capacity,
+        int writerThreads,
+        int writerQLen) throws IOException {
+      super(ioEngineName,
+          capacity,
+          writerThreads,
+          writerQLen,
+          DEFAULT_ERROR_TOLERATION_DURATION,
+          CacheConfig.DEFAULT_L2_BUCKET_CACHE_BUCKET_SIZES,
+          null);
+      super.wait_when_cache = true;
+    }
+
+    @Override
+    public void cacheBlock(BlockCacheKey cacheKey, byte[] buf,
+        boolean inMemory) {
+      if (super.getBlock(cacheKey, true, false) != null) {
+        throw new RuntimeException("Cached an already cached block");
+      }
+      super.cacheBlock(cacheKey, buf, inMemory);
+    }
+
+    @Override
+    public void cacheBlock(BlockCacheKey cacheKey, byte[] buf) {
+      if (super.getBlock(cacheKey, true, false) != null) {
+        throw new RuntimeException("Cached an already cached block");
+      }
+      super.cacheBlock(cacheKey, buf);
+    }
+  }
+}

Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java?rev=1492797&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java Thu Jun 13 18:18:20 2013
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for {@link ByteBufferIOEngine}
+ */
+@RunWith(Parameterized.class)
+public class TestByteBufferIOEngine {
+
+  private static final Log LOG = LogFactory.getLog(TestByteBufferIOEngine.class);
+
+  private final boolean isDirect;
+
+  public TestByteBufferIOEngine(boolean isDirect) {
+    this.isDirect = isDirect;
+    LOG.info("Running with direct allocation " +
+        (isDirect? "enabled" : "disabled"));
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> getConfigurations() {
+    Object[][] data = new Object[][] { {true}, {false} };
+    return Arrays.asList(data);
+  }
+
+  @Test
+  public void testByteBufferIOEngine() throws Exception {
+    int capacity = 32 * 1024 * 1024; // 32 MB
+    int testNum = 100;
+    int maxBlockSize = 64 * 1024;
+    ByteBufferIOEngine ioEngine = new ByteBufferIOEngine(capacity,
+        2 * 1024 * 1024, isDirect);
+    int testOffsetAtStartNum = testNum / 10;
+    int testOffsetAtEndNum = testNum / 10;
+    for (int i = 0; i < testNum; i++) {
+      byte val = (byte) (Math.random() * 255);
+      int blockSize = (int) (Math.random() * maxBlockSize);
+      byte[] byteArray = new byte[blockSize];
+      for (int j = 0; j < byteArray.length; ++j) {
+        byteArray[j] = val;
+      }
+      int offset = 0;
+      if (testOffsetAtStartNum > 0) {
+        testOffsetAtStartNum--;
+        offset = 0;
+      } else if (testOffsetAtEndNum > 0) {
+        testOffsetAtEndNum--;
+        offset = capacity - blockSize;
+      } else {
+        offset = (int) (Math.random() * (capacity - maxBlockSize));
+      }
+      ioEngine.write(byteArray, offset);
+      byte[] dst = new byte[blockSize];
+      ioEngine.read(dst, offset);
+      for (int j = 0; j < byteArray.length; ++j) {
+        assertTrue(byteArray[j] == dst[j]);
+      }
+    }
+    assert testOffsetAtStartNum == 0;
+    assert testOffsetAtEndNum == 0;
+  }
+}

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/EncodedSeekPerformanceTest.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/EncodedSeekPerformanceTest.java?rev=1492797&r1=1492796&r2=1492797&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/EncodedSeekPerformanceTest.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/EncodedSeekPerformanceTest.java Thu Jun 13 18:18:20 2013
@@ -74,7 +74,8 @@ public class EncodedSeekPerformanceTest 
       allKeyValues.add(current);
     }
 
-    storeFile.closeReader(cacheConf.shouldEvictOnClose());
+    storeFile.closeReader(cacheConf.shouldEvictOnClose(),
+        cacheConf.shouldL2EvictOnClose());
 
     // pick seeks by random
     List<KeyValue> seeks = new ArrayList<KeyValue>();
@@ -135,7 +136,8 @@ public class EncodedSeekPerformanceTest 
     double seeksPerSec = (seeks.size() * NANOSEC_IN_SEC) /
         (finishSeeksTime - startSeeksTime);
 
-    storeFile.closeReader(cacheConf.shouldEvictOnClose());
+    storeFile.closeReader(cacheConf.shouldEvictOnClose(),
+        cacheConf.shouldL2EvictOnClose());
     clearBlockCache();
 
     System.out.println(blockEncoder);



Mime
View raw message