incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [26/51] [partial] Fixed BLUR-126.
Date Thu, 06 Jun 2013 18:57:58 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockCache.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockCache.java b/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockCache.java
new file mode 100644
index 0000000..7777047
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockCache.java
@@ -0,0 +1,302 @@
+package org.apache.blur.store.blockcache;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.apache.blur.metrics.MetricsConstants.CACHE;
+import static org.apache.blur.metrics.MetricsConstants.ENTRIES;
+import static org.apache.blur.metrics.MetricsConstants.EVICTION;
+import static org.apache.blur.metrics.MetricsConstants.ORG_APACHE_BLUR;
+import static org.apache.blur.metrics.MetricsConstants.SIZE;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.security.AccessController;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
+import com.googlecode.concurrentlinkedhashmap.EvictionListener;
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
+
+public class BlockCache implements Closeable {
+
+  /**
+   * <code>true</code>, if this platform supports unmapping mmapped files.
+   */
+  public static final boolean UNMAP_SUPPORTED;
+  static {
+    boolean v;
+    try {
+      Class.forName("sun.misc.Cleaner");
+      Class.forName("java.nio.DirectByteBuffer").getMethod("cleaner");
+      v = true;
+    } catch (Exception e) {
+      v = false;
+    }
+    UNMAP_SUPPORTED = v;
+  }
+
+  public static final int _128M = 134217728;
+  public static final int _8K = 8192;
+
+  private final ConcurrentMap<BlockCacheKey, BlockCacheLocation> _cache;
+  private final ByteBuffer[] _slabs;
+  private final BlockLocks[] _locks;
+  private final AtomicInteger[] _lockCounters;
+  private final int _blockSize = _8K;
+  private final int _numberOfBlocksPerSlab;
+  private final int _maxEntries;
+  private final Meter evictions;
+  private final int _numberOfSlabs;
+  private final boolean _directAllocation;
+  private final ThreadLocal<ByteBuffer[]> _threadLocalSlabs = new ThreadLocal<ByteBuffer[]>() {
+    @Override
+    protected ByteBuffer[] initialValue() {
+      return new ByteBuffer[_numberOfSlabs];
+    }
+  };
+  
+  //This turns the lazy bytebuffer allocation on or off.
+  private final boolean lazy = true;
+
+  public BlockCache(boolean directAllocation, long totalMemory) {
+    this(directAllocation, totalMemory, _128M);
+  }
+
+  public BlockCache(boolean directAllocation, long totalMemory, int slabSize) {
+    _numberOfBlocksPerSlab = slabSize / _blockSize;
+    _numberOfSlabs = (int) (totalMemory / slabSize);
+    _directAllocation = directAllocation;
+
+    _slabs = new ByteBuffer[_numberOfSlabs];
+    _locks = new BlockLocks[_numberOfSlabs];
+    _lockCounters = new AtomicInteger[_numberOfSlabs];
+    _maxEntries = (_numberOfBlocksPerSlab * _numberOfSlabs) - 1;
+    for (int i = 0; i < _numberOfSlabs; i++) {
+      if (!lazy) {
+        if (_directAllocation) {
+          _slabs[i] = ByteBuffer.allocateDirect(_numberOfBlocksPerSlab * _blockSize);
+        } else {
+          _slabs[i] = ByteBuffer.allocate(_numberOfBlocksPerSlab * _blockSize);
+        }
+      }
+      _locks[i] = new BlockLocks(_numberOfBlocksPerSlab);
+      _lockCounters[i] = new AtomicInteger();
+    }
+
+    evictions = Metrics.newMeter(new MetricName(ORG_APACHE_BLUR, CACHE, EVICTION), EVICTION, TimeUnit.SECONDS);
+    Metrics.newGauge(new MetricName(ORG_APACHE_BLUR, CACHE, ENTRIES), new Gauge<Long>() {
+      @Override
+      public Long value() {
+        return (long) getSize();
+      }
+    });
+    Metrics.newGauge(new MetricName(ORG_APACHE_BLUR, CACHE, SIZE), new Gauge<Long>() {
+      @Override
+      public Long value() {
+        return ((long) getSize()) * (long) _8K;
+      }
+    });
+
+    EvictionListener<BlockCacheKey, BlockCacheLocation> listener = new EvictionListener<BlockCacheKey, BlockCacheLocation>() {
+      @Override
+      public void onEviction(BlockCacheKey key, BlockCacheLocation location) {
+        releaseLocation(location);
+        evictions.mark();
+      }
+    };
+    _cache = new ConcurrentLinkedHashMap.Builder<BlockCacheKey, BlockCacheLocation>()
+        .maximumWeightedCapacity(_maxEntries).listener(listener).build();
+  }
+
+  private void releaseLocation(BlockCacheLocation location) {
+    if (location == null) {
+      return;
+    }
+    int slabId = location.getSlabId();
+    int block = location.getBlock();
+    location.setRemoved(true);
+    _locks[slabId].clear(block);
+    _lockCounters[slabId].decrementAndGet();
+  }
+
+  public boolean store(BlockCacheKey blockCacheKey, int blockOffset, byte[] data, int offset, int length) {
+    if (length + blockOffset > _blockSize) {
+      throw new RuntimeException("Buffer size exceeded, expecting max [" + _blockSize + "] got length [" + length
+          + "] with blockOffset [" + blockOffset + "]");
+    }
+    BlockCacheLocation location = _cache.get(blockCacheKey);
+    boolean newLocation = false;
+    if (location == null) {
+      newLocation = true;
+      location = new BlockCacheLocation();
+      if (!findEmptyLocation(location)) {
+        return false;
+      }
+    }
+    if (location.isRemoved()) {
+      return false;
+    }
+    int slabId = location.getSlabId();
+    int slabOffset = location.getBlock() * _blockSize;
+    ByteBuffer slab = getSlab(slabId);
+    slab.position(slabOffset + blockOffset);
+    slab.put(data, offset, length);
+    if (newLocation) {
+      releaseLocation(_cache.put(blockCacheKey.clone(), location));
+    }
+    return true;
+  }
+
+  public boolean fetch(BlockCacheKey blockCacheKey, byte[] buffer, int blockOffset, int off, int length) {
+    BlockCacheLocation location = _cache.get(blockCacheKey);
+    if (location == null) {
+      return false;
+    }
+    if (location.isRemoved()) {
+      return false;
+    }
+    int slabId = location.getSlabId();
+    int offset = location.getBlock() * _blockSize;
+    location.touch();
+    ByteBuffer slab = getSlab(slabId);
+    slab.position(offset + blockOffset);
+    slab.get(buffer, off, length);
+    return true;
+  }
+
+  public boolean fetch(BlockCacheKey blockCacheKey, byte[] buffer) {
+    checkLength(buffer);
+    return fetch(blockCacheKey, buffer, 0, 0, _blockSize);
+  }
+
+  private boolean findEmptyLocation(BlockCacheLocation location) {
+    // This is a tight loop that will try and find a location to
+    // place the block before giving up
+    for (int j = 0; j < 10; j++) {
+      OUTER: for (int slabId = 0; slabId < _slabs.length; slabId++) {
+        AtomicInteger bitSetCounter = _lockCounters[slabId];
+        BlockLocks bitSet = _locks[slabId];
+        if (bitSetCounter.get() == _numberOfBlocksPerSlab) {
+          // if bitset is full
+          continue OUTER;
+        }
+        // this check needs to spin, if a lock was attempted but not obtained
+        // the rest of the slab should not be skipped
+        int bit = bitSet.nextClearBit(0);
+        INNER: while (bit != -1) {
+          if (bit >= _numberOfBlocksPerSlab) {
+            // bit set is full
+            continue OUTER;
+          }
+          if (!bitSet.set(bit)) {
+            // lock was not obtained
+            // this restarts at 0 because another block could have been unlocked
+            // while this was executing
+            bit = bitSet.nextClearBit(0);
+            continue INNER;
+          } else {
+            // lock obtained
+            location.setSlabId(slabId);
+            location.setBlock(bit);
+            bitSetCounter.incrementAndGet();
+            return true;
+          }
+        }
+      }
+    }
+    return false;
+  }
+
+  private void checkLength(byte[] buffer) {
+    if (buffer.length != _blockSize) {
+      throw new RuntimeException("Buffer wrong size, expecting [" + _blockSize + "] got [" + buffer.length + "]");
+    }
+  }
+
+  private ByteBuffer getSlab(int slabId) {
+    if (!lazy) {
+      return _slabs[slabId].duplicate();
+    } else {
+      ByteBuffer[] byteBuffers = _threadLocalSlabs.get();
+      ByteBuffer byteBuffer = byteBuffers[slabId];
+      if (byteBuffer == null) {
+        synchronized (_slabs) {
+          ByteBuffer bb = _slabs[slabId];
+          if (bb == null) {
+            if (_directAllocation) {
+              bb = ByteBuffer.allocateDirect(_numberOfBlocksPerSlab * _blockSize);
+            } else {
+              bb = ByteBuffer.allocate(_numberOfBlocksPerSlab * _blockSize);
+            }
+            _slabs[slabId] = bb;
+          }
+          byteBuffer = bb.duplicate();
+        }
+        byteBuffers[slabId] = byteBuffer;
+      }
+      return byteBuffer;
+    }
+  }
+
+  public int getSize() {
+    return _cache.size();
+  }
+
+  public void close() throws IOException {
+    for (ByteBuffer buffer : this._slabs) {
+      freeBuffer(buffer);
+    }
+  }
+
+  /**
+   * This code was copied form MMAPDirectory in Lucene.
+   */
+  protected void freeBuffer(final ByteBuffer buffer) throws IOException {
+    if (buffer == null) {
+      return;
+    }
+    if (UNMAP_SUPPORTED) {
+      try {
+        AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() {
+          @Override
+          public Void run() throws Exception {
+            final Method getCleanerMethod = buffer.getClass().getMethod("cleaner");
+            getCleanerMethod.setAccessible(true);
+            final Object cleaner = getCleanerMethod.invoke(buffer);
+            if (cleaner != null) {
+              cleaner.getClass().getMethod("clean").invoke(cleaner);
+            }
+            return null;
+          }
+        });
+      } catch (PrivilegedActionException e) {
+        final IOException ioe = new IOException("unable to unmap the mapped buffer");
+        ioe.initCause(e.getCause());
+        throw ioe;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockCacheKey.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockCacheKey.java b/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockCacheKey.java
new file mode 100644
index 0000000..cab828d
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockCacheKey.java
@@ -0,0 +1,73 @@
+package org.apache.blur.store.blockcache;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+public class BlockCacheKey implements Cloneable {
+
+  private long _block;
+  private int _file;
+
+  public long getBlock() {
+    return _block;
+  }
+
+  public int getFile() {
+    return _file;
+  }
+
+  public void setBlock(long block) {
+    _block = block;
+  }
+
+  public void setFile(int file) {
+    _file = file;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + (int) (_block ^ (_block >>> 32));
+    result = prime * result + _file;
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    BlockCacheKey other = (BlockCacheKey) obj;
+    if (_block != other._block)
+      return false;
+    if (_file != other._file)
+      return false;
+    return true;
+  }
+
+  @Override
+  public BlockCacheKey clone() {
+    try {
+      return (BlockCacheKey) super.clone();
+    } catch (CloneNotSupportedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockCacheLocation.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockCacheLocation.java b/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockCacheLocation.java
new file mode 100644
index 0000000..61a54b0
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockCacheLocation.java
@@ -0,0 +1,66 @@
+package org.apache.blur.store.blockcache;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class BlockCacheLocation {
+
+  private int _block;
+  private int _slabId;
+  private long _lastAccess = System.currentTimeMillis();
+  private long _accesses;
+  private AtomicBoolean _removed = new AtomicBoolean(false);
+
+  public void setBlock(int block) {
+    _block = block;
+  }
+
+  public void setSlabId(int slabId) {
+    _slabId = slabId;
+  }
+
+  public int getBlock() {
+    return _block;
+  }
+
+  public int getSlabId() {
+    return _slabId;
+  }
+
+  public void touch() {
+    _lastAccess = System.currentTimeMillis();
+    _accesses++;
+  }
+
+  public long getLastAccess() {
+    return _lastAccess;
+  }
+
+  public long getNumberOfAccesses() {
+    return _accesses;
+  }
+
+  public boolean isRemoved() {
+    return _removed.get();
+  }
+
+  public void setRemoved(boolean removed) {
+    _removed.set(removed);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectory.java b/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectory.java
new file mode 100644
index 0000000..041b10d
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectory.java
@@ -0,0 +1,324 @@
+package org.apache.blur.store.blockcache;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Set;
+
+import org.apache.blur.store.buffer.BufferStore;
+import org.apache.blur.store.buffer.ReusedBufferedIndexInput;
+import org.apache.blur.store.hdfs.DirectoryDecorator;
+import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.Lock;
+import org.apache.lucene.store.LockFactory;
+
+public class BlockDirectory extends Directory implements DirectoryDecorator {
+
+  public static final long BLOCK_SHIFT = 13; // 2^13 = 8,192 bytes per block
+  public static final long BLOCK_MOD = 0x1FFF;
+  public static final int BLOCK_SIZE = 1 << BLOCK_SHIFT;
+
+  public static long getBlock(long pos) {
+    return pos >>> BLOCK_SHIFT;
+  }
+
+  public static long getPosition(long pos) {
+    return pos & BLOCK_MOD;
+  }
+
+  public static long getRealPosition(long block, long positionInBlock) {
+    return (block << BLOCK_SHIFT) + positionInBlock;
+  }
+
+  public static Cache NO_CACHE = new Cache() {
+
+    @Override
+    public void update(String name, long blockId, int blockOffset, byte[] buffer, int offset, int length) {
+    }
+
+    @Override
+    public boolean fetch(String name, long blockId, int blockOffset, byte[] b, int off, int lengthToReadInBlock) {
+      return false;
+    }
+
+    @Override
+    public void delete(String name) {
+
+    }
+
+    @Override
+    public long size() {
+      return 0;
+    }
+
+    @Override
+    public void renameCacheFile(String source, String dest) {
+    }
+  };
+
+  private Directory _directory;
+  private int _blockSize;
+  private String _dirName;
+  private Cache _cache;
+  private Set<String> _blockCacheFileTypes;
+  private boolean closed = false;
+
+  public BlockDirectory(String dirName, Directory directory) throws IOException {
+    this(dirName, directory, NO_CACHE);
+  }
+
+  public BlockDirectory(String dirName, Directory directory, Cache cache) throws IOException {
+    this(dirName, directory, cache, null);
+  }
+
+  public BlockDirectory(String dirName, Directory directory, Cache cache, Set<String> blockCacheFileTypes)
+      throws IOException {
+    _dirName = dirName;
+    _directory = directory;
+    _blockSize = BLOCK_SIZE;
+    _cache = cache;
+    if (blockCacheFileTypes == null || blockCacheFileTypes.isEmpty()) {
+      _blockCacheFileTypes = null;
+    } else {
+      _blockCacheFileTypes = blockCacheFileTypes;
+    }
+    setLockFactory(directory.getLockFactory());
+  }
+
+  private boolean isCachableFile(String name) {
+    for (String ext : _blockCacheFileTypes) {
+      if (name.endsWith(ext)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public IndexInput openInput(final String name, IOContext context) throws IOException {
+    final IndexInput source = _directory.openInput(name, context);
+    if (_blockCacheFileTypes == null || isCachableFile(name)) {
+      return new CachedIndexInput(source, _blockSize, name, getFileCacheName(name), _cache, context);
+    }
+    return source;
+  }
+
+  static class CachedIndexInput extends ReusedBufferedIndexInput {
+
+    private IndexInput _source;
+    private int _blockSize;
+    private long _fileLength;
+    private String _cacheName;
+    private Cache _cache;
+
+    public CachedIndexInput(IndexInput source, int blockSize, String name, String cacheName, Cache cache,
+        IOContext context) {
+      super(name, context);
+      _source = source;
+      _blockSize = blockSize;
+      _fileLength = source.length();
+      _cacheName = cacheName;
+      _cache = cache;
+    }
+
+    @Override
+    public CachedIndexInput clone() {
+      CachedIndexInput clone = (CachedIndexInput) super.clone();
+      clone._source = (IndexInput) _source.clone();
+      return clone;
+    }
+
+    @Override
+    public long length() {
+      return _source.length();
+    }
+
+    @Override
+    protected void seekInternal(long pos) throws IOException {
+    }
+
+    @Override
+    protected void readInternal(byte[] b, int off, int len) throws IOException {
+      long position = getFilePointer();
+      while (len > 0) {
+        int length = fetchBlock(position, b, off, len);
+        position += length;
+        len -= length;
+        off += length;
+      }
+    }
+
+    private int fetchBlock(long position, byte[] b, int off, int len) throws IOException {
+      // read whole block into cache and then provide needed data
+      long blockId = getBlock(position);
+      int blockOffset = (int) getPosition(position);
+      int lengthToReadInBlock = Math.min(len, _blockSize - blockOffset);
+      if (checkCache(blockId, blockOffset, b, off, lengthToReadInBlock)) {
+        return lengthToReadInBlock;
+      } else {
+        readIntoCacheAndResult(blockId, blockOffset, b, off, lengthToReadInBlock);
+      }
+      return lengthToReadInBlock;
+    }
+
+    private void readIntoCacheAndResult(long blockId, int blockOffset, byte[] b, int off, int lengthToReadInBlock)
+        throws IOException {
+      long position = getRealPosition(blockId, 0);
+      int length = (int) Math.min(_blockSize, _fileLength - position);
+      _source.seek(position);
+
+      byte[] buf = BufferStore.takeBuffer(_blockSize);
+      _source.readBytes(buf, 0, length);
+      System.arraycopy(buf, blockOffset, b, off, lengthToReadInBlock);
+      _cache.update(_cacheName, blockId, 0, buf, 0, _blockSize);
+      BufferStore.putBuffer(buf);
+    }
+
+    private boolean checkCache(long blockId, int blockOffset, byte[] b, int off, int lengthToReadInBlock) {
+      return _cache.fetch(_cacheName, blockId, blockOffset, b, off, lengthToReadInBlock);
+    }
+
+    @Override
+    protected void closeInternal() throws IOException {
+      _source.close();
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (!closed) {
+      String[] files = listAll();
+      for (String file : files) {
+        _cache.delete(getFileCacheName(file));
+      }
+      _directory.close();
+      closed = true;
+    }
+  }
+
+  String getFileCacheLocation(String name) {
+    return _dirName + "/" + name;
+  }
+
+  String getFileCacheName(String name) throws IOException {
+    return getFileCacheLocation(name) + ":" + getFileModified(name);
+  }
+
+  private long getFileModified(String name) throws IOException {
+    if (_directory instanceof FSDirectory) {
+      File directory = ((FSDirectory) _directory).getDirectory();
+      File file = new File(directory, name);
+      if (!file.exists()) {
+        throw new FileNotFoundException("File [" + name + "] not found");
+      }
+      return file.lastModified();
+    } else if (_directory instanceof HdfsDirectory) {
+      return ((HdfsDirectory) _directory).getFileModified(name);
+    } else if (_directory instanceof LastModified) {
+      return ((LastModified) _directory).getFileModified(name);
+    } else {
+      throw new RuntimeException("Not supported");
+    }
+  }
+
+  public void clearLock(String name) throws IOException {
+    _directory.clearLock(name);
+  }
+
+  @Override
+  public void copy(Directory to, String src, String dest, IOContext context) throws IOException {
+    _directory.copy(to, src, dest, context);
+  }
+
+  @Override
+  public LockFactory getLockFactory() {
+    return _directory.getLockFactory();
+  }
+
+  @Override
+  public String getLockID() {
+    return _directory.getLockID();
+  }
+
+  @Override
+  public Lock makeLock(String name) {
+    return _directory.makeLock(name);
+  }
+
+  @Override
+  public void setLockFactory(LockFactory lockFactory) throws IOException {
+    _directory.setLockFactory(lockFactory);
+  }
+
+  @Override
+  public void sync(Collection<String> names) throws IOException {
+    _directory.sync(names);
+  }
+
+  public String toString() {
+    return _directory.toString();
+  }
+
+  @Override
+  public IndexOutput createOutput(String name, IOContext context) throws IOException {
+    IndexOutput dest = _directory.createOutput(name, context);
+    // if (_blockCacheFileTypes == null || isCachableFile(name)) {
+    // return new CachedIndexOutput(this, dest, _blockSize, name, _cache,
+    // _blockSize);
+    // }
+    return dest;
+  }
+
+  @Override
+  public void deleteFile(String name) throws IOException {
+    _cache.delete(getFileCacheName(name));
+    _directory.deleteFile(name);
+  }
+
+  @Override
+  public boolean fileExists(String name) throws IOException {
+    return _directory.fileExists(name);
+  }
+
+  @Override
+  public long fileLength(String name) throws IOException {
+    return _directory.fileLength(name);
+  }
+
+  @Override
+  public String[] listAll() throws IOException {
+    return _directory.listAll();
+  }
+
+  public Directory getDirectory() {
+    return _directory;
+  }
+
+  @Override
+  public Directory getOriginalDirectory() {
+    return _directory;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectoryCache.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectoryCache.java b/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectoryCache.java
new file mode 100644
index 0000000..f6e8b10
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectoryCache.java
@@ -0,0 +1,95 @@
+package org.apache.blur.store.blockcache;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.apache.blur.metrics.MetricsConstants.CACHE;
+import static org.apache.blur.metrics.MetricsConstants.HIT;
+import static org.apache.blur.metrics.MetricsConstants.MISS;
+import static org.apache.blur.metrics.MetricsConstants.ORG_APACHE_BLUR;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
+
+public class BlockDirectoryCache implements Cache {
+
+  private BlockCache _blockCache;
+  private AtomicInteger _counter = new AtomicInteger();
+  private Map<String, Integer> _names = new ConcurrentHashMap<String, Integer>();
+  private Meter hits;
+  private Meter misses;
+
+  public BlockDirectoryCache(BlockCache blockCache) {
+    _blockCache = blockCache;
+    hits = Metrics.newMeter(new MetricName(ORG_APACHE_BLUR, CACHE, HIT), HIT, TimeUnit.SECONDS);
+    misses = Metrics.newMeter(new MetricName(ORG_APACHE_BLUR, CACHE, MISS), MISS, TimeUnit.SECONDS);
+  }
+
+  @Override
+  public void delete(String name) {
+    _names.remove(name);
+  }
+
+  @Override
+  public void update(String name, long blockId, int blockOffset, byte[] buffer, int offset, int length) {
+    Integer file = _names.get(name);
+    if (file == null) {
+      file = _counter.incrementAndGet();
+      _names.put(name, file);
+    }
+    BlockCacheKey blockCacheKey = new BlockCacheKey();
+    blockCacheKey.setBlock(blockId);
+    blockCacheKey.setFile(file);
+    _blockCache.store(blockCacheKey, blockOffset, buffer, offset, length);
+  }
+
+  @Override
+  public boolean fetch(String name, long blockId, int blockOffset, byte[] b, int off, int lengthToReadInBlock) {
+    Integer file = _names.get(name);
+    if (file == null) {
+      return false;
+    }
+    BlockCacheKey blockCacheKey = new BlockCacheKey();
+    blockCacheKey.setBlock(blockId);
+    blockCacheKey.setFile(file);
+    boolean fetch = _blockCache.fetch(blockCacheKey, b, blockOffset, off, lengthToReadInBlock);
+    if (fetch) {
+      hits.mark();
+    } else {
+      misses.mark();
+    }
+    return fetch;
+  }
+
+  @Override
+  public long size() {
+    return _blockCache.getSize();
+  }
+
+  @Override
+  public void renameCacheFile(String source, String dest) {
+    Integer file = _names.remove(source);
+    if (file != null) {
+      _names.put(dest, file);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockLocks.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockLocks.java b/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockLocks.java
new file mode 100644
index 0000000..b25cd12
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockLocks.java
@@ -0,0 +1,95 @@
+package org.apache.blur.store.blockcache;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.concurrent.atomic.AtomicLongArray;
+
+import org.apache.lucene.util.OpenBitSet;
+
+public class BlockLocks {
+
+  private AtomicLongArray bits;
+  private int wlen;
+
+  public BlockLocks(long numBits) {
+    int length = OpenBitSet.bits2words(numBits);
+    bits = new AtomicLongArray(length);
+    wlen = length;
+  }
+
+  /**
+   * Find the next clear bit in the bit set.
+   * 
+   * @param index
+   * @return
+   */
+  public int nextClearBit(int index) {
+    int i = index >> 6;
+    if (i >= wlen)
+      return -1;
+    int subIndex = index & 0x3f; // index within the word
+    long word = ~bits.get(i) >> subIndex; // skip all the bits to the right of
+                                          // index
+    if (word != 0) {
+      return (i << 6) + subIndex + Long.numberOfTrailingZeros(word);
+    }
+    while (++i < wlen) {
+      word = ~bits.get(i);
+      if (word != 0) {
+        return (i << 6) + Long.numberOfTrailingZeros(word);
+      }
+    }
+    return -1;
+  }
+
+  /**
+   * Thread safe set operation that will set the bit if and only if the bit was
+   * not previously set.
+   * 
+   * @param index
+   *          the index position to set.
+   * @return returns true if the bit was set and false if it was already set.
+   */
+  public boolean set(int index) {
+    int wordNum = index >> 6; // div 64
+    int bit = index & 0x3f; // mod 64
+    long bitmask = 1L << bit;
+    long word, oword;
+    do {
+      word = bits.get(wordNum);
+      // if set another thread stole the lock
+      if ((word & bitmask) != 0) {
+        return false;
+      }
+      oword = word;
+      word |= bitmask;
+    } while (!bits.compareAndSet(wordNum, oword, word));
+    return true;
+  }
+
+  public void clear(int index) {
+    int wordNum = index >> 6;
+    int bit = index & 0x03f;
+    long bitmask = 1L << bit;
+    long word, oword;
+    do {
+      word = bits.get(wordNum);
+      oword = word;
+      word &= ~bitmask;
+    } while (!bits.compareAndSet(wordNum, oword, word));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-store/src/main/java/org/apache/blur/store/blockcache/Cache.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache/Cache.java b/blur-store/src/main/java/org/apache/blur/store/blockcache/Cache.java
new file mode 100644
index 0000000..38e70cb
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/blockcache/Cache.java
@@ -0,0 +1,56 @@
+package org.apache.blur.store.blockcache;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+public interface Cache {
+
+  /**
+   * Remove a file from the cache.
+   * 
+   * @param name cache file name
+   */
+  void delete(String name);
+
+  /**
+   * Update the content of the specified cache file. Creates cache entry
+   * if necessary.
+   * 
+   */
+  void update(String name, long blockId, int blockOffset, byte[] buffer, int offset, int length);
+  
+  /**
+   * Fetch the specified cache file content.
+   *
+   * @return true if cached content found, otherwise return false
+   */
+  boolean fetch(String name, long blockId, int blockOffset, byte[] b, int off, int lengthToReadInBlock);
+
+  /**
+   * Number of entries in the cache.
+   */
+  long size();
+
+  /**
+   * Expert: Rename the specified file in the cache. Allows a file to be moved
+   * without invalidating the cache.
+   * 
+   * @param source original name
+   * @param dest final name
+   */
+  void renameCacheFile(String source, String dest);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-store/src/main/java/org/apache/blur/store/blockcache/CachedIndexOutput.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache/CachedIndexOutput.java b/blur-store/src/main/java/org/apache/blur/store/blockcache/CachedIndexOutput.java
new file mode 100644
index 0000000..8a5170e
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/blockcache/CachedIndexOutput.java
@@ -0,0 +1,88 @@
+package org.apache.blur.store.blockcache;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.blur.store.buffer.ReusedBufferedIndexOutput;
+import org.apache.lucene.store.IndexOutput;
+
+/*
+ * Cache the blocks as they are written. The cache file name is the name of
+ * the file until the file is closed, at which point the cache is updated
+ * to include the last modified date (which is unknown until that point).
+ */
+public class CachedIndexOutput extends ReusedBufferedIndexOutput {
+  private final BlockDirectory _directory;
+  private final IndexOutput _dest;
+  private final int _blockSize;
+  private final String _name;
+  private final String _location;
+  private final Cache _cache;
+
+  public CachedIndexOutput(BlockDirectory directory, IndexOutput dest, int blockSize, String name, Cache cache, int bufferSize) {
+    super(bufferSize);
+    _directory = directory;
+    _dest = dest;
+    _blockSize = blockSize;
+    _name = name;
+    _location = _directory.getFileCacheLocation(name);
+    _cache = cache;
+  }
+
+  @Override
+  public void flushInternal() throws IOException {
+    _dest.flush();
+  }
+
+  @Override
+  public void closeInternal() throws IOException {
+    _dest.close();
+    _cache.renameCacheFile(_location, _directory.getFileCacheName(_name));
+  }
+
+  @Override
+  public void seekInternal(long pos) throws IOException {
+    throw new IOException("Seek not supported");
+  }
+
+  private int writeBlock(long position, byte[] b, int offset, int length) throws IOException {
+    // read whole block into cache and then provide needed data
+    long blockId = BlockDirectory.getBlock(position);
+    int blockOffset = (int) BlockDirectory.getPosition(position);
+    int lengthToWriteInBlock = Math.min(length, _blockSize - blockOffset);
+
+    // write the file and copy into the cache
+    _dest.writeBytes(b, offset, lengthToWriteInBlock);
+    _cache.update(_location, blockId, blockOffset, b, offset, lengthToWriteInBlock);
+
+    return lengthToWriteInBlock;
+  }
+
+  @Override
+  public void writeInternal(byte[] b, int offset, int length) throws IOException {
+    long position = getBufferStart();
+    while (length > 0) {
+      int len = writeBlock(position, b, offset, length);
+      position += len;
+      length -= len;
+      offset += len;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-store/src/main/java/org/apache/blur/store/blockcache/LastModified.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache/LastModified.java b/blur-store/src/main/java/org/apache/blur/store/blockcache/LastModified.java
new file mode 100644
index 0000000..e64037d
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/blockcache/LastModified.java
@@ -0,0 +1,30 @@
+package org.apache.blur.store.blockcache;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+/**
+ * The block cache needs to know when the file was last modified for unique file
+ * naming. So when an index is removed and replaced the file cache is not
+ * mistakenly used.
+ */
+public interface LastModified {
+
+  long getFileModified(String name) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-store/src/main/java/org/apache/blur/store/buffer/BufferStore.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/buffer/BufferStore.java b/blur-store/src/main/java/org/apache/blur/store/buffer/BufferStore.java
new file mode 100644
index 0000000..2701726
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/buffer/BufferStore.java
@@ -0,0 +1,126 @@
+package org.apache.blur.store.buffer;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.apache.blur.metrics.MetricsConstants.INTERNAL_BUFFERS;
+import static org.apache.blur.metrics.MetricsConstants.LOST;
+import static org.apache.blur.metrics.MetricsConstants.LUCENE;
+import static org.apache.blur.metrics.MetricsConstants.ORG_APACHE_BLUR;
+import static org.apache.blur.metrics.MetricsConstants.OTHER_SIZES_ALLOCATED;
+import static org.apache.blur.metrics.MetricsConstants._1K_SIZE_ALLOCATED;
+import static org.apache.blur.metrics.MetricsConstants._8K_SIZE_ALLOCATED;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
+
+public class BufferStore {
+
+  private static final Log LOG = LogFactory.getLog(BufferStore.class);
+
+  private static BlockingQueue<byte[]> _1024;
+  private static BlockingQueue<byte[]> _8192;
+
+  private static Meter _lost;
+  private static Meter _8K;
+  private static Meter _1K;
+  private static Meter _other;
+  private volatile static boolean setup = false;
+
+  public static void init(int _1KSize, int _8KSize) {
+    if (!setup) {
+      _lost = Metrics.newMeter(new MetricName(ORG_APACHE_BLUR, LUCENE, LOST, INTERNAL_BUFFERS), INTERNAL_BUFFERS, TimeUnit.SECONDS);
+      _8K = Metrics.newMeter(new MetricName(ORG_APACHE_BLUR, LUCENE, _1K_SIZE_ALLOCATED, INTERNAL_BUFFERS), INTERNAL_BUFFERS, TimeUnit.SECONDS);
+      _1K = Metrics.newMeter(new MetricName(ORG_APACHE_BLUR, LUCENE, _8K_SIZE_ALLOCATED, INTERNAL_BUFFERS), INTERNAL_BUFFERS, TimeUnit.SECONDS);
+      _other = Metrics.newMeter(new MetricName(ORG_APACHE_BLUR, LUCENE, OTHER_SIZES_ALLOCATED, INTERNAL_BUFFERS), INTERNAL_BUFFERS, TimeUnit.SECONDS);
+      LOG.info("Initializing the 1024 buffers with [{0}] buffers.", _1KSize);
+      _1024 = setupBuffers(1024, _1KSize, _1K);
+      LOG.info("Initializing the 8192 buffers with [{0}] buffers.", _8KSize);
+      _8192 = setupBuffers(8192, _8KSize, _8K);
+      setup = true;
+    }
+  }
+
+  private static BlockingQueue<byte[]> setupBuffers(int bufferSize, int count, Meter meter) {
+    BlockingQueue<byte[]> queue = new ArrayBlockingQueue<byte[]>(count);
+    for (int i = 0; i < count; i++) {
+      meter.mark();
+      queue.add(new byte[bufferSize]);
+    }
+    return queue;
+  }
+
+  public static byte[] takeBuffer(int bufferSize) {
+    switch (bufferSize) {
+    case 1024:
+      return newBuffer1024(_1024.poll());
+    case 8192:
+      return newBuffer8192(_8192.poll());
+    default:
+      return newBuffer(bufferSize);
+    }
+  }
+
+  public static void putBuffer(byte[] buffer) {
+    if (buffer == null) {
+      return;
+    }
+    int bufferSize = buffer.length;
+    switch (bufferSize) {
+    case 1024:
+      checkReturn(_1024.offer(buffer));
+      return;
+    case 8192:
+      checkReturn(_8192.offer(buffer));
+      return;
+    }
+  }
+
+  private static void checkReturn(boolean offer) {
+    if (!offer) {
+      _lost.mark();
+    }
+  }
+
+  private static byte[] newBuffer1024(byte[] buf) {
+    if (buf != null) {
+      return buf;
+    }
+    _1K.mark();
+    return new byte[1024];
+  }
+
+  private static byte[] newBuffer8192(byte[] buf) {
+    if (buf != null) {
+      return buf;
+    }
+    _8K.mark();
+    return new byte[8192];
+  }
+
+  private static byte[] newBuffer(int size) {
+    _other.mark();
+    return new byte[size];
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-store/src/main/java/org/apache/blur/store/buffer/ReusedBufferedIndexInput.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/buffer/ReusedBufferedIndexInput.java b/blur-store/src/main/java/org/apache/blur/store/buffer/ReusedBufferedIndexInput.java
new file mode 100644
index 0000000..23c09d3
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/buffer/ReusedBufferedIndexInput.java
@@ -0,0 +1,361 @@
+package org.apache.blur.store.buffer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+
+/** Base implementation class for buffered {@link IndexInput}. */
+public abstract class ReusedBufferedIndexInput extends IndexInput {
+
+  // START Changed code from normal BufferedIndexInput
+  @Override
+  public final void close() throws IOException {
+    closeInternal();
+    BufferStore.putBuffer(buffer);
+    buffer = null;
+  }
+
+  protected abstract void closeInternal() throws IOException;
+
+  private void refill() throws IOException {
+    long start = bufferStart + bufferPosition;
+    long end = start + bufferSize;
+    if (end > length()) // don't read past EOF
+      end = length();
+    int newLength = (int) (end - start);
+    if (newLength <= 0)
+      throw new EOFException("read past EOF: " + this);
+
+    if (buffer == null) {
+      buffer = BufferStore.takeBuffer(bufferSize);
+      seekInternal(bufferStart);
+    }
+    readInternal(buffer, 0, newLength);
+    bufferLength = newLength;
+    bufferStart = start;
+    bufferPosition = 0;
+  }
+  
+  // The normal read buffer size defaults to 1024, but
+  // increasing this during merging seems to yield
+  // performance gains. However we don't want to increase
+  // it too much because there are quite a few
+  // BufferedIndexInputs created during merging. See
+  // LUCENE-888 for details.
+  /**
+   * A buffer size for merges set to 8192, 4k in Lucene
+   */
+  public static final int MERGE_BUFFER_SIZE = 8192;
+  
+  //END Changed code from normal BufferedIndexInput
+
+  /** Default buffer size set to 1024 */
+  public static final int BUFFER_SIZE = 1024;
+
+
+
+  private int bufferSize = BUFFER_SIZE;
+
+  protected byte[] buffer;
+
+  private long bufferStart = 0; // position in file of buffer
+  private int bufferLength = 0; // end of valid bytes
+  private int bufferPosition = 0; // next byte to read
+
+  @Override
+  public final byte readByte() throws IOException {
+    if (bufferPosition >= bufferLength)
+      refill();
+    return buffer[bufferPosition++];
+  }
+
+  public ReusedBufferedIndexInput(String resourceDesc) {
+    this(resourceDesc, BUFFER_SIZE);
+  }
+
+  public ReusedBufferedIndexInput(String resourceDesc, IOContext context) {
+    this(resourceDesc, bufferSize(context));
+  }
+
+  /** Inits BufferedIndexInput with a specific bufferSize */
+  public ReusedBufferedIndexInput(String resourceDesc, int bufferSize) {
+    super(resourceDesc);
+    checkBufferSize(bufferSize);
+    this.bufferSize = bufferSize;
+  }
+
+  /** Returns buffer size. @see #setBufferSize */
+  public final int getBufferSize() {
+    return bufferSize;
+  }
+
+  private void checkBufferSize(int bufferSize) {
+    if (bufferSize <= 0)
+      throw new IllegalArgumentException("bufferSize must be greater than 0 (got " + bufferSize + ")");
+  }
+
+  @Override
+  public final void readBytes(byte[] b, int offset, int len) throws IOException {
+    readBytes(b, offset, len, true);
+  }
+
+  @Override
+  public final void readBytes(byte[] b, int offset, int len, boolean useBuffer) throws IOException {
+
+    if (len <= (bufferLength - bufferPosition)) {
+      // the buffer contains enough data to satisfy this request
+      if (len > 0) // to allow b to be null if len is 0...
+        System.arraycopy(buffer, bufferPosition, b, offset, len);
+      bufferPosition += len;
+    } else {
+      // the buffer does not have enough data. First serve all we've got.
+      int available = bufferLength - bufferPosition;
+      if (available > 0) {
+        System.arraycopy(buffer, bufferPosition, b, offset, available);
+        offset += available;
+        len -= available;
+        bufferPosition += available;
+      }
+      // and now, read the remaining 'len' bytes:
+      if (useBuffer && len < bufferSize) {
+        // If the amount left to read is small enough, and
+        // we are allowed to use our buffer, do it in the usual
+        // buffered way: fill the buffer and copy from it:
+        refill();
+        if (bufferLength < len) {
+          // Throw an exception when refill() could not read len bytes:
+          System.arraycopy(buffer, 0, b, offset, bufferLength);
+          throw new EOFException("read past EOF: " + this);
+        } else {
+          System.arraycopy(buffer, 0, b, offset, len);
+          bufferPosition = len;
+        }
+      } else {
+        // The amount left to read is larger than the buffer
+        // or we've been asked to not use our buffer -
+        // there's no performance reason not to read it all
+        // at once. Note that unlike the previous code of
+        // this function, there is no need to do a seek
+        // here, because there's no need to reread what we
+        // had in the buffer.
+        long after = bufferStart + bufferPosition + len;
+        if (after > length())
+          throw new EOFException("read past EOF: " + this);
+        readInternal(b, offset, len);
+        bufferStart = after;
+        bufferPosition = 0;
+        bufferLength = 0; // trigger refill() on read
+      }
+    }
+  }
+
+  @Override
+  public final short readShort() throws IOException {
+    if (2 <= (bufferLength - bufferPosition)) {
+      return (short) (((buffer[bufferPosition++] & 0xFF) << 8) | (buffer[bufferPosition++] & 0xFF));
+    } else {
+      return super.readShort();
+    }
+  }
+
+  @Override
+  public final int readInt() throws IOException {
+    if (4 <= (bufferLength - bufferPosition)) {
+      return ((buffer[bufferPosition++] & 0xFF) << 24) | ((buffer[bufferPosition++] & 0xFF) << 16) | ((buffer[bufferPosition++] & 0xFF) << 8) | (buffer[bufferPosition++] & 0xFF);
+    } else {
+      return super.readInt();
+    }
+  }
+
+  @Override
+  public final long readLong() throws IOException {
+    if (8 <= (bufferLength - bufferPosition)) {
+      final int i1 = ((buffer[bufferPosition++] & 0xff) << 24) | ((buffer[bufferPosition++] & 0xff) << 16) | ((buffer[bufferPosition++] & 0xff) << 8)
+          | (buffer[bufferPosition++] & 0xff);
+      final int i2 = ((buffer[bufferPosition++] & 0xff) << 24) | ((buffer[bufferPosition++] & 0xff) << 16) | ((buffer[bufferPosition++] & 0xff) << 8)
+          | (buffer[bufferPosition++] & 0xff);
+      return (((long) i1) << 32) | (i2 & 0xFFFFFFFFL);
+    } else {
+      return super.readLong();
+    }
+  }
+
+  @Override
+  public final int readVInt() throws IOException {
+    if (5 <= (bufferLength - bufferPosition)) {
+      byte b = buffer[bufferPosition++];
+      if (b >= 0)
+        return b;
+      int i = b & 0x7F;
+      b = buffer[bufferPosition++];
+      i |= (b & 0x7F) << 7;
+      if (b >= 0)
+        return i;
+      b = buffer[bufferPosition++];
+      i |= (b & 0x7F) << 14;
+      if (b >= 0)
+        return i;
+      b = buffer[bufferPosition++];
+      i |= (b & 0x7F) << 21;
+      if (b >= 0)
+        return i;
+      b = buffer[bufferPosition++];
+      // Warning: the next ands use 0x0F / 0xF0 - beware copy/paste errors:
+      i |= (b & 0x0F) << 28;
+      if ((b & 0xF0) == 0)
+        return i;
+      throw new IOException("Invalid vInt detected (too many bits)");
+    } else {
+      return super.readVInt();
+    }
+  }
+
+  @Override
+  public final long readVLong() throws IOException {
+    if (9 <= bufferLength - bufferPosition) {
+      byte b = buffer[bufferPosition++];
+      if (b >= 0)
+        return b;
+      long i = b & 0x7FL;
+      b = buffer[bufferPosition++];
+      i |= (b & 0x7FL) << 7;
+      if (b >= 0)
+        return i;
+      b = buffer[bufferPosition++];
+      i |= (b & 0x7FL) << 14;
+      if (b >= 0)
+        return i;
+      b = buffer[bufferPosition++];
+      i |= (b & 0x7FL) << 21;
+      if (b >= 0)
+        return i;
+      b = buffer[bufferPosition++];
+      i |= (b & 0x7FL) << 28;
+      if (b >= 0)
+        return i;
+      b = buffer[bufferPosition++];
+      i |= (b & 0x7FL) << 35;
+      if (b >= 0)
+        return i;
+      b = buffer[bufferPosition++];
+      i |= (b & 0x7FL) << 42;
+      if (b >= 0)
+        return i;
+      b = buffer[bufferPosition++];
+      i |= (b & 0x7FL) << 49;
+      if (b >= 0)
+        return i;
+      b = buffer[bufferPosition++];
+      i |= (b & 0x7FL) << 56;
+      if (b >= 0)
+        return i;
+      throw new IOException("Invalid vLong detected (negative values disallowed)");
+    } else {
+      return super.readVLong();
+    }
+  }
+
+  /**
+   * Expert: implements buffer refill. Reads bytes from the current position in
+   * the input.
+   * 
+   * @param b
+   *          the array to read bytes into
+   * @param offset
+   *          the offset in the array to start storing bytes
+   * @param length
+   *          the number of bytes to read
+   */
+  protected abstract void readInternal(byte[] b, int offset, int length) throws IOException;
+
+  @Override
+  public final long getFilePointer() {
+    return bufferStart + bufferPosition;
+  }
+
+  @Override
+  public final void seek(long pos) throws IOException {
+    if (pos >= bufferStart && pos < (bufferStart + bufferLength))
+      bufferPosition = (int) (pos - bufferStart); // seek within buffer
+    else {
+      bufferStart = pos;
+      bufferPosition = 0;
+      bufferLength = 0; // trigger refill() on read()
+      seekInternal(pos);
+    }
+  }
+
+  /**
+   * Expert: implements seek. Sets current position in this file, where the next
+   * {@link #readInternal(byte[],int,int)} will occur.
+   * 
+   * @see #readInternal(byte[],int,int)
+   */
+  protected abstract void seekInternal(long pos) throws IOException;
+
+  @Override
+  public ReusedBufferedIndexInput clone() {
+    ReusedBufferedIndexInput clone = (ReusedBufferedIndexInput) super.clone();
+
+    clone.buffer = null;
+    clone.bufferLength = 0;
+    clone.bufferPosition = 0;
+    clone.bufferStart = getFilePointer();
+
+    return clone;
+  }
+
+  /**
+   * Flushes the in-memory bufer to the given output, copying at most
+   * <code>numBytes</code>.
+   * <p>
+   * <b>NOTE:</b> this method does not refill the buffer, however it does
+   * advance the buffer position.
+   * 
+   * @return the number of bytes actually flushed from the in-memory buffer.
+   */
+  protected final int flushBuffer(IndexOutput out, long numBytes) throws IOException {
+    int toCopy = bufferLength - bufferPosition;
+    if (toCopy > numBytes) {
+      toCopy = (int) numBytes;
+    }
+    if (toCopy > 0) {
+      out.writeBytes(buffer, bufferPosition, toCopy);
+      bufferPosition += toCopy;
+    }
+    return toCopy;
+  }
+
+  /**
+   * Returns default buffer sizes for the given {@link IOContext}
+   */
+  public static int bufferSize(IOContext context) {
+    switch (context.context) {
+    case MERGE:
+      return MERGE_BUFFER_SIZE;
+    default:
+      return BUFFER_SIZE;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-store/src/main/java/org/apache/blur/store/buffer/ReusedBufferedIndexOutput.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/buffer/ReusedBufferedIndexOutput.java b/blur-store/src/main/java/org/apache/blur/store/buffer/ReusedBufferedIndexOutput.java
new file mode 100644
index 0000000..c4576d5
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/buffer/ReusedBufferedIndexOutput.java
@@ -0,0 +1,189 @@
+package org.apache.blur.store.buffer;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.lucene.store.IndexOutput;
+
+public abstract class ReusedBufferedIndexOutput extends IndexOutput {
+
+  public static final int BUFFER_SIZE = 1024;
+
+  private int bufferSize = BUFFER_SIZE;
+  
+  protected byte[] buffer;
+
+  /** position in the file of buffer */
+  private long bufferStart = 0;
+  /** end of valid bytes */
+  private int bufferLength = 0;
+  /** next byte to write */
+  private int bufferPosition = 0;
+  /** total length of the file */
+  private long _fileLength = 0;
+  
+  public ReusedBufferedIndexOutput() {
+    this(BUFFER_SIZE);
+  }
+
+  public ReusedBufferedIndexOutput(int bufferSize) {
+    checkBufferSize(bufferSize);
+    this.bufferSize = bufferSize;
+    buffer = BufferStore.takeBuffer(this.bufferSize);
+  }
+
+  protected long getBufferStart() {
+    return bufferStart;
+  }
+
+  private void checkBufferSize(int bufferSize) {
+    if (bufferSize <= 0)
+      throw new IllegalArgumentException("bufferSize must be greater than 0 (got " + bufferSize + ")");
+  }
+
+  /** Write the buffered bytes to cache */
+  private void flushBufferToCache() throws IOException {
+    writeInternal(buffer, 0, bufferLength);
+
+    bufferStart += bufferLength;
+    bufferLength = 0;
+    bufferPosition = 0;
+  }
+
+  protected abstract void flushInternal() throws IOException;
+
+  @Override
+  public void flush() throws IOException {
+    flushBufferToCache();
+    flushInternal();
+  }
+
+  protected abstract void closeInternal() throws IOException;
+
+  @Override
+  public void close() throws IOException {
+    flushBufferToCache();
+    closeInternal();
+    BufferStore.putBuffer(buffer);
+    buffer = null;
+  }
+
+  @Override
+  public long getFilePointer() {
+    return bufferStart + bufferPosition;
+  }
+
+  protected abstract void seekInternal(long pos) throws IOException;
+
+  @Override
+  public void seek(long pos) throws IOException {
+    if (pos > _fileLength) {
+      _fileLength = pos;
+    }
+
+    if (pos >= bufferStart && pos < (bufferStart + bufferLength))
+      bufferPosition = (int)(pos - bufferStart);  // seek within buffer
+    else {
+      flushBufferToCache();
+      bufferStart = pos;
+      bufferPosition = 0;
+      bufferLength = 0;
+      seekInternal(pos);
+    }
+  }
+
+  @Override
+  public long length() throws IOException {
+    return _fileLength;
+  }
+
+  @Override
+  public void writeByte(byte b) throws IOException {
+    if (bufferPosition >= bufferSize) {
+      flushBufferToCache();
+    }
+    if (getFilePointer() >= _fileLength) {
+      _fileLength++;
+    }
+    buffer[bufferPosition++] = b;
+    if (bufferPosition > bufferLength) {
+      bufferLength = bufferPosition;
+    }
+  }
+
+  /** Expert: implements buffer flushing to cache. Writes bytes to the current
+   * position in the output.
+   * @param b the array of bytes to write
+   * @param offset the offset in the array of bytes to write
+   * @param length the number of bytes to write
+   */
+  protected abstract void writeInternal(byte[] b, int offset, int length) throws IOException;
+
+  @Override
+  public void writeBytes(byte[] b, int offset, int length) throws IOException {
+    if (getFilePointer() + length > _fileLength) {
+      _fileLength = getFilePointer() + length;
+    }
+    if(length <= bufferSize - bufferPosition){
+      // the buffer contains enough space to satisfy this request
+      if(length > 0) { // to allow b to be null if len is 0...
+        System.arraycopy(b, offset, buffer, bufferPosition, length);
+      }
+      bufferPosition += length;
+      if (bufferPosition > bufferLength) {
+        bufferLength = bufferPosition;
+      }
+    } else {
+      // the buffer does not have enough space. First buffer all we've got.
+      int available = bufferSize - bufferPosition;
+      if(available > 0){
+        System.arraycopy(b, offset, buffer, bufferPosition, available);
+        offset += available;
+        length -= available;
+        bufferPosition = bufferSize;
+        bufferLength = bufferSize;
+      }
+
+      flushBufferToCache();
+
+      // and now, write the remaining 'length' bytes:
+      if (length < bufferSize){
+        // If the amount left to write is small enough do it in the usual
+        // buffered way:
+        System.arraycopy(b, offset, buffer, 0, length);
+        bufferPosition = length;
+        bufferLength = length;
+      } else {
+        // The amount left to write is larger than the buffer
+        // there's no performance reason not to write it all
+        // at once.
+        writeInternal(b, offset, length);
+        bufferStart += length;
+        bufferPosition = 0;
+        bufferLength = 0;
+      }
+
+    }
+  }
+
+  @Override
+  protected Object clone() throws CloneNotSupportedException {
+    throw new CloneNotSupportedException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-store/src/main/java/org/apache/blur/store/hdfs/BlurLockFactory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/BlurLockFactory.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/BlurLockFactory.java
new file mode 100644
index 0000000..9f3bf00
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/BlurLockFactory.java
@@ -0,0 +1,110 @@
+package org.apache.blur.store.hdfs;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.store.Lock;
+import org.apache.lucene.store.LockFactory;
+
+public class BlurLockFactory extends LockFactory {
+  
+  private static final Log LOG = LogFactory.getLog(BlurLockFactory.class);
+
+  private final Configuration _configuration;
+  private final FileSystem _fileSystem;
+  private final String _baseLockKey;
+  private byte[] _lockKey;
+  private final Path _dir;
+
+  public BlurLockFactory(Configuration configuration, Path dir, String host, int pid) throws IOException {
+    _configuration = configuration;
+    _dir = dir;
+    _fileSystem = _dir.getFileSystem(_configuration);
+    _baseLockKey = host + "/" + pid;
+  }
+
+  @Override
+  public Lock makeLock(String lockName) {
+    final Path lockPath = new Path(_dir, lockName);
+    return new Lock() {
+      private boolean _set;
+
+      @Override
+      public boolean obtain() throws IOException {
+        if (_set) {
+          throw new IOException("Lock for [" + _baseLockKey + "] can only be set once.");
+        }
+        try {
+          _lockKey = (_baseLockKey + "/" + System.currentTimeMillis()).getBytes();
+          FSDataOutputStream outputStream = _fileSystem.create(lockPath, true);
+          outputStream.write(_lockKey);
+          outputStream.close();
+        } finally {
+          _set = true;
+        }
+        return true;
+      }
+
+      @Override
+      public void release() throws IOException {
+        _fileSystem.delete(lockPath, false);
+      }
+
+      @Override
+      public boolean isLocked() throws IOException {
+        if (!_set) {
+          LOG.info("The lock has NOT been set.");
+          return false;
+        }
+        if (!_fileSystem.exists(lockPath)) {
+          LOG.info("The lock file has been removed.");
+          return false;
+        }
+        FileStatus fileStatus = _fileSystem.getFileStatus(lockPath);
+        long len = fileStatus.getLen();
+        if (len != _lockKey.length) {
+          LOG.info("The lock file length has changed.");
+          return false;
+        }
+        byte[] buf = new byte[_lockKey.length];
+        FSDataInputStream inputStream = _fileSystem.open(lockPath);
+        inputStream.readFully(buf);
+        inputStream.close();
+        if (Arrays.equals(_lockKey, buf)) {
+          return true;
+        }
+        LOG.info("The lock information has been changed.");
+        return false;
+      }
+    };
+  }
+
+  @Override
+  public void clearLock(String lockName) throws IOException {
+    _fileSystem.delete(new Path(_dir, lockName), false);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-store/src/main/java/org/apache/blur/store/hdfs/DirectoryDecorator.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/DirectoryDecorator.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/DirectoryDecorator.java
new file mode 100644
index 0000000..e1ab445
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/DirectoryDecorator.java
@@ -0,0 +1,9 @@
+package org.apache.blur.store.hdfs;
+
+import org.apache.lucene.store.Directory;
+
+public interface DirectoryDecorator {
+  
+  Directory getOriginalDirectory();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
new file mode 100644
index 0000000..050ba6d
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
@@ -0,0 +1,345 @@
+package org.apache.blur.store.hdfs;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.apache.blur.metrics.MetricsConstants.HDFS;
+import static org.apache.blur.metrics.MetricsConstants.ORG_APACHE_BLUR;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Map;
+import java.util.WeakHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.store.buffer.ReusedBufferedIndexInput;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.lucene.store.BufferedIndexOutput;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.NoLockFactory;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
+
+public class HdfsDirectory extends Directory {
+
+  private static final Log LOG = LogFactory.getLog(HdfsDirectory.class);
+
+  private static AtomicLong deleteCounter = new AtomicLong();
+  private static AtomicLong existsCounter = new AtomicLong();
+  private static AtomicLong fileStatusCounter = new AtomicLong();
+  private static AtomicLong renameCounter = new AtomicLong();
+  private static AtomicLong listCounter = new AtomicLong();
+  private static AtomicLong createCounter = new AtomicLong();
+  private static AtomicLong isFileCounter = new AtomicLong();
+
+  private static final boolean debug = false;
+
+  static {
+    if (debug) {
+      Thread thread = new Thread(new Runnable() {
+        @Override
+        public void run() {
+          while (true) {
+            LOG.debug("Delete Counter [" + deleteCounter + "]");
+            LOG.debug("Exists Counter [" + existsCounter + "]");
+            LOG.debug("File Status Counter [" + fileStatusCounter + "]");
+            LOG.debug("Rename Counter [" + renameCounter + "]");
+            LOG.debug("List Counter [" + listCounter + "]");
+            LOG.debug("Create Counter [" + createCounter + "]");
+            LOG.debug("IsFile Counter [" + isFileCounter + "]");
+            try {
+              Thread.sleep(5000);
+            } catch (InterruptedException e) {
+              return;
+            }
+          }
+        }
+      });
+      thread.setName("HDFS dir counter logger");
+      thread.setDaemon(true);
+      thread.start();
+    }
+  }
+
+  private final Path path;
+  private final FileSystem fileSystem;
+  private final MetricsGroup metricsGroup;
+
+  static class MetricsGroup {
+    final Histogram readAccess;
+    final Histogram writeAccess;
+    final Meter writeThroughput;
+    final Meter readThroughput;
+
+    MetricsGroup(Histogram readAccess, Histogram writeAccess, Meter readThroughput, Meter writeThroughput) {
+      this.readAccess = readAccess;
+      this.writeAccess = writeAccess;
+      this.readThroughput = readThroughput;
+      this.writeThroughput = writeThroughput;
+    }
+  }
+
+  /**
+   * We keep the metrics separate per filesystem.
+   */
+  private static Map<URI, MetricsGroup> metricsGroupMap = new WeakHashMap<URI, MetricsGroup>();
+
+  public HdfsDirectory(Configuration configuration, Path path) throws IOException {
+    this.path = path;
+    fileSystem = path.getFileSystem(configuration);
+    setLockFactory(NoLockFactory.getNoLockFactory());
+    synchronized (metricsGroupMap) {
+      URI uri = fileSystem.getUri();
+      MetricsGroup metricsGroup = metricsGroupMap.get(uri);
+      if (metricsGroup == null) {
+        String scope = uri.toString();
+
+        Histogram readAccess = Metrics.newHistogram(new MetricName(ORG_APACHE_BLUR, HDFS, "Read Latency in \u00B5s",
+            scope));
+        Histogram writeAccess = Metrics.newHistogram(new MetricName(ORG_APACHE_BLUR, HDFS, "Write Latency in \u00B5s",
+            scope));
+        Meter readThroughput = Metrics.newMeter(new MetricName(ORG_APACHE_BLUR, HDFS, "Read Throughput", scope),
+            "Read Bytes", TimeUnit.SECONDS);
+        Meter writeThroughput = Metrics.newMeter(new MetricName(ORG_APACHE_BLUR, HDFS, "Write Throughput", scope),
+            "Write Bytes", TimeUnit.SECONDS);
+        metricsGroup = new MetricsGroup(readAccess, writeAccess, readThroughput, writeThroughput);
+        metricsGroupMap.put(uri, metricsGroup);
+      }
+      this.metricsGroup = metricsGroup;
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "HdfsDirectory path=[" + path + "]";
+  }
+
+  public static class HdfsIndexInput extends ReusedBufferedIndexInput {
+
+    private final long len;
+    private FSDataInputStream inputStream;
+    private boolean isClone;
+    private final MetricsGroup metricsGroup;
+
+    public HdfsIndexInput(FileSystem fileSystem, Path filePath, MetricsGroup metricsGroup) throws IOException {
+      super(filePath.toString());
+      inputStream = fileSystem.open(filePath);
+      FileStatus fileStatus = fileSystem.getFileStatus(filePath);
+      len = fileStatus.getLen();
+      this.metricsGroup = metricsGroup;
+    }
+
+    @Override
+    public long length() {
+      return len;
+    }
+
+    @Override
+    protected void seekInternal(long pos) throws IOException {
+
+    }
+
+    @Override
+    protected void readInternal(byte[] b, int offset, int length) throws IOException {
+      synchronized (inputStream) {
+        long start = System.nanoTime();
+        inputStream.seek(getFilePointer());
+        inputStream.readFully(b, offset, length);
+        long end = System.nanoTime();
+        metricsGroup.readAccess.update((end - start) / 1000);
+        metricsGroup.readThroughput.mark(length);
+      }
+    }
+
+    @Override
+    protected void closeInternal() throws IOException {
+      if (!isClone) {
+        inputStream.close();
+      }
+    }
+
+    @Override
+    public ReusedBufferedIndexInput clone() {
+      HdfsIndexInput clone = (HdfsIndexInput) super.clone();
+      clone.isClone = true;
+      return clone;
+    }
+  }
+
+  @Override
+  public IndexOutput createOutput(String name, IOContext context) throws IOException {
+    if (fileExists(name)) {
+      throw new IOException("File [" + name + "] already exists found.");
+    }
+    final FSDataOutputStream outputStream = fileSystem.create(getPath(name));
+    createCounter.incrementAndGet();
+    return new BufferedIndexOutput() {
+
+      @Override
+      public long length() throws IOException {
+        return outputStream.getPos();
+      }
+
+      @Override
+      protected void flushBuffer(byte[] b, int offset, int len) throws IOException {
+        long start = System.nanoTime();
+        outputStream.write(b, offset, len);
+        long end = System.nanoTime();
+        metricsGroup.writeAccess.update((end - start) / 1000);
+        metricsGroup.writeThroughput.mark(len);
+      }
+
+      @Override
+      public void close() throws IOException {
+        super.close();
+        outputStream.close();
+      }
+
+      @Override
+      public void seek(long pos) throws IOException {
+        throw new IOException("seeks not allowed on IndexOutputs.");
+      }
+    };
+  }
+
+  @Override
+  public IndexInput openInput(String name, IOContext context) throws IOException {
+    if (!fileExists(name)) {
+      throw new FileNotFoundException("File [" + name + "] not found.");
+    }
+    Path filePath = getPath(name);
+    return new HdfsIndexInput(fileSystem, filePath, metricsGroup);
+  }
+
+  @Override
+  public String[] listAll() throws IOException {
+    listCounter.incrementAndGet();
+    FileStatus[] files = fileSystem.listStatus(path, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        try {
+          isFileCounter.incrementAndGet();
+          return fileSystem.isFile(path);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    });
+    String[] result = new String[files.length];
+    for (int i = 0; i < result.length; i++) {
+      result[i] = files[i].getPath().getName();
+    }
+    return result;
+  }
+
+  @Override
+  public boolean fileExists(String name) throws IOException {
+    existsCounter.incrementAndGet();
+    return fileSystem.exists(getPath(name));
+  }
+
+  @Override
+  public void deleteFile(String name) throws IOException {
+    if (fileExists(name)) {
+      deleteCounter.incrementAndGet();
+      fileSystem.delete(getPath(name), true);
+    } else {
+      throw new FileNotFoundException("File [" + name + "] not found");
+    }
+  }
+
+  @Override
+  public long fileLength(String name) throws IOException {
+    fileStatusCounter.incrementAndGet();
+    FileStatus fileStatus = fileSystem.getFileStatus(getPath(name));
+    return fileStatus.getLen();
+  }
+
+  @Override
+  public void sync(Collection<String> names) throws IOException {
+
+  }
+
+  @Override
+  public void close() throws IOException {
+
+  }
+  
+  public Path getPath() {
+    return path;
+  }
+
+  private Path getPath(String name) {
+    return new Path(path, name);
+  }
+
+  public long getFileModified(String name) throws IOException {
+    if (!fileExists(name)) {
+      throw new FileNotFoundException("File [" + name + "] not found");
+    }
+    Path file = getPath(name);
+    fileStatusCounter.incrementAndGet();
+    return fileSystem.getFileStatus(file).getModificationTime();
+  }
+
+  @Override
+  public void copy(Directory to, String src, String dest, IOContext context) throws IOException {
+    if (to instanceof DirectoryDecorator) {
+      copy(((DirectoryDecorator) to).getOriginalDirectory(), src, dest, context);
+    } else if (to instanceof HdfsDirectory) {
+      if (quickMove(to, src, dest, context)) {
+        return;
+      }
+    } else {
+      super.copy(to, src, dest, context);
+    }
+  }
+
+  private boolean quickMove(Directory to, String src, String dest, IOContext context) throws IOException {
+    HdfsDirectory simpleTo = (HdfsDirectory) to;
+    if (ifSameCluster(simpleTo, this)) {
+      Path newDest = simpleTo.getPath(dest);
+      Path oldSrc = getPath(src);
+      renameCounter.incrementAndGet();
+      return fileSystem.rename(oldSrc, newDest);
+    }
+    return false;
+  }
+
+  private boolean ifSameCluster(HdfsDirectory dest, HdfsDirectory src) {
+    // @TODO
+    return true;
+  }
+
+}


Mime
View raw message