incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Adding first cut at the block cache version 2. Can be enabling in the blur-site via "blur.shard.experimental.block.cache" property.
Date Tue, 03 Sep 2013 19:34:07 GMT
Updated Branches:
  refs/heads/master bc55d21fb -> 5ea9c30e2


Adding first cut at the block cache version 2.  Can be enabling in the blur-site via "blur.shard.experimental.block.cache" property.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/5ea9c30e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/5ea9c30e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/5ea9c30e

Branch: refs/heads/master
Commit: 5ea9c30e2172921be492bdb10ccd4a802e398a23
Parents: bc55d21
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Tue Sep 3 15:33:07 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Tue Sep 3 15:33:07 2013 -0400

----------------------------------------------------------------------
 .../indexserver/DistributedIndexServer.java     |  15 +-
 .../blur/thrift/ThriftBlurShardServer.java      | 117 +++++------
 .../blur/store/BlockCacheDirectoryFactory.java  |  28 +++
 .../store/BlockCacheDirectoryFactoryV1.java     |  39 ++++
 .../store/BlockCacheDirectoryFactoryV2.java     |  62 ++++++
 .../blur/store/blockcache_v2/BaseCache.java     | 210 +++++++++++++++++++
 .../apache/blur/store/blockcache_v2/Cache.java  | 138 ++++++++++++
 .../store/blockcache_v2/CacheDirectory.java     | 147 +++++++++++++
 .../store/blockcache_v2/CacheIndexInput.java    | 171 +++++++++++++++
 .../store/blockcache_v2/CacheIndexOutput.java   | 130 ++++++++++++
 .../blur/store/blockcache_v2/CacheKey.java      |  89 ++++++++
 .../blur/store/blockcache_v2/CacheValue.java    |  93 ++++++++
 .../store/blockcache_v2/FileNameBlockSize.java  |  24 +++
 .../store/blockcache_v2/FileNameFilter.java     |  24 +++
 .../cachevalue/BaseCacheValue.java              |  83 ++++++++
 .../cachevalue/ByteArrayCacheValue.java         |  55 +++++
 .../cachevalue/UnsafeCacheValue.java            |  99 +++++++++
 .../blockcache_v2/CacheIndexInputTest.java      | 206 ++++++++++++++++++
 .../blockcache_v2/CacheIndexOutputTest.java     | 101 +++++++++
 .../cachevalue/ByteArrayCacheValueTest.java     |  68 ++++++
 .../cachevalue/UnsafeCacheValueTest.java        |  68 ++++++
 .../org/apache/blur/utils/BlurConstants.java    |   1 +
 .../src/main/resources/blur-default.properties  |   3 +
 docs/cluster-setup.html                         |   3 +
 24 files changed, 1907 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5ea9c30e/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
index 6b7e942..ff3ce79 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
@@ -63,8 +63,7 @@ import org.apache.blur.metrics.AtomicLongGauge;
 import org.apache.blur.server.IndexSearcherClosable;
 import org.apache.blur.server.ShardContext;
 import org.apache.blur.server.TableContext;
-import org.apache.blur.store.blockcache.BlockDirectory;
-import org.apache.blur.store.blockcache.Cache;
+import org.apache.blur.store.BlockCacheDirectoryFactory;
 import org.apache.blur.store.hdfs.BlurLockFactory;
 import org.apache.blur.store.hdfs.HdfsDirectory;
 import org.apache.blur.thrift.generated.ShardState;
@@ -107,7 +106,7 @@ public class DistributedIndexServer extends AbstractIndexServer {
   private Configuration _configuration;
   private String _nodeName;
   private int _shardOpenerThreadCount;
-  private Cache _cache;
+  private BlockCacheDirectoryFactory _blockCacheDirectoryFactory;
   private ZooKeeper _zookeeper;
   private String _cluster;
 
@@ -448,7 +447,7 @@ public class DistributedIndexServer extends AbstractIndexServer {
     boolean blockCacheEnabled = _clusterStatus.isBlockCacheEnabled(_cluster, table);
     if (blockCacheEnabled) {
       Set<String> blockCacheFileTypes = _clusterStatus.getBlockCacheFileTypes(_cluster, table);
-      dir = new BlockDirectory(table + "_" + shard, directory, _cache, blockCacheFileTypes);
+      dir = _blockCacheDirectoryFactory.newDirectory(table + "_" + shard, directory, blockCacheFileTypes);
     } else {
       dir = directory;
     }
@@ -717,10 +716,6 @@ public class DistributedIndexServer extends AbstractIndexServer {
     _shardOpenerThreadCount = shardOpenerThreadCount;
   }
 
-  public void setCache(Cache cache) {
-    _cache = cache;
-  }
-
   public void setZookeeper(ZooKeeper zookeeper) {
     _zookeeper = zookeeper;
   }
@@ -752,4 +747,8 @@ public class DistributedIndexServer extends AbstractIndexServer {
   public static AtomicLong getPauseWarmup() {
     return _pauseWarmup;
   }
+  
+  public void setBlockCacheDirectoryFactory(BlockCacheDirectoryFactory blockCacheDirectoryFactory) {
+    _blockCacheDirectoryFactory = blockCacheDirectoryFactory;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5ea9c30e/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
index aeead2f..9d634ab 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
@@ -16,20 +16,25 @@ package org.apache.blur.thrift;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import static org.apache.blur.utils.BlurConstants.*;
+import static org.apache.blur.utils.BlurConstants.BLUR_CLUSTER;
 import static org.apache.blur.utils.BlurConstants.BLUR_CLUSTER_NAME;
 import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_BIND_PORT;
 import static org.apache.blur.utils.BlurConstants.BLUR_GUI_CONTROLLER_PORT;
 import static org.apache.blur.utils.BlurConstants.BLUR_GUI_SHARD_PORT;
 import static org.apache.blur.utils.BlurConstants.BLUR_INDEXMANAGER_SEARCH_THREAD_COUNT;
 import static org.apache.blur.utils.BlurConstants.BLUR_MAX_CLAUSE_COUNT;
+import static org.apache.blur.utils.BlurConstants.BLUR_MAX_HEAP_PER_ROW_FETCH;
+import static org.apache.blur.utils.BlurConstants.BLUR_MAX_RECORDS_PER_ROW_FETCH_REQUEST;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BIND_ADDRESS;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BIND_PORT;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BLOCKCACHE_DIRECT_MEMORY_ALLOCATION;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BLOCKCACHE_SLAB_COUNT;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_EXPERIMENTAL_BLOCK_CACHE;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_FETCHCOUNT;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_FILTER_CACHE_CLASS;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_HOSTNAME;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_INDEX_WARMUP_CLASS;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_INDEX_WARMUP_THROTTLE;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_INTERNAL_SEARCH_THREAD_COUNT;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_OPENER_THREAD_COUNT;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_SAFEMODEDELAY;
@@ -40,8 +45,6 @@ import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_TIMEOUT;
 import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_TIMEOUT_DEFAULT;
 import static org.apache.blur.utils.BlurUtil.quietClose;
 
-import java.lang.management.ManagementFactory;
-import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.blur.BlurConfiguration;
@@ -65,6 +68,9 @@ import org.apache.blur.manager.writer.BlurIndexRefresher;
 import org.apache.blur.metrics.JSONReporter;
 import org.apache.blur.metrics.ReporterSetup;
 import org.apache.blur.server.ShardServerEventHandler;
+import org.apache.blur.store.BlockCacheDirectoryFactory;
+import org.apache.blur.store.BlockCacheDirectoryFactoryV1;
+import org.apache.blur.store.BlockCacheDirectoryFactoryV2;
 import org.apache.blur.store.blockcache.BlockCache;
 import org.apache.blur.store.blockcache.BlockDirectory;
 import org.apache.blur.store.blockcache.BlockDirectoryCache;
@@ -83,10 +89,13 @@ import org.apache.zookeeper.ZooKeeper;
 import org.mortbay.jetty.servlet.ServletHolder;
 import org.mortbay.jetty.webapp.WebAppContext;
 
+import sun.misc.VM;
+
 public class ThriftBlurShardServer extends ThriftServer {
 
   private static final Log LOG = LogFactory.getLog(ThriftBlurShardServer.class);
   private static final boolean enableJsonReporter = false;
+  private static final long _64MB = 0;
 
   public static void main(String[] args) throws Exception {
     int serverIndex = getServerIndex(args);
@@ -97,21 +106,13 @@ public class ThriftBlurShardServer extends ThriftServer {
     ReporterSetup.setupReporters(configuration);
     MemoryReporter.enable();
     setupJvmMetrics();
-    //make this configurable
+    // make this configurable
     GCWatcher.init(0.75);
     ThriftServer server = createServer(serverIndex, configuration);
     server.start();
   }
 
   public static ThriftServer createServer(int serverIndex, BlurConfiguration configuration) throws Exception {
-    // setup block cache
-    // 134,217,728 is the slab size, therefore there are 16,384 blocks
-    // in a slab when using a block size of 8,192
-    int numberOfBlocksPerSlab = 16384;
-    int blockSize = BlockDirectory.BLOCK_SIZE;
-    int slabCount = configuration.getInt(BLUR_SHARD_BLOCKCACHE_SLAB_COUNT, -1);
-    slabCount = getSlabCount(slabCount, numberOfBlocksPerSlab, blockSize);
-    Cache cache;
     Configuration config = new Configuration();
 
     String bindAddress = configuration.get(BLUR_SHARD_BIND_ADDRESS);
@@ -132,34 +133,50 @@ public class ThriftBlurShardServer extends ThriftServer {
       httpServer = null;
     }
 
-    if (slabCount >= 1) {
-      BlockCache blockCache;
-      boolean directAllocation = configuration.getBoolean(BLUR_SHARD_BLOCKCACHE_DIRECT_MEMORY_ALLOCATION, true);
-
-      int slabSize = numberOfBlocksPerSlab * blockSize;
-      LOG.info("Number of slabs of block cache [{0}] with direct memory allocation set to [{1}]", slabCount,
-          directAllocation);
-      LOG.info("Block cache target memory usage, slab size of [{0}] will allocate [{1}] slabs and use ~[{2}] bytes",
-          slabSize, slabCount, ((long) slabCount * (long) slabSize));
-
-      int _1024Size = configuration.getInt("blur.shard.buffercache.1024", 8192);
-      int _8192Size = configuration.getInt("blur.shard.buffercache.8192", 8192);
-      BufferStore.init(_1024Size, _8192Size);
+    BlockCacheDirectoryFactory blockCacheDirectoryFactory;
+    boolean experimentalBlockCache = configuration.getBoolean(BLUR_SHARD_EXPERIMENTAL_BLOCK_CACHE, false);
+    if (!experimentalBlockCache) {
+      // setup block cache
+      // 134,217,728 is the slab size, therefore there are 16,384 blocks
+      // in a slab when using a block size of 8,192
+      int numberOfBlocksPerSlab = 16384;
+      int blockSize = BlockDirectory.BLOCK_SIZE;
+      int slabCount = configuration.getInt(BLUR_SHARD_BLOCKCACHE_SLAB_COUNT, -1);
+      slabCount = getSlabCount(slabCount, numberOfBlocksPerSlab, blockSize);
+      Cache cache;
+      if (slabCount >= 1) {
+        BlockCache blockCache;
+        boolean directAllocation = configuration.getBoolean(BLUR_SHARD_BLOCKCACHE_DIRECT_MEMORY_ALLOCATION, true);
+
+        int slabSize = numberOfBlocksPerSlab * blockSize;
+        LOG.info("Number of slabs of block cache [{0}] with direct memory allocation set to [{1}]", slabCount,
+            directAllocation);
+        LOG.info("Block cache target memory usage, slab size of [{0}] will allocate [{1}] slabs and use ~[{2}] bytes",
+            slabSize, slabCount, ((long) slabCount * (long) slabSize));
+
+        int _1024Size = configuration.getInt("blur.shard.buffercache.1024", 8192);
+        int _8192Size = configuration.getInt("blur.shard.buffercache.8192", 8192);
+        BufferStore.init(_1024Size, _8192Size);
 
-      try {
-        long totalMemory = (long) slabCount * (long) numberOfBlocksPerSlab * (long) blockSize;
-        blockCache = new BlockCache(directAllocation, totalMemory, slabSize);
-      } catch (OutOfMemoryError e) {
-        if ("Direct buffer memory".equals(e.getMessage())) {
-          System.err
-              .println("The max direct memory is too low.  Either increase by setting (-XX:MaxDirectMemorySize=<size>g -XX:+UseLargePages) or disable direct allocation by (blur.shard.blockcache.direct.memory.allocation=false) in blur-site.properties");
-          System.exit(1);
+        try {
+          long totalMemory = (long) slabCount * (long) numberOfBlocksPerSlab * (long) blockSize;
+          blockCache = new BlockCache(directAllocation, totalMemory, slabSize);
+        } catch (OutOfMemoryError e) {
+          if ("Direct buffer memory".equals(e.getMessage())) {
+            System.err
+                .println("The max direct memory is too low.  Either increase by setting (-XX:MaxDirectMemorySize=<size>g -XX:+UseLargePages) or disable direct allocation by (blur.shard.blockcache.direct.memory.allocation=false) in blur-site.properties");
+            System.exit(1);
+          }
+          throw e;
         }
-        throw e;
+        cache = new BlockDirectoryCache(blockCache);
+      } else {
+        cache = BlockDirectory.NO_CACHE;
       }
-      cache = new BlockDirectoryCache(blockCache);
+      blockCacheDirectoryFactory = new BlockCacheDirectoryFactoryV1(cache);
     } else {
-      cache = BlockDirectory.NO_CACHE;
+      long totalNumberOfBytes = VM.maxDirectMemory() - _64MB;
+      blockCacheDirectoryFactory = new BlockCacheDirectoryFactoryV2(configuration, totalNumberOfBytes);
     }
 
     LOG.info("Shard Server using index [{0}] bind address [{1}]", serverIndex, bindAddress + ":" + bindPort);
@@ -185,7 +202,7 @@ public class ThriftBlurShardServer extends ThriftServer {
     BlurIndexWarmup indexWarmup = getIndexWarmup(configuration);
 
     final DistributedIndexServer indexServer = new DistributedIndexServer();
-    indexServer.setCache(cache);
+    indexServer.setBlockCacheDirectoryFactory(blockCacheDirectoryFactory);
     indexServer.setClusterStatus(clusterStatus);
     indexServer.setClusterName(configuration.get(BLUR_CLUSTER_NAME, BLUR_CLUSTER));
     indexServer.setConfiguration(config);
@@ -260,33 +277,15 @@ public class ThriftBlurShardServer extends ThriftServer {
   private static int getSlabCount(int slabCount, int numberOfBlocksPerSlab, int blockSize) {
     if (slabCount < 0) {
       long slabSize = numberOfBlocksPerSlab * blockSize;
-      List<String> inputArguments = ManagementFactory.getRuntimeMXBean().getInputArguments();
-      for (String arg : inputArguments) {
-        if (arg.startsWith("-XX:MaxDirectMemorySize")) {
-          long maxDirectMemorySize = getMaxDirectMemorySize(arg);
-          maxDirectMemorySize -= 64 * 1024 * 1024;
-          return (int) (maxDirectMemorySize / slabSize);
-        }
+      long maxDirectMemorySize = VM.maxDirectMemory() - _64MB;
+      if (maxDirectMemorySize < slabSize) {
+        throw new RuntimeException("Auto slab setup cannot happen, JVM option -XX:MaxDirectMemorySize not set.");
       }
-      throw new RuntimeException("Auto slab setup cannot happen, JVM option -XX:MaxDirectMemorySize not set.");
+      return (int) (maxDirectMemorySize / slabSize);
     }
     return slabCount;
   }
 
-  private static long getMaxDirectMemorySize(String arg) {
-    int index = arg.lastIndexOf('=');
-    return parseNumber(arg.substring(index + 1).toLowerCase().replace(" ", ""));
-  }
-
-  private static long parseNumber(String number) {
-    if (number.endsWith("m")) {
-      return Long.parseLong(number.substring(0, number.length() - 1)) * 1024 * 1024;
-    } else if (number.endsWith("g")) {
-      return Long.parseLong(number.substring(0, number.length() - 1)) * 1024 * 1024 * 1024;
-    }
-    throw new RuntimeException("Cannot parse [" + number + "]");
-  }
-
   private static BlurFilterCache getFilterCache(BlurConfiguration configuration) {
     String _blurFilterCacheClass = configuration.get(BLUR_SHARD_FILTER_CACHE_CLASS);
     if (_blurFilterCacheClass != null) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5ea9c30e/blur-store/src/main/java/org/apache/blur/store/BlockCacheDirectoryFactory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/BlockCacheDirectoryFactory.java b/blur-store/src/main/java/org/apache/blur/store/BlockCacheDirectoryFactory.java
new file mode 100644
index 0000000..a8d31ab
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/BlockCacheDirectoryFactory.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.lucene.store.Directory;
+
+public interface BlockCacheDirectoryFactory {
+
+  Directory newDirectory(String name, Directory directory, Set<String> blockCacheFileTypes) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5ea9c30e/blur-store/src/main/java/org/apache/blur/store/BlockCacheDirectoryFactoryV1.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/BlockCacheDirectoryFactoryV1.java b/blur-store/src/main/java/org/apache/blur/store/BlockCacheDirectoryFactoryV1.java
new file mode 100644
index 0000000..182b494
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/BlockCacheDirectoryFactoryV1.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.blur.store.blockcache.BlockDirectory;
+import org.apache.blur.store.blockcache.Cache;
+import org.apache.lucene.store.Directory;
+
+public class BlockCacheDirectoryFactoryV1 implements BlockCacheDirectoryFactory {
+
+  private final Cache _cache;
+  
+  public BlockCacheDirectoryFactoryV1(Cache cache) {
+    _cache = cache;
+  }
+
+  @Override
+  public Directory newDirectory(String name, Directory directory, Set<String> blockCacheFileTypes) throws IOException {
+    return new BlockDirectory(name, directory, _cache, blockCacheFileTypes);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5ea9c30e/blur-store/src/main/java/org/apache/blur/store/BlockCacheDirectoryFactoryV2.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/BlockCacheDirectoryFactoryV2.java b/blur-store/src/main/java/org/apache/blur/store/BlockCacheDirectoryFactoryV2.java
new file mode 100644
index 0000000..0f20900
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/BlockCacheDirectoryFactoryV2.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.blur.BlurConfiguration;
+import org.apache.blur.store.blockcache_v2.BaseCache;
+import org.apache.blur.store.blockcache_v2.BaseCache.STORE;
+import org.apache.blur.store.blockcache_v2.Cache;
+import org.apache.blur.store.blockcache_v2.CacheDirectory;
+import org.apache.blur.store.blockcache_v2.FileNameBlockSize;
+import org.apache.blur.store.blockcache_v2.FileNameFilter;
+import org.apache.lucene.store.Directory;
+
+public class BlockCacheDirectoryFactoryV2 implements BlockCacheDirectoryFactory {
+
+  private Cache _cache;
+
+  public BlockCacheDirectoryFactoryV2(BlurConfiguration configuration, long totalNumberOfBytes) {
+    int fileBufferSize = 8192;
+    FileNameBlockSize fileNameBlockSize = new FileNameBlockSize() {
+      @Override
+      public int getBlockSize(String directoryName, String fileName) {
+        return 8192;
+      }
+    };
+    FileNameFilter readFilter = new FileNameFilter() {
+      @Override
+      public boolean accept(String directoryName, String fileName) {
+        if (fileName.endsWith(".fdt") || fileName.endsWith(".fdx")) {
+          return true;
+        }
+        return true;
+      }
+    };
+    FileNameFilter writeFilter = readFilter;
+    _cache = new BaseCache(totalNumberOfBytes, fileBufferSize, fileNameBlockSize, readFilter, writeFilter,
+        STORE.OFF_HEAP);
+  }
+
+  @Override
+  public Directory newDirectory(String name, Directory directory, Set<String> blockCacheFileTypes) throws IOException {
+    return new CacheDirectory(name, directory, _cache);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5ea9c30e/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/BaseCache.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/BaseCache.java b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/BaseCache.java
new file mode 100644
index 0000000..1ec2796
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/BaseCache.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store.blockcache_v2;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.blur.store.blockcache_v2.cachevalue.ByteArrayCacheValue;
+import org.apache.blur.store.blockcache_v2.cachevalue.UnsafeCacheValue;
+import org.apache.lucene.store.IOContext;
+
+import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
+import com.googlecode.concurrentlinkedhashmap.EvictionListener;
+import com.googlecode.concurrentlinkedhashmap.Weigher;
+
+public class BaseCache implements Cache {
+  public enum STORE {
+    ON_HEAP, OFF_HEAP
+  }
+
+  class BaseCacheEvictionListener implements EvictionListener<CacheKey, CacheValue> {
+    @Override
+    public void onEviction(CacheKey key, CacheValue value) {
+      addToReleaseQueue(value);
+    }
+  }
+
+  class BaseCacheWeigher implements Weigher<CacheValue> {
+    @Override
+    public int weightOf(CacheValue value) {
+      return value.size();
+    }
+  }
+
+  private final ConcurrentLinkedHashMap<CacheKey, CacheValue> _cacheMap;
+  private final int _fileBufferSize;
+  private final FileNameFilter _readFilter;
+  private final FileNameFilter _writeFilter;
+  private final STORE _store;
+  private final FileNameBlockSize _fileNameBlockSize;
+  private final Map<FileIdKey, Long> _fileNameToId = new ConcurrentHashMap<FileIdKey, Long>();
+  private final AtomicLong _fileId = new AtomicLong();
+
+  public BaseCache(long totalNumberOfBytes, int fileBufferSize, FileNameBlockSize fileNameBlockSize,
+      FileNameFilter readFilter, FileNameFilter writeFilter, STORE store) {
+    _cacheMap = new ConcurrentLinkedHashMap.Builder<CacheKey, CacheValue>().weigher(new BaseCacheWeigher())
+        .maximumWeightedCapacity(totalNumberOfBytes).listener(new BaseCacheEvictionListener()).build();
+    _fileBufferSize = fileBufferSize;
+    _readFilter = readFilter;
+    _writeFilter = writeFilter;
+    _store = store;
+    _fileNameBlockSize = fileNameBlockSize;
+  }
+
+  private void addToReleaseQueue(CacheValue value) {
+    if (value != null) {
+      if (value.refCount() == 0) {
+        value.release();
+        return;
+      }
+      // @TODO create clean up queue
+    }
+  }
+
+  @Override
+  public CacheValue newInstance(CacheDirectory directory, String fileName) {
+    switch (_store) {
+    case ON_HEAP:
+      return new ByteArrayCacheValue(getCacheBlockSize(directory, fileName));
+    case OFF_HEAP:
+      return new UnsafeCacheValue(getCacheBlockSize(directory, fileName));
+    default:
+      throw new RuntimeException("Unknown type [" + _store + "]");
+    }
+  }
+
+  @Override
+  public long getFileId(CacheDirectory directory, String fileName) throws IOException {
+    FileIdKey cachedFileName = getCacheFileName(directory, fileName);
+    Long id = _fileNameToId.get(cachedFileName);
+    if (id != null) {
+      return id;
+    }
+    long newId = _fileId.incrementAndGet();
+    _fileNameToId.put(cachedFileName, id);
+    return newId;
+  }
+
+  @Override
+  public void removeFile(CacheDirectory directory, String fileName) throws IOException {
+    FileIdKey cachedFileName = getCacheFileName(directory, fileName);
+    _fileNameToId.remove(cachedFileName);
+  }
+
+  private FileIdKey getCacheFileName(CacheDirectory directory, String fileName) throws IOException {
+    long fileModified = directory.getFileModified(fileName);
+    return new FileIdKey(directory.getDirectoryName(), fileName, fileModified);
+  }
+
+  @Override
+  public int getCacheBlockSize(CacheDirectory directory, String fileName) {
+    return _fileNameBlockSize.getBlockSize(directory.getDirectoryName(), fileName);
+  }
+
+  @Override
+  public int getFileBufferSize(CacheDirectory directory, String fileName) {
+    return _fileBufferSize;
+  }
+
+  @Override
+  public boolean cacheFileForReading(CacheDirectory directory, String fileName, IOContext context) {
+    return _readFilter.accept(directory.getDirectoryName(), fileName);
+  }
+
+  @Override
+  public boolean cacheFileForWriting(CacheDirectory directory, String fileName, IOContext context) {
+    return _writeFilter.accept(directory.getDirectoryName(), fileName);
+  }
+
+  @Override
+  public CacheValue get(CacheKey key) {
+    return _cacheMap.get(key);
+  }
+
+  @Override
+  public void put(CacheKey key, CacheValue value) {
+    addToReleaseQueue(_cacheMap.put(key, value));
+  }
+
+  @Override
+  public void releaseDirectory(String directoryName) {
+    Set<Entry<FileIdKey, Long>> entrySet = _fileNameToId.entrySet();
+    Iterator<Entry<FileIdKey, Long>> iterator = entrySet.iterator();
+    while (iterator.hasNext()) {
+      Entry<FileIdKey, Long> entry = iterator.next();
+      FileIdKey fileIdKey = entry.getKey();
+      if (fileIdKey._directoryName.equals(directoryName)) {
+        iterator.remove();
+      }
+    }
+  }
+
+  static class FileIdKey {
+    final String _directoryName;
+    final String _fileName;
+    final long _lastModified;
+
+    FileIdKey(String directoryName, String fileName, long lastModified) {
+      _directoryName = directoryName;
+      _fileName = fileName;
+      _lastModified = lastModified;
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((_directoryName == null) ? 0 : _directoryName.hashCode());
+      result = prime * result + ((_fileName == null) ? 0 : _fileName.hashCode());
+      result = prime * result + (int) (_lastModified ^ (_lastModified >>> 32));
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj)
+        return true;
+      if (obj == null)
+        return false;
+      if (getClass() != obj.getClass())
+        return false;
+      FileIdKey other = (FileIdKey) obj;
+      if (_directoryName == null) {
+        if (other._directoryName != null)
+          return false;
+      } else if (!_directoryName.equals(other._directoryName))
+        return false;
+      if (_fileName == null) {
+        if (other._fileName != null)
+          return false;
+      } else if (!_fileName.equals(other._fileName))
+        return false;
+      if (_lastModified != other._lastModified)
+        return false;
+      return true;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5ea9c30e/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/Cache.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/Cache.java b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/Cache.java
new file mode 100644
index 0000000..66e60f7
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/Cache.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store.blockcache_v2;
+
+import java.io.IOException;
+
+import org.apache.lucene.store.IOContext;
+
+public interface Cache {
+
+  /**
+   * Creates a new instance of CacheValue, the cache capacity should be used for
+   * the given file.
+   * 
+   * @param directory
+   *          the directory.
+   * @param fileName
+   *          the file name.
+   * @return the new CacheValue instance.
+   */
+  CacheValue newInstance(CacheDirectory directory, String fileName);
+
+  /**
+   * Gets unique id for the given file. This is assumed to be unique even if the
+   * file is deleted and recreated.
+   * 
+   * @param directory
+   *          the directory.
+   * @param fileName
+   *          the file name.
+   * @return the file id.
+   * @throws IOException
+   */
+  long getFileId(CacheDirectory directory, String fileName) throws IOException;
+
+  /**
+   * Get capacity of each cache entry for the given file.
+   * 
+   * @param directory
+   *          the directory.
+   * @param fileName
+   *          the file name.
+   * @return the capacity.
+   */
+  int getCacheBlockSize(CacheDirectory directory, String fileName);
+
+  /**
+   * Gets buffer size of the buffer used while interacting with the underlying
+   * directory.
+   * 
+   * @param directory
+   *          the directory.
+   * @param fileName
+   *          the file name.
+   * @return the buffer size.
+   */
+  int getFileBufferSize(CacheDirectory directory, String fileName);
+
+  /**
+   * Checks whether file should be cached or not during reading.
+   * 
+   * @param directory
+   *          the directory.
+   * @param fileName
+   *          the file name.
+   * @param context
+   *          the IOContext from Lucene.
+   * @return boolean.
+   */
+  boolean cacheFileForReading(CacheDirectory directory, String fileName, IOContext context);
+
+  /**
+   * Checks whether file should be cached or not during writing.
+   * 
+   * @param directory
+   *          the directory.
+   * @param fileName
+   *          the file name.
+   * @param context
+   *          the IOContext from Lucene.
+   * @return boolean.
+   */
+  boolean cacheFileForWriting(CacheDirectory directory, String fileName, IOContext context);
+
+  /**
+   * Gets the cache value for the given key. Null if missing.
+   * 
+   * @param key
+   *          the key.
+   * @return the cache value or null.
+   */
+  CacheValue get(CacheKey key);
+
+  /**
+   * Puts the cache entry into the cache.
+   * 
+   * @param key
+   *          the key.
+   * @param value
+   *          the value.
+   */
+  void put(CacheKey key, CacheValue value);
+
+  /**
+   * Removes the file from the cache.
+   * 
+   * @param directory
+   *          the directory.
+   * @param fileName
+   *          the file name.
+   * @throws IOException
+   */
+  void removeFile(CacheDirectory directory, String fileName) throws IOException;
+
+  /**
+   * This is called when the CacheDirectory is finalized.
+   * 
+   * @param directoryName
+   *          the directory name.
+   */
+  void releaseDirectory(String directoryName);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5ea9c30e/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheDirectory.java b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheDirectory.java
new file mode 100644
index 0000000..73cc147
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheDirectory.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store.blockcache_v2;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.blur.store.blockcache.LastModified;
+import org.apache.blur.store.hdfs.DirectoryDecorator;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.Lock;
+import org.apache.lucene.store.LockFactory;
+
+public class CacheDirectory extends Directory implements DirectoryDecorator, LastModified {
+
+  private final Directory _internal;
+  private final String _directoryName;
+  private final Cache _cache;
+
+  public CacheDirectory(String directoryName, Directory directory, Cache cache) {
+    if (!(directory instanceof LastModified)) {
+      throw new RuntimeException("Directory [" + directory + "] does not implement '" + LastModified.class.toString()
+          + "'");
+    }
+    _directoryName = notNull(directoryName);
+    _internal = notNull(directory);
+    _cache = notNull(cache);
+  }
+
+  @Override
+  protected void finalize() throws Throwable {
+    _cache.releaseDirectory(getDirectoryName());
+  }
+
+  public IndexInput openInput(String name, IOContext context) throws IOException {
+    IndexInput indexInput = _internal.openInput(name, context);
+    if (_cache.cacheFileForReading(this, name, context)) {
+      return new CacheIndexInput(this, name, indexInput, _cache);
+    }
+    return indexInput;
+  }
+
+  public IndexOutput createOutput(String name, IOContext context) throws IOException {
+    IndexOutput indexOutput = _internal.createOutput(name, context);
+    if (_cache.cacheFileForWriting(this, name, context)) {
+      return new CacheIndexOutput(this, name, indexOutput, _cache);
+    }
+    return indexOutput;
+  }
+
+  public void deleteFile(String name) throws IOException {
+    _cache.removeFile(this, name);
+    _internal.deleteFile(name);
+  }
+
+  public String[] listAll() throws IOException {
+    return _internal.listAll();
+  }
+
+  public boolean fileExists(String name) throws IOException {
+    return _internal.fileExists(name);
+  }
+
+  public long fileLength(String name) throws IOException {
+    return _internal.fileLength(name);
+  }
+
+  public void sync(Collection<String> names) throws IOException {
+    _internal.sync(names);
+  }
+
+  public Lock makeLock(String name) {
+    return _internal.makeLock(name);
+  }
+
+  public void clearLock(String name) throws IOException {
+    _internal.clearLock(name);
+  }
+
+  public void close() throws IOException {
+    _internal.close();
+  }
+
+  public void setLockFactory(LockFactory lockFactory) throws IOException {
+    _internal.setLockFactory(lockFactory);
+  }
+
+  public LockFactory getLockFactory() {
+    return _internal.getLockFactory();
+  }
+
+  public String getLockID() {
+    return _internal.getLockID();
+  }
+
+  public String toString() {
+    return _internal.toString();
+  }
+
+  public void copy(Directory to, String src, String dest, IOContext context) throws IOException {
+    _internal.copy(to, src, dest, context);
+  }
+
+  public IndexInputSlicer createSlicer(String name, IOContext context) throws IOException {
+    return _internal.createSlicer(name, context);
+  }
+
+  public String getDirectoryName() {
+    return _directoryName;
+  }
+
+  @Override
+  public long getFileModified(String name) throws IOException {
+    return ((LastModified) _internal).getFileModified(name);
+  }
+
+  @Override
+  public Directory getOriginalDirectory() {
+    return _internal;
+  }
+  
+  private static <T> T notNull(T t) {
+    if (t == null) {
+      throw new IllegalArgumentException("Cannot be null");
+    }
+    return t;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5ea9c30e/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexInput.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexInput.java b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexInput.java
new file mode 100644
index 0000000..5279292
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexInput.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store.blockcache_v2;
+
+import java.io.IOException;
+
+import org.apache.blur.store.buffer.BufferStore;
+import org.apache.lucene.store.IndexInput;
+
+public class CacheIndexInput extends IndexInput {
+
+  private final long _fileLength;
+  private final long _fileId;
+  private final int _cacheBlockSize;
+  private final int _bufferSize;
+  private final CacheDirectory _directory;
+  private final String _fileName;
+  private final IndexInput _indexInput;
+  private final Cache _cache;
+  private final CacheKey _key = new CacheKey();
+
+  private CacheValue _cacheValue;
+  private long _position;
+  private int _blockPosition;
+
+  public CacheIndexInput(CacheDirectory directory, String fileName, IndexInput indexInput, Cache cache) throws IOException {
+    super(fileName);
+    _directory = directory;
+    _fileName = fileName;
+    _indexInput = indexInput;
+    _fileLength = indexInput.length();
+    _cache = cache;
+
+    _fileId = _cache.getFileId(_directory, _fileName);
+    _cacheBlockSize = _cache.getCacheBlockSize(_directory, _fileName);
+    _bufferSize = _cache.getFileBufferSize(_directory, _fileName);
+    _key.setFileId(_fileId);
+  }
+
+  @Override
+  public byte readByte() throws IOException {
+    tryToFill();
+    byte b = _cacheValue.read(_blockPosition);
+    _position++;
+    _blockPosition++;
+    return b;
+  }
+
+  @Override
+  public void readBytes(byte[] b, int offset, int len) throws IOException {
+    while (len > 0) {
+      tryToFill();
+      int remaining = remaining();
+      int length = Math.min(len, remaining);
+      _cacheValue.read(_blockPosition, b, offset, length);
+      offset += length;
+      len -= length;
+      _position += length;
+      _blockPosition += length;
+    }
+  }
+
+  private int remaining() {
+    return _cacheValue.length() - _blockPosition;
+  }
+
+  private void tryToFill() throws IOException {
+    if (_cacheValue == null) {
+      fill();
+    } else if (remaining() == 0) {
+      releaseCache();
+      fill();
+    } else {
+      return;
+    }
+  }
+
+  private void releaseCache() {
+    if (_cacheValue != null) {
+      _cacheValue.decRef();
+      _cacheValue = null;
+    }
+  }
+
+  private void fill() throws IOException {
+    _key.setBlockId(getBlockId());
+    _cacheValue = _cache.get(_key);
+    if (_cacheValue == null) {
+      _cacheValue = _cache.newInstance(_directory, _fileName);
+      long filePosition = getFilePosition();
+      _indexInput.seek(filePosition);
+      byte[] buffer = BufferStore.takeBuffer(_bufferSize);
+      int len = (int) Math.min(_cacheBlockSize, _fileLength - filePosition);
+      int cachePosition = 0;
+      while (len > 0) {
+        int length = Math.min(_bufferSize, len);
+        _indexInput.readBytes(buffer, 0, length);
+        _cacheValue.write(cachePosition, buffer, 0, length);
+        len -= length;
+        cachePosition += length;
+      }
+    }
+    _cache.put(_key.clone(), _cacheValue);
+    _cacheValue.incRef();
+    _blockPosition = getBlockPosition();
+  }
+
+  private int getBlockPosition() {
+    return (int) (_position % _cacheBlockSize);
+  }
+
+  private long getFilePosition() {
+    // make this a mask...?
+    return getBlockId() * _cacheBlockSize;
+  }
+
+  private long getBlockId(long pos) {
+    return pos / _cacheBlockSize;
+  }
+
+  private long getBlockId() {
+    return _position / _cacheBlockSize;
+  }
+
+  @Override
+  public void close() throws IOException {
+    _indexInput.close();
+  }
+
+  @Override
+  public long getFilePointer() {
+    return _position;
+  }
+
+  @Override
+  public void seek(long pos) throws IOException {
+    if (_position >= _fileLength) {
+      throw new IOException("Can not seek past end of file [" + pos + "] filelength [" + _fileLength + "]");
+    }
+    long oldBlockId = getBlockId();
+    _position = pos;
+    long newBlockId = getBlockId(pos);
+    if (newBlockId == oldBlockId) {
+      // need to set new block position
+      _blockPosition = getBlockPosition();
+    } else {
+      releaseCache();
+    }
+  }
+
+  @Override
+  public long length() {
+    return _fileLength;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5ea9c30e/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexOutput.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexOutput.java b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexOutput.java
new file mode 100644
index 0000000..dbb346c
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexOutput.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store.blockcache_v2;
+
+import java.io.IOException;
+
+import org.apache.blur.store.buffer.BufferStore;
+import org.apache.lucene.store.IndexOutput;
+
+public class CacheIndexOutput extends IndexOutput {
+
+  private final IndexOutput _indexOutput;
+  private final int _fileBufferSize;
+  private final Cache _cache;
+  private final String _fileName;
+  private final CacheDirectory _directory;
+  private final long _fileId;
+
+  private long _position;
+  private byte[] _buffer;
+  private int _bufferPosition;
+  private int _lastWrittenPosition;
+  private int _cacheBlockSize;
+  private CacheValue _cacheValue;
+
+  public CacheIndexOutput(CacheDirectory directory, String fileName, IndexOutput indexOutput, Cache cache) throws IOException {
+    _cache = cache;
+    _directory = directory;
+    _fileName = fileName;
+    _fileBufferSize = _cache.getFileBufferSize(_directory, _fileName);
+    _cacheBlockSize = _cache.getCacheBlockSize(_directory, _fileName);
+    _fileId = _cache.getFileId(_directory, _fileName);
+    _indexOutput = indexOutput;
+    _buffer = BufferStore.takeBuffer(_cacheBlockSize);
+  }
+
+  @Override
+  public void writeByte(byte b) throws IOException {
+    tryToFlush();
+    _buffer[_bufferPosition] = b;
+    _bufferPosition++;
+    _position++;
+  }
+
+  @Override
+  public void writeBytes(byte[] b, int offset, int len) throws IOException {
+    while (len > 0) {
+      tryToFlush();
+      int remaining = remaining();
+      int length = Math.min(len, remaining);
+      System.arraycopy(b, offset, _buffer, _bufferPosition, length);
+      _bufferPosition += length;
+      _position += length;
+      len -= length;
+      offset += length;
+    }
+  }
+
+  private int remaining() {
+    return _fileBufferSize - _bufferPosition;
+  }
+
+  private void tryToFlush() throws IOException {
+    if (remaining() == 0) {
+      flushInternal();
+    }
+  }
+
+  private void flushInternal() throws IOException {
+    if (_cacheValue == null) {
+      _cacheValue = _cache.newInstance(_directory, _fileName);
+    }
+    int length = _bufferPosition - _lastWrittenPosition;
+    _indexOutput.writeBytes(_buffer, _lastWrittenPosition, length);
+    _cacheValue.write(_bufferPosition, _buffer, _lastWrittenPosition, length);
+    if (_bufferPosition == _fileBufferSize) {
+      _cache.put(new CacheKey(_fileId, getBlockId()), _cacheValue);
+      _bufferPosition = 0;
+      _cacheValue = null;
+    }
+    _lastWrittenPosition = _bufferPosition;
+  }
+
+  private long getBlockId() {
+    return _position / _cacheBlockSize;
+  }
+
+  @Override
+  public void seek(long pos) throws IOException {
+    throw new IOException("Seek is not supported.");
+  }
+
+  @Override
+  public void close() throws IOException {
+    flush();
+    _indexOutput.close();
+    BufferStore.putBuffer(_buffer);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    flushInternal();
+    _indexOutput.flush();
+  }
+
+  @Override
+  public long getFilePointer() {
+    return _position;
+  }
+
+  @Override
+  public long length() throws IOException {
+    return getFilePointer();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5ea9c30e/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheKey.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheKey.java b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheKey.java
new file mode 100644
index 0000000..340e488
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheKey.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store.blockcache_v2;
+
+public class CacheKey implements Cloneable {
+
+  private long _fileId;
+  private long _blockId;
+
+  public CacheKey() {
+
+  }
+
+  public CacheKey(long fileId, long blockId) {
+    _fileId = fileId;
+    _blockId = blockId;
+  }
+
+  public long getFileId() {
+    return _fileId;
+  }
+
+  public void setFileId(long fileId) {
+    _fileId = fileId;
+  }
+
+  public long getBlockId() {
+    return _blockId;
+  }
+
+  public void setBlockId(long blockId) {
+    _blockId = blockId;
+  }
+
+  @Override
+  public CacheKey clone() {
+    try {
+      return (CacheKey) super.clone();
+    } catch (CloneNotSupportedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + (int) (_blockId ^ (_blockId >>> 32));
+    result = prime * result + (int) (_fileId ^ (_fileId >>> 32));
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    CacheKey other = (CacheKey) obj;
+    if (_blockId != other._blockId)
+      return false;
+    if (_fileId != other._fileId)
+      return false;
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    return "CacheKey [_fileId=" + _fileId + ", _blockId=" + _blockId + "]";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5ea9c30e/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheValue.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheValue.java b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheValue.java
new file mode 100644
index 0000000..0f7b976
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheValue.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store.blockcache_v2;
+
+public interface CacheValue {
+
+  /**
+   * The actual size of the the underlying resource.
+   * 
+   * @return the size.
+   */
+  int size();
+
+  /**
+   * The length of the data in this block.
+   * 
+   * @return the length.
+   */
+  int length();
+
+  /**
+   * Writes data out to a given position in this block.
+   * 
+   * @param position
+   *          the position.
+   * @param buf
+   *          the buffer.
+   * @param offset
+   *          the offset in the buffer.
+   * @param length
+   *          the length of bytes to write.
+   */
+  void write(int position, byte[] buf, int offset, int length);
+
+  /**
+   * Reads data into the buffer given the position.
+   * 
+   * @param position
+   *          the position to read.
+   * @param buf
+   *          the buffer to read into.
+   * @param offset
+   *          the offset within the buffer.
+   * @param length
+   *          the length of data to read.
+   */
+  void read(int position, byte[] buf, int offset, int length);
+
+  /**
+   * Reads a byte from the given position.
+   * 
+   * @param position
+   *          the position.
+   * @return the byte.
+   */
+  byte read(int position);
+
+  /**
+   * Increments the reference.
+   */
+  void incRef();
+
+  /**
+   * Decrements the reference.
+   */
+  void decRef();
+
+  /**
+   * Gets the reference count.
+   */
+  long refCount();
+
+  /**
+   * Releases any underlying resources.
+   */
+  void release();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5ea9c30e/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/FileNameBlockSize.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/FileNameBlockSize.java b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/FileNameBlockSize.java
new file mode 100644
index 0000000..8890459
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/FileNameBlockSize.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store.blockcache_v2;
+
+public interface FileNameBlockSize {
+
+  int getBlockSize(String directoryName, String fileName);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5ea9c30e/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/FileNameFilter.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/FileNameFilter.java b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/FileNameFilter.java
new file mode 100644
index 0000000..03a969c
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/FileNameFilter.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store.blockcache_v2;
+
+public interface FileNameFilter {
+
+  boolean accept(String directoryName, String fileName);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5ea9c30e/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/cachevalue/BaseCacheValue.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/cachevalue/BaseCacheValue.java b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/cachevalue/BaseCacheValue.java
new file mode 100644
index 0000000..a5e10a3
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/cachevalue/BaseCacheValue.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store.blockcache_v2.cachevalue;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.blur.store.blockcache_v2.CacheValue;
+
+@SuppressWarnings("serial")
+public abstract class BaseCacheValue extends AtomicLong implements CacheValue {
+
+  private final int _length;
+
+  public BaseCacheValue(int length) {
+    _length = length;
+  }
+
+  @Override
+  public final int length() {
+    return _length;
+  }
+
+  @Override
+  public void write(int position, byte[] buf, int offset, int length) {
+    if (position + length > _length) {
+      throw new ArrayIndexOutOfBoundsException(position + length);
+    }
+    writeInternal(position, buf, offset, length);
+  }
+
+  @Override
+  public void read(int position, byte[] buf, int offset, int length) {
+    if (position + length > _length) {
+      throw new ArrayIndexOutOfBoundsException(position + length);
+    }
+    readInternal(position, buf, offset, length);
+  }
+
+  @Override
+  public byte read(int position) {
+    if (position >= _length) {
+      throw new ArrayIndexOutOfBoundsException(position);
+    }
+    return readInternal(position);
+  }
+
+  protected abstract void writeInternal(int position, byte[] buf, int offset, int length);
+
+  protected abstract byte readInternal(int position);
+
+  protected abstract void readInternal(int position, byte[] buf, int offset, int length);
+
+  @Override
+  public final void incRef() {
+    incrementAndGet();
+  }
+
+  @Override
+  public final void decRef() {
+    decrementAndGet();
+  }
+
+  @Override
+  public final long refCount() {
+    return get();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5ea9c30e/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/cachevalue/ByteArrayCacheValue.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/cachevalue/ByteArrayCacheValue.java b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/cachevalue/ByteArrayCacheValue.java
new file mode 100644
index 0000000..c4d84bf
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/cachevalue/ByteArrayCacheValue.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store.blockcache_v2.cachevalue;
+
+@SuppressWarnings("serial")
+public class ByteArrayCacheValue extends BaseCacheValue {
+
+  private final byte[] _buffer;
+
+  public ByteArrayCacheValue(int length) {
+    super(length);
+    _buffer = new byte[length];
+  }
+
+  @Override
+  protected void writeInternal(int position, byte[] buf, int offset, int length) {
+    System.arraycopy(buf, offset, _buffer, position, length);
+  }
+
+  @Override
+  protected void readInternal(int position, byte[] buf, int offset, int length) {
+    System.arraycopy(_buffer, position, buf, offset, length);
+  }
+
+  @Override
+  protected byte readInternal(int position) {
+    return _buffer[position];
+  }
+
+  @Override
+  public void release() {
+    // no op
+  }
+
+  @Override
+  public int size() {
+    return length();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5ea9c30e/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/cachevalue/UnsafeCacheValue.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/cachevalue/UnsafeCacheValue.java b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/cachevalue/UnsafeCacheValue.java
new file mode 100644
index 0000000..a918250
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/cachevalue/UnsafeCacheValue.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store.blockcache_v2.cachevalue;
+
+import java.lang.reflect.Field;
+
+import sun.misc.Unsafe;
+
+@SuppressWarnings("serial")
+public class UnsafeCacheValue extends BaseCacheValue {
+
+  private static final int MINIMUM_SIZE = 1024;
+
+  private static final Unsafe _unsafe;
+
+  static {
+    try {
+      Class<?> clazz = Class.forName("java.nio.Bits");
+      Field field = clazz.getDeclaredField("unsafe");
+      field.setAccessible(true);
+      _unsafe = (Unsafe) field.get(null);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static final int BYTE_ARRAY_BASE_OFFSET = _unsafe.arrayBaseOffset(byte[].class);
+
+  private static void copyFromArray(byte[] src, int srcOffset, int length, long destAddress) {
+    long offset = BYTE_ARRAY_BASE_OFFSET + srcOffset;
+    _unsafe.copyMemory(src, offset, null, destAddress, length);
+  }
+
+  private static void copyToArray(long srcAddress, byte[] dst, int dstOffset, int length) {
+    long offset = BYTE_ARRAY_BASE_OFFSET + dstOffset;
+    _unsafe.copyMemory(null, srcAddress, dst, offset, length);
+  }
+
+  private final long _address;
+  private final int _capacity;
+
+  public UnsafeCacheValue(int length) {
+    super(length);
+    _capacity = getCapacity(length);
+    _address = _unsafe.allocateMemory(_capacity);
+  }
+
+  private int getCapacity(int length) {
+    if (length < MINIMUM_SIZE) {
+      return MINIMUM_SIZE;
+    }
+    return length;
+  }
+
+  @Override
+  protected void writeInternal(int position, byte[] buf, int offset, int length) {
+    copyFromArray(buf, offset, length, resolveAddress(position));
+  }
+
+  @Override
+  protected void readInternal(int position, byte[] buf, int offset, int length) {
+    copyToArray(resolveAddress(position), buf, offset, length);
+  }
+
+  @Override
+  protected byte readInternal(int position) {
+    return _unsafe.getByte(resolveAddress(position));
+  }
+
+  private long resolveAddress(int position) {
+    return _address + position;
+  }
+
+  @Override
+  public void release() {
+    _unsafe.freeMemory(_address);
+  }
+
+  @Override
+  public int size() {
+    return _capacity;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5ea9c30e/blur-store/src/test/java/org/apache/blur/store/blockcache_v2/CacheIndexInputTest.java
----------------------------------------------------------------------
diff --git a/blur-store/src/test/java/org/apache/blur/store/blockcache_v2/CacheIndexInputTest.java b/blur-store/src/test/java/org/apache/blur/store/blockcache_v2/CacheIndexInputTest.java
new file mode 100644
index 0000000..6e897bc
--- /dev/null
+++ b/blur-store/src/test/java/org/apache/blur/store/blockcache_v2/CacheIndexInputTest.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store.blockcache_v2;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.blur.store.blockcache_v2.cachevalue.ByteArrayCacheValue;
+import org.apache.blur.store.buffer.BufferStore;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.RAMDirectory;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
+import com.googlecode.concurrentlinkedhashmap.EvictionListener;
+import com.googlecode.concurrentlinkedhashmap.Weigher;
+
+public class CacheIndexInputTest {
+
+  private long seed;
+
+  private final int sampleSize = 10000;
+  private final int maxBufSize = 10000;
+  private final int maxOffset = 1000;
+
+  @Before
+  public void setup() {
+    BufferStore.init(128, 128);
+    seed = new Random().nextLong();
+    System.out.println("Using seed [" + seed + "]");
+    // seed = -265282183286396219l;
+  }
+
+  @Test
+  public void test1() throws IOException {
+    RAMDirectory directory = new RAMDirectory();
+
+    String name = "test";
+
+    IndexOutput output = directory.createOutput(name, IOContext.DEFAULT);
+    byte[] bs = "hello world".getBytes();
+    output.writeBytes(bs, bs.length);
+    output.close();
+
+    IndexInput input = directory.openInput(name, IOContext.DEFAULT);
+    Cache cache = getCache();
+    CacheIndexInput cacheInput = new CacheIndexInput(null, name, input, cache);
+    byte[] buf = new byte[bs.length];
+    cacheInput.readBytes(buf, 0, buf.length);
+    cacheInput.close();
+
+    assertArrayEquals(bs, buf);
+  }
+
+  @Test
+  public void test2() throws IOException {
+    Cache cache = getCache();
+    RAMDirectory directory = new RAMDirectory();
+    Random random = new Random(seed);
+
+    String name = "test2";
+    long size = (10 * 1024 * 1024) + 13;
+
+    IndexOutput output = directory.createOutput(name, IOContext.DEFAULT);
+    writeRandomData(size, random, output);
+    output.close();
+
+    IndexInput input = directory.openInput(name, IOContext.DEFAULT);
+    IndexInput testInput = new CacheIndexInput(null, name, input.clone(), cache);
+    readRandomData(input, testInput, random, sampleSize, maxBufSize, maxOffset);
+    testInput.close();
+    input.close();
+  }
+
+  public static void readRandomData(IndexInput baseInput, IndexInput testInput, Random random, int sampleSize,
+      int maxBufSize, int maxOffset) throws IOException {
+    assertEquals(baseInput.length(), testInput.length());
+    int fileLength = (int) baseInput.length();
+    for (int i = 0; i < sampleSize; i++) {
+      int position = random.nextInt(fileLength - maxBufSize);
+      int bufSize = random.nextInt(maxBufSize - maxOffset) + 1;
+      byte[] buf1 = new byte[bufSize];
+      byte[] buf2 = new byte[bufSize];
+
+      int offset = random.nextInt(Math.min(maxOffset, bufSize));
+      int len = Math.min(random.nextInt(bufSize - offset), fileLength - position);
+
+      baseInput.seek(position);
+      baseInput.readBytes(buf1, offset, len);
+      testInput.seek(position);
+      testInput.readBytes(buf2, offset, len);
+      assertArrayEquals("Read [" + i + "] The position is [" + position + "] and bufSize [" + bufSize + "]", buf1, buf2);
+    }
+  }
+
+  public static void writeRandomData(long size, Random random, IndexOutput... outputs) throws IOException {
+    byte[] buf = new byte[1024];
+    for (long l = 0; l < size; l += buf.length) {
+      random.nextBytes(buf);
+      int length = (int) Math.min(buf.length, size - l);
+      for (IndexOutput output : outputs) {
+        output.writeBytes(buf, length);
+      }
+    }
+  }
+
+
+  public static Cache getCache() {
+    EvictionListener<CacheKey, CacheValue> listener = new EvictionListener<CacheKey, CacheValue>() {
+      @Override
+      public void onEviction(CacheKey key, CacheValue value) {
+        if (value.refCount() == 0) {
+          value.release();
+        } else {
+          // doing something else...
+          fail();
+        }
+      }
+    };
+    Weigher<CacheValue> weigher = new Weigher<CacheValue>() {
+      @Override
+      public int weightOf(CacheValue value) {
+        return value.length();
+      }
+    };
+    long maximumWeightedCapacity = 1 * 1024 * 1024;
+    final ConcurrentLinkedHashMap<CacheKey, CacheValue> cache = new ConcurrentLinkedHashMap.Builder<CacheKey, CacheValue>()
+        .weigher(weigher).maximumWeightedCapacity(maximumWeightedCapacity).listener(listener).build();
+    Cache cacheFactory = new Cache() {
+
+      @Override
+      public CacheValue newInstance(CacheDirectory directory, String fileName) {
+        return new ByteArrayCacheValue(getCacheBlockSize(directory, fileName));
+      }
+
+      @Override
+      public long getFileId(CacheDirectory directory, String fileName) {
+        return fileName.hashCode();
+      }
+
+      @Override
+      public int getFileBufferSize(CacheDirectory directory, String fileName) {
+        return 1024;
+      }
+
+      @Override
+      public int getCacheBlockSize(CacheDirectory directory, String fileName) {
+        return 8192;
+      }
+
+      @Override
+      public boolean cacheFileForReading(CacheDirectory directory, String name, IOContext context) {
+        return true;
+      }
+
+      @Override
+      public boolean cacheFileForWriting(CacheDirectory directory, String name, IOContext context) {
+        return true;
+      }
+
+      @Override
+      public CacheValue get(CacheKey key) {
+        return cache.get(key);
+      }
+
+      @Override
+      public void put(CacheKey key, CacheValue value) {
+        cache.put(key, value);
+      }
+
+      @Override
+      public void removeFile(CacheDirectory directory, String fileName) throws IOException {
+        
+      }
+
+      @Override
+      public void releaseDirectory(String directoryName) {
+        
+      }
+    };
+    return cacheFactory;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5ea9c30e/blur-store/src/test/java/org/apache/blur/store/blockcache_v2/CacheIndexOutputTest.java
----------------------------------------------------------------------
diff --git a/blur-store/src/test/java/org/apache/blur/store/blockcache_v2/CacheIndexOutputTest.java b/blur-store/src/test/java/org/apache/blur/store/blockcache_v2/CacheIndexOutputTest.java
new file mode 100644
index 0000000..2dc8b5b
--- /dev/null
+++ b/blur-store/src/test/java/org/apache/blur/store/blockcache_v2/CacheIndexOutputTest.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store.blockcache_v2;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.blur.store.buffer.BufferStore;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.RAMDirectory;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CacheIndexOutputTest {
+
+  private long seed;
+
+  private final int sampleSize = 10000;
+  private final int maxBufSize = 10000;
+  private final int maxOffset = 1000;
+
+  @Before
+  public void setup() {
+    BufferStore.init(128, 128);
+    seed = new Random().nextLong();
+    System.out.println("Using seed [" + seed + "]");
+    // seed = -265282183286396219l;
+  }
+
+  @Test
+  public void test1() throws IOException {
+    Random random = new Random(seed);
+    RAMDirectory directory = new RAMDirectory();
+    IndexOutput output = directory.createOutput("test", IOContext.DEFAULT);
+
+    Cache cache = CacheIndexInputTest.getCache();
+    CacheIndexOutput indexOutput = new CacheIndexOutput(null, "test", output, cache);
+    indexOutput.writeByte((byte) 1);
+    indexOutput.writeByte((byte) 2);
+    byte[] b = new byte[16000];
+    random.nextBytes(b);
+    indexOutput.writeBytes(b, 16000);
+    indexOutput.close();
+
+    IndexInput input = directory.openInput("test", IOContext.DEFAULT);
+    assertEquals(16002, input.length());
+    assertEquals(1, input.readByte());
+    assertEquals(2, input.readByte());
+
+    byte[] buf = new byte[16000];
+    input.readBytes(buf, 0, 16000);
+    input.close();
+    assertArrayEquals(b, buf);
+  }
+
+  @Test
+  public void test2() throws IOException {
+    Cache cache = CacheIndexInputTest.getCache();
+    RAMDirectory directory = new RAMDirectory();
+    RAMDirectory directory2 = new RAMDirectory();
+
+    Random random = new Random(seed);
+
+    String name = "test2";
+    long size = (10 * 1024 * 1024) + 13;
+
+    IndexOutput output = directory.createOutput(name, IOContext.DEFAULT);
+    IndexOutput output2 = directory2.createOutput(name, IOContext.DEFAULT);
+    CacheIndexOutput cacheIndexOutput = new CacheIndexOutput(null, name, output2, cache);
+    CacheIndexInputTest.writeRandomData(size, random, output, cacheIndexOutput);
+    output.close();
+    cacheIndexOutput.close();
+
+    IndexInput input = directory.openInput(name, IOContext.DEFAULT);
+    IndexInput testInput = directory2.openInput(name, IOContext.DEFAULT);
+    CacheIndexInputTest.readRandomData(input, testInput, random, sampleSize, maxBufSize, maxOffset);
+    testInput.close();
+    input.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5ea9c30e/blur-store/src/test/java/org/apache/blur/store/blockcache_v2/cachevalue/ByteArrayCacheValueTest.java
----------------------------------------------------------------------
diff --git a/blur-store/src/test/java/org/apache/blur/store/blockcache_v2/cachevalue/ByteArrayCacheValueTest.java b/blur-store/src/test/java/org/apache/blur/store/blockcache_v2/cachevalue/ByteArrayCacheValueTest.java
new file mode 100644
index 0000000..8d33ece
--- /dev/null
+++ b/blur-store/src/test/java/org/apache/blur/store/blockcache_v2/cachevalue/ByteArrayCacheValueTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store.blockcache_v2.cachevalue;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+public class ByteArrayCacheValueTest {
+
+  @Test
+  public void test1() {
+    ByteArrayCacheValue value = new ByteArrayCacheValue(10);
+    byte[] buf = "hello world".getBytes();
+    value.write(0, buf, 0, 10);
+    byte[] buf2 = new byte[10];
+    value.read(0, buf2, 0, 10);
+    assertArrayEquals("hello worl".getBytes(), buf2);
+  }
+
+  @Test
+  public void test2() {
+    ByteArrayCacheValue value = new ByteArrayCacheValue(10);
+    byte[] buf = "hello world".getBytes();
+    try {
+      value.write(0, buf, 0, 11);
+      fail();
+    } catch (ArrayIndexOutOfBoundsException e) {
+    }
+  }
+
+  @Test
+  public void test3() {
+    ByteArrayCacheValue value = new ByteArrayCacheValue(10);
+    byte[] buf = "hello world".getBytes();
+    try {
+      value.write(8, buf, 0, 3);
+      fail();
+    } catch (ArrayIndexOutOfBoundsException e) {
+    }
+  }
+
+  @Test
+  public void test4() {
+    ByteArrayCacheValue value = new ByteArrayCacheValue(10);
+    byte[] buf = "hello world".getBytes();
+    value.write(8, buf, 0, 2);
+
+    assertEquals('h', value.read(8));
+    assertEquals('e', value.read(9));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5ea9c30e/blur-store/src/test/java/org/apache/blur/store/blockcache_v2/cachevalue/UnsafeCacheValueTest.java
----------------------------------------------------------------------
diff --git a/blur-store/src/test/java/org/apache/blur/store/blockcache_v2/cachevalue/UnsafeCacheValueTest.java b/blur-store/src/test/java/org/apache/blur/store/blockcache_v2/cachevalue/UnsafeCacheValueTest.java
new file mode 100644
index 0000000..37f5926
--- /dev/null
+++ b/blur-store/src/test/java/org/apache/blur/store/blockcache_v2/cachevalue/UnsafeCacheValueTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store.blockcache_v2.cachevalue;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+public class UnsafeCacheValueTest {
+
+  @Test
+  public void test1() {
+    UnsafeCacheValue value = new UnsafeCacheValue(10);
+    byte[] buf = "hello world".getBytes();
+    value.write(0, buf, 0, 10);
+    byte[] buf2 = new byte[10];
+    value.read(0, buf2, 0, 10);
+    assertArrayEquals("hello worl".getBytes(), buf2);
+  }
+
+  @Test
+  public void test2() {
+    UnsafeCacheValue value = new UnsafeCacheValue(10);
+    byte[] buf = "hello world".getBytes();
+    try {
+      value.write(0, buf, 0, 11);
+      fail();
+    } catch (ArrayIndexOutOfBoundsException e) {
+    }
+  }
+
+  @Test
+  public void test3() {
+    UnsafeCacheValue value = new UnsafeCacheValue(10);
+    byte[] buf = "hello world".getBytes();
+    try {
+      value.write(8, buf, 0, 3);
+      fail();
+    } catch (ArrayIndexOutOfBoundsException e) {
+    }
+  }
+
+  @Test
+  public void test4() {
+    UnsafeCacheValue value = new UnsafeCacheValue(10);
+    byte[] buf = "hello world".getBytes();
+    value.write(8, buf, 0, 2);
+
+    assertEquals('h', value.read(8));
+    assertEquals('e', value.read(9));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5ea9c30e/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java b/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
index 60368f8..fbe86d0 100644
--- a/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
+++ b/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
@@ -44,6 +44,7 @@ public class BlurConstants {
   public static final String BLUR_SHARD_BIND_ADDRESS = "blur.shard.bind.address";
   public static final String BLUR_SHARD_BLOCKCACHE_DIRECT_MEMORY_ALLOCATION = "blur.shard.blockcache.direct.memory.allocation";
   public static final String BLUR_SHARD_BLOCKCACHE_SLAB_COUNT = "blur.shard.blockcache.slab.count";
+  public static final String BLUR_SHARD_EXPERIMENTAL_BLOCK_CACHE = "blur.shard.experimental.block.cache";
   public static final String BLUR_SHARD_SAFEMODEDELAY = "blur.shard.safemodedelay";
   public static final String BLUR_CONTROLLER_HOSTNAME = "blur.controller.hostname";
   public static final String BLUR_CONTROLLER_BIND_PORT = "blur.controller.bind.port";

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5ea9c30e/blur-util/src/main/resources/blur-default.properties
----------------------------------------------------------------------
diff --git a/blur-util/src/main/resources/blur-default.properties b/blur-util/src/main/resources/blur-default.properties
index d28f39d..4a1ef8b 100644
--- a/blur-util/src/main/resources/blur-default.properties
+++ b/blur-util/src/main/resources/blur-default.properties
@@ -66,6 +66,9 @@ blur.shard.index.warmup.throttle=30000000
 # By default the block cache using off heap memory
 blur.shard.blockcache.direct.memory.allocation=true
 
+# By default the experimental block cache is off
+blur.shard.experimental.block.cache=false
+
 # The slabs in the blockcache are automatically configured by default (-1) otherwise 1 slab equals 128MB
 # The auto config is detected through the MaxDirectoryMemorySize provided to the JVM
 blur.shard.blockcache.slab.count=-1

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5ea9c30e/docs/cluster-setup.html
----------------------------------------------------------------------
diff --git a/docs/cluster-setup.html b/docs/cluster-setup.html
index d6ba58b..e63e4f5 100644
--- a/docs/cluster-setup.html
+++ b/docs/cluster-setup.html
@@ -284,6 +284,9 @@ blur.shard.index.warmup.throttle=30000000
 # By default the block cache using off heap memory
 blur.shard.blockcache.direct.memory.allocation=true
 
+# By default the experimental block cache is off
+blur.shard.experimental.block.cache=false
+
 # The slabs in the blockcache are automatically configured by 
 # default (-1) otherwise 1 slab equals 128MB. The auto config
 # is detected through the MaxDirectoryMemorySize provided to 


Mime
View raw message