hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nspiegelb...@apache.org
Subject svn commit: r1181554 [2/2] - in /hbase/branches/0.89/src: main/java/org/apache/hadoop/hbase/io/ main/java/org/apache/hadoop/hbase/io/hfile/ test/java/org/apache/hadoop/hbase/io/hfile/
Date Tue, 11 Oct 2011 02:19:22 GMT
Added: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java?rev=1181554&view=auto
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
(added)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
Tue Oct 11 02:19:21 2011
@@ -0,0 +1,482 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.io.compress.Compressor;
+
+import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.*;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestHFileBlock {
+
+  private static final boolean[] BOOLEAN_VALUES = new boolean[] { false, true };
+
+  private static final Log LOG = LogFactory.getLog(TestHFileBlock.class);
+
+  static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = {
+      NONE, GZ };
+
+  // In case we need to temporarily switch some test cases to just test gzip.
+  static final Compression.Algorithm[] GZIP_ONLY  = { GZ };
+
+  private static final int NUM_TEST_BLOCKS = 1000;
+
+  private static final int NUM_READER_THREADS = 26;
+
+  private static final HBaseTestingUtility TEST_UTIL =
+      new HBaseTestingUtility();
+  private FileSystem fs;
+  private int uncompressedSizeV1;
+
+  @Before
+  public void setUp() throws IOException {
+    fs = FileSystem.get(TEST_UTIL.getConfiguration());
+    TEST_UTIL.initTestDir();
+  }
+
+  public void writeTestBlockContents(DataOutputStream dos) throws IOException {
+    // This compresses really well.
+    for (int i = 0; i < 1000; ++i)
+      dos.writeInt(i / 100);
+  }
+
+  public byte[] createTestV1Block(Compression.Algorithm algo)
+      throws IOException {
+    Compressor compressor = algo.getCompressor();
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    OutputStream os = algo.createCompressionStream(baos, compressor, 0);
+    DataOutputStream dos = new DataOutputStream(os);
+    BlockType.META.write(dos); // Let's make this a meta block.
+    writeTestBlockContents(dos);
+    uncompressedSizeV1 = dos.size();
+    dos.flush();
+    algo.returnCompressor(compressor);
+    return baos.toByteArray();
+  }
+
+  private byte[] createTestV2Block(Compression.Algorithm algo)
+      throws IOException {
+    final BlockType blockType = BlockType.DATA;
+    HFileBlock.Writer hbw = new HFileBlock.Writer(algo);
+    DataOutputStream dos = hbw.startWriting(blockType, false);
+    writeTestBlockContents(dos);
+    byte[] headerAndData = hbw.getHeaderAndData();
+    assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader());
+    hbw.releaseCompressor();
+    return headerAndData;
+  }
+
+  public String createTestBlockStr(Compression.Algorithm algo)
+      throws IOException {
+    return Bytes.toStringBinary(createTestV2Block(algo));
+  }
+
+  @Test
+  public void testNoCompression() throws IOException {
+    assertEquals(4000 + HFileBlock.HEADER_SIZE, createTestV2Block(NONE).length);
+  }
+
+  @Test
+  public void testGzipCompression() throws IOException {
+    assertEquals(
+        "DATABLK*\\x00\\x00\\x00:\\x00\\x00\\x0F\\xA0\\xFF\\xFF\\xFF\\xFF"
+            + "\\xFF\\xFF\\xFF\\xFF\\x1F\\x8B\\x08\\x00\\x00\\x00\\x00\\x00"
+            + "\\x00\\x00\\xED\\xC3\\xC1\\x11\\x00 \\x08\\xC00DD\\xDD\\x7Fa"
+            + "\\xD6\\xE8\\xA3\\xB9K\\x84`\\x96Q\\xD3\\xA8\\xDB\\xA8e\\xD4c"
+            + "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\s\\xA0\\x0F\\x00\\x00",
+        createTestBlockStr(GZ));
+  }
+
+  @Test
+  public void testReaderV1() throws IOException {
+    for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
+      for (boolean pread : new boolean[] { false, true }) {
+        byte[] block = createTestV1Block(algo);
+        Path path = new Path(HBaseTestingUtility.getTestDir(), "blocks_v1_"
+            + algo);
+        LOG.info("Creating temporary file at " + path);
+        FSDataOutputStream os = fs.create(path);
+        int totalSize = 0;
+        int numBlocks = 50;
+        for (int i = 0; i < numBlocks; ++i) {
+          os.write(block);
+          totalSize += block.length;
+        }
+        os.close();
+
+        FSDataInputStream is = fs.open(path);
+        HFileBlock.FSReader hbr = new HFileBlock.FSReaderV1(is, algo,
+            totalSize);
+        HFileBlock b;
+        int numBlocksRead = 0;
+        long pos = 0;
+        while (pos < totalSize) {
+          b = hbr.readBlockData(pos, block.length, uncompressedSizeV1, pread);
+          b.sanityCheck();
+          pos += block.length;
+          numBlocksRead++;
+        }
+        assertEquals(numBlocks, numBlocksRead);
+        is.close();
+      }
+    }
+  }
+
+  @Test
+  public void testReaderV2() throws IOException {
+    for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
+      for (boolean pread : new boolean[] { false, true }) {
+        Path path = new Path(HBaseTestingUtility.getTestDir(), "blocks_v2_"
+            + algo);
+        FSDataOutputStream os = fs.create(path);
+        HFileBlock.Writer hbw = new HFileBlock.Writer(algo);
+        long totalSize = 0;
+        for (int blockId = 0; blockId < 2; ++blockId) {
+          DataOutputStream dos = hbw.startWriting(BlockType.DATA, false);
+          for (int i = 0; i < 1234; ++i)
+            dos.writeInt(i);
+          hbw.writeHeaderAndData(os);
+          totalSize += hbw.getOnDiskSizeWithHeader();
+        }
+        os.close();
+
+        FSDataInputStream is = fs.open(path);
+        HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(is, algo,
+            totalSize);
+        HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
+        is.close();
+
+        b.sanityCheck();
+        assertEquals(4936, b.getUncompressedSizeWithoutHeader());
+        assertEquals(algo == GZ ? 2173 : 4936, b.getOnDiskSizeWithoutHeader());
+        String blockStr = b.toString();
+
+        if (algo == GZ) {
+          is = fs.open(path);
+          hbr = new HFileBlock.FSReaderV2(is, algo, totalSize);
+          b = hbr.readBlockData(0, 2173 + HFileBlock.HEADER_SIZE, -1, pread);
+          assertEquals(blockStr, b.toString());
+          int wrongCompressedSize = 2172;
+          try {
+            b = hbr.readBlockData(0, wrongCompressedSize
+                + HFileBlock.HEADER_SIZE, -1, pread);
+            fail("Exception expected");
+          } catch (IOException ex) {
+            String expectedPrefix = "On-disk size without header provided is "
+                + wrongCompressedSize + ", but block header contains "
+                + b.getOnDiskSizeWithoutHeader() + ".";
+            assertTrue("Invalid exception message: '" + ex.getMessage()
+                + "'.\nMessage is expected to start with: '" + expectedPrefix
+                + "'", ex.getMessage().startsWith(expectedPrefix));
+          }
+          is.close();
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testPreviousOffset() throws IOException {
+    for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
+      for (boolean pread : BOOLEAN_VALUES) {
+        for (boolean cacheOnWrite : BOOLEAN_VALUES) {
+          Random rand = defaultRandom();
+          LOG.info("Compression algorithm: " + algo + ", pread=" + pread);
+          Path path = new Path(HBaseTestingUtility.getTestDir(), "prev_offset");
+          List<Long> expectedOffsets = new ArrayList<Long>();
+          List<Long> expectedPrevOffsets = new ArrayList<Long>();
+          List<BlockType> expectedTypes = new ArrayList<BlockType>();
+          List<ByteBuffer> expectedContents = cacheOnWrite
+              ? new ArrayList<ByteBuffer>() : null;
+          long totalSize = writeBlocks(rand, algo, path, expectedOffsets,
+              expectedPrevOffsets, expectedTypes, expectedContents, true);
+
+          FSDataInputStream is = fs.open(path);
+          HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(is, algo,
+              totalSize);
+          long curOffset = 0;
+          for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
+            if (!pread) {
+              assertEquals(is.getPos(), curOffset + (i == 0 ? 0 :
+                  HFileBlock.HEADER_SIZE));
+            }
+
+            assertEquals(expectedOffsets.get(i).longValue(), curOffset);
+
+            LOG.info("Reading block #" + i + " at offset " + curOffset);
+            HFileBlock b = hbr.readBlockData(curOffset, -1, -1, pread);
+            LOG.info("Block #" + i + ": " + b);
+            assertEquals("Invalid block #" + i + "'s type:",
+                expectedTypes.get(i), b.getBlockType());
+            assertEquals("Invalid previous block offset for block " + i
+                + " of " + "type " + b.getBlockType() + ":",
+                (long) expectedPrevOffsets.get(i), b.getPrevBlockOffset());
+            b.sanityCheck();
+            assertEquals(curOffset, b.getOffset());
+
+            // Now re-load this block knowing the on-disk size. This tests a
+            // different branch in the loader.
+            HFileBlock b2 = hbr.readBlockData(curOffset,
+                b.getOnDiskSizeWithHeader(), -1, pread);
+            b2.sanityCheck();
+
+            assertEquals(b.getBlockType(), b2.getBlockType());
+            assertEquals(b.getOnDiskSizeWithoutHeader(),
+                b2.getOnDiskSizeWithoutHeader());
+            assertEquals(b.getOnDiskSizeWithHeader(),
+                b2.getOnDiskSizeWithHeader());
+            assertEquals(b.getUncompressedSizeWithoutHeader(),
+                b2.getUncompressedSizeWithoutHeader());
+            assertEquals(b.getPrevBlockOffset(), b2.getPrevBlockOffset());
+            assertEquals(curOffset, b2.getOffset());
+
+            curOffset += b.getOnDiskSizeWithHeader();
+
+            if (cacheOnWrite) {
+              // In the cache-on-write mode we store uncompressed bytes so we
+              // can compare them to what was read by the block reader.
+
+              ByteBuffer bufRead = b.getBufferWithHeader();
+              ByteBuffer bufExpected = expectedContents.get(i);
+              boolean bytesAreCorrect = Bytes.compareTo(bufRead.array(),
+                  bufRead.arrayOffset(), bufRead.limit(),
+                  bufExpected.array(), bufExpected.arrayOffset(),
+                  bufExpected.limit()) == 0;
+              String wrongBytesMsg = "";
+
+              if (!bytesAreCorrect) {
+                // Optimization: only construct an error message in case we
+                // will need it.
+                wrongBytesMsg = "Expected bytes in block #" + i + " (algo="
+                    + algo + ", pread=" + pread + "):\n";
+                wrongBytesMsg += Bytes.toStringBinary(bufExpected.array(),
+                    bufExpected.arrayOffset(), Math.min(32,
+                        bufExpected.limit()))
+                    + ", actual:\n"
+                    + Bytes.toStringBinary(bufRead.array(),
+                        bufRead.arrayOffset(), Math.min(32, bufRead.limit()));
+              }
+
+              assertTrue(wrongBytesMsg, bytesAreCorrect);
+            }
+          }
+
+          assertEquals(curOffset, fs.getFileStatus(path).getLen());
+          is.close();
+        }
+      }
+    }
+  }
+
+  private Random defaultRandom() {
+    return new Random(189237);
+  }
+
+  private class BlockReaderThread implements Callable<Boolean> {
+    private final String clientId;
+    private final HFileBlock.FSReader hbr;
+    private final List<Long> offsets;
+    private final List<BlockType> types;
+    private final long fileSize;
+
+    public BlockReaderThread(String clientId,
+        HFileBlock.FSReader hbr, List<Long> offsets, List<BlockType> types,
+        long fileSize) {
+      this.clientId = clientId;
+      this.offsets = offsets;
+      this.hbr = hbr;
+      this.types = types;
+      this.fileSize = fileSize;
+    }
+
+    @Override
+    public Boolean call() throws Exception {
+      Random rand = new Random(clientId.hashCode());
+      long endTime = System.currentTimeMillis() + 10000;
+      int numBlocksRead = 0;
+      int numPositionalRead = 0;
+      int numWithOnDiskSize = 0;
+      while (System.currentTimeMillis() < endTime) {
+        int blockId = rand.nextInt(NUM_TEST_BLOCKS);
+        long offset = offsets.get(blockId);
+        boolean pread = rand.nextBoolean();
+        boolean withOnDiskSize = rand.nextBoolean();
+        long expectedSize =
+          (blockId == NUM_TEST_BLOCKS - 1 ? fileSize
+              : offsets.get(blockId + 1)) - offset;
+
+        HFileBlock b;
+        try {
+          long onDiskSizeArg = withOnDiskSize ? expectedSize : -1;
+          b = hbr.readBlockData(offset, onDiskSizeArg, -1, pread);
+        } catch (IOException ex) {
+          LOG.error("Error in client " + clientId + " trying to read block at "
+              + offset + ", pread=" + pread + ", withOnDiskSize=" +
+              withOnDiskSize, ex);
+          return false;
+        }
+
+        assertEquals(types.get(blockId), b.getBlockType());
+        assertEquals(expectedSize, b.getOnDiskSizeWithHeader());
+        assertEquals(offset, b.getOffset());
+
+        ++numBlocksRead;
+        if (pread)
+          ++numPositionalRead;
+        if (withOnDiskSize)
+          ++numWithOnDiskSize;
+      }
+      LOG.info("Client " + clientId + " successfully read " + numBlocksRead +
+          " blocks (with pread: " + numPositionalRead + ", with onDiskSize " +
+          "specified: " + numWithOnDiskSize + ")");
+      return true;
+    }
+
+  }
+
+  @Test
+  public void testConcurrentReading() throws Exception {
+    for (Compression.Algorithm compressAlgo : COMPRESSION_ALGORITHMS) {
+      Path path =
+          new Path(HBaseTestingUtility.getTestDir(), "concurrent_reading");
+      Random rand = defaultRandom();
+      List<Long> offsets = new ArrayList<Long>();
+      List<BlockType> types = new ArrayList<BlockType>();
+      writeBlocks(rand, compressAlgo, path, offsets, null, types, null, false);
+      FSDataInputStream is = fs.open(path);
+      long fileSize = fs.getFileStatus(path).getLen();
+      HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(is, compressAlgo,
+          fileSize);
+
+      Executor exec = Executors.newFixedThreadPool(NUM_READER_THREADS);
+      ExecutorCompletionService<Boolean> ecs =
+          new ExecutorCompletionService<Boolean>(exec);
+
+      for (int i = 0; i < NUM_READER_THREADS; ++i) {
+        ecs.submit(new BlockReaderThread("reader_" + (char) ('A' + i), hbr,
+            offsets, types, fileSize));
+      }
+
+      for (int i = 0; i < NUM_READER_THREADS; ++i) {
+        Future<Boolean> result = ecs.take();
+        assertTrue(result.get());
+        LOG.info(String.valueOf(i + 1)
+            + " reader threads finished successfully (algo=" + compressAlgo
+            + ")");
+      }
+
+      is.close();
+    }
+  }
+
+  private long writeBlocks(Random rand, Compression.Algorithm compressAlgo,
+      Path path, List<Long> expectedOffsets, List<Long> expectedPrevOffsets,
+      List<BlockType> expectedTypes, List<ByteBuffer> expectedContents,
+      boolean detailedLogging) throws IOException {
+    boolean cacheOnWrite = expectedContents != null;
+    FSDataOutputStream os = fs.create(path);
+    HFileBlock.Writer hbw = new HFileBlock.Writer(compressAlgo);
+    Map<BlockType, Long> prevOffsetByType = new HashMap<BlockType, Long>();
+    long totalSize = 0;
+    for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
+      int blockTypeOrdinal = rand.nextInt(BlockType.values().length);
+      BlockType bt = BlockType.values()[blockTypeOrdinal];
+      DataOutputStream dos = hbw.startWriting(bt, cacheOnWrite);
+      for (int j = 0; j < rand.nextInt(500); ++j) {
+        // This might compress well.
+        dos.writeShort(i + 1);
+        dos.writeInt(j + 1);
+      }
+
+      if (expectedOffsets != null)
+        expectedOffsets.add(os.getPos());
+
+      if (expectedPrevOffsets != null) {
+        Long prevOffset = prevOffsetByType.get(bt);
+        expectedPrevOffsets.add(prevOffset != null ? prevOffset : -1);
+        prevOffsetByType.put(bt, os.getPos());
+      }
+
+      expectedTypes.add(bt);
+
+      hbw.writeHeaderAndData(os);
+      totalSize += hbw.getOnDiskSizeWithHeader();
+
+      if (cacheOnWrite)
+        expectedContents.add(hbw.getUncompressedBufferWithHeader());
+
+      if (detailedLogging) {
+        LOG.info("Writing block #" + i + " of type " + bt
+            + ", uncompressed size " + hbw.getUncompressedSizeWithoutHeader()
+            + " at offset " + os.getPos());
+      }
+    }
+    os.close();
+    LOG.info("Created a temporary file at " + path + ", "
+        + fs.getFileStatus(path).getLen() + " byte, compression=" +
+        compressAlgo);
+    return totalSize;
+  }
+
+  @Test
+  public void testBlockHeapSize() {
+    for (int size : new int[] { 100, 256, 12345 }) {
+      byte[] byteArr = new byte[HFileBlock.HEADER_SIZE + size];
+      ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
+      HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf,
+          true, -1);
+      assertEquals(80, HFileBlock.BYTE_BUFFER_HEAP_SIZE);
+      long expected = ClassSize.align(ClassSize.estimateBase(HFileBlock.class,
+          true)
+          + ClassSize.estimateBase(buf.getClass(), true)
+          + HFileBlock.HEADER_SIZE + size);
+      assertEquals(expected, block.heapSize());
+    }
+  }
+
+}

Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java?rev=1181554&r1=1181553&r2=1181554&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java
(original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java
Tue Oct 11 02:19:21 2011
@@ -19,7 +19,6 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
-import java.nio.ByteBuffer;
 import java.util.Random;
 
 import org.apache.hadoop.hbase.io.HeapSize;
@@ -43,11 +42,11 @@ public class TestLruBlockCache extends T
 
     LruBlockCache cache = new LruBlockCache(maxSize,blockSize);
 
-    Block [] blocks = generateFixedBlocks(10, blockSize, "block");
+    CachedItem [] blocks = generateFixedBlocks(10, blockSize, "block");
 
     // Add all the blocks
-    for(Block block : blocks) {
-      cache.cacheBlock(block.blockName, block.buf);
+    for (CachedItem block : blocks) {
+      cache.cacheBlock(block.blockName, block);
     }
 
     // Let the eviction run
@@ -70,35 +69,35 @@ public class TestLruBlockCache extends T
 
     LruBlockCache cache = new LruBlockCache(maxSize, blockSize);
 
-    Block [] blocks = generateRandomBlocks(100, blockSize);
+    CachedItem [] blocks = generateRandomBlocks(100, blockSize);
 
     long expectedCacheSize = cache.heapSize();
 
     // Confirm empty
-    for(Block block : blocks) {
+    for (CachedItem block : blocks) {
       assertTrue(cache.getBlock(block.blockName) == null);
     }
 
     // Add blocks
-    for(Block block : blocks) {
-      cache.cacheBlock(block.blockName, block.buf);
-      expectedCacheSize += block.heapSize();
+    for (CachedItem block : blocks) {
+      cache.cacheBlock(block.blockName, block);
+      expectedCacheSize += block.cacheBlockHeapSize();
     }
 
     // Verify correctly calculated cache heap size
     assertEquals(expectedCacheSize, cache.heapSize());
 
     // Check if all blocks are properly cached and retrieved
-    for(Block block : blocks) {
-      ByteBuffer buf = cache.getBlock(block.blockName);
+    for (CachedItem block : blocks) {
+      HeapSize buf = cache.getBlock(block.blockName);
       assertTrue(buf != null);
-      assertEquals(buf.capacity(), block.buf.capacity());
+      assertEquals(buf.heapSize(), block.heapSize());
     }
 
     // Re-add same blocks and ensure nothing has changed
-    for(Block block : blocks) {
+    for (CachedItem block : blocks) {
       try {
-        cache.cacheBlock(block.blockName, block.buf);
+        cache.cacheBlock(block.blockName, block);
         assertTrue("Cache should not allow re-caching a block", false);
       } catch(RuntimeException re) {
         // expected
@@ -109,10 +108,10 @@ public class TestLruBlockCache extends T
     assertEquals(expectedCacheSize, cache.heapSize());
 
     // Check if all blocks are properly cached and retrieved
-    for(Block block : blocks) {
-      ByteBuffer buf = cache.getBlock(block.blockName);
+    for (CachedItem block : blocks) {
+      HeapSize buf = cache.getBlock(block.blockName);
       assertTrue(buf != null);
-      assertEquals(buf.capacity(), block.buf.capacity());
+      assertEquals(buf.heapSize(), block.heapSize());
     }
 
     // Expect no evictions
@@ -126,14 +125,14 @@ public class TestLruBlockCache extends T
 
     LruBlockCache cache = new LruBlockCache(maxSize,blockSize,false);
 
-    Block [] blocks = generateFixedBlocks(10, blockSize, "block");
+    CachedItem [] blocks = generateFixedBlocks(10, blockSize, "block");
 
     long expectedCacheSize = cache.heapSize();
 
     // Add all the blocks
-    for(Block block : blocks) {
-      cache.cacheBlock(block.blockName, block.buf);
-      expectedCacheSize += block.heapSize();
+    for (CachedItem block : blocks) {
+      cache.cacheBlock(block.blockName, block);
+      expectedCacheSize += block.cacheBlockHeapSize();
     }
 
     // A single eviction run should have occurred
@@ -155,7 +154,7 @@ public class TestLruBlockCache extends T
     assertTrue(cache.getBlock(blocks[1].blockName) == null);
     for(int i=2;i<blocks.length;i++) {
       assertEquals(cache.getBlock(blocks[i].blockName),
-          blocks[i].buf);
+          blocks[i]);
     }
   }
 
@@ -166,21 +165,21 @@ public class TestLruBlockCache extends T
 
     LruBlockCache cache = new LruBlockCache(maxSize,blockSize,false);
 
-    Block [] singleBlocks = generateFixedBlocks(5, 10000, "single");
-    Block [] multiBlocks = generateFixedBlocks(5, 10000, "multi");
+    CachedItem [] singleBlocks = generateFixedBlocks(5, 10000, "single");
+    CachedItem [] multiBlocks = generateFixedBlocks(5, 10000, "multi");
 
     long expectedCacheSize = cache.heapSize();
 
     // Add and get the multi blocks
-    for(Block block : multiBlocks) {
-      cache.cacheBlock(block.blockName, block.buf);
-      expectedCacheSize += block.heapSize();
-      assertEquals(cache.getBlock(block.blockName), block.buf);
+    for (CachedItem block : multiBlocks) {
+      cache.cacheBlock(block.blockName, block);
+      expectedCacheSize += block.cacheBlockHeapSize();
+      assertEquals(cache.getBlock(block.blockName), block);
     }
 
     // Add the single blocks (no get)
-    for(Block block : singleBlocks) {
-      cache.cacheBlock(block.blockName, block.buf);
+    for (CachedItem block : singleBlocks) {
+      cache.cacheBlock(block.blockName, block);
       expectedCacheSize += block.heapSize();
     }
 
@@ -211,9 +210,9 @@ public class TestLruBlockCache extends T
     // And all others to be cached
     for(int i=1;i<4;i++) {
       assertEquals(cache.getBlock(singleBlocks[i].blockName),
-          singleBlocks[i].buf);
+          singleBlocks[i]);
       assertEquals(cache.getBlock(multiBlocks[i].blockName),
-          multiBlocks[i].buf);
+          multiBlocks[i]);
     }
   }
 
@@ -233,9 +232,9 @@ public class TestLruBlockCache extends T
         0.34f);// memory
 
 
-    Block [] singleBlocks = generateFixedBlocks(5, blockSize, "single");
-    Block [] multiBlocks = generateFixedBlocks(5, blockSize, "multi");
-    Block [] memoryBlocks = generateFixedBlocks(5, blockSize, "memory");
+    CachedItem [] singleBlocks = generateFixedBlocks(5, blockSize, "single");
+    CachedItem [] multiBlocks = generateFixedBlocks(5, blockSize, "multi");
+    CachedItem [] memoryBlocks = generateFixedBlocks(5, blockSize, "memory");
 
     long expectedCacheSize = cache.heapSize();
 
@@ -243,17 +242,17 @@ public class TestLruBlockCache extends T
     for(int i=0;i<3;i++) {
 
       // Just add single blocks
-      cache.cacheBlock(singleBlocks[i].blockName, singleBlocks[i].buf);
-      expectedCacheSize += singleBlocks[i].heapSize();
+      cache.cacheBlock(singleBlocks[i].blockName, singleBlocks[i]);
+      expectedCacheSize += singleBlocks[i].cacheBlockHeapSize();
 
       // Add and get multi blocks
-      cache.cacheBlock(multiBlocks[i].blockName, multiBlocks[i].buf);
-      expectedCacheSize += multiBlocks[i].heapSize();
+      cache.cacheBlock(multiBlocks[i].blockName, multiBlocks[i]);
+      expectedCacheSize += multiBlocks[i].cacheBlockHeapSize();
       cache.getBlock(multiBlocks[i].blockName);
 
       // Add memory blocks as such
-      cache.cacheBlock(memoryBlocks[i].blockName, memoryBlocks[i].buf, true);
-      expectedCacheSize += memoryBlocks[i].heapSize();
+      cache.cacheBlock(memoryBlocks[i].blockName, memoryBlocks[i], true);
+      expectedCacheSize += memoryBlocks[i].cacheBlockHeapSize();
 
     }
 
@@ -264,7 +263,7 @@ public class TestLruBlockCache extends T
     assertEquals(expectedCacheSize, cache.heapSize());
 
     // Insert a single block, oldest single should be evicted
-    cache.cacheBlock(singleBlocks[3].blockName, singleBlocks[3].buf);
+    cache.cacheBlock(singleBlocks[3].blockName, singleBlocks[3]);
 
     // Single eviction, one thing evicted
     assertEquals(1, cache.getEvictionCount());
@@ -277,7 +276,7 @@ public class TestLruBlockCache extends T
     cache.getBlock(singleBlocks[1].blockName);
 
     // Insert another single block
-    cache.cacheBlock(singleBlocks[4].blockName, singleBlocks[4].buf);
+    cache.cacheBlock(singleBlocks[4].blockName, singleBlocks[4]);
 
     // Two evictions, two evicted.
     assertEquals(2, cache.getEvictionCount());
@@ -287,7 +286,7 @@ public class TestLruBlockCache extends T
     assertEquals(null, cache.getBlock(multiBlocks[0].blockName));
 
     // Insert another memory block
-    cache.cacheBlock(memoryBlocks[3].blockName, memoryBlocks[3].buf, true);
+    cache.cacheBlock(memoryBlocks[3].blockName, memoryBlocks[3], true);
 
     // Three evictions, three evicted.
     assertEquals(3, cache.getEvictionCount());
@@ -297,8 +296,8 @@ public class TestLruBlockCache extends T
     assertEquals(null, cache.getBlock(memoryBlocks[0].blockName));
 
     // Add a block that is twice as big (should force two evictions)
-    Block [] bigBlocks = generateFixedBlocks(3, blockSize*3, "big");
-    cache.cacheBlock(bigBlocks[0].blockName, bigBlocks[0].buf);
+    CachedItem [] bigBlocks = generateFixedBlocks(3, blockSize*3, "big");
+    cache.cacheBlock(bigBlocks[0].blockName, bigBlocks[0]);
 
     // Four evictions, six evicted (inserted block 3X size, expect +3 evicted)
     assertEquals(4, cache.getEvictionCount());
@@ -313,7 +312,7 @@ public class TestLruBlockCache extends T
     cache.getBlock(bigBlocks[0].blockName);
 
     // Cache another single big block
-    cache.cacheBlock(bigBlocks[1].blockName, bigBlocks[1].buf);
+    cache.cacheBlock(bigBlocks[1].blockName, bigBlocks[1]);
 
     // Five evictions, nine evicted (3 new)
     assertEquals(5, cache.getEvictionCount());
@@ -325,7 +324,7 @@ public class TestLruBlockCache extends T
     assertEquals(null, cache.getBlock(multiBlocks[2].blockName));
 
     // Cache a big memory block
-    cache.cacheBlock(bigBlocks[2].blockName, bigBlocks[2].buf, true);
+    cache.cacheBlock(bigBlocks[2].blockName, bigBlocks[2], true);
 
     // Six evictions, twelve evicted (3 new)
     assertEquals(6, cache.getEvictionCount());
@@ -355,18 +354,18 @@ public class TestLruBlockCache extends T
         0.33f, // multi
         0.34f);// memory
 
-    Block [] singleBlocks = generateFixedBlocks(20, blockSize, "single");
-    Block [] multiBlocks = generateFixedBlocks(5, blockSize, "multi");
+    CachedItem [] singleBlocks = generateFixedBlocks(20, blockSize, "single");
+    CachedItem [] multiBlocks = generateFixedBlocks(5, blockSize, "multi");
 
     // Add 5 multi blocks
-    for(Block block : multiBlocks) {
-      cache.cacheBlock(block.blockName, block.buf);
+    for (CachedItem block : multiBlocks) {
+      cache.cacheBlock(block.blockName, block);
       cache.getBlock(block.blockName);
     }
 
     // Add 5 single blocks
     for(int i=0;i<5;i++) {
-      cache.cacheBlock(singleBlocks[i].blockName, singleBlocks[i].buf);
+      cache.cacheBlock(singleBlocks[i].blockName, singleBlocks[i]);
     }
 
     // An eviction ran
@@ -389,7 +388,7 @@ public class TestLruBlockCache extends T
     // 12 more evicted.
 
     for(int i=5;i<18;i++) {
-      cache.cacheBlock(singleBlocks[i].blockName, singleBlocks[i].buf);
+      cache.cacheBlock(singleBlocks[i].blockName, singleBlocks[i]);
     }
 
     // 4 total evictions, 16 total evicted
@@ -417,22 +416,22 @@ public class TestLruBlockCache extends T
         0.33f, // multi
         0.34f);// memory
 
-    Block [] singleBlocks = generateFixedBlocks(10, blockSize, "single");
-    Block [] multiBlocks = generateFixedBlocks(10, blockSize, "multi");
-    Block [] memoryBlocks = generateFixedBlocks(10, blockSize, "memory");
+    CachedItem [] singleBlocks = generateFixedBlocks(10, blockSize, "single");
+    CachedItem [] multiBlocks = generateFixedBlocks(10, blockSize, "multi");
+    CachedItem [] memoryBlocks = generateFixedBlocks(10, blockSize, "memory");
 
     // Add all blocks from all priorities
     for(int i=0;i<10;i++) {
 
       // Just add single blocks
-      cache.cacheBlock(singleBlocks[i].blockName, singleBlocks[i].buf);
+      cache.cacheBlock(singleBlocks[i].blockName, singleBlocks[i]);
 
       // Add and get multi blocks
-      cache.cacheBlock(multiBlocks[i].blockName, multiBlocks[i].buf);
+      cache.cacheBlock(multiBlocks[i].blockName, multiBlocks[i]);
       cache.getBlock(multiBlocks[i].blockName);
 
       // Add memory blocks as such
-      cache.cacheBlock(memoryBlocks[i].blockName, memoryBlocks[i].buf, true);
+      cache.cacheBlock(memoryBlocks[i].blockName, memoryBlocks[i], true);
     }
 
     // Do not expect any evictions yet
@@ -456,29 +455,29 @@ public class TestLruBlockCache extends T
 
     // And the newest 5 blocks should still be accessible
     for(int i=5;i<10;i++) {
-      assertEquals(singleBlocks[i].buf, cache.getBlock(singleBlocks[i].blockName));
-      assertEquals(multiBlocks[i].buf, cache.getBlock(multiBlocks[i].blockName));
-      assertEquals(memoryBlocks[i].buf, cache.getBlock(memoryBlocks[i].blockName));
+      assertEquals(singleBlocks[i], cache.getBlock(singleBlocks[i].blockName));
+      assertEquals(multiBlocks[i], cache.getBlock(multiBlocks[i].blockName));
+      assertEquals(memoryBlocks[i], cache.getBlock(memoryBlocks[i].blockName));
     }
   }
 
-  private Block [] generateFixedBlocks(int numBlocks, int size, String pfx) {
-    Block [] blocks = new Block[numBlocks];
+  private CachedItem [] generateFixedBlocks(int numBlocks, int size, String pfx) {
+    CachedItem [] blocks = new CachedItem[numBlocks];
     for(int i=0;i<numBlocks;i++) {
-      blocks[i] = new Block(pfx + i, size);
+      blocks[i] = new CachedItem(pfx + i, size);
     }
     return blocks;
   }
 
-  private Block [] generateFixedBlocks(int numBlocks, long size, String pfx) {
+  private CachedItem [] generateFixedBlocks(int numBlocks, long size, String pfx) {
     return generateFixedBlocks(numBlocks, (int)size, pfx);
   }
 
-  private Block [] generateRandomBlocks(int numBlocks, long maxSize) {
-    Block [] blocks = new Block[numBlocks];
+  private CachedItem [] generateRandomBlocks(int numBlocks, long maxSize) {
+    CachedItem [] blocks = new CachedItem[numBlocks];
     Random r = new Random();
     for(int i=0;i<numBlocks;i++) {
-      blocks[i] = new Block("block" + i, r.nextInt((int)maxSize)+1);
+      blocks[i] = new CachedItem("block" + i, r.nextInt((int)maxSize)+1);
     }
     return blocks;
   }
@@ -508,19 +507,26 @@ public class TestLruBlockCache extends T
         LruBlockCache.DEFAULT_ACCEPTABLE_FACTOR));
   }
 
-  private static class Block implements HeapSize {
+  private static class CachedItem implements HeapSize {
     String blockName;
-    ByteBuffer buf;
+    int size;
 
-    Block(String blockName, int size) {
+    CachedItem(String blockName, int size) {
       this.blockName = blockName;
-      this.buf = ByteBuffer.allocate(size);
+      this.size = size;
     }
 
+    /** The size of this item reported to the block cache layer */
+    @Override
     public long heapSize() {
-      return CachedBlock.PER_BLOCK_OVERHEAD +
-      ClassSize.align(blockName.length()) +
-      ClassSize.align(buf.capacity());
+      return ClassSize.align(size);
+    }
+
+    /** Size of the cache block holding this item. Used for verification. */
+    public long cacheBlockHeapSize() {
+      return CachedBlock.PER_BLOCK_OVERHEAD
+          + ClassSize.align(blockName.length())
+          + ClassSize.align(size);
     }
   }
 }



Mime
View raw message