incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ph...@apache.org
Subject git commit: BLUR-5 Write through caching for the BlockCache
Date Fri, 26 Oct 2012 05:31:16 GMT
Updated Branches:
  refs/heads/lucene-4.0.0 d456e4dc2 -> 8b52ee3ee


BLUR-5 Write through caching for the BlockCache

Basic implementation of a write through cache.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/8b52ee3e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/8b52ee3e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/8b52ee3e

Branch: refs/heads/lucene-4.0.0
Commit: 8b52ee3eec54f43c760e2a63343f8d0f8f037b02
Parents: d456e4d
Author: Patrick Hunt <phunt@apache.org>
Authored: Mon Oct 22 17:12:08 2012 -0700
Committer: Patrick Hunt <phunt@apache.org>
Committed: Thu Oct 25 22:04:06 2012 -0700

----------------------------------------------------------------------
 .../apache/blur/store/blockcache/BlockCache.java   |   13 +-
 .../blur/store/blockcache/BlockDirectory.java      |   23 ++-
 .../blur/store/blockcache/BlockDirectoryCache.java |   10 +-
 .../org/apache/blur/store/blockcache/Cache.java    |   31 +++-
 .../blur/store/blockcache/CachedIndexOutput.java   |   88 +++++++
 .../store/buffer/ReusedBufferedIndexOutput.java    |  189 +++++++++++++++
 .../blur/store/blockcache/BlockCacheTest.java      |   28 ++-
 .../blur/store/blockcache/BlockDirectoryTest.java  |  107 +++++----
 8 files changed, 431 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8b52ee3e/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockCache.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockCache.java
b/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockCache.java
index 0358fc8..70b2860 100644
--- a/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockCache.java
+++ b/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockCache.java
@@ -77,8 +77,11 @@ public class BlockCache {
     _lockCounters[slabId].decrementAndGet();
   }
 
-  public boolean store(BlockCacheKey blockCacheKey, byte[] data) {
-    checkLength(data);
+  public boolean store(BlockCacheKey blockCacheKey, int blockOffset, byte[] data, int offset,
int length) {
+    if (length + blockOffset > _blockSize) {
+      throw new RuntimeException("Buffer size exceeded, expecting max ["
+          + _blockSize + "] got length [" + length + "] with blockOffset [" + blockOffset
+ "]" );
+    }
     BlockCacheLocation location = _cache.get(blockCacheKey);
     boolean newLocation = false;
     if (location == null) {
@@ -92,10 +95,10 @@ public class BlockCache {
       return false;
     }
     int slabId = location.getSlabId();
-    int offset = location.getBlock() * _blockSize;
+    int slabOffset = location.getBlock() * _blockSize;
     ByteBuffer slab = getSlab(slabId);
-    slab.position(offset);
-    slab.put(data, 0, _blockSize);
+    slab.position(slabOffset + blockOffset);
+    slab.put(data, offset, length);
     if (newLocation) {
       releaseLocation(_cache.put(blockCacheKey.clone(), location));
     }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8b52ee3e/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectory.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectory.java
b/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectory.java
index d9efecf..7d7d93f 100644
--- a/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectory.java
+++ b/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectory.java
@@ -54,8 +54,7 @@ public class BlockDirectory extends Directory {
   public static Cache NO_CACHE = new Cache() {
 
     @Override
-    public void update(String name, long blockId, byte[] buffer) {
-
+    public void update(String name, long blockId, int blockOffset, byte[] buffer, int offset,
int length) {
     }
 
     @Override
@@ -72,6 +71,10 @@ public class BlockDirectory extends Directory {
     public long size() {
       return 0;
     }
+
+    @Override
+    public void renameCacheFile(String source, String dest) {
+    }
   };
 
   private Directory _directory;
@@ -184,7 +187,7 @@ public class BlockDirectory extends Directory {
       byte[] buf = BufferStore.takeBuffer(_blockSize);
       _source.readBytes(buf, 0, length);
       System.arraycopy(buf, blockOffset, b, off, lengthToReadInBlock);
-      _cache.update(_cacheName, blockId, buf);
+      _cache.update(_cacheName, blockId, 0, buf, 0, _blockSize);
       BufferStore.putBuffer(buf);
     }
 
@@ -207,8 +210,12 @@ public class BlockDirectory extends Directory {
     _directory.close();
   }
 
-  private String getFileCacheName(String name) throws IOException {
-    return _dirName + "/" + name + ":" + getFileModified(name);
+  String getFileCacheLocation(String name) {
+    return _dirName + "/" + name;
+  }
+
+  String getFileCacheName(String name) throws IOException {
+    return getFileCacheLocation(name) + ":" + getFileModified(name);
   }
 
   private long getFileModified(String name) throws IOException {
@@ -266,7 +273,11 @@ public class BlockDirectory extends Directory {
 
   @Override
   public IndexOutput createOutput(String name, IOContext context) throws IOException {
-    return _directory.createOutput(name, context);
+    IndexOutput dest = _directory.createOutput(name, context);
+    if (_blockCacheFileTypes == null || isCachableFile(name)) {
+      return new CachedIndexOutput(this, dest, _blockSize, name, _cache, _blockSize);
+    }
+    return dest;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8b52ee3e/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectoryCache.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectoryCache.java
b/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectoryCache.java
index a7117e3..46493f8 100644
--- a/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectoryCache.java
+++ b/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectoryCache.java
@@ -36,7 +36,7 @@ public class BlockDirectoryCache implements Cache {
   }
 
   @Override
-  public void update(String name, long blockId, byte[] buffer) {
+  public void update(String name, long blockId, int blockOffset, byte[] buffer, int offset,
int length) {
     Integer file = _names.get(name);
     if (file == null) {
       file = _counter.incrementAndGet();
@@ -45,7 +45,7 @@ public class BlockDirectoryCache implements Cache {
     BlockCacheKey blockCacheKey = new BlockCacheKey();
     blockCacheKey.setBlock(blockId);
     blockCacheKey.setFile(file);
-    _blockCache.store(blockCacheKey, buffer);
+    _blockCache.store(blockCacheKey, blockOffset, buffer, offset, length);  
   }
 
   @Override
@@ -65,4 +65,10 @@ public class BlockDirectoryCache implements Cache {
   public long size() {
     return _blockCache.getSize();
   }
+
+  @Override
+  public void renameCacheFile(String source, String dest) {
+    Integer file = _names.remove(source);
+    _names.put(dest, file);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8b52ee3e/src/blur-store/src/main/java/org/apache/blur/store/blockcache/Cache.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/blockcache/Cache.java b/src/blur-store/src/main/java/org/apache/blur/store/blockcache/Cache.java
index 98a66c3..38e70cb 100644
--- a/src/blur-store/src/main/java/org/apache/blur/store/blockcache/Cache.java
+++ b/src/blur-store/src/main/java/org/apache/blur/store/blockcache/Cache.java
@@ -18,12 +18,39 @@ package org.apache.blur.store.blockcache;
  */
 public interface Cache {
 
+  /**
+   * Remove a file from the cache.
+   * 
+   * @param name cache file name
+   */
   void delete(String name);
 
-  void update(String name, long blockId, byte[] buffer);
-
+  /**
+   * Update the content of the specified cache file. Creates cache entry
+   * if necessary.
+   * 
+   */
+  void update(String name, long blockId, int blockOffset, byte[] buffer, int offset, int
length);
+  
+  /**
+   * Fetch the specified cache file content.
+   *
+   * @return true if cached content found, otherwise return false
+   */
   boolean fetch(String name, long blockId, int blockOffset, byte[] b, int off, int lengthToReadInBlock);
 
+  /**
+   * Number of entries in the cache.
+   */
   long size();
 
+  /**
+   * Expert: Rename the specified file in the cache. Allows a file to be moved
+   * without invalidating the cache.
+   * 
+   * @param source original name
+   * @param dest final name
+   */
+  void renameCacheFile(String source, String dest);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8b52ee3e/src/blur-store/src/main/java/org/apache/blur/store/blockcache/CachedIndexOutput.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/blockcache/CachedIndexOutput.java
b/src/blur-store/src/main/java/org/apache/blur/store/blockcache/CachedIndexOutput.java
new file mode 100644
index 0000000..8a5170e
--- /dev/null
+++ b/src/blur-store/src/main/java/org/apache/blur/store/blockcache/CachedIndexOutput.java
@@ -0,0 +1,88 @@
+package org.apache.blur.store.blockcache;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.blur.store.buffer.ReusedBufferedIndexOutput;
+import org.apache.lucene.store.IndexOutput;
+
+/*
+ * Cache the blocks as they are written. The cache file name is the name of
+ * the file until the file is closed, at which point the cache is updated
+ * to include the last modified date (which is unknown until that point).
+ */
+public class CachedIndexOutput extends ReusedBufferedIndexOutput {
+  private final BlockDirectory _directory;
+  private final IndexOutput _dest;
+  private final int _blockSize;
+  private final String _name;
+  private final String _location;
+  private final Cache _cache;
+
+  public CachedIndexOutput(BlockDirectory directory, IndexOutput dest, int blockSize, String
name, Cache cache, int bufferSize) {
+    super(bufferSize);
+    _directory = directory;
+    _dest = dest;
+    _blockSize = blockSize;
+    _name = name;
+    _location = _directory.getFileCacheLocation(name);
+    _cache = cache;
+  }
+
+  @Override
+  public void flushInternal() throws IOException {
+    _dest.flush();
+  }
+
+  @Override
+  public void closeInternal() throws IOException {
+    _dest.close();
+    _cache.renameCacheFile(_location, _directory.getFileCacheName(_name));
+  }
+
+  @Override
+  public void seekInternal(long pos) throws IOException {
+    throw new IOException("Seek not supported");
+  }
+
+  private int writeBlock(long position, byte[] b, int offset, int length) throws IOException
{
+    // read whole block into cache and then provide needed data
+    long blockId = BlockDirectory.getBlock(position);
+    int blockOffset = (int) BlockDirectory.getPosition(position);
+    int lengthToWriteInBlock = Math.min(length, _blockSize - blockOffset);
+
+    // write the file and copy into the cache
+    _dest.writeBytes(b, offset, lengthToWriteInBlock);
+    _cache.update(_location, blockId, blockOffset, b, offset, lengthToWriteInBlock);
+
+    return lengthToWriteInBlock;
+  }
+
+  @Override
+  public void writeInternal(byte[] b, int offset, int length) throws IOException {
+    long position = getBufferStart();
+    while (length > 0) {
+      int len = writeBlock(position, b, offset, length);
+      position += len;
+      length -= len;
+      offset += len;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8b52ee3e/src/blur-store/src/main/java/org/apache/blur/store/buffer/ReusedBufferedIndexOutput.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/buffer/ReusedBufferedIndexOutput.java
b/src/blur-store/src/main/java/org/apache/blur/store/buffer/ReusedBufferedIndexOutput.java
new file mode 100644
index 0000000..c4576d5
--- /dev/null
+++ b/src/blur-store/src/main/java/org/apache/blur/store/buffer/ReusedBufferedIndexOutput.java
@@ -0,0 +1,189 @@
+package org.apache.blur.store.buffer;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.lucene.store.IndexOutput;
+
+public abstract class ReusedBufferedIndexOutput extends IndexOutput {
+
+  public static final int BUFFER_SIZE = 1024;
+
+  private int bufferSize = BUFFER_SIZE;
+  
+  protected byte[] buffer;
+
+  /** position in the file of buffer */
+  private long bufferStart = 0;
+  /** end of valid bytes */
+  private int bufferLength = 0;
+  /** next byte to write */
+  private int bufferPosition = 0;
+  /** total length of the file */
+  private long _fileLength = 0;
+  
+  public ReusedBufferedIndexOutput() {
+    this(BUFFER_SIZE);
+  }
+
+  public ReusedBufferedIndexOutput(int bufferSize) {
+    checkBufferSize(bufferSize);
+    this.bufferSize = bufferSize;
+    buffer = BufferStore.takeBuffer(this.bufferSize);
+  }
+
+  protected long getBufferStart() {
+    return bufferStart;
+  }
+
+  private void checkBufferSize(int bufferSize) {
+    if (bufferSize <= 0)
+      throw new IllegalArgumentException("bufferSize must be greater than 0 (got " + bufferSize
+ ")");
+  }
+
+  /** Write the buffered bytes to cache */
+  private void flushBufferToCache() throws IOException {
+    writeInternal(buffer, 0, bufferLength);
+
+    bufferStart += bufferLength;
+    bufferLength = 0;
+    bufferPosition = 0;
+  }
+
+  protected abstract void flushInternal() throws IOException;
+
+  @Override
+  public void flush() throws IOException {
+    flushBufferToCache();
+    flushInternal();
+  }
+
+  protected abstract void closeInternal() throws IOException;
+
+  @Override
+  public void close() throws IOException {
+    flushBufferToCache();
+    closeInternal();
+    BufferStore.putBuffer(buffer);
+    buffer = null;
+  }
+
+  @Override
+  public long getFilePointer() {
+    return bufferStart + bufferPosition;
+  }
+
+  protected abstract void seekInternal(long pos) throws IOException;
+
+  @Override
+  public void seek(long pos) throws IOException {
+    if (pos > _fileLength) {
+      _fileLength = pos;
+    }
+
+    if (pos >= bufferStart && pos < (bufferStart + bufferLength))
+      bufferPosition = (int)(pos - bufferStart);  // seek within buffer
+    else {
+      flushBufferToCache();
+      bufferStart = pos;
+      bufferPosition = 0;
+      bufferLength = 0;
+      seekInternal(pos);
+    }
+  }
+
+  @Override
+  public long length() throws IOException {
+    return _fileLength;
+  }
+
+  @Override
+  public void writeByte(byte b) throws IOException {
+    if (bufferPosition >= bufferSize) {
+      flushBufferToCache();
+    }
+    if (getFilePointer() >= _fileLength) {
+      _fileLength++;
+    }
+    buffer[bufferPosition++] = b;
+    if (bufferPosition > bufferLength) {
+      bufferLength = bufferPosition;
+    }
+  }
+
+  /** Expert: implements buffer flushing to cache. Writes bytes to the current
+   * position in the output.
+   * @param b the array of bytes to write
+   * @param offset the offset in the array of bytes to write
+   * @param length the number of bytes to write
+   */
+  protected abstract void writeInternal(byte[] b, int offset, int length) throws IOException;
+
+  @Override
+  public void writeBytes(byte[] b, int offset, int length) throws IOException {
+    if (getFilePointer() + length > _fileLength) {
+      _fileLength = getFilePointer() + length;
+    }
+    if(length <= bufferSize - bufferPosition){
+      // the buffer contains enough space to satisfy this request
+      if(length > 0) { // to allow b to be null if len is 0...
+        System.arraycopy(b, offset, buffer, bufferPosition, length);
+      }
+      bufferPosition += length;
+      if (bufferPosition > bufferLength) {
+        bufferLength = bufferPosition;
+      }
+    } else {
+      // the buffer does not have enough space. First buffer all we've got.
+      int available = bufferSize - bufferPosition;
+      if(available > 0){
+        System.arraycopy(b, offset, buffer, bufferPosition, available);
+        offset += available;
+        length -= available;
+        bufferPosition = bufferSize;
+        bufferLength = bufferSize;
+      }
+
+      flushBufferToCache();
+
+      // and now, write the remaining 'length' bytes:
+      if (length < bufferSize){
+        // If the amount left to write is small enough do it in the usual
+        // buffered way:
+        System.arraycopy(b, offset, buffer, 0, length);
+        bufferPosition = length;
+        bufferLength = length;
+      } else {
+        // The amount left to write is larger than the buffer
+        // there's no performance reason not to write it all
+        // at once.
+        writeInternal(b, offset, length);
+        bufferStart += length;
+        bufferPosition = 0;
+        bufferLength = 0;
+      }
+
+    }
+  }
+
+  @Override
+  protected Object clone() throws CloneNotSupportedException {
+    throw new CloneNotSupportedException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8b52ee3e/src/blur-store/src/test/java/org/apache/blur/store/blockcache/BlockCacheTest.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/test/java/org/apache/blur/store/blockcache/BlockCacheTest.java
b/src/blur-store/src/test/java/org/apache/blur/store/blockcache/BlockCacheTest.java
index 7bfb9ec..98b5c74 100644
--- a/src/blur-store/src/test/java/org/apache/blur/store/blockcache/BlockCacheTest.java
+++ b/src/blur-store/src/test/java/org/apache/blur/store/blockcache/BlockCacheTest.java
@@ -59,7 +59,7 @@ public class BlockCacheTest {
 
       byte[] testData = testData(random, blockSize, newData);
       long t1 = System.nanoTime();
-      blockCache.store(blockCacheKey, testData);
+      blockCache.store(blockCacheKey, 0, testData, 0, blockSize);
       storeTime += (System.nanoTime() - t1);
 
       long t3 = System.nanoTime();
@@ -75,6 +75,32 @@ public class BlockCacheTest {
     System.out.println("# of Elements = " + blockCache.getSize());
   }
 
+  /**
+   * Verify checking of buffer size limits against the cached block size.
+   */
+  @Test
+  public void testLongBuffer() {
+    Random random = new Random();
+    int blockSize = BlockCache._8K;
+    int slabSize = blockSize * 1024;
+    long totalMemory = 2 * slabSize;
+
+    BlockCache blockCache = new BlockCache(true, totalMemory, slabSize);
+    BlockCacheKey blockCacheKey = new BlockCacheKey();
+    blockCacheKey.setBlock(0);
+    blockCacheKey.setFile(0);
+    byte[] newData = new byte[blockSize*3];
+    byte[] testData = testData(random, blockSize, newData);
+
+    assertTrue(blockCache.store(blockCacheKey, 0, testData, 0, blockSize));
+    assertTrue(blockCache.store(blockCacheKey, 0, testData, blockSize, blockSize));
+    assertTrue(blockCache.store(blockCacheKey, 0, testData, blockSize*2, blockSize));
+
+    assertTrue(blockCache.store(blockCacheKey, 1, testData, 0, blockSize - 1));
+    assertTrue(blockCache.store(blockCacheKey, 1, testData, blockSize, blockSize - 1));
+    assertTrue(blockCache.store(blockCacheKey, 1, testData, blockSize*2, blockSize - 1));
+  }
+
   private static byte[] testData(Random random, int size, byte[] buf) {
     random.nextBytes(buf);
     return buf;

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8b52ee3e/src/blur-store/src/test/java/org/apache/blur/store/blockcache/BlockDirectoryTest.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/test/java/org/apache/blur/store/blockcache/BlockDirectoryTest.java
b/src/blur-store/src/test/java/org/apache/blur/store/blockcache/BlockDirectoryTest.java
index 03a6458..4dced3b 100644
--- a/src/blur-store/src/test/java/org/apache/blur/store/blockcache/BlockDirectoryTest.java
+++ b/src/blur-store/src/test/java/org/apache/blur/store/blockcache/BlockDirectoryTest.java
@@ -39,16 +39,67 @@ import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
 public class BlockDirectoryTest {
   private static final File TMPDIR = new File(System.getProperty("blur.tmp.dir", "/tmp"));
 
+  private class MapperCache implements Cache {
+    public Map<String, byte[]> map = new ConcurrentLinkedHashMap.Builder<String,
byte[]>().maximumWeightedCapacity(8).build();
+
+    @Override
+    public void update(String name, long blockId, int blockOffset, byte[] buffer, int offset,
int length) {
+      byte[] cached = map.get(name + blockId);
+      if (cached != null) {
+        int newlen = Math.max(cached.length, blockOffset + length);
+        byte[] b = new byte[newlen];
+        System.arraycopy(cached, 0, b, 0, cached.length);
+        System.arraycopy(buffer, offset, b, blockOffset, length);
+        cached = b;
+      } else {
+        cached = copy(blockOffset, buffer, offset, length);
+      }
+      map.put(name + blockId, cached);
+    }
+
+    private byte[] copy(int blockOffset, byte[] buffer, int offset, int length) {
+      byte[] b = new byte[length + blockOffset];
+      System.arraycopy(buffer, offset, b, blockOffset, length);
+      return b;
+    }
+
+    @Override
+    public boolean fetch(String name, long blockId, int blockOffset, byte[] b, int off, int
lengthToReadInBlock) {
+      // return false;
+      byte[] data = map.get(name + blockId);
+      if (data == null) {
+        return false;
+      }
+      System.arraycopy(data, blockOffset, b, off, lengthToReadInBlock);
+      return true;
+    }
+
+    @Override
+    public void delete(String name) {
+
+    }
+
+    @Override
+    public long size() {
+      return map.size();
+    }
+
+    @Override
+    public void renameCacheFile(String source, String dest) {
+    }
+  }
+
   private static final int MAX_NUMBER_OF_WRITES = 10000;
   private static final int MIN_FILE_SIZE = 100;
   private static final int MAX_FILE_SIZE = 100000;
   private static final int MIN_BUFFER_SIZE = 1;
-  private static final int MAX_BUFFER_SIZE = 5000;
+  private static final int MAX_BUFFER_SIZE = 12000;
   private static final int MAX_NUMBER_OF_READS = 20000;
   private Directory directory;
   private File file;
   private long seed;
   private Random random;
+  private MapperCache mapperCache;
 
   @Before
   public void setUp() throws IOException {
@@ -56,49 +107,13 @@ public class BlockDirectoryTest {
     rm(file);
     file.mkdirs();
     FSDirectory dir = FSDirectory.open(new File(file, "base"));
-    directory = new BlockDirectory("test", dir, getBasicCache());
+    mapperCache = new MapperCache();
+    directory = new BlockDirectory("test", dir, mapperCache);
     seed = new Random().nextLong();
+    System.out.println("Seed is " + seed);
     random = new Random(seed);
   }
 
-  private Cache getBasicCache() {
-    return new Cache() {
-      private Map<String, byte[]> map = new ConcurrentLinkedHashMap.Builder<String,
byte[]>().maximumWeightedCapacity(8).build();
-
-      @Override
-      public void update(String name, long blockId, byte[] buffer) {
-        map.put(name + blockId, copy(buffer));
-      }
-
-      private byte[] copy(byte[] buffer) {
-        byte[] b = new byte[buffer.length];
-        System.arraycopy(buffer, 0, b, 0, buffer.length);
-        return b;
-      }
-
-      @Override
-      public boolean fetch(String name, long blockId, int blockOffset, byte[] b, int off,
int lengthToReadInBlock) {
-        // return false;
-        byte[] data = map.get(name + blockId);
-        if (data == null) {
-          return false;
-        }
-        System.arraycopy(data, blockOffset, b, off, lengthToReadInBlock);
-        return true;
-      }
-
-      @Override
-      public void delete(String name) {
-
-      }
-
-      @Override
-      public long size() {
-        return map.size();
-      }
-    };
-  }
-
   @Test
   public void testEOF() throws IOException {
     Directory fsDir = FSDirectory.open(new File(file, "normal"));
@@ -123,6 +138,8 @@ public class BlockDirectoryTest {
 
   @Test
   public void testRandomAccessWrites() throws IOException {
+    long t1 = System.nanoTime();
+
     int i = 0;
     try {
       for (; i < 10; i++) {
@@ -135,6 +152,14 @@ public class BlockDirectoryTest {
       e.printStackTrace();
       fail("Test failed with seed [" + seed + "] on pass [" + i + "]");
     }
+    long t2 = System.nanoTime();
+    System.out.println("Total time is " + ((t2 - t1)/1000000) + "ms");
+  }
+
+  @Test
+  public void testRandomAccessWritesLargeCache() throws IOException {
+    mapperCache.map = new ConcurrentLinkedHashMap.Builder<String, byte[]>().maximumWeightedCapacity(10000).build();
+    testRandomAccessWrites();
   }
 
   private void assertInputsEquals(String name, Directory fsDir, Directory hdfs) throws IOException
{
@@ -167,9 +192,7 @@ public class BlockDirectoryTest {
     int writes = random.nextInt(MAX_NUMBER_OF_WRITES);
     int fileLength = random.nextInt(MAX_FILE_SIZE - MIN_FILE_SIZE) + MIN_FILE_SIZE;
     IndexOutput fsOutput = fsDir.createOutput(name, IOContext.DEFAULT);
-    fsOutput.setLength(fileLength);
     IndexOutput hdfsOutput = hdfs.createOutput(name, IOContext.DEFAULT);
-    hdfsOutput.setLength(fileLength);
     for (int i = 0; i < writes; i++) {
       byte[] buf = new byte[random.nextInt(Math.min(MAX_BUFFER_SIZE - MIN_BUFFER_SIZE, fileLength))
+ MIN_BUFFER_SIZE];
       random.nextBytes(buf);


Mime
View raw message