incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Added block cache.
Date Mon, 08 Oct 2012 01:03:45 GMT
Updated Branches:
  refs/heads/new-api-prototype 0ec49da9c -> f2b81bd0b


Added block cache.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/f2b81bd0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/f2b81bd0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/f2b81bd0

Branch: refs/heads/new-api-prototype
Commit: f2b81bd0b62ee9ba305e991b586b626d559bff89
Parents: 0ec49da
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Sun Oct 7 21:03:17 2012 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Sun Oct 7 21:03:17 2012 -0400

----------------------------------------------------------------------
 src/blur-new-api-prototype/pom.xml                 |    7 +-
 .../java/org/apache/blur/cache/BlockCache.java     |  178 +++++++
 .../java/org/apache/blur/cache/BlockCacheKey.java  |   73 +++
 .../org/apache/blur/cache/BlockCacheLocation.java  |   66 +++
 .../java/org/apache/blur/cache/BlockDirectory.java |  291 ++++++++++++
 .../org/apache/blur/cache/BlockDirectoryCache.java |   68 +++
 .../java/org/apache/blur/cache/BlockLocks.java     |   96 ++++
 .../java/org/apache/blur/cache/BufferStore.java    |   96 ++++
 .../src/main/java/org/apache/blur/cache/Cache.java |   29 ++
 .../blur/cache/ReusedBufferedIndexInput.java       |  361 +++++++++++++++
 .../main/java/org/apache/blur/core/ServerHdfs.java |  321 +++++++++++++
 .../org/apache/blur/store/SimpleHDFSDirectory.java |    8 +
 12 files changed, 1593 insertions(+), 1 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f2b81bd0/src/blur-new-api-prototype/pom.xml
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/pom.xml b/src/blur-new-api-prototype/pom.xml
index ad633d6..c64c54d 100644
--- a/src/blur-new-api-prototype/pom.xml
+++ b/src/blur-new-api-prototype/pom.xml
@@ -20,6 +20,11 @@
 
 	<dependencies>
 		<dependency>
+			<groupId>com.googlecode.concurrentlinkedhashmap</groupId>
+			<artifactId>concurrentlinkedhashmap-lru</artifactId>
+			<version>1.3.1</version>
+		</dependency>
+		<dependency>
 			<groupId>org.apache.lucene</groupId>
 			<artifactId>lucene-core</artifactId>
 			<version>4.0.0-BETA</version>
@@ -37,7 +42,7 @@
 		<dependency>
 			<groupId>org.apache.hadoop</groupId>
 			<artifactId>hadoop-core</artifactId>
-			<version>1.0.0</version>
+			<version>1.0.3</version>
 		</dependency>
 		<dependency>
 			<groupId>org.apache.thrift</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f2b81bd0/src/blur-new-api-prototype/src/main/java/org/apache/blur/cache/BlockCache.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/cache/BlockCache.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/cache/BlockCache.java
new file mode 100644
index 0000000..2a3beea
--- /dev/null
+++ b/src/blur-new-api-prototype/src/main/java/org/apache/blur/cache/BlockCache.java
@@ -0,0 +1,178 @@
+package org.apache.blur.cache;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
+import com.googlecode.concurrentlinkedhashmap.EvictionListener;
+
+public class BlockCache {
+
+  public static final int _128M = 134217728;
+  public static final int _8K = 8192;
+  private final ConcurrentMap<BlockCacheKey, BlockCacheLocation> _cache;
+  private final ByteBuffer[] _slabs;
+  private final BlockLocks[] _locks;
+  private final AtomicInteger[] _lockCounters;
+  private final int _blockSize = _8K;
+  private final int _numberOfBlocksPerSlab;
+  private final int _maxEntries;
+
+  public BlockCache(boolean directAllocation, long totalMemory) {
+    this(directAllocation, totalMemory, _128M);
+  }
+
+  public BlockCache(boolean directAllocation, long totalMemory, int slabSize) {
+    _numberOfBlocksPerSlab = slabSize / _blockSize;
+    int numberOfSlabs = (int) (totalMemory / slabSize);
+
+    _slabs = new ByteBuffer[numberOfSlabs];
+    _locks = new BlockLocks[numberOfSlabs];
+    _lockCounters = new AtomicInteger[numberOfSlabs];
+    _maxEntries = (_numberOfBlocksPerSlab * numberOfSlabs) - 1;
+    for (int i = 0; i < numberOfSlabs; i++) {
+      if (directAllocation) {
+        _slabs[i] = ByteBuffer.allocateDirect(_numberOfBlocksPerSlab * _blockSize);
+      } else {
+        _slabs[i] = ByteBuffer.allocate(_numberOfBlocksPerSlab * _blockSize);
+      }
+      _locks[i] = new BlockLocks(_numberOfBlocksPerSlab);
+      _lockCounters[i] = new AtomicInteger();
+    }
+
+    EvictionListener<BlockCacheKey, BlockCacheLocation> listener = new EvictionListener<BlockCacheKey, BlockCacheLocation>() {
+      @Override
+      public void onEviction(BlockCacheKey key, BlockCacheLocation location) {
+        releaseLocation(location);
+      }
+    };
+    _cache = new ConcurrentLinkedHashMap.Builder<BlockCacheKey, BlockCacheLocation>().maximumWeightedCapacity(_maxEntries).listener(listener).build();
+  }
+
+  private void releaseLocation(BlockCacheLocation location) {
+    if (location == null) {
+      return;
+    }
+    int slabId = location.getSlabId();
+    int block = location.getBlock();
+    location.setRemoved(true);
+    _locks[slabId].clear(block);
+    _lockCounters[slabId].decrementAndGet();
+  }
+
+  public boolean store(BlockCacheKey blockCacheKey, byte[] data) {
+    checkLength(data);
+    BlockCacheLocation location = _cache.get(blockCacheKey);
+    boolean newLocation = false;
+    if (location == null) {
+      newLocation = true;
+      location = new BlockCacheLocation();
+      if (!findEmptyLocation(location)) {
+        return false;
+      }
+    }
+    if (location.isRemoved()) {
+      return false;
+    }
+    int slabId = location.getSlabId();
+    int offset = location.getBlock() * _blockSize;
+    ByteBuffer slab = getSlab(slabId);
+    slab.position(offset);
+    slab.put(data, 0, _blockSize);
+    if (newLocation) {
+      releaseLocation(_cache.put(blockCacheKey.clone(), location));
+    }
+    return true;
+  }
+
+  public boolean fetch(BlockCacheKey blockCacheKey, byte[] buffer, int blockOffset, int off, int length) {
+    BlockCacheLocation location = _cache.get(blockCacheKey);
+    if (location == null) {
+      return false;
+    }
+    if (location.isRemoved()) {
+      return false;
+    }
+    int slabId = location.getSlabId();
+    int offset = location.getBlock() * _blockSize;
+    location.touch();
+    ByteBuffer slab = getSlab(slabId);
+    slab.position(offset + blockOffset);
+    slab.get(buffer, off, length);
+    return true;
+  }
+
+  public boolean fetch(BlockCacheKey blockCacheKey, byte[] buffer) {
+    checkLength(buffer);
+    return fetch(blockCacheKey, buffer, 0, 0, _blockSize);
+  }
+
+  private boolean findEmptyLocation(BlockCacheLocation location) {
+    // This is a tight loop that will try and find a location to
+    // place the block before giving up
+    for (int j = 0; j < 10; j++) {
+      OUTER: for (int slabId = 0; slabId < _slabs.length; slabId++) {
+        AtomicInteger bitSetCounter = _lockCounters[slabId];
+        BlockLocks bitSet = _locks[slabId];
+        if (bitSetCounter.get() == _numberOfBlocksPerSlab) {
+          // if bitset is full
+          continue OUTER;
+        }
+        // this check needs to spin, if a lock was attempted but not obtained
+        // the rest of the slab should not be skipped
+        int bit = bitSet.nextClearBit(0);
+        INNER: while (bit != -1) {
+          if (bit >= _numberOfBlocksPerSlab) {
+            // bit set is full
+            continue OUTER;
+          }
+          if (!bitSet.set(bit)) {
+            // lock was not obtained
+            // this restarts at 0 because another block could have been unlocked
+            // while this was executing
+            bit = bitSet.nextClearBit(0);
+            continue INNER;
+          } else {
+            // lock obtained
+            location.setSlabId(slabId);
+            location.setBlock(bit);
+            bitSetCounter.incrementAndGet();
+            return true;
+          }
+        }
+      }
+    }
+    return false;
+  }
+
+  private void checkLength(byte[] buffer) {
+    if (buffer.length != _blockSize) {
+      throw new RuntimeException("Buffer wrong size, expecting [" + _blockSize + "] got [" + buffer.length + "]");
+    }
+  }
+
+  private ByteBuffer getSlab(int slabId) {
+    return _slabs[slabId].duplicate();
+  }
+
+  public int getSize() {
+    return _cache.size();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f2b81bd0/src/blur-new-api-prototype/src/main/java/org/apache/blur/cache/BlockCacheKey.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/cache/BlockCacheKey.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/cache/BlockCacheKey.java
new file mode 100644
index 0000000..d8adb32
--- /dev/null
+++ b/src/blur-new-api-prototype/src/main/java/org/apache/blur/cache/BlockCacheKey.java
@@ -0,0 +1,73 @@
+package org.apache.blur.cache;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+public class BlockCacheKey implements Cloneable {
+
+  private long _block;
+  private int _file;
+
+  public long getBlock() {
+    return _block;
+  }
+
+  public int getFile() {
+    return _file;
+  }
+
+  public void setBlock(long block) {
+    _block = block;
+  }
+
+  public void setFile(int file) {
+    _file = file;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + (int) (_block ^ (_block >>> 32));
+    result = prime * result + _file;
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    BlockCacheKey other = (BlockCacheKey) obj;
+    if (_block != other._block)
+      return false;
+    if (_file != other._file)
+      return false;
+    return true;
+  }
+
+  @Override
+  public BlockCacheKey clone() {
+    try {
+      return (BlockCacheKey) super.clone();
+    } catch (CloneNotSupportedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f2b81bd0/src/blur-new-api-prototype/src/main/java/org/apache/blur/cache/BlockCacheLocation.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/cache/BlockCacheLocation.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/cache/BlockCacheLocation.java
new file mode 100644
index 0000000..cc00872
--- /dev/null
+++ b/src/blur-new-api-prototype/src/main/java/org/apache/blur/cache/BlockCacheLocation.java
@@ -0,0 +1,66 @@
+package org.apache.blur.cache;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class BlockCacheLocation {
+
+  private int _block;
+  private int _slabId;
+  private long _lastAccess = System.currentTimeMillis();
+  private long _accesses;
+  private AtomicBoolean _removed = new AtomicBoolean(false);
+
+  public void setBlock(int block) {
+    _block = block;
+  }
+
+  public void setSlabId(int slabId) {
+    _slabId = slabId;
+  }
+
+  public int getBlock() {
+    return _block;
+  }
+
+  public int getSlabId() {
+    return _slabId;
+  }
+
+  public void touch() {
+    _lastAccess = System.currentTimeMillis();
+    _accesses++;
+  }
+
+  public long getLastAccess() {
+    return _lastAccess;
+  }
+
+  public long getNumberOfAccesses() {
+    return _accesses;
+  }
+
+  public boolean isRemoved() {
+    return _removed.get();
+  }
+
+  public void setRemoved(boolean removed) {
+    _removed.set(removed);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f2b81bd0/src/blur-new-api-prototype/src/main/java/org/apache/blur/cache/BlockDirectory.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/cache/BlockDirectory.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/cache/BlockDirectory.java
new file mode 100644
index 0000000..6441a3b
--- /dev/null
+++ b/src/blur-new-api-prototype/src/main/java/org/apache/blur/cache/BlockDirectory.java
@@ -0,0 +1,291 @@
+package org.apache.blur.cache;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Set;
+
+import org.apache.blur.store.SimpleHDFSDirectory;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.Lock;
+import org.apache.lucene.store.LockFactory;
+
+public class BlockDirectory extends Directory {
+
+  public static final long BLOCK_SHIFT = 13; // 2^13 = 8,192 bytes per block
+  public static final long BLOCK_MOD = 0x1FFF;
+  public static final int BLOCK_SIZE = 1 << BLOCK_SHIFT;
+
+  public static long getBlock(long pos) {
+    return pos >>> BLOCK_SHIFT;
+  }
+
+  public static long getPosition(long pos) {
+    return pos & BLOCK_MOD;
+  }
+
+  public static long getRealPosition(long block, long positionInBlock) {
+    return (block << BLOCK_SHIFT) + positionInBlock;
+  }
+
+  public static Cache NO_CACHE = new Cache() {
+
+    @Override
+    public void update(String name, long blockId, byte[] buffer) {
+
+    }
+
+    @Override
+    public boolean fetch(String name, long blockId, int blockOffset, byte[] b, int off, int lengthToReadInBlock) {
+      return false;
+    }
+
+    @Override
+    public void delete(String name) {
+
+    }
+
+    @Override
+    public long size() {
+      return 0;
+    }
+  };
+
+  private Directory _directory;
+  private int _blockSize;
+  private String _dirName;
+  private Cache _cache;
+  private Set<String> _blockCacheFileTypes;
+
+  public BlockDirectory(String dirName, Directory directory) throws IOException {
+    this(dirName, directory, NO_CACHE);
+  }
+
+  public BlockDirectory(String dirName, Directory directory, Cache cache) throws IOException {
+    this(dirName, directory, cache, null);
+  }
+
+  public BlockDirectory(String dirName, Directory directory, Cache cache, Set<String> blockCacheFileTypes) throws IOException {
+    _dirName = dirName;
+    _directory = directory;
+    _blockSize = BLOCK_SIZE;
+    _cache = cache;
+    if (blockCacheFileTypes == null || blockCacheFileTypes.isEmpty()) {
+      _blockCacheFileTypes = null;
+    } else {
+      _blockCacheFileTypes = blockCacheFileTypes;
+    }
+    setLockFactory(directory.getLockFactory());
+  }
+
+  private boolean isCachableFile(String name) {
+    for (String ext : _blockCacheFileTypes) {
+      if (name.endsWith(ext)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public IndexInput openInput(final String name, IOContext context) throws IOException {
+    final IndexInput source = _directory.openInput(name, context);
+    if (_blockCacheFileTypes == null || isCachableFile(name)) {
+      return new CachedIndexInput(source, _blockSize, name, getFileCacheName(name), _cache, context);
+    }
+    return source;
+  }
+
+  static class CachedIndexInput extends ReusedBufferedIndexInput {
+
+    private IndexInput _source;
+    private int _blockSize;
+    private long _fileLength;
+    private String _cacheName;
+    private Cache _cache;
+
+    public CachedIndexInput(IndexInput source, int blockSize, String name, String cacheName, Cache cache, IOContext context) {
+      super(name, context);
+      _source = source;
+      _blockSize = blockSize;
+      _fileLength = source.length();
+      _cacheName = cacheName;
+      _cache = cache;
+    }
+
+    @Override
+    public CachedIndexInput clone() {
+      CachedIndexInput clone = (CachedIndexInput) super.clone();
+      clone._source = (IndexInput) _source.clone();
+      return clone;
+    }
+
+    @Override
+    public long length() {
+      return _source.length();
+    }
+
+    @Override
+    protected void seekInternal(long pos) throws IOException {
+    }
+
+    @Override
+    protected void readInternal(byte[] b, int off, int len) throws IOException {
+      long position = getFilePointer();
+      while (len > 0) {
+        int length = fetchBlock(position, b, off, len);
+        position += length;
+        len -= length;
+        off += length;
+      }
+    }
+
+    private int fetchBlock(long position, byte[] b, int off, int len) throws IOException {
+      // read whole block into cache and then provide needed data
+      long blockId = getBlock(position);
+      int blockOffset = (int) getPosition(position);
+      int lengthToReadInBlock = Math.min(len, _blockSize - blockOffset);
+      if (checkCache(blockId, blockOffset, b, off, lengthToReadInBlock)) {
+        return lengthToReadInBlock;
+      } else {
+        readIntoCacheAndResult(blockId, blockOffset, b, off, lengthToReadInBlock);
+      }
+      return lengthToReadInBlock;
+    }
+
+    private void readIntoCacheAndResult(long blockId, int blockOffset, byte[] b, int off, int lengthToReadInBlock) throws IOException {
+      long position = getRealPosition(blockId, 0);
+      int length = (int) Math.min(_blockSize, _fileLength - position);
+      _source.seek(position);
+
+      byte[] buf = BufferStore.takeBuffer(_blockSize);
+      _source.readBytes(buf, 0, length);
+      System.arraycopy(buf, blockOffset, b, off, lengthToReadInBlock);
+      _cache.update(_cacheName, blockId, buf);
+      BufferStore.putBuffer(buf);
+    }
+
+    private boolean checkCache(long blockId, int blockOffset, byte[] b, int off, int lengthToReadInBlock) {
+      return _cache.fetch(_cacheName, blockId, blockOffset, b, off, lengthToReadInBlock);
+    }
+
+    @Override
+    protected void closeInternal() throws IOException {
+      _source.close();
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    String[] files = listAll();
+    for (String file : files) {
+      _cache.delete(getFileCacheName(file));
+    }
+    _directory.close();
+  }
+
+  private String getFileCacheName(String name) throws IOException {
+    return _dirName + "/" + name + ":" + getFileModified(name);
+  }
+
+  private long getFileModified(String name) throws IOException {
+    if (_directory instanceof FSDirectory) {
+      File directory = ((FSDirectory) _directory).getDirectory();
+      File file = new File(directory,name);
+      if (!file.exists()) {
+        throw new FileNotFoundException("File [" + name + "] not found");
+      }
+      return file.lastModified();
+    } else if (_directory instanceof SimpleHDFSDirectory) {
+      return ((SimpleHDFSDirectory) _directory).getFileModified(name);
+    } else {
+      throw new RuntimeException("Not supported");
+    }
+  }
+
+  public void clearLock(String name) throws IOException {
+    _directory.clearLock(name);
+  }
+
+  @Override
+  public void copy(Directory to, String src, String dest, IOContext context) throws IOException {
+    _directory.copy(to, src, dest, context);
+  }
+
+  @Override
+  public LockFactory getLockFactory() {
+    return _directory.getLockFactory();
+  }
+
+  @Override
+  public String getLockID() {
+    return _directory.getLockID();
+  }
+
+  @Override
+  public Lock makeLock(String name) {
+    return _directory.makeLock(name);
+  }
+
+  @Override
+  public void setLockFactory(LockFactory lockFactory) throws IOException {
+    _directory.setLockFactory(lockFactory);
+  }
+
+  @Override
+  public void sync(Collection<String> names) throws IOException {
+    _directory.sync(names);
+  }
+
+  public String toString() {
+    return _directory.toString();
+  }
+
+  @Override
+  public IndexOutput createOutput(String name, IOContext context) throws IOException {
+    return _directory.createOutput(name, context);
+  }
+
+  @Override
+  public void deleteFile(String name) throws IOException {
+    _cache.delete(getFileCacheName(name));
+    _directory.deleteFile(name);
+  }
+
+  @Override
+  public boolean fileExists(String name) throws IOException {
+    return _directory.fileExists(name);
+  }
+
+  @Override
+  public long fileLength(String name) throws IOException {
+    return _directory.fileLength(name);
+  }
+
+  @Override
+  public String[] listAll() throws IOException {
+    return _directory.listAll();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f2b81bd0/src/blur-new-api-prototype/src/main/java/org/apache/blur/cache/BlockDirectoryCache.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/cache/BlockDirectoryCache.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/cache/BlockDirectoryCache.java
new file mode 100644
index 0000000..fe1c5b4
--- /dev/null
+++ b/src/blur-new-api-prototype/src/main/java/org/apache/blur/cache/BlockDirectoryCache.java
@@ -0,0 +1,68 @@
+package org.apache.blur.cache;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+public class BlockDirectoryCache implements Cache {
+  private BlockCache _blockCache;
+  private AtomicInteger _counter = new AtomicInteger();
+  private Map<String, Integer> _names = new ConcurrentHashMap<String, Integer>();
+
+  public BlockDirectoryCache(BlockCache blockCache) {
+    _blockCache = blockCache;
+  }
+
+  @Override
+  public void delete(String name) {
+    _names.remove(name);
+  }
+
+  @Override
+  public void update(String name, long blockId, byte[] buffer) {
+    Integer file = _names.get(name);
+    if (file == null) {
+      file = _counter.incrementAndGet();
+      _names.put(name, file);
+    }
+    BlockCacheKey blockCacheKey = new BlockCacheKey();
+    blockCacheKey.setBlock(blockId);
+    blockCacheKey.setFile(file);
+    _blockCache.store(blockCacheKey, buffer);
+  }
+
+  @Override
+  public boolean fetch(String name, long blockId, int blockOffset, byte[] b, int off, int lengthToReadInBlock) {
+    Integer file = _names.get(name);
+    if (file == null) {
+      return false;
+    }
+    BlockCacheKey blockCacheKey = new BlockCacheKey();
+    blockCacheKey.setBlock(blockId);
+    blockCacheKey.setFile(file);
+    boolean fetch = _blockCache.fetch(blockCacheKey, b, blockOffset, off, lengthToReadInBlock);
+    return fetch;
+  }
+
+  @Override
+  public long size() {
+    return _blockCache.getSize();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f2b81bd0/src/blur-new-api-prototype/src/main/java/org/apache/blur/cache/BlockLocks.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/cache/BlockLocks.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/cache/BlockLocks.java
new file mode 100644
index 0000000..f097891
--- /dev/null
+++ b/src/blur-new-api-prototype/src/main/java/org/apache/blur/cache/BlockLocks.java
@@ -0,0 +1,96 @@
+package org.apache.blur.cache;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.concurrent.atomic.AtomicLongArray;
+
+import org.apache.lucene.util.BitUtil;
+import org.apache.lucene.util.OpenBitSet;
+
+public class BlockLocks {
+
+  private AtomicLongArray bits;
+  private int wlen;
+
+  public BlockLocks(long numBits) {
+    int length = OpenBitSet.bits2words(numBits);
+    bits = new AtomicLongArray(length);
+    wlen = length;
+  }
+
+  /**
+   * Find the next clear bit in the bit set.
+   * 
+   * @param index
+   * @return
+   */
+  public int nextClearBit(int index) {
+    int i = index >> 6;
+    if (i >= wlen)
+      return -1;
+    int subIndex = index & 0x3f; // index within the word
+    long word = ~bits.get(i) >> subIndex; // skip all the bits to the right of
+                                          // index
+    if (word != 0) {
+      return (i << 6) + subIndex + BitUtil.ntz(word);
+    }
+    while (++i < wlen) {
+      word = ~bits.get(i);
+      if (word != 0) {
+        return (i << 6) + BitUtil.ntz(word);
+      }
+    }
+    return -1;
+  }
+
+  /**
+   * Thread safe set operation that will set the bit if and only if the bit was
+   * not previously set.
+   * 
+   * @param index
+   *          the index position to set.
+   * @return returns true if the bit was set and false if it was already set.
+   */
+  public boolean set(int index) {
+    int wordNum = index >> 6; // div 64
+    int bit = index & 0x3f; // mod 64
+    long bitmask = 1L << bit;
+    long word, oword;
+    do {
+      word = bits.get(wordNum);
+      // if set another thread stole the lock
+      if ((word & bitmask) != 0) {
+        return false;
+      }
+      oword = word;
+      word |= bitmask;
+    } while (!bits.compareAndSet(wordNum, oword, word));
+    return true;
+  }
+
+  public void clear(int index) {
+    int wordNum = index >> 6;
+    int bit = index & 0x03f;
+    long bitmask = 1L << bit;
+    long word, oword;
+    do {
+      word = bits.get(wordNum);
+      oword = word;
+      word &= ~bitmask;
+    } while (!bits.compareAndSet(wordNum, oword, word));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f2b81bd0/src/blur-new-api-prototype/src/main/java/org/apache/blur/cache/BufferStore.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/cache/BufferStore.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/cache/BufferStore.java
new file mode 100644
index 0000000..eb47740
--- /dev/null
+++ b/src/blur-new-api-prototype/src/main/java/org/apache/blur/cache/BufferStore.java
@@ -0,0 +1,96 @@
+package org.apache.blur.cache;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class BufferStore {
+
+  private static final Log LOG = LogFactory.getLog(BufferStore.class);
+
+  private static BlockingQueue<byte[]> _1024 = setupBuffers(1024, 1);
+  private static BlockingQueue<byte[]> _8192 = setupBuffers(8192, 1);
+
+  public static void init() {
+    int _1024Size = 8192;
+    int _8192Size = 8192;
+    LOG.info("Initializing the 1024 buffers with [" + _1024Size + "] buffers.");
+    _1024 = setupBuffers(1024, _1024Size);
+    LOG.info("Initializing the 8192 buffers with [" + _8192Size + "] buffers.");
+    _8192 = setupBuffers(8192, _8192Size);
+  }
+
+  private static BlockingQueue<byte[]> setupBuffers(int bufferSize, int count) {
+    BlockingQueue<byte[]> queue = new ArrayBlockingQueue<byte[]>(count);
+    for (int i = 0; i < count; i++) {
+      queue.add(new byte[bufferSize]);
+    }
+    return queue;
+  }
+
+  public static byte[] takeBuffer(int bufferSize) {
+    switch (bufferSize) {
+    case 1024:
+      return newBuffer1024(_1024.poll());
+    case 8192:
+      return newBuffer8192(_8192.poll());
+    default:
+      return newBuffer(bufferSize);
+    }
+  }
+
+  public static void putBuffer(byte[] buffer) {
+    if (buffer == null) {
+      return;
+    }
+    int bufferSize = buffer.length;
+    switch (bufferSize) {
+    case 1024:
+      checkReturn(_1024.offer(buffer));
+      return;
+    case 8192:
+      checkReturn(_8192.offer(buffer));
+      return;
+    }
+  }
+
+  private static void checkReturn(boolean offer) {
+
+  }
+
+  private static byte[] newBuffer1024(byte[] buf) {
+    if (buf != null) {
+      return buf;
+    }
+    return new byte[1024];
+  }
+
+  private static byte[] newBuffer8192(byte[] buf) {
+    if (buf != null) {
+      return buf;
+    }
+    return new byte[8192];
+  }
+
+  private static byte[] newBuffer(int size) {
+    return new byte[size];
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f2b81bd0/src/blur-new-api-prototype/src/main/java/org/apache/blur/cache/Cache.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/cache/Cache.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/cache/Cache.java
new file mode 100644
index 0000000..0037129
--- /dev/null
+++ b/src/blur-new-api-prototype/src/main/java/org/apache/blur/cache/Cache.java
@@ -0,0 +1,29 @@
+package org.apache.blur.cache;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+public interface Cache {
+
+  void delete(String name);
+
+  void update(String name, long blockId, byte[] buffer);
+
+  boolean fetch(String name, long blockId, int blockOffset, byte[] b, int off, int lengthToReadInBlock);
+
+  long size();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f2b81bd0/src/blur-new-api-prototype/src/main/java/org/apache/blur/cache/ReusedBufferedIndexInput.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/cache/ReusedBufferedIndexInput.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/cache/ReusedBufferedIndexInput.java
new file mode 100644
index 0000000..a2f5e25
--- /dev/null
+++ b/src/blur-new-api-prototype/src/main/java/org/apache/blur/cache/ReusedBufferedIndexInput.java
@@ -0,0 +1,361 @@
+package org.apache.blur.cache;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+
+/** Base implementation class for buffered {@link IndexInput}. */
+public abstract class ReusedBufferedIndexInput extends IndexInput {
+
+  public static final int BUFFER_SIZE = 1024;
+  public static final int MERGE_BUFFER_SIZE = 8192;
+  private int bufferSize = BUFFER_SIZE;
+
+  protected byte[] buffer;
+
+  private long bufferStart = 0; // position in file of buffer
+  private int bufferLength = 0; // end of valid bytes
+  private int bufferPosition = 0; // next byte to read
+
+  @Override
+  public final byte readByte() throws IOException {
+    if (bufferPosition >= bufferLength)
+      refill();
+    return buffer[bufferPosition++];
+  }
+
+  public ReusedBufferedIndexInput(String resourceDesc) {
+    this(resourceDesc, BUFFER_SIZE);
+  }
+
+  public ReusedBufferedIndexInput(String resourceDesc, IOContext context) {
+    this(resourceDesc, bufferSize(context));
+  }
+
+  /** Inits BufferedIndexInput with a specific bufferSize */
+  public ReusedBufferedIndexInput(String resourceDesc, int bufferSize) {
+    super(resourceDesc);
+    checkBufferSize(bufferSize);
+    this.bufferSize = bufferSize;
+  }
+
+  /** Returns buffer size. @see #setBufferSize */
+  public final int getBufferSize() {
+    return bufferSize;
+  }
+
+  private void checkBufferSize(int bufferSize) {
+    if (bufferSize <= 0)
+      throw new IllegalArgumentException("bufferSize must be greater than 0 (got " + bufferSize + ")");
+  }
+
+  @Override
+  public final void readBytes(byte[] b, int offset, int len) throws IOException {
+    readBytes(b, offset, len, true);
+  }
+
+  @Override
+  public final void readBytes(byte[] b, int offset, int len, boolean useBuffer) throws IOException {
+
+    if (len <= (bufferLength - bufferPosition)) {
+      // the buffer contains enough data to satisfy this request
+      if (len > 0) // to allow b to be null if len is 0...
+        System.arraycopy(buffer, bufferPosition, b, offset, len);
+      bufferPosition += len;
+    } else {
+      // the buffer does not have enough data. First serve all we've got.
+      int available = bufferLength - bufferPosition;
+      if (available > 0) {
+        System.arraycopy(buffer, bufferPosition, b, offset, available);
+        offset += available;
+        len -= available;
+        bufferPosition += available;
+      }
+      // and now, read the remaining 'len' bytes:
+      if (useBuffer && len < bufferSize) {
+        // If the amount left to read is small enough, and
+        // we are allowed to use our buffer, do it in the usual
+        // buffered way: fill the buffer and copy from it:
+        refill();
+        if (bufferLength < len) {
+          // Throw an exception when refill() could not read len bytes:
+          System.arraycopy(buffer, 0, b, offset, bufferLength);
+          throw new EOFException("read past EOF: " + this);
+        } else {
+          System.arraycopy(buffer, 0, b, offset, len);
+          bufferPosition = len;
+        }
+      } else {
+        // The amount left to read is larger than the buffer
+        // or we've been asked to not use our buffer -
+        // there's no performance reason not to read it all
+        // at once. Note that unlike the previous code of
+        // this function, there is no need to do a seek
+        // here, because there's no need to reread what we
+        // had in the buffer.
+        long after = bufferStart + bufferPosition + len;
+        if (after > length())
+          throw new EOFException("read past EOF: " + this);
+        readInternal(b, offset, len);
+        bufferStart = after;
+        bufferPosition = 0;
+        bufferLength = 0; // trigger refill() on read
+      }
+    }
+  }
+
+  @Override
+  public final short readShort() throws IOException {
+    if (2 <= (bufferLength - bufferPosition)) {
+      return (short) (((buffer[bufferPosition++] & 0xFF) << 8) | (buffer[bufferPosition++] & 0xFF));
+    } else {
+      return super.readShort();
+    }
+  }
+
+  @Override
+  public final int readInt() throws IOException {
+    if (4 <= (bufferLength - bufferPosition)) {
+      return ((buffer[bufferPosition++] & 0xFF) << 24) | ((buffer[bufferPosition++] & 0xFF) << 16) | ((buffer[bufferPosition++] & 0xFF) << 8) | (buffer[bufferPosition++] & 0xFF);
+    } else {
+      return super.readInt();
+    }
+  }
+
+  @Override
+  public final long readLong() throws IOException {
+    if (8 <= (bufferLength - bufferPosition)) {
+      final int i1 = ((buffer[bufferPosition++] & 0xff) << 24) | ((buffer[bufferPosition++] & 0xff) << 16) | ((buffer[bufferPosition++] & 0xff) << 8)
+          | (buffer[bufferPosition++] & 0xff);
+      final int i2 = ((buffer[bufferPosition++] & 0xff) << 24) | ((buffer[bufferPosition++] & 0xff) << 16) | ((buffer[bufferPosition++] & 0xff) << 8)
+          | (buffer[bufferPosition++] & 0xff);
+      return (((long) i1) << 32) | (i2 & 0xFFFFFFFFL);
+    } else {
+      return super.readLong();
+    }
+  }
+
+  @Override
+  public final int readVInt() throws IOException {
+    if (5 <= (bufferLength - bufferPosition)) {
+      byte b = buffer[bufferPosition++];
+      if (b >= 0)
+        return b;
+      int i = b & 0x7F;
+      b = buffer[bufferPosition++];
+      i |= (b & 0x7F) << 7;
+      if (b >= 0)
+        return i;
+      b = buffer[bufferPosition++];
+      i |= (b & 0x7F) << 14;
+      if (b >= 0)
+        return i;
+      b = buffer[bufferPosition++];
+      i |= (b & 0x7F) << 21;
+      if (b >= 0)
+        return i;
+      b = buffer[bufferPosition++];
+      // Warning: the next ands use 0x0F / 0xF0 - beware copy/paste errors:
+      i |= (b & 0x0F) << 28;
+      if ((b & 0xF0) == 0)
+        return i;
+      throw new IOException("Invalid vInt detected (too many bits)");
+    } else {
+      return super.readVInt();
+    }
+  }
+
+  @Override
+  public final long readVLong() throws IOException {
+    if (9 <= bufferLength - bufferPosition) {
+      byte b = buffer[bufferPosition++];
+      if (b >= 0)
+        return b;
+      long i = b & 0x7FL;
+      b = buffer[bufferPosition++];
+      i |= (b & 0x7FL) << 7;
+      if (b >= 0)
+        return i;
+      b = buffer[bufferPosition++];
+      i |= (b & 0x7FL) << 14;
+      if (b >= 0)
+        return i;
+      b = buffer[bufferPosition++];
+      i |= (b & 0x7FL) << 21;
+      if (b >= 0)
+        return i;
+      b = buffer[bufferPosition++];
+      i |= (b & 0x7FL) << 28;
+      if (b >= 0)
+        return i;
+      b = buffer[bufferPosition++];
+      i |= (b & 0x7FL) << 35;
+      if (b >= 0)
+        return i;
+      b = buffer[bufferPosition++];
+      i |= (b & 0x7FL) << 42;
+      if (b >= 0)
+        return i;
+      b = buffer[bufferPosition++];
+      i |= (b & 0x7FL) << 49;
+      if (b >= 0)
+        return i;
+      b = buffer[bufferPosition++];
+      i |= (b & 0x7FL) << 56;
+      if (b >= 0)
+        return i;
+      throw new IOException("Invalid vLong detected (negative values disallowed)");
+    } else {
+      return super.readVLong();
+    }
+  }
+
+  private void refill() throws IOException {
+    long start = bufferStart + bufferPosition;
+    long end = start + bufferSize;
+    if (end > length()) // don't read past EOF
+      end = length();
+    int newLength = (int) (end - start);
+    if (newLength <= 0)
+      throw new EOFException("read past EOF: " + this);
+
+    if (buffer == null) {
+      buffer = BufferStore.takeBuffer(bufferSize);
+      seekInternal(bufferStart);
+    }
+    readInternal(buffer, 0, newLength);
+    bufferLength = newLength;
+    bufferStart = start;
+    bufferPosition = 0;
+  }
+
+  /**
+   * Expert: implements buffer refill. Reads bytes from the current position in
+   * the input.
+   * 
+   * @param b
+   *          the array to read bytes into
+   * @param offset
+   *          the offset in the array to start storing bytes
+   * @param length
+   *          the number of bytes to read
+   */
+  protected abstract void readInternal(byte[] b, int offset, int length) throws IOException;
+
+  @Override
+  public final long getFilePointer() {
+    return bufferStart + bufferPosition;
+  }
+
+  @Override
+  public final void seek(long pos) throws IOException {
+    if (pos >= bufferStart && pos < (bufferStart + bufferLength))
+      bufferPosition = (int) (pos - bufferStart); // seek within buffer
+    else {
+      bufferStart = pos;
+      bufferPosition = 0;
+      bufferLength = 0; // trigger refill() on read()
+      seekInternal(pos);
+    }
+  }
+
+  /**
+   * Expert: implements seek. Sets current position in this file, where the next
+   * {@link #readInternal(byte[],int,int)} will occur.
+   * 
+   * @see #readInternal(byte[],int,int)
+   */
+  protected abstract void seekInternal(long pos) throws IOException;
+
+  @Override
+  public ReusedBufferedIndexInput clone() {
+    ReusedBufferedIndexInput clone = (ReusedBufferedIndexInput) super.clone();
+
+    clone.buffer = null;
+    clone.bufferLength = 0;
+    clone.bufferPosition = 0;
+    clone.bufferStart = getFilePointer();
+
+    return clone;
+  }
+
+  /**
+   * Flushes the in-memory bufer to the given output, copying at most
+   * <code>numBytes</code>.
+   * <p>
+   * <b>NOTE:</b> this method does not refill the buffer, however it does
+   * advance the buffer position.
+   * 
+   * @return the number of bytes actually flushed from the in-memory buffer.
+   */
+  protected final int flushBuffer(IndexOutput out, long numBytes) throws IOException {
+    int toCopy = bufferLength - bufferPosition;
+    if (toCopy > numBytes) {
+      toCopy = (int) numBytes;
+    }
+    if (toCopy > 0) {
+      out.writeBytes(buffer, bufferPosition, toCopy);
+      bufferPosition += toCopy;
+    }
+    return toCopy;
+  }
+
+  @Override
+  public void copyBytes(IndexOutput out, long numBytes) throws IOException {
+    assert numBytes >= 0 : "numBytes=" + numBytes;
+
+    while (numBytes > 0) {
+      if (bufferLength == bufferPosition) {
+        refill();
+      }
+      numBytes -= flushBuffer(out, numBytes);
+    }
+  }
+
+  /**
+   * Returns default buffer sizes for the given {@link IOContext}
+   */
+  public static int bufferSize(IOContext context) {
+    switch (context.context) {
+    case DEFAULT:
+    case FLUSH:
+    case READ:
+      return BUFFER_SIZE;
+    case MERGE:
+      return MERGE_BUFFER_SIZE;
+    default:
+      assert false : "unknown IOContext " + context.context;
+      return BUFFER_SIZE;
+    }
+  }
+
+  @Override
+  public final void close() throws IOException {
+    closeInternal();
+    BufferStore.putBuffer(buffer);
+    buffer = null;
+  }
+
+  protected abstract void closeInternal() throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f2b81bd0/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/ServerHdfs.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/ServerHdfs.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/ServerHdfs.java
new file mode 100644
index 0000000..9533c68
--- /dev/null
+++ b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/ServerHdfs.java
@@ -0,0 +1,321 @@
+package org.apache.blur.core;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.RuntimeMXBean;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.blur.cache.BlockCache;
+import org.apache.blur.cache.BlockDirectory;
+import org.apache.blur.cache.BlockDirectoryCache;
+import org.apache.blur.cache.BufferStore;
+import org.apache.blur.store.SimpleHDFSDirectory;
+import org.apache.blur.thrift.generated.Attribute;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.BlurTuple.Iface;
+import org.apache.blur.thrift.generated.BlurTuple.Processor;
+import org.apache.blur.thrift.generated.Session;
+import org.apache.blur.thrift.generated.Tuple;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.analysis.core.KeywordAnalyzer;
+import org.apache.lucene.codecs.appending.AppendingCodec;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.IndexableField;
+import org.apache.lucene.queryparser.classic.QueryParser;
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.Version;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.server.TThreadPoolServer.Args;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TTransportException;
+
+public class ServerHdfs implements Iface {
+
+  private static final String XX_MAX_DIRECT_MEMORY_SIZE = "-XX:MaxDirectMemorySize=";
+  private static final int MIN_SLABSIZE = 33554432;
+  private static final int MAX_SLABSIZE = 134217728;
+
+  private static final List<Tuple> EMPTY_LIST = new ArrayList<Tuple>();
+  private static final long KILO = 1024;
+  private static final long MEG = KILO * 1024;
+  private static final long GIG = MEG * 1024;
+
+  public static void main(String[] argsStr) throws TTransportException, IOException {
+    Configuration conf = new Configuration();
+    ServerHdfs server = new ServerHdfs(conf, new Path(argsStr[0]));
+    Processor<Iface> processor = new Processor<Iface>(server);
+    Args args = new Args(new TServerSocket(new InetSocketAddress("127.0.0.1", 9000)));
+    args.minWorkerThreads(50);
+    args.maxWorkerThreads(50);
+    args.processor(processor);
+    args.transportFactory(new TFramedTransport.Factory());
+    args.protocolFactory(new TBinaryProtocol.Factory(true, true));
+
+    TThreadPoolServer tserver = new TThreadPoolServer(args);
+    tserver.serve();
+  }
+
+  private Map<String, RSession> readSessions = new ConcurrentHashMap<String, RSession>();
+  private Map<String, WSession> writeSessions = new ConcurrentHashMap<String, WSession>();
+  private IndexWriter writer;
+  private Path path;
+  private Configuration configuration;
+
+  public ServerHdfs(Configuration configuration, Path path) throws IOException {
+    this.path = path;
+    this.configuration = configuration;
+    Directory directory = new SimpleHDFSDirectory(configuration, path);
+    BufferStore.init();
+    long totalMemory = getTotalMemory();
+    int slabSize = MAX_SLABSIZE;
+    BlockCache cache;
+    if (totalMemory < 0) {
+      cache = new BlockCache(false, MIN_SLABSIZE, MIN_SLABSIZE);
+    } else {
+      cache = new BlockCache(true, totalMemory, slabSize);
+    }
+    BlockDirectoryCache blockDirectoryCache = new BlockDirectoryCache(cache);
+    BlockDirectory blockDirectory = new BlockDirectory("embedded", directory, blockDirectoryCache);
+    this.writer = openWriter(blockDirectory);
+  }
+
+  private long getTotalMemory() {
+    RuntimeMXBean RuntimemxBean = ManagementFactory.getRuntimeMXBean();
+    List<String> arguments = RuntimemxBean.getInputArguments();
+    String maxDirectMemorySize = null;
+    for (String argument : arguments) {
+      if (argument.startsWith(XX_MAX_DIRECT_MEMORY_SIZE)) {
+        maxDirectMemorySize = argument;
+      }
+    }
+    if (maxDirectMemorySize == null) {
+      return -1L;
+    }
+    String directSize = maxDirectMemorySize.substring(XX_MAX_DIRECT_MEMORY_SIZE.length());
+    return getSize(directSize.toLowerCase()) - MAX_SLABSIZE;
+  }
+
+  private long getSize(String directSize) {
+    if (directSize.endsWith("g")) {
+      return Long.parseLong(directSize.substring(0, directSize.length() - 1)) * GIG;
+    } else if (directSize.endsWith("m")) {
+      return Long.parseLong(directSize.substring(0, directSize.length() - 1)) * MEG;
+    } else if (directSize.endsWith("k")) {
+      return Long.parseLong(directSize.substring(0, directSize.length() - 1)) * KILO;
+    } else {
+      return Long.parseLong(directSize.substring(0, directSize.length()));
+    }
+  }
+
+  private IndexWriter openWriter(Directory directory) throws IOException {
+    IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_40, new KeywordAnalyzer());
+    conf.setCodec(new AppendingCodec());
+    return new IndexWriter(directory, conf);
+  }
+
+  @Override
+  public Session openReadSession() throws BlurException, TException {
+    try {
+      IndexReader reader = DirectoryReader.open(writer, true);
+      RSession session = new RSession(UUID.randomUUID().toString(), reader);
+      readSessions.put(session.getSessionId(), session);
+      return new Session(session.getSessionId());
+    } catch (Throwable t) {
+      throw Util.wrapThrowable(t);
+    }
+  }
+
+  @Override
+  public void executeQuery(Session readSession, String query) throws BlurException, TException {
+    try {
+      RSession session = getReadSession(readSession);
+      if (query.trim().equals("*")) {
+        session.execute(new MatchAllDocsQuery());
+        return;
+      }
+      QueryParser parser = new QueryParser(Version.LUCENE_40, "SUPER", new KeywordAnalyzer());
+      parser.setAllowLeadingWildcard(true);
+      session.execute(parser.parse(query));
+    } catch (Throwable t) {
+      throw Util.wrapThrowable(t);
+    }
+  }
+
+  @Override
+  public List<Tuple> nextMetaDataResults(Session readSession, int batchSize) throws BlurException, TException {
+    try {
+      RSession session = getReadSession(readSession);
+      if (session.isMetaDataBeenFetched()) {
+        return EMPTY_LIST;
+      }
+      Tuple tuple = new Tuple();
+      tuple.addToAttributes(Util.newAttribute("totalResults", session.getTotalHits()));
+      return Arrays.asList(tuple);
+    } catch (Throwable t) {
+      throw Util.wrapThrowable(t);
+    }
+  }
+
+  @Override
+  public List<Tuple> nextResults(Session readSession, int batchSize) throws BlurException, TException {
+    try {
+      RSession session = getReadSession(readSession);
+      List<Tuple> results = new ArrayList<Tuple>();
+      for (int i = 0; i < batchSize; i++) {
+        Tuple tuple = convert(session.nextDocument());
+        if (tuple == null) {
+          break;
+        }
+        results.add(tuple);
+      }
+      return results;
+    } catch (Throwable t) {
+      throw Util.wrapThrowable(t);
+    }
+  }
+
+  @Override
+  public void closeReadSession(Session readSession) throws BlurException, TException {
+    try {
+      RSession session = readSessions.remove(readSession.getSessionId());
+      session.close();
+    } catch (Throwable t) {
+      throw Util.wrapThrowable(t);
+    }
+  }
+
+  @Override
+  public Session openWriteSession() throws BlurException, TException {
+    try {
+      String id = UUID.randomUUID().toString();
+      Path p = new Path(path, id);
+      Directory directory = new SimpleHDFSDirectory(configuration, p);
+      WSession session = new WSessionThreadedPath(id, openWriter(directory), directory, p);
+      writeSessions.put(session.getSessionId(), session);
+      return new Session(session.getSessionId());
+    } catch (Throwable t) {
+      throw Util.wrapThrowable(t);
+    }
+  }
+
+  @Override
+  public void writeTuples(Session writeSession, List<Tuple> tuples) throws BlurException, TException {
+    try {
+      WSession session = getWriteSession(writeSession);
+      session.addDocuments(convert(tuples));
+    } catch (Throwable t) {
+      throw Util.wrapThrowable(t);
+    }
+  }
+
+  @Override
+  public void commitWriteSession(Session writeSession) throws BlurException, TException {
+    try {
+      WSession session = writeSessions.remove(writeSession.getSessionId());
+      session.closeWriter();
+      writer.addIndexes(session.getDirectory());
+      writer.commit();
+      writer.maybeMerge();
+      rm(((WSessionThreadedPath) session).getPath());
+    } catch (Throwable t) {
+      throw Util.wrapThrowable(t);
+    }
+  }
+
+  @Override
+  public void rollbackWriteSession(Session writeSession) throws BlurException, TException {
+    try {
+      WSession session = writeSessions.remove(writeSession.getSessionId());
+      session.closeWriter();
+      rm(((WSessionThreadedPath) session).getPath());
+    } catch (Throwable t) {
+      throw Util.wrapThrowable(t);
+    }
+  }
+
+  private void rm(Path path) throws IOException {
+    FileSystem fileSystem = path.getFileSystem(configuration);
+    fileSystem.delete(path, true);
+    fileSystem.close();
+  }
+
+  private Document convert(Tuple tuple) throws BlurException {
+    if (tuple == null) {
+      return null;
+    }
+    Document newDoc = new Document();
+    for (Attribute attribute : tuple.getAttributes()) {
+      newDoc.add(Util.getField(attribute));
+    }
+    return newDoc;
+  }
+
+  private Tuple convert(Document document) {
+    if (document == null) {
+      return null;
+    }
+    List<IndexableField> fields = document.getFields();
+    Tuple tuple = new Tuple();
+    for (IndexableField fieldable : fields) {
+      tuple.addToAttributes(Util.toAttribute(fieldable));
+    }
+    return tuple;
+  }
+
+  private List<Document> convert(List<Tuple> tuples) throws BlurException {
+    List<Document> docs = new ArrayList<Document>();
+    for (Tuple tuple : tuples) {
+      docs.add(convert(tuple));
+    }
+    return docs;
+  }
+
+  private WSession getWriteSession(Session session) throws BlurException {
+    WSession wsession = writeSessions.get(session.getSessionId());
+    if (wsession == null) {
+      throw new BlurException("Write Session [" + session + "] not found", null);
+    }
+    return wsession;
+  }
+
+  private RSession getReadSession(Session session) throws BlurException {
+    RSession rsession = readSessions.get(session.getSessionId());
+    if (rsession == null) {
+      throw new BlurException("Read Session [" + session + "] not found", null);
+    }
+    return rsession;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f2b81bd0/src/blur-new-api-prototype/src/main/java/org/apache/blur/store/SimpleHDFSDirectory.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/store/SimpleHDFSDirectory.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/store/SimpleHDFSDirectory.java
index 775a68b..a2c3365 100644
--- a/src/blur-new-api-prototype/src/main/java/org/apache/blur/store/SimpleHDFSDirectory.java
+++ b/src/blur-new-api-prototype/src/main/java/org/apache/blur/store/SimpleHDFSDirectory.java
@@ -141,4 +141,12 @@ public class SimpleHDFSDirectory extends Directory {
     return new Path(path, name);
   }
 
+  public long getFileModified(String name) throws IOException {
+    if (!fileExists(name)) {
+      throw new FileNotFoundException("File [" + name + "] not found");
+    }
+    Path file = getPath(name);
+    return fileSystem.getFileStatus(file).getModificationTime();
+  }
+
 }


Mime
View raw message