incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [26/28] Initial commit of the back port. The blur-util, blur-store, have been completed. Also a new distribution project help with the building of the project. Also all of the pom files have been updated to the new version. This is very much a work i
Date Mon, 18 Mar 2013 01:10:30 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/a4601422/src/blur-store/src/main/java/org/apache/blur/store/CustomBufferedIndexInput.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/CustomBufferedIndexInput.java b/src/blur-store/src/main/java/org/apache/blur/store/CustomBufferedIndexInput.java
deleted file mode 100644
index b206ec9..0000000
--- a/src/blur-store/src/main/java/org/apache/blur/store/CustomBufferedIndexInput.java
+++ /dev/null
@@ -1,277 +0,0 @@
-package org.apache.blur.store;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import java.io.IOException;
-
-import org.apache.lucene.store.IndexInput;
-import org.apache.lucene.store.IndexOutput;
-
-public abstract class CustomBufferedIndexInput extends IndexInput {
-
-  public static final int BUFFER_SIZE = 1024;
-
-  private int bufferSize = BUFFER_SIZE;
-
-  protected byte[] buffer;
-
-  private long bufferStart = 0; // position in file of buffer
-  private int bufferLength = 0; // end of valid bytes
-  private int bufferPosition = 0; // next byte to read
-
-  @Override
-  public byte readByte() throws IOException {
-    if (bufferPosition >= bufferLength)
-      refill();
-    return buffer[bufferPosition++];
-  }
-
-  public CustomBufferedIndexInput(String resourceDesc) {
-    this(resourceDesc, BUFFER_SIZE);
-  }
-
-  public CustomBufferedIndexInput(String resourceDesc, int bufferSize) {
-    super(resourceDesc);
-    checkBufferSize(bufferSize);
-    this.bufferSize = bufferSize;
-  }
-
-  private void checkBufferSize(int bufferSize) {
-    if (bufferSize <= 0)
-      throw new IllegalArgumentException("bufferSize must be greater than 0 (got " + bufferSize + ")");
-  }
-
-  @Override
-  public void readBytes(byte[] b, int offset, int len) throws IOException {
-    readBytes(b, offset, len, true);
-  }
-
-  @Override
-  public void readBytes(byte[] b, int offset, int len, boolean useBuffer) throws IOException {
-
-    if (len <= (bufferLength - bufferPosition)) {
-      // the buffer contains enough data to satisfy this request
-      if (len > 0) // to allow b to be null if len is 0...
-        System.arraycopy(buffer, bufferPosition, b, offset, len);
-      bufferPosition += len;
-    } else {
-      // the buffer does not have enough data. First serve all we've got.
-      int available = bufferLength - bufferPosition;
-      if (available > 0) {
-        System.arraycopy(buffer, bufferPosition, b, offset, available);
-        offset += available;
-        len -= available;
-        bufferPosition += available;
-      }
-      // and now, read the remaining 'len' bytes:
-      if (useBuffer && len < bufferSize) {
-        // If the amount left to read is small enough, and
-        // we are allowed to use our buffer, do it in the usual
-        // buffered way: fill the buffer and copy from it:
-        refill();
-        if (bufferLength < len) {
-          // Throw an exception when refill() could not read len bytes:
-          System.arraycopy(buffer, 0, b, offset, bufferLength);
-          throw new IOException("read past EOF");
-        } else {
-          System.arraycopy(buffer, 0, b, offset, len);
-          bufferPosition = len;
-        }
-      } else {
-        // The amount left to read is larger than the buffer
-        // or we've been asked to not use our buffer -
-        // there's no performance reason not to read it all
-        // at once. Note that unlike the previous code of
-        // this function, there is no need to do a seek
-        // here, because there's no need to reread what we
-        // had in the buffer.
-        long after = bufferStart + bufferPosition + len;
-        if (after > length())
-          throw new IOException("read past EOF");
-        readInternal(b, offset, len);
-        bufferStart = after;
-        bufferPosition = 0;
-        bufferLength = 0; // trigger refill() on read
-      }
-    }
-  }
-
-  @Override
-  public int readInt() throws IOException {
-    if (4 <= (bufferLength - bufferPosition)) {
-      return ((buffer[bufferPosition++] & 0xFF) << 24) | ((buffer[bufferPosition++] & 0xFF) << 16) | ((buffer[bufferPosition++] & 0xFF) << 8) | (buffer[bufferPosition++] & 0xFF);
-    } else {
-      return super.readInt();
-    }
-  }
-
-  @Override
-  public long readLong() throws IOException {
-    if (8 <= (bufferLength - bufferPosition)) {
-      final int i1 = ((buffer[bufferPosition++] & 0xff) << 24) | ((buffer[bufferPosition++] & 0xff) << 16) | ((buffer[bufferPosition++] & 0xff) << 8)
-          | (buffer[bufferPosition++] & 0xff);
-      final int i2 = ((buffer[bufferPosition++] & 0xff) << 24) | ((buffer[bufferPosition++] & 0xff) << 16) | ((buffer[bufferPosition++] & 0xff) << 8)
-          | (buffer[bufferPosition++] & 0xff);
-      return (((long) i1) << 32) | (i2 & 0xFFFFFFFFL);
-    } else {
-      return super.readLong();
-    }
-  }
-
-  @Override
-  public int readVInt() throws IOException {
-    if (5 <= (bufferLength - bufferPosition)) {
-      byte b = buffer[bufferPosition++];
-      int i = b & 0x7F;
-      for (int shift = 7; (b & 0x80) != 0; shift += 7) {
-        b = buffer[bufferPosition++];
-        i |= (b & 0x7F) << shift;
-      }
-      return i;
-    } else {
-      return super.readVInt();
-    }
-  }
-
-  @Override
-  public long readVLong() throws IOException {
-    if (9 <= bufferLength - bufferPosition) {
-      byte b = buffer[bufferPosition++];
-      long i = b & 0x7F;
-      for (int shift = 7; (b & 0x80) != 0; shift += 7) {
-        b = buffer[bufferPosition++];
-        i |= (b & 0x7FL) << shift;
-      }
-      return i;
-    } else {
-      return super.readVLong();
-    }
-  }
-
-  private void refill() throws IOException {
-    long start = bufferStart + bufferPosition;
-    long end = start + bufferSize;
-    if (end > length()) // don't read past EOF
-      end = length();
-    int newLength = (int) (end - start);
-    if (newLength <= 0)
-      throw new IOException("read past EOF");
-
-    if (buffer == null) {
-      buffer = BufferStore.takeBuffer(bufferSize);
-      seekInternal(bufferStart);
-    }
-    readInternal(buffer, 0, newLength);
-    bufferLength = newLength;
-    bufferStart = start;
-    bufferPosition = 0;
-  }
-
-  @Override
-  public final void close() throws IOException {
-    closeInternal();
-    BufferStore.putBuffer(buffer);
-    buffer = null;
-  }
-
-  protected abstract void closeInternal() throws IOException;
-
-  /**
-   * Expert: implements buffer refill. Reads bytes from the current position in
-   * the input.
-   * 
-   * @param b
-   *          the array to read bytes into
-   * @param offset
-   *          the offset in the array to start storing bytes
-   * @param length
-   *          the number of bytes to read
-   */
-  protected abstract void readInternal(byte[] b, int offset, int length) throws IOException;
-
-  @Override
-  public long getFilePointer() {
-    return bufferStart + bufferPosition;
-  }
-
-  @Override
-  public void seek(long pos) throws IOException {
-    if (pos >= bufferStart && pos < (bufferStart + bufferLength))
-      bufferPosition = (int) (pos - bufferStart); // seek within buffer
-    else {
-      bufferStart = pos;
-      bufferPosition = 0;
-      bufferLength = 0; // trigger refill() on read()
-      seekInternal(pos);
-    }
-  }
-
-  /**
-   * Expert: implements seek. Sets current position in this file, where the next
-   * {@link #readInternal(byte[],int,int)} will occur.
-   * 
-   * @see #readInternal(byte[],int,int)
-   */
-  protected abstract void seekInternal(long pos) throws IOException;
-
-  @Override
-  public Object clone() {
-    CustomBufferedIndexInput clone = (CustomBufferedIndexInput) super.clone();
-
-    clone.buffer = null;
-    clone.bufferLength = 0;
-    clone.bufferPosition = 0;
-    clone.bufferStart = getFilePointer();
-
-    return clone;
-  }
-
-  /**
-   * Flushes the in-memory bufer to the given output, copying at most
-   * <code>numBytes</code>.
-   * <p>
-   * <b>NOTE:</b> this method does not refill the buffer, however it does
-   * advance the buffer position.
-   * 
-   * @return the number of bytes actually flushed from the in-memory buffer.
-   */
-  protected int flushBuffer(IndexOutput out, long numBytes) throws IOException {
-    int toCopy = bufferLength - bufferPosition;
-    if (toCopy > numBytes) {
-      toCopy = (int) numBytes;
-    }
-    if (toCopy > 0) {
-      out.writeBytes(buffer, bufferPosition, toCopy);
-      bufferPosition += toCopy;
-    }
-    return toCopy;
-  }
-
-  @Override
-  public void copyBytes(IndexOutput out, long numBytes) throws IOException {
-    assert numBytes >= 0 : "numBytes=" + numBytes;
-
-    while (numBytes > 0) {
-      if (bufferLength == bufferPosition) {
-        refill();
-      }
-      numBytes -= flushBuffer(out, numBytes);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/a4601422/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockCache.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockCache.java b/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockCache.java
index 02c9dbf..b2947b9 100644
--- a/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockCache.java
+++ b/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockCache.java
@@ -16,39 +16,38 @@ package org.apache.blur.store.blockcache;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+import static org.apache.blur.metrics.MetricsConstants.*;
+
 import java.nio.ByteBuffer;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.blur.metrics.BlurMetrics;
-
 import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
 import com.googlecode.concurrentlinkedhashmap.EvictionListener;
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
 
 public class BlockCache {
 
   public static final int _128M = 134217728;
-  public static final int _32K = 32768;
+  public static final int _8K = 8192;
   private final ConcurrentMap<BlockCacheKey, BlockCacheLocation> _cache;
   private final ByteBuffer[] _slabs;
   private final BlockLocks[] _locks;
   private final AtomicInteger[] _lockCounters;
-  private final int _blockSize;
+  private final int _blockSize = _8K;
   private final int _numberOfBlocksPerSlab;
   private final int _maxEntries;
-  private final BlurMetrics _metrics;
-
-  public BlockCache(BlurMetrics metrics, boolean directAllocation, long totalMemory) {
-    this(metrics, directAllocation, totalMemory, _128M);
-  }
+  private Meter evictions;
 
-  public BlockCache(BlurMetrics metrics, boolean directAllocation, long totalMemory, int slabSize) {
-    this(metrics, directAllocation, totalMemory, slabSize, _32K);
+  public BlockCache(boolean directAllocation, long totalMemory) {
+    this(directAllocation, totalMemory, _128M);
   }
 
-  public BlockCache(BlurMetrics metrics, boolean directAllocation, long totalMemory, int slabSize, int blockSize) {
-    _metrics = metrics;
-    _numberOfBlocksPerSlab = slabSize / blockSize;
+  public BlockCache(boolean directAllocation, long totalMemory, int slabSize) {
+    _numberOfBlocksPerSlab = slabSize / _blockSize;
     int numberOfSlabs = (int) (totalMemory / slabSize);
 
     _slabs = new ByteBuffer[numberOfSlabs];
@@ -57,22 +56,24 @@ public class BlockCache {
     _maxEntries = (_numberOfBlocksPerSlab * numberOfSlabs) - 1;
     for (int i = 0; i < numberOfSlabs; i++) {
       if (directAllocation) {
-        _slabs[i] = ByteBuffer.allocateDirect(_numberOfBlocksPerSlab * blockSize);
+        _slabs[i] = ByteBuffer.allocateDirect(_numberOfBlocksPerSlab * _blockSize);
       } else {
-        _slabs[i] = ByteBuffer.allocate(_numberOfBlocksPerSlab * blockSize);
+        _slabs[i] = ByteBuffer.allocate(_numberOfBlocksPerSlab * _blockSize);
       }
       _locks[i] = new BlockLocks(_numberOfBlocksPerSlab);
       _lockCounters[i] = new AtomicInteger();
     }
+    
+    evictions = Metrics.newMeter(new MetricName(ORG_APACHE_BLUR, CACHE, EVICTION), EVICTION, TimeUnit.SECONDS);
 
     EvictionListener<BlockCacheKey, BlockCacheLocation> listener = new EvictionListener<BlockCacheKey, BlockCacheLocation>() {
       @Override
       public void onEviction(BlockCacheKey key, BlockCacheLocation location) {
         releaseLocation(location);
+        evictions.mark();
       }
     };
     _cache = new ConcurrentLinkedHashMap.Builder<BlockCacheKey, BlockCacheLocation>().maximumWeightedCapacity(_maxEntries).listener(listener).build();
-    _blockSize = blockSize;
   }
 
   private void releaseLocation(BlockCacheLocation location) {
@@ -84,12 +85,13 @@ public class BlockCache {
     location.setRemoved(true);
     _locks[slabId].clear(block);
     _lockCounters[slabId].decrementAndGet();
-    _metrics.blockCacheEviction.incrementAndGet();
-    _metrics.blockCacheSize.decrementAndGet();
   }
 
-  public boolean store(BlockCacheKey blockCacheKey, byte[] data) {
-    checkLength(data);
+  public boolean store(BlockCacheKey blockCacheKey, int blockOffset, byte[] data, int offset, int length) {
+    if (length + blockOffset > _blockSize) {
+      throw new RuntimeException("Buffer size exceeded, expecting max ["
+          + _blockSize + "] got length [" + length + "] with blockOffset [" + blockOffset + "]" );
+    }
     BlockCacheLocation location = _cache.get(blockCacheKey);
     boolean newLocation = false;
     if (location == null) {
@@ -103,13 +105,12 @@ public class BlockCache {
       return false;
     }
     int slabId = location.getSlabId();
-    int offset = location.getBlock() * _blockSize;
+    int slabOffset = location.getBlock() * _blockSize;
     ByteBuffer slab = getSlab(slabId);
-    slab.position(offset);
-    slab.put(data, 0, _blockSize);
+    slab.position(slabOffset + blockOffset);
+    slab.put(data, offset, length);
     if (newLocation) {
       releaseLocation(_cache.put(blockCacheKey.clone(), location));
-      _metrics.blockCacheSize.incrementAndGet();
     }
     return true;
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/a4601422/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectory.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectory.java b/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectory.java
index fed1039..8146830 100644
--- a/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectory.java
+++ b/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectory.java
@@ -16,19 +16,23 @@ package org.apache.blur.store.blockcache;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Set;
 
-import org.apache.blur.store.BufferStore;
-import org.apache.blur.store.CustomBufferedIndexInput;
+import org.apache.blur.store.buffer.BufferStore;
+import org.apache.blur.store.buffer.ReusedBufferedIndexInput;
+import org.apache.blur.store.hdfs.HdfsDirectory;
 import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.IndexInput;
 import org.apache.lucene.store.IndexOutput;
 import org.apache.lucene.store.Lock;
 import org.apache.lucene.store.LockFactory;
 
-
 public class BlockDirectory extends Directory {
 
   public static final long BLOCK_SHIFT = 13; // 2^13 = 8,192 bytes per block
@@ -50,8 +54,7 @@ public class BlockDirectory extends Directory {
   public static Cache NO_CACHE = new Cache() {
 
     @Override
-    public void update(String name, long blockId, byte[] buffer) {
-
+    public void update(String name, long blockId, int blockOffset, byte[] buffer, int offset, int length) {
     }
 
     @Override
@@ -68,6 +71,10 @@ public class BlockDirectory extends Directory {
     public long size() {
       return 0;
     }
+
+    @Override
+    public void renameCacheFile(String source, String dest) {
+    }
   };
 
   private Directory _directory;
@@ -97,14 +104,6 @@ public class BlockDirectory extends Directory {
     setLockFactory(directory.getLockFactory());
   }
 
-  public IndexInput openInput(String name, int bufferSize) throws IOException {
-    final IndexInput source = _directory.openInput(name, _blockSize);
-    if (_blockCacheFileTypes == null || isCachableFile(name)) {
-      return new CachedIndexInput(source, _blockSize, name, getFileCacheName(name), _cache, bufferSize);
-    }
-    return source;
-  }
-
   private boolean isCachableFile(String name) {
     for (String ext : _blockCacheFileTypes) {
       if (name.endsWith(ext)) {
@@ -115,11 +114,15 @@ public class BlockDirectory extends Directory {
   }
 
   @Override
-  public IndexInput openInput(final String name) throws IOException {
-    return openInput(name, _blockSize);
+  public IndexInput openInput(final String name, IOContext context) throws IOException {
+    final IndexInput source = _directory.openInput(name, context);
+    if (_blockCacheFileTypes == null || isCachableFile(name)) {
+      return new CachedIndexInput(source, _blockSize, name, getFileCacheName(name), _cache, context);
+    }
+    return source;
   }
 
-  static class CachedIndexInput extends CustomBufferedIndexInput {
+  static class CachedIndexInput extends ReusedBufferedIndexInput {
 
     private IndexInput _source;
     private int _blockSize;
@@ -127,8 +130,8 @@ public class BlockDirectory extends Directory {
     private String _cacheName;
     private Cache _cache;
 
-    public CachedIndexInput(IndexInput source, int blockSize, String name, String cacheName, Cache cache, int bufferSize) {
-      super(name, bufferSize);
+    public CachedIndexInput(IndexInput source, int blockSize, String name, String cacheName, Cache cache, IOContext context) {
+      super(name, context);
       _source = source;
       _blockSize = blockSize;
       _fileLength = source.length();
@@ -137,7 +140,7 @@ public class BlockDirectory extends Directory {
     }
 
     @Override
-    public Object clone() {
+    public CachedIndexInput clone() {
       CachedIndexInput clone = (CachedIndexInput) super.clone();
       clone._source = (IndexInput) _source.clone();
       return clone;
@@ -184,7 +187,7 @@ public class BlockDirectory extends Directory {
       byte[] buf = BufferStore.takeBuffer(_blockSize);
       _source.readBytes(buf, 0, length);
       System.arraycopy(buf, blockOffset, b, off, lengthToReadInBlock);
-      _cache.update(_cacheName, blockId, buf);
+      _cache.update(_cacheName, blockId, 0, buf, 0, _blockSize);
       BufferStore.putBuffer(buf);
     }
 
@@ -207,76 +210,99 @@ public class BlockDirectory extends Directory {
     _directory.close();
   }
 
-  private String getFileCacheName(String name) throws IOException {
-    return _dirName + "/" + name + ":" + fileModified(name);
+  String getFileCacheLocation(String name) {
+    return _dirName + "/" + name;
+  }
+
+  String getFileCacheName(String name) throws IOException {
+    return getFileCacheLocation(name) + ":" + getFileModified(name);
+  }
+
+  private long getFileModified(String name) throws IOException {
+    if (_directory instanceof FSDirectory) {
+      File directory = ((FSDirectory) _directory).getDirectory();
+      File file = new File(directory,name);
+      if (!file.exists()) {
+        throw new FileNotFoundException("File [" + name + "] not found");
+      }
+      return file.lastModified();
+    } else if (_directory instanceof HdfsDirectory) {
+      return ((HdfsDirectory) _directory).getFileModified(name);
+    } else {
+      throw new RuntimeException("Not supported");
+    }
   }
 
   public void clearLock(String name) throws IOException {
     _directory.clearLock(name);
   }
 
-  public void copy(Directory to, String src, String dest) throws IOException {
-    _directory.copy(to, src, dest);
+  @Override
+  public void copy(Directory to, String src, String dest, IOContext context) throws IOException {
+    _directory.copy(to, src, dest, context);
   }
 
+  @Override
   public LockFactory getLockFactory() {
     return _directory.getLockFactory();
   }
 
+  @Override
   public String getLockID() {
     return _directory.getLockID();
   }
 
+  @Override
   public Lock makeLock(String name) {
     return _directory.makeLock(name);
   }
 
+  @Override
   public void setLockFactory(LockFactory lockFactory) throws IOException {
     _directory.setLockFactory(lockFactory);
   }
 
+  @Override
   public void sync(Collection<String> names) throws IOException {
     _directory.sync(names);
   }
 
-  @SuppressWarnings("deprecation")
-  public void sync(String name) throws IOException {
-    _directory.sync(name);
-  }
-
   public String toString() {
     return _directory.toString();
   }
 
-  public IndexOutput createOutput(String name) throws IOException {
-    return _directory.createOutput(name);
+  @Override
+  public IndexOutput createOutput(String name, IOContext context) throws IOException {
+    IndexOutput dest = _directory.createOutput(name, context);
+//    if (_blockCacheFileTypes == null || isCachableFile(name)) {
+//      return new CachedIndexOutput(this, dest, _blockSize, name, _cache, _blockSize);
+//    }
+    return dest;
   }
 
+  @Override
   public void deleteFile(String name) throws IOException {
     _cache.delete(getFileCacheName(name));
     _directory.deleteFile(name);
   }
 
+  @Override
   public boolean fileExists(String name) throws IOException {
     return _directory.fileExists(name);
   }
 
+  @Override
   public long fileLength(String name) throws IOException {
     return _directory.fileLength(name);
   }
 
-  @SuppressWarnings("deprecation")
-  public long fileModified(String name) throws IOException {
-    return _directory.fileModified(name);
-  }
-
+  @Override
   public String[] listAll() throws IOException {
     return _directory.listAll();
   }
 
-  @SuppressWarnings("deprecation")
-  public void touchFile(String name) throws IOException {
-    _directory.touchFile(name);
+  public Directory getDirectory() {
+    return _directory;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/a4601422/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectoryCache.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectoryCache.java b/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectoryCache.java
index becd4ba..f6e8b10 100644
--- a/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectoryCache.java
+++ b/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectoryCache.java
@@ -16,22 +16,32 @@ package org.apache.blur.store.blockcache;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+import static org.apache.blur.metrics.MetricsConstants.CACHE;
+import static org.apache.blur.metrics.MetricsConstants.HIT;
+import static org.apache.blur.metrics.MetricsConstants.MISS;
+import static org.apache.blur.metrics.MetricsConstants.ORG_APACHE_BLUR;
+
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.blur.metrics.BlurMetrics;
-
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
 
 public class BlockDirectoryCache implements Cache {
+
   private BlockCache _blockCache;
   private AtomicInteger _counter = new AtomicInteger();
   private Map<String, Integer> _names = new ConcurrentHashMap<String, Integer>();
-  private BlurMetrics _blurMetrics;
+  private Meter hits;
+  private Meter misses;
 
-  public BlockDirectoryCache(BlockCache blockCache, BlurMetrics blurMetrics) {
+  public BlockDirectoryCache(BlockCache blockCache) {
     _blockCache = blockCache;
-    _blurMetrics = blurMetrics;
+    hits = Metrics.newMeter(new MetricName(ORG_APACHE_BLUR, CACHE, HIT), HIT, TimeUnit.SECONDS);
+    misses = Metrics.newMeter(new MetricName(ORG_APACHE_BLUR, CACHE, MISS), MISS, TimeUnit.SECONDS);
   }
 
   @Override
@@ -40,7 +50,7 @@ public class BlockDirectoryCache implements Cache {
   }
 
   @Override
-  public void update(String name, long blockId, byte[] buffer) {
+  public void update(String name, long blockId, int blockOffset, byte[] buffer, int offset, int length) {
     Integer file = _names.get(name);
     if (file == null) {
       file = _counter.incrementAndGet();
@@ -49,7 +59,7 @@ public class BlockDirectoryCache implements Cache {
     BlockCacheKey blockCacheKey = new BlockCacheKey();
     blockCacheKey.setBlock(blockId);
     blockCacheKey.setFile(file);
-    _blockCache.store(blockCacheKey, buffer);
+    _blockCache.store(blockCacheKey, blockOffset, buffer, offset, length);
   }
 
   @Override
@@ -63,9 +73,9 @@ public class BlockDirectoryCache implements Cache {
     blockCacheKey.setFile(file);
     boolean fetch = _blockCache.fetch(blockCacheKey, b, blockOffset, off, lengthToReadInBlock);
     if (fetch) {
-      _blurMetrics.blockCacheHit.incrementAndGet();
+      hits.mark();
     } else {
-      _blurMetrics.blockCacheMiss.incrementAndGet();
+      misses.mark();
     }
     return fetch;
   }
@@ -74,4 +84,12 @@ public class BlockDirectoryCache implements Cache {
   public long size() {
     return _blockCache.getSize();
   }
+
+  @Override
+  public void renameCacheFile(String source, String dest) {
+    Integer file = _names.remove(source);
+    if (file != null) {
+      _names.put(dest, file);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/a4601422/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockLocks.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockLocks.java b/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockLocks.java
index 502d786..b25cd12 100644
--- a/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockLocks.java
+++ b/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockLocks.java
@@ -18,7 +18,6 @@ package org.apache.blur.store.blockcache;
  */
 import java.util.concurrent.atomic.AtomicLongArray;
 
-import org.apache.lucene.util.BitUtil;
 import org.apache.lucene.util.OpenBitSet;
 
 public class BlockLocks {
@@ -46,12 +45,12 @@ public class BlockLocks {
     long word = ~bits.get(i) >> subIndex; // skip all the bits to the right of
                                           // index
     if (word != 0) {
-      return (i << 6) + subIndex + BitUtil.ntz(word);
+      return (i << 6) + subIndex + Long.numberOfTrailingZeros(word);
     }
     while (++i < wlen) {
       word = ~bits.get(i);
       if (word != 0) {
-        return (i << 6) + BitUtil.ntz(word);
+        return (i << 6) + Long.numberOfTrailingZeros(word);
       }
     }
     return -1;

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/a4601422/src/blur-store/src/main/java/org/apache/blur/store/blockcache/Cache.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/blockcache/Cache.java b/src/blur-store/src/main/java/org/apache/blur/store/blockcache/Cache.java
index 98a66c3..38e70cb 100644
--- a/src/blur-store/src/main/java/org/apache/blur/store/blockcache/Cache.java
+++ b/src/blur-store/src/main/java/org/apache/blur/store/blockcache/Cache.java
@@ -18,12 +18,39 @@ package org.apache.blur.store.blockcache;
  */
 public interface Cache {
 
+  /**
+   * Remove a file from the cache.
+   * 
+   * @param name cache file name
+   */
   void delete(String name);
 
-  void update(String name, long blockId, byte[] buffer);
-
+  /**
+   * Update the content of the specified cache file. Creates cache entry
+   * if necessary.
+   * 
+   */
+  void update(String name, long blockId, int blockOffset, byte[] buffer, int offset, int length);
+  
+  /**
+   * Fetch the specified cache file content.
+   *
+   * @return true if cached content found, otherwise return false
+   */
   boolean fetch(String name, long blockId, int blockOffset, byte[] b, int off, int lengthToReadInBlock);
 
+  /**
+   * Number of entries in the cache.
+   */
   long size();
 
+  /**
+   * Expert: Rename the specified file in the cache. Allows a file to be moved
+   * without invalidating the cache.
+   * 
+   * @param source original name
+   * @param dest final name
+   */
+  void renameCacheFile(String source, String dest);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/a4601422/src/blur-store/src/main/java/org/apache/blur/store/blockcache/CachedIndexOutput.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/blockcache/CachedIndexOutput.java b/src/blur-store/src/main/java/org/apache/blur/store/blockcache/CachedIndexOutput.java
new file mode 100644
index 0000000..8a5170e
--- /dev/null
+++ b/src/blur-store/src/main/java/org/apache/blur/store/blockcache/CachedIndexOutput.java
@@ -0,0 +1,88 @@
+package org.apache.blur.store.blockcache;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.blur.store.buffer.ReusedBufferedIndexOutput;
+import org.apache.lucene.store.IndexOutput;
+
+/*
+ * Cache the blocks as they are written. The cache file name is the name of
+ * the file until the file is closed, at which point the cache is updated
+ * to include the last modified date (which is unknown until that point).
+ */
+public class CachedIndexOutput extends ReusedBufferedIndexOutput {
+  private final BlockDirectory _directory;
+  private final IndexOutput _dest;
+  private final int _blockSize;
+  private final String _name;
+  private final String _location;
+  private final Cache _cache;
+
+  public CachedIndexOutput(BlockDirectory directory, IndexOutput dest, int blockSize, String name, Cache cache, int bufferSize) {
+    super(bufferSize);
+    _directory = directory;
+    _dest = dest;
+    _blockSize = blockSize;
+    _name = name;
+    _location = _directory.getFileCacheLocation(name);
+    _cache = cache;
+  }
+
+  @Override
+  public void flushInternal() throws IOException {
+    _dest.flush();
+  }
+
+  @Override
+  public void closeInternal() throws IOException {
+    _dest.close();
+    _cache.renameCacheFile(_location, _directory.getFileCacheName(_name));
+  }
+
+  @Override
+  public void seekInternal(long pos) throws IOException {
+    throw new IOException("Seek not supported");
+  }
+
+  private int writeBlock(long position, byte[] b, int offset, int length) throws IOException {
+    // read whole block into cache and then provide needed data
+    long blockId = BlockDirectory.getBlock(position);
+    int blockOffset = (int) BlockDirectory.getPosition(position);
+    int lengthToWriteInBlock = Math.min(length, _blockSize - blockOffset);
+
+    // write the file and copy into the cache
+    _dest.writeBytes(b, offset, lengthToWriteInBlock);
+    _cache.update(_location, blockId, blockOffset, b, offset, lengthToWriteInBlock);
+
+    return lengthToWriteInBlock;
+  }
+
+  @Override
+  public void writeInternal(byte[] b, int offset, int length) throws IOException {
+    long position = getBufferStart();
+    while (length > 0) {
+      int len = writeBlock(position, b, offset, length);
+      position += len;
+      length -= len;
+      offset += len;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/a4601422/src/blur-store/src/main/java/org/apache/blur/store/buffer/BufferStore.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/buffer/BufferStore.java b/src/blur-store/src/main/java/org/apache/blur/store/buffer/BufferStore.java
new file mode 100644
index 0000000..2701726
--- /dev/null
+++ b/src/blur-store/src/main/java/org/apache/blur/store/buffer/BufferStore.java
@@ -0,0 +1,126 @@
+package org.apache.blur.store.buffer;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.apache.blur.metrics.MetricsConstants.INTERNAL_BUFFERS;
+import static org.apache.blur.metrics.MetricsConstants.LOST;
+import static org.apache.blur.metrics.MetricsConstants.LUCENE;
+import static org.apache.blur.metrics.MetricsConstants.ORG_APACHE_BLUR;
+import static org.apache.blur.metrics.MetricsConstants.OTHER_SIZES_ALLOCATED;
+import static org.apache.blur.metrics.MetricsConstants._1K_SIZE_ALLOCATED;
+import static org.apache.blur.metrics.MetricsConstants._8K_SIZE_ALLOCATED;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
+
+public class BufferStore {
+
+  private static final Log LOG = LogFactory.getLog(BufferStore.class);
+
+  private static BlockingQueue<byte[]> _1024;
+  private static BlockingQueue<byte[]> _8192;
+
+  private static Meter _lost;
+  private static Meter _8K;
+  private static Meter _1K;
+  private static Meter _other;
+  private volatile static boolean setup = false;
+
+  public static void init(int _1KSize, int _8KSize) {
+    if (!setup) {
+      _lost = Metrics.newMeter(new MetricName(ORG_APACHE_BLUR, LUCENE, LOST, INTERNAL_BUFFERS), INTERNAL_BUFFERS, TimeUnit.SECONDS);
+      _8K = Metrics.newMeter(new MetricName(ORG_APACHE_BLUR, LUCENE, _1K_SIZE_ALLOCATED, INTERNAL_BUFFERS), INTERNAL_BUFFERS, TimeUnit.SECONDS);
+      _1K = Metrics.newMeter(new MetricName(ORG_APACHE_BLUR, LUCENE, _8K_SIZE_ALLOCATED, INTERNAL_BUFFERS), INTERNAL_BUFFERS, TimeUnit.SECONDS);
+      _other = Metrics.newMeter(new MetricName(ORG_APACHE_BLUR, LUCENE, OTHER_SIZES_ALLOCATED, INTERNAL_BUFFERS), INTERNAL_BUFFERS, TimeUnit.SECONDS);
+      LOG.info("Initializing the 1024 buffers with [{0}] buffers.", _1KSize);
+      _1024 = setupBuffers(1024, _1KSize, _1K);
+      LOG.info("Initializing the 8192 buffers with [{0}] buffers.", _8KSize);
+      _8192 = setupBuffers(8192, _8KSize, _8K);
+      setup = true;
+    }
+  }
+
+  private static BlockingQueue<byte[]> setupBuffers(int bufferSize, int count, Meter meter) {
+    BlockingQueue<byte[]> queue = new ArrayBlockingQueue<byte[]>(count);
+    for (int i = 0; i < count; i++) {
+      meter.mark();
+      queue.add(new byte[bufferSize]);
+    }
+    return queue;
+  }
+
+  public static byte[] takeBuffer(int bufferSize) {
+    switch (bufferSize) {
+    case 1024:
+      return newBuffer1024(_1024.poll());
+    case 8192:
+      return newBuffer8192(_8192.poll());
+    default:
+      return newBuffer(bufferSize);
+    }
+  }
+
+  public static void putBuffer(byte[] buffer) {
+    if (buffer == null) {
+      return;
+    }
+    int bufferSize = buffer.length;
+    switch (bufferSize) {
+    case 1024:
+      checkReturn(_1024.offer(buffer));
+      return;
+    case 8192:
+      checkReturn(_8192.offer(buffer));
+      return;
+    }
+  }
+
+  private static void checkReturn(boolean offer) {
+    if (!offer) {
+      _lost.mark();
+    }
+  }
+
+  private static byte[] newBuffer1024(byte[] buf) {
+    if (buf != null) {
+      return buf;
+    }
+    _1K.mark();
+    return new byte[1024];
+  }
+
+  private static byte[] newBuffer8192(byte[] buf) {
+    if (buf != null) {
+      return buf;
+    }
+    _8K.mark();
+    return new byte[8192];
+  }
+
+  private static byte[] newBuffer(int size) {
+    _other.mark();
+    return new byte[size];
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/a4601422/src/blur-store/src/main/java/org/apache/blur/store/buffer/ReusedBufferedIndexInput.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/buffer/ReusedBufferedIndexInput.java b/src/blur-store/src/main/java/org/apache/blur/store/buffer/ReusedBufferedIndexInput.java
new file mode 100644
index 0000000..23c09d3
--- /dev/null
+++ b/src/blur-store/src/main/java/org/apache/blur/store/buffer/ReusedBufferedIndexInput.java
@@ -0,0 +1,361 @@
+package org.apache.blur.store.buffer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+
+/** Base implementation class for buffered {@link IndexInput}. */
+public abstract class ReusedBufferedIndexInput extends IndexInput {
+
+  // START Changed code from normal BufferedIndexInput
+  @Override
+  public final void close() throws IOException {
+    closeInternal();
+    BufferStore.putBuffer(buffer);
+    buffer = null;
+  }
+
+  protected abstract void closeInternal() throws IOException;
+
+  private void refill() throws IOException {
+    long start = bufferStart + bufferPosition;
+    long end = start + bufferSize;
+    if (end > length()) // don't read past EOF
+      end = length();
+    int newLength = (int) (end - start);
+    if (newLength <= 0)
+      throw new EOFException("read past EOF: " + this);
+
+    if (buffer == null) {
+      buffer = BufferStore.takeBuffer(bufferSize);
+      seekInternal(bufferStart);
+    }
+    readInternal(buffer, 0, newLength);
+    bufferLength = newLength;
+    bufferStart = start;
+    bufferPosition = 0;
+  }
+  
+  // The normal read buffer size defaults to 1024, but
+  // increasing this during merging seems to yield
+  // performance gains. However we don't want to increase
+  // it too much because there are quite a few
+  // BufferedIndexInputs created during merging. See
+  // LUCENE-888 for details.
+  /**
+   * A buffer size for merges set to 8192, 4k in Lucene
+   */
+  public static final int MERGE_BUFFER_SIZE = 8192;
+  
+  //END Changed code from normal BufferedIndexInput
+
+  /** Default buffer size set to 1024 */
+  public static final int BUFFER_SIZE = 1024;
+
+
+
+  private int bufferSize = BUFFER_SIZE;
+
+  protected byte[] buffer;
+
+  private long bufferStart = 0; // position in file of buffer
+  private int bufferLength = 0; // end of valid bytes
+  private int bufferPosition = 0; // next byte to read
+
+  @Override
+  public final byte readByte() throws IOException {
+    if (bufferPosition >= bufferLength)
+      refill();
+    return buffer[bufferPosition++];
+  }
+
+  public ReusedBufferedIndexInput(String resourceDesc) {
+    this(resourceDesc, BUFFER_SIZE);
+  }
+
+  public ReusedBufferedIndexInput(String resourceDesc, IOContext context) {
+    this(resourceDesc, bufferSize(context));
+  }
+
+  /** Inits BufferedIndexInput with a specific bufferSize */
+  public ReusedBufferedIndexInput(String resourceDesc, int bufferSize) {
+    super(resourceDesc);
+    checkBufferSize(bufferSize);
+    this.bufferSize = bufferSize;
+  }
+
+  /** Returns buffer size. @see #setBufferSize */
+  public final int getBufferSize() {
+    return bufferSize;
+  }
+
+  private void checkBufferSize(int bufferSize) {
+    if (bufferSize <= 0)
+      throw new IllegalArgumentException("bufferSize must be greater than 0 (got " + bufferSize + ")");
+  }
+
+  @Override
+  public final void readBytes(byte[] b, int offset, int len) throws IOException {
+    readBytes(b, offset, len, true);
+  }
+
+  @Override
+  public final void readBytes(byte[] b, int offset, int len, boolean useBuffer) throws IOException {
+
+    if (len <= (bufferLength - bufferPosition)) {
+      // the buffer contains enough data to satisfy this request
+      if (len > 0) // to allow b to be null if len is 0...
+        System.arraycopy(buffer, bufferPosition, b, offset, len);
+      bufferPosition += len;
+    } else {
+      // the buffer does not have enough data. First serve all we've got.
+      int available = bufferLength - bufferPosition;
+      if (available > 0) {
+        System.arraycopy(buffer, bufferPosition, b, offset, available);
+        offset += available;
+        len -= available;
+        bufferPosition += available;
+      }
+      // and now, read the remaining 'len' bytes:
+      if (useBuffer && len < bufferSize) {
+        // If the amount left to read is small enough, and
+        // we are allowed to use our buffer, do it in the usual
+        // buffered way: fill the buffer and copy from it:
+        refill();
+        if (bufferLength < len) {
+          // Throw an exception when refill() could not read len bytes:
+          System.arraycopy(buffer, 0, b, offset, bufferLength);
+          throw new EOFException("read past EOF: " + this);
+        } else {
+          System.arraycopy(buffer, 0, b, offset, len);
+          bufferPosition = len;
+        }
+      } else {
+        // The amount left to read is larger than the buffer
+        // or we've been asked to not use our buffer -
+        // there's no performance reason not to read it all
+        // at once. Note that unlike the previous code of
+        // this function, there is no need to do a seek
+        // here, because there's no need to reread what we
+        // had in the buffer.
+        long after = bufferStart + bufferPosition + len;
+        if (after > length())
+          throw new EOFException("read past EOF: " + this);
+        readInternal(b, offset, len);
+        bufferStart = after;
+        bufferPosition = 0;
+        bufferLength = 0; // trigger refill() on read
+      }
+    }
+  }
+
+  @Override
+  public final short readShort() throws IOException {
+    if (2 <= (bufferLength - bufferPosition)) {
+      return (short) (((buffer[bufferPosition++] & 0xFF) << 8) | (buffer[bufferPosition++] & 0xFF));
+    } else {
+      return super.readShort();
+    }
+  }
+
+  @Override
+  public final int readInt() throws IOException {
+    if (4 <= (bufferLength - bufferPosition)) {
+      return ((buffer[bufferPosition++] & 0xFF) << 24) | ((buffer[bufferPosition++] & 0xFF) << 16) | ((buffer[bufferPosition++] & 0xFF) << 8) | (buffer[bufferPosition++] & 0xFF);
+    } else {
+      return super.readInt();
+    }
+  }
+
+  @Override
+  public final long readLong() throws IOException {
+    if (8 <= (bufferLength - bufferPosition)) {
+      final int i1 = ((buffer[bufferPosition++] & 0xff) << 24) | ((buffer[bufferPosition++] & 0xff) << 16) | ((buffer[bufferPosition++] & 0xff) << 8)
+          | (buffer[bufferPosition++] & 0xff);
+      final int i2 = ((buffer[bufferPosition++] & 0xff) << 24) | ((buffer[bufferPosition++] & 0xff) << 16) | ((buffer[bufferPosition++] & 0xff) << 8)
+          | (buffer[bufferPosition++] & 0xff);
+      return (((long) i1) << 32) | (i2 & 0xFFFFFFFFL);
+    } else {
+      return super.readLong();
+    }
+  }
+
+  @Override
+  public final int readVInt() throws IOException {
+    if (5 <= (bufferLength - bufferPosition)) {
+      byte b = buffer[bufferPosition++];
+      if (b >= 0)
+        return b;
+      int i = b & 0x7F;
+      b = buffer[bufferPosition++];
+      i |= (b & 0x7F) << 7;
+      if (b >= 0)
+        return i;
+      b = buffer[bufferPosition++];
+      i |= (b & 0x7F) << 14;
+      if (b >= 0)
+        return i;
+      b = buffer[bufferPosition++];
+      i |= (b & 0x7F) << 21;
+      if (b >= 0)
+        return i;
+      b = buffer[bufferPosition++];
+      // Warning: the next ands use 0x0F / 0xF0 - beware copy/paste errors:
+      i |= (b & 0x0F) << 28;
+      if ((b & 0xF0) == 0)
+        return i;
+      throw new IOException("Invalid vInt detected (too many bits)");
+    } else {
+      return super.readVInt();
+    }
+  }
+
+  @Override
+  public final long readVLong() throws IOException {
+    if (9 <= bufferLength - bufferPosition) {
+      byte b = buffer[bufferPosition++];
+      if (b >= 0)
+        return b;
+      long i = b & 0x7FL;
+      b = buffer[bufferPosition++];
+      i |= (b & 0x7FL) << 7;
+      if (b >= 0)
+        return i;
+      b = buffer[bufferPosition++];
+      i |= (b & 0x7FL) << 14;
+      if (b >= 0)
+        return i;
+      b = buffer[bufferPosition++];
+      i |= (b & 0x7FL) << 21;
+      if (b >= 0)
+        return i;
+      b = buffer[bufferPosition++];
+      i |= (b & 0x7FL) << 28;
+      if (b >= 0)
+        return i;
+      b = buffer[bufferPosition++];
+      i |= (b & 0x7FL) << 35;
+      if (b >= 0)
+        return i;
+      b = buffer[bufferPosition++];
+      i |= (b & 0x7FL) << 42;
+      if (b >= 0)
+        return i;
+      b = buffer[bufferPosition++];
+      i |= (b & 0x7FL) << 49;
+      if (b >= 0)
+        return i;
+      b = buffer[bufferPosition++];
+      i |= (b & 0x7FL) << 56;
+      if (b >= 0)
+        return i;
+      throw new IOException("Invalid vLong detected (negative values disallowed)");
+    } else {
+      return super.readVLong();
+    }
+  }
+
+  /**
+   * Expert: implements buffer refill. Reads bytes from the current position in
+   * the input.
+   * 
+   * @param b
+   *          the array to read bytes into
+   * @param offset
+   *          the offset in the array to start storing bytes
+   * @param length
+   *          the number of bytes to read
+   */
+  protected abstract void readInternal(byte[] b, int offset, int length) throws IOException;
+
+  @Override
+  public final long getFilePointer() {
+    return bufferStart + bufferPosition;
+  }
+
+  @Override
+  public final void seek(long pos) throws IOException {
+    if (pos >= bufferStart && pos < (bufferStart + bufferLength))
+      bufferPosition = (int) (pos - bufferStart); // seek within buffer
+    else {
+      bufferStart = pos;
+      bufferPosition = 0;
+      bufferLength = 0; // trigger refill() on read()
+      seekInternal(pos);
+    }
+  }
+
+  /**
+   * Expert: implements seek. Sets current position in this file, where the next
+   * {@link #readInternal(byte[],int,int)} will occur.
+   * 
+   * @see #readInternal(byte[],int,int)
+   */
+  protected abstract void seekInternal(long pos) throws IOException;
+
+  @Override
+  public ReusedBufferedIndexInput clone() {
+    ReusedBufferedIndexInput clone = (ReusedBufferedIndexInput) super.clone();
+
+    clone.buffer = null;
+    clone.bufferLength = 0;
+    clone.bufferPosition = 0;
+    clone.bufferStart = getFilePointer();
+
+    return clone;
+  }
+
+  /**
+   * Flushes the in-memory bufer to the given output, copying at most
+   * <code>numBytes</code>.
+   * <p>
+   * <b>NOTE:</b> this method does not refill the buffer, however it does
+   * advance the buffer position.
+   * 
+   * @return the number of bytes actually flushed from the in-memory buffer.
+   */
+  protected final int flushBuffer(IndexOutput out, long numBytes) throws IOException {
+    int toCopy = bufferLength - bufferPosition;
+    if (toCopy > numBytes) {
+      toCopy = (int) numBytes;
+    }
+    if (toCopy > 0) {
+      out.writeBytes(buffer, bufferPosition, toCopy);
+      bufferPosition += toCopy;
+    }
+    return toCopy;
+  }
+
+  /**
+   * Returns default buffer sizes for the given {@link IOContext}
+   */
+  public static int bufferSize(IOContext context) {
+    switch (context.context) {
+    case MERGE:
+      return MERGE_BUFFER_SIZE;
+    default:
+      return BUFFER_SIZE;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/a4601422/src/blur-store/src/main/java/org/apache/blur/store/buffer/ReusedBufferedIndexOutput.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/buffer/ReusedBufferedIndexOutput.java b/src/blur-store/src/main/java/org/apache/blur/store/buffer/ReusedBufferedIndexOutput.java
new file mode 100644
index 0000000..c4576d5
--- /dev/null
+++ b/src/blur-store/src/main/java/org/apache/blur/store/buffer/ReusedBufferedIndexOutput.java
@@ -0,0 +1,189 @@
+package org.apache.blur.store.buffer;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.lucene.store.IndexOutput;
+
+public abstract class ReusedBufferedIndexOutput extends IndexOutput {
+
+  public static final int BUFFER_SIZE = 1024;
+
+  private int bufferSize = BUFFER_SIZE;
+  
+  protected byte[] buffer;
+
+  /** position in the file of buffer */
+  private long bufferStart = 0;
+  /** end of valid bytes */
+  private int bufferLength = 0;
+  /** next byte to write */
+  private int bufferPosition = 0;
+  /** total length of the file */
+  private long _fileLength = 0;
+  
+  public ReusedBufferedIndexOutput() {
+    this(BUFFER_SIZE);
+  }
+
+  public ReusedBufferedIndexOutput(int bufferSize) {
+    checkBufferSize(bufferSize);
+    this.bufferSize = bufferSize;
+    buffer = BufferStore.takeBuffer(this.bufferSize);
+  }
+
+  protected long getBufferStart() {
+    return bufferStart;
+  }
+
+  private void checkBufferSize(int bufferSize) {
+    if (bufferSize <= 0)
+      throw new IllegalArgumentException("bufferSize must be greater than 0 (got " + bufferSize + ")");
+  }
+
+  /** Write the buffered bytes to cache */
+  private void flushBufferToCache() throws IOException {
+    writeInternal(buffer, 0, bufferLength);
+
+    bufferStart += bufferLength;
+    bufferLength = 0;
+    bufferPosition = 0;
+  }
+
+  protected abstract void flushInternal() throws IOException;
+
+  @Override
+  public void flush() throws IOException {
+    flushBufferToCache();
+    flushInternal();
+  }
+
+  protected abstract void closeInternal() throws IOException;
+
+  @Override
+  public void close() throws IOException {
+    flushBufferToCache();
+    closeInternal();
+    BufferStore.putBuffer(buffer);
+    buffer = null;
+  }
+
+  @Override
+  public long getFilePointer() {
+    return bufferStart + bufferPosition;
+  }
+
+  protected abstract void seekInternal(long pos) throws IOException;
+
+  @Override
+  public void seek(long pos) throws IOException {
+    if (pos > _fileLength) {
+      _fileLength = pos;
+    }
+
+    if (pos >= bufferStart && pos < (bufferStart + bufferLength))
+      bufferPosition = (int)(pos - bufferStart);  // seek within buffer
+    else {
+      flushBufferToCache();
+      bufferStart = pos;
+      bufferPosition = 0;
+      bufferLength = 0;
+      seekInternal(pos);
+    }
+  }
+
+  @Override
+  public long length() throws IOException {
+    return _fileLength;
+  }
+
+  @Override
+  public void writeByte(byte b) throws IOException {
+    if (bufferPosition >= bufferSize) {
+      flushBufferToCache();
+    }
+    if (getFilePointer() >= _fileLength) {
+      _fileLength++;
+    }
+    buffer[bufferPosition++] = b;
+    if (bufferPosition > bufferLength) {
+      bufferLength = bufferPosition;
+    }
+  }
+
+  /** Expert: implements buffer flushing to cache. Writes bytes to the current
+   * position in the output.
+   * @param b the array of bytes to write
+   * @param offset the offset in the array of bytes to write
+   * @param length the number of bytes to write
+   */
+  protected abstract void writeInternal(byte[] b, int offset, int length) throws IOException;
+
+  @Override
+  public void writeBytes(byte[] b, int offset, int length) throws IOException {
+    if (getFilePointer() + length > _fileLength) {
+      _fileLength = getFilePointer() + length;
+    }
+    if(length <= bufferSize - bufferPosition){
+      // the buffer contains enough space to satisfy this request
+      if(length > 0) { // to allow b to be null if len is 0...
+        System.arraycopy(b, offset, buffer, bufferPosition, length);
+      }
+      bufferPosition += length;
+      if (bufferPosition > bufferLength) {
+        bufferLength = bufferPosition;
+      }
+    } else {
+      // the buffer does not have enough space. First buffer all we've got.
+      int available = bufferSize - bufferPosition;
+      if(available > 0){
+        System.arraycopy(b, offset, buffer, bufferPosition, available);
+        offset += available;
+        length -= available;
+        bufferPosition = bufferSize;
+        bufferLength = bufferSize;
+      }
+
+      flushBufferToCache();
+
+      // and now, write the remaining 'length' bytes:
+      if (length < bufferSize){
+        // If the amount left to write is small enough do it in the usual
+        // buffered way:
+        System.arraycopy(b, offset, buffer, 0, length);
+        bufferPosition = length;
+        bufferLength = length;
+      } else {
+        // The amount left to write is larger than the buffer
+        // there's no performance reason not to write it all
+        // at once.
+        writeInternal(b, offset, length);
+        bufferStart += length;
+        bufferPosition = 0;
+        bufferLength = 0;
+      }
+
+    }
+  }
+
+  @Override
+  protected Object clone() throws CloneNotSupportedException {
+    throw new CloneNotSupportedException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/a4601422/src/blur-store/src/main/java/org/apache/blur/store/compressed/CompressedFieldDataDirectory.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/compressed/CompressedFieldDataDirectory.java b/src/blur-store/src/main/java/org/apache/blur/store/compressed/CompressedFieldDataDirectory.java
deleted file mode 100644
index 368d719..0000000
--- a/src/blur-store/src/main/java/org/apache/blur/store/compressed/CompressedFieldDataDirectory.java
+++ /dev/null
@@ -1,813 +0,0 @@
-package org.apache.blur.store.compressed;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLongArray;
-
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.Compressor;
-import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.IndexInput;
-import org.apache.lucene.store.IndexOutput;
-import org.apache.lucene.store.Lock;
-import org.apache.lucene.store.LockFactory;
-
-
-public class CompressedFieldDataDirectory extends Directory {
-
-  private static final Log LOG = LogFactory.getLog(CompressedFieldDataDirectory.class);
-
-  private static final int _MIN_BUFFER_SIZE = 100;
-  private static final String FDZ = ".fdz";
-  private static final String FDT = ".fdt";
-  private static final String Z_TMP = ".tmp";
-
-  private static final int COMPRESSED_BUFFER_SIZE = 65536;
-
-  public static CompressionCodec DEFAULT_COMPRESSION = new DefaultCodec();
-
-  private CompressionCodec _compression = DEFAULT_COMPRESSION;
-  private Directory _directory;
-  private int _writingBlockSize;
-
-  public Directory getInnerDirectory() {
-    return _directory;
-  }
-
-  public CompressedFieldDataDirectory(Directory dir) {
-    this(dir, DEFAULT_COMPRESSION);
-  }
-
-  public CompressedFieldDataDirectory(Directory dir, CompressionCodec compression) {
-    this(dir, compression, COMPRESSED_BUFFER_SIZE);
-  }
-
-  public CompressedFieldDataDirectory(Directory dir, CompressionCodec compression, int blockSize) {
-    _directory = dir;
-    if (compression == null) {
-      _compression = DEFAULT_COMPRESSION;
-    } else {
-      _compression = compression;
-    }
-    _writingBlockSize = blockSize;
-  }
-
-  private IndexInput wrapInput(String name) throws IOException {
-    IndexInput indexInput = _directory.openInput(name);
-    int version = getVersion(indexInput);
-    switch (version) {
-    case 0:
-      return new CompressedIndexInput_V0(name, indexInput, _compression);
-    case 1:
-      return new CompressedIndexInput_V1(name, indexInput, _compression);
-    default:
-      throw new RuntimeException("Unknown version [" + version + "]");
-    }
-  }
-
-  private int getVersion(IndexInput indexInput) throws IOException {
-    long length = indexInput.length();
-    indexInput.seek(length - 8);
-    long l = indexInput.readLong();
-    if (l < 0) {
-      return (int) Math.abs(l);
-    } else {
-      return 0;
-    }
-  }
-
-  private IndexOutput wrapOutput(String name) throws IOException {
-    return new CompressedIndexOutput_V0(name, _directory, _compression, _writingBlockSize);
-  }
-
-  public IndexInput openInput(String name) throws IOException {
-    if (compressedFileExists(name)) {
-      return wrapInput(getCompressedName(name));
-    }
-    return _directory.openInput(name);
-  }
-
-  public IndexInput openInput(String name, int bufferSize) throws IOException {
-    if (compressedFileExists(name)) {
-      return wrapInput(getCompressedName(name));
-    }
-    return _directory.openInput(name, bufferSize);
-  }
-
-  private boolean compressedFileExists(String name) throws IOException {
-    if (!name.endsWith(FDT)) {
-      return false;
-    }
-    return _directory.fileExists(getCompressedName(name));
-  }
-
-  private String getCompressedName(String name) {
-    int index = name.lastIndexOf('.');
-    return name.substring(0, index) + FDZ;
-  }
-
-  private String getNormalName(String compressedName) {
-    int index = compressedName.lastIndexOf('.');
-    return compressedName.substring(0, index) + FDT;
-  }
-
-  public IndexOutput createOutput(String name) throws IOException {
-    if (name.endsWith(FDT)) {
-      return wrapOutput(getCompressedName(name));
-    }
-    return _directory.createOutput(name);
-  }
-
-  public void clearLock(String name) throws IOException {
-    _directory.clearLock(name);
-  }
-
-  public void close() throws IOException {
-    _directory.close();
-  }
-
-  public void deleteFile(String name) throws IOException {
-    if (compressedFileExists(name)) {
-      _directory.deleteFile(getCompressedName(name));
-    } else {
-      _directory.deleteFile(name);
-    }
-  }
-
-  public boolean fileExists(String name) throws IOException {
-    if (compressedFileExists(name)) {
-      return true;
-    }
-    return _directory.fileExists(name);
-  }
-
-  public long fileLength(String name) throws IOException {
-    if (compressedFileExists(name)) {
-      IndexInput input = _directory.openInput(getCompressedName(name));
-      try {
-        long length = input.length();
-        input.seek(length - 8);
-        long fileLength = input.readLong();
-        if (fileLength < 0) {
-          input.seek(length - 16);
-          return input.readLong();
-        } else {
-          return fileLength;
-        }
-      } finally {
-        input.close();
-      }
-    }
-    return _directory.fileLength(name);
-  }
-
-  @SuppressWarnings("deprecation")
-  public long fileModified(String name) throws IOException {
-    if (compressedFileExists(name)) {
-      return _directory.fileModified(getCompressedName(name));
-    }
-    return _directory.fileModified(name);
-  }
-
-  public String[] listAll() throws IOException {
-    return fixNames(_directory.listAll());
-  }
-
-  private String[] fixNames(String[] listAll) {
-    for (int i = 0; i < listAll.length; i++) {
-      if (listAll[i].endsWith(FDZ)) {
-        listAll[i] = getNormalName(listAll[i]);
-      }
-    }
-    return listAll;
-  }
-
-  public void touchFile(String name) throws IOException {
-    // do nothing
-  }
-
-  public LockFactory getLockFactory() {
-    return _directory.getLockFactory();
-  }
-
-  public String getLockID() {
-    return _directory.getLockID();
-  }
-
-  public Lock makeLock(String name) {
-    return _directory.makeLock(name);
-  }
-
-  public void setLockFactory(LockFactory lockFactory) throws IOException {
-    _directory.setLockFactory(lockFactory);
-  }
-
-  @SuppressWarnings("deprecation")
-  public void sync(String name) throws IOException {
-    if (compressedFileExists(name)) {
-      _directory.sync(getCompressedName(name));
-    } else {
-      _directory.sync(name);
-    }
-  }
-
-  public String toString() {
-    return _directory.toString();
-  }
-
-  public static class CompressedIndexOutput_V1 extends IndexOutput {
-
-    private static final long VERSION = -1L;
-    private long _position = 0;
-    private IndexOutput _output;
-    private byte[] _buffer;
-    private int _bufferPosition = 0;
-    private byte[] _compressedBuffer;
-    private IndexOutput _tmpOutput;
-    private Directory _directory;
-    private String _name;
-    private int _blockCount;
-    private Compressor _compressor;
-
-    public CompressedIndexOutput_V1(String name, Directory directory, CompressionCodec codec, int blockSize) throws IOException {
-      _compressor = codec.createCompressor();
-      if (_compressor == null) {
-        throw new RuntimeException("CompressionCodec [" + codec + "] does not support compressor on this platform.");
-      }
-      _directory = directory;
-      _name = name;
-      _output = directory.createOutput(name);
-      _tmpOutput = directory.createOutput(name + Z_TMP);
-      _buffer = new byte[blockSize];
-      int dsize = blockSize * 2;
-      if (dsize < _MIN_BUFFER_SIZE) {
-        dsize = _MIN_BUFFER_SIZE;
-      }
-      _compressedBuffer = new byte[dsize];
-    }
-
-    @Override
-    public void writeByte(byte b) throws IOException {
-      _buffer[_bufferPosition] = b;
-      _bufferPosition++;
-      _position++;
-      flushIfNeeded();
-    }
-
-    private void flushIfNeeded() throws IOException {
-      if (_bufferPosition >= _buffer.length) {
-        flushBuffer();
-        _bufferPosition = 0;
-      }
-    }
-
-    private void flushBuffer() throws IOException {
-      if (_bufferPosition > 0) {
-        _compressor.reset();
-        _compressor.setInput(_buffer, 0, _bufferPosition);
-        _compressor.finish();
-
-        long filePointer = _output.getFilePointer();
-
-        int length = _compressor.compress(_compressedBuffer, 0, _compressedBuffer.length);
-
-        _tmpOutput.writeLong(filePointer);
-        _blockCount++;
-        _output.writeBytes(_compressedBuffer, 0, length);
-      }
-    }
-
-    @Override
-    public void writeBytes(byte[] b, int offset, int length) throws IOException {
-      int len = length + offset;
-      for (int i = offset; i < len; i++) {
-        writeByte(b[i]);
-      }
-    }
-
-    @Override
-    public void close() throws IOException {
-      flushBuffer();
-      _tmpOutput.close();
-      IndexInput input = _directory.openInput(_name + Z_TMP);
-      try {
-        long len = input.length();
-        long readCount = 0;
-        while (readCount < len) {
-          int toRead = readCount + _buffer.length > len ? (int) (len - readCount) : _buffer.length;
-          input.readBytes(_buffer, 0, toRead);
-          _output.writeBytes(_buffer, toRead);
-          readCount += toRead;
-        }
-        _output.writeInt(_blockCount);
-        _output.writeInt(_buffer.length);
-        _output.writeLong(_position);
-        _output.writeLong(VERSION);
-      } finally {
-        try {
-          _output.close();
-        } finally {
-          input.close();
-        }
-      }
-      _directory.deleteFile(_name + Z_TMP);
-      _compressor.end();
-    }
-
-    @Override
-    public long getFilePointer() {
-      return _position;
-    }
-
-    @Override
-    public long length() throws IOException {
-      throw new RuntimeException("not supported");
-    }
-
-    @Override
-    public void seek(long pos) throws IOException {
-      throw new RuntimeException("not supported");
-    }
-
-    @Override
-    public void flush() throws IOException {
-
-    }
-  }
-
-  public static class CompressedIndexOutput_V0 extends IndexOutput {
-
-    private long _position = 0;
-    private IndexOutput _output;
-    private byte[] _buffer;
-    private int _bufferPosition = 0;
-    private byte[] _compressedBuffer;
-    private IndexOutput _tmpOutput;
-    private Directory _directory;
-    private String _name;
-    private int _blockCount;
-    private Compressor _compressor;
-
-    public CompressedIndexOutput_V0(String name, Directory directory, CompressionCodec codec, int blockSize) throws IOException {
-      _compressor = codec.createCompressor();
-      if (_compressor == null) {
-        throw new RuntimeException("CompressionCodec [" + codec + "] does not support compressor on this platform.");
-      }
-      _directory = directory;
-      _name = name;
-      _output = directory.createOutput(name);
-      _tmpOutput = directory.createOutput(name + Z_TMP);
-      _buffer = new byte[blockSize];
-      int dsize = blockSize * 2;
-      if (dsize < _MIN_BUFFER_SIZE) {
-        dsize = _MIN_BUFFER_SIZE;
-      }
-      _compressedBuffer = new byte[dsize];
-    }
-
-    @Override
-    public void writeByte(byte b) throws IOException {
-      _buffer[_bufferPosition] = b;
-      _bufferPosition++;
-      _position++;
-      flushIfNeeded();
-    }
-
-    private void flushIfNeeded() throws IOException {
-      if (_bufferPosition >= _buffer.length) {
-        flushBuffer();
-        _bufferPosition = 0;
-      }
-    }
-
-    private void flushBuffer() throws IOException {
-      if (_bufferPosition > 0) {
-        _compressor.reset();
-        _compressor.setInput(_buffer, 0, _bufferPosition);
-        _compressor.finish();
-
-        long filePointer = _output.getFilePointer();
-
-        int length = _compressor.compress(_compressedBuffer, 0, _compressedBuffer.length);
-
-        _tmpOutput.writeVLong(filePointer);
-        _tmpOutput.writeVInt(length);
-        _blockCount++;
-        _output.writeBytes(_compressedBuffer, 0, length);
-      }
-    }
-
-    @Override
-    public void writeBytes(byte[] b, int offset, int length) throws IOException {
-      int len = length + offset;
-      for (int i = offset; i < len; i++) {
-        writeByte(b[i]);
-      }
-    }
-
-    @Override
-    public void close() throws IOException {
-      flushBuffer();
-      _tmpOutput.close();
-      IndexInput input = _directory.openInput(_name + Z_TMP);
-      try {
-        long len = input.length();
-        long readCount = 0;
-        while (readCount < len) {
-          int toRead = readCount + _buffer.length > len ? (int) (len - readCount) : _buffer.length;
-          input.readBytes(_buffer, 0, toRead);
-          _output.writeBytes(_buffer, toRead);
-          readCount += toRead;
-        }
-        _output.writeLong(len);
-        _output.writeInt(_blockCount);
-        _output.writeInt(_buffer.length);
-        _output.writeLong(_position);
-      } finally {
-        try {
-          _output.close();
-        } finally {
-          input.close();
-        }
-      }
-      _directory.deleteFile(_name + Z_TMP);
-      _compressor.end();
-    }
-
-    @Override
-    public long getFilePointer() {
-      return _position;
-    }
-
-    @Override
-    public long length() throws IOException {
-      throw new RuntimeException("not supported");
-    }
-
-    @Override
-    public void seek(long pos) throws IOException {
-      throw new RuntimeException("not supported");
-    }
-
-    @Override
-    public void flush() throws IOException {
-
-    }
-  }
-
-  public static class CompressedIndexInput_V1 extends IndexInput {
-
-    private static final long VERSION = -1l;
-
-    private static final int _SIZES_META_DATA = 24;
-
-    private final AtomicLongArray _blockPositions;
-    private final long _realLength;
-    private final long _origLength;
-    private final int _blockSize;
-
-    private IndexInput _indexInput;
-    private long _pos;
-    private boolean _isClone;
-    private long _currentBlockId = -1;
-    private byte[] _blockBuffer;
-    private byte[] _decompressionBuffer;
-    private int _blockBufferLength;
-    private Decompressor _decompressor;
-    private int _blockCount;
-    private Thread _openerThread;
-    private AtomicBoolean _errorInOpener = new AtomicBoolean(false);
-    private String _name;
-
-    public CompressedIndexInput_V1(String name, IndexInput indexInput, CompressionCodec codec) throws IOException {
-      super(name);
-      _name = name;
-      long s = System.nanoTime();
-      _decompressor = codec.createDecompressor();
-      if (_decompressor == null) {
-        throw new RuntimeException("CompressionCodec [" + codec + "] does not support decompressor on this platform.");
-      }
-      _indexInput = indexInput;
-      _realLength = _indexInput.length();
-
-      // read meta data
-      _indexInput.seek(_realLength - _SIZES_META_DATA); // 8 - 4 - 4 - 8
-      _blockCount = _indexInput.readInt();
-      _blockSize = _indexInput.readInt();
-      _origLength = _indexInput.readLong();
-      long version = _indexInput.readLong();
-      if (version != VERSION) {
-        throw new IOException("Version [" + version + "] mismatch!");
-      }
-
-      _blockPositions = new AtomicLongArray(_blockCount);
-      for (int i = 0; i < _blockCount; i++) {
-        _blockPositions.set(i, -1l);
-      }
-      readBlockPositions((IndexInput) indexInput.clone(), name);
-      setupBuffers(this);
-      long e = System.nanoTime();
-      double total = (e - s) / 1000000.0;
-      LOG.debug("Took [" + total + " ms] to open file [" + name + "].");
-    }
-
-    private void readBlockPositions(final IndexInput indexInput, final String name) throws IOException {
-      _openerThread = new Thread(new Runnable() {
-        @Override
-        public void run() {
-          try {
-            long s = System.nanoTime();
-            long metaDataLength = _blockCount * 8;
-            indexInput.seek(_realLength - _SIZES_META_DATA - metaDataLength);
-            for (int i = 0; i < _blockCount; i++) {
-              _blockPositions.set(i, indexInput.readLong());
-            }
-            long e = System.nanoTime();
-            double total = (e - s) / 1000000.0;
-            LOG.debug("Took [{0} ms] to read block positions with blockCount of [{1}] in file [{2}].", total, _blockCount, name);
-            indexInput.close();
-          } catch (Exception e) {
-            LOG.error("Error during the reading of block positions in file [{0}] ", e, name);
-            _errorInOpener.set(true);
-          }
-        }
-      });
-      _openerThread.setName("Block-Position-Reader-" + name);
-      _openerThread.start();
-    }
-
-    private int getBlockLength(int blockId) throws IOException {
-      int newBlockId = blockId + 1;
-      if (newBlockId == _blockCount) {
-        // last block
-        return (int) (_realLength - _SIZES_META_DATA - getBlockPosition(blockId));
-      } else {
-        return (int) (getBlockPosition(newBlockId) - getBlockPosition(blockId));
-      }
-    }
-
-    public long getBlockPosition(int blockId) throws IOException {
-      long position = _blockPositions.get(blockId);
-      while (true) {
-        if (position < 0) {
-          try {
-            Thread.sleep(10);
-          } catch (InterruptedException e) {
-            throw new RuntimeException(e);
-          }
-        } else {
-          return position;
-        }
-        if (_errorInOpener.get()) {
-          throw new IOException("Block positions for file [" + _name + "] can not be read.");
-        }
-        position = _blockPositions.get(blockId);
-      }
-    }
-
-    private static void setupBuffers(CompressedIndexInput_V1 input) {
-      input._blockBuffer = new byte[input._blockSize];
-      int dsize = input._blockSize * 2;
-      if (dsize < _MIN_BUFFER_SIZE) {
-        dsize = _MIN_BUFFER_SIZE;
-      }
-      input._decompressionBuffer = new byte[dsize];
-    }
-
-    public Object clone() {
-      CompressedIndexInput_V1 clone = (CompressedIndexInput_V1) super.clone();
-      clone._isClone = true;
-      clone._indexInput = (IndexInput) _indexInput.clone();
-      setupBuffers(clone);
-      return clone;
-    }
-
-    public void close() throws IOException {
-      if (!_isClone) {
-        _decompressor.end();
-      }
-      _indexInput.close();
-    }
-
-    public long getFilePointer() {
-      return _pos;
-    }
-
-    public long length() {
-      return _origLength;
-    }
-
-    public byte readByte() throws IOException {
-      int blockId = getBlockId();
-      if (blockId != _currentBlockId) {
-        fetchBlock(blockId);
-      }
-      int blockPosition = getBlockPosition();
-      _pos++;
-      return _blockBuffer[blockPosition];
-    }
-
-    public void readBytes(byte[] b, int offset, int len) throws IOException {
-      while (len > 0) {
-        int blockId = getBlockId();
-        if (blockId != _currentBlockId) {
-          fetchBlock(blockId);
-        }
-        int blockPosition = getBlockPosition();
-        int length = Math.min(_blockBufferLength - blockPosition, len);
-        System.arraycopy(_blockBuffer, blockPosition, b, offset, length);
-        _pos += length;
-        len -= length;
-        offset += length;
-      }
-    }
-
-    private int getBlockPosition() {
-      return (int) (_pos % _blockSize);
-    }
-
-    private void fetchBlock(int blockId) throws IOException {
-      long position = getBlockPosition(blockId);
-      int length = getBlockLength(blockId);
-      _indexInput.seek(position);
-      _indexInput.readBytes(_decompressionBuffer, 0, length);
-
-      synchronized (_decompressor) {
-        _decompressor.reset();
-        _decompressor.setInput(_decompressionBuffer, 0, length);
-        _blockBufferLength = _decompressor.decompress(_blockBuffer, 0, _blockBuffer.length);
-      }
-
-      _currentBlockId = blockId;
-    }
-
-    private int getBlockId() {
-      return (int) (_pos / _blockSize);
-    }
-
-    public void seek(long pos) throws IOException {
-      _pos = pos;
-    }
-  }
-
-  public static class CompressedIndexInput_V0 extends IndexInput {
-
-    private static final int _SIZES_META_DATA = 24;
-
-    private final int[] _blockLengths;
-    private final long[] _blockPositions;
-    private final long _realLength;
-    private final long _origLength;
-    private final int _blockSize;
-
-    private IndexInput _indexInput;
-    private long _pos;
-    private boolean _isClone;
-    private long _currentBlockId = -1;
-    private byte[] _blockBuffer;
-    private byte[] _decompressionBuffer;
-    private int _blockBufferLength;
-    private Decompressor _decompressor;
-
-    public CompressedIndexInput_V0(String name, IndexInput indexInput, CompressionCodec codec) throws IOException {
-      super(name);
-      _decompressor = codec.createDecompressor();
-      if (_decompressor == null) {
-        throw new RuntimeException("CompressionCodec [" + codec + "] does not support decompressor on this platform.");
-      }
-      long s1 = System.nanoTime();
-      _indexInput = indexInput;
-      _realLength = _indexInput.length();
-
-      // read meta data
-      _indexInput.seek(_realLength - _SIZES_META_DATA); // 8 - 4 - 4 - 8
-      long metaDataLength = _indexInput.readLong();
-      int blockCount = _indexInput.readInt();
-      _blockSize = _indexInput.readInt();
-      _origLength = _indexInput.readLong();
-      long e1 = System.nanoTime();
-
-      _blockLengths = new int[blockCount];
-      _blockPositions = new long[blockCount];
-
-      long s2 = System.nanoTime();
-      _indexInput.seek(_realLength - _SIZES_META_DATA - metaDataLength);
-      for (int i = 0; i < blockCount; i++) {
-        _blockPositions[i] = _indexInput.readVLong();
-        _blockLengths[i] = _indexInput.readVInt();
-      }
-      long e2 = System.nanoTime();
-
-      setupBuffers(this);
-
-      double total = (e2 - s1) / 1000000.0;
-      double _1st = (e1 - s1) / 1000000.0;
-      double _2nd = (e2 - s2) / 1000000.0;
-      LOG.debug("Took [" + total + " ms] to open [" + _1st + "] [" + _2nd + " with blockCount of " + blockCount + "].");
-    }
-
-    private static void setupBuffers(CompressedIndexInput_V0 input) {
-      input._blockBuffer = new byte[input._blockSize];
-      int dsize = input._blockSize * 2;
-      if (dsize < _MIN_BUFFER_SIZE) {
-        dsize = _MIN_BUFFER_SIZE;
-      }
-      input._decompressionBuffer = new byte[dsize];
-    }
-
-    public Object clone() {
-      CompressedIndexInput_V0 clone = (CompressedIndexInput_V0) super.clone();
-      clone._isClone = true;
-      clone._indexInput = (IndexInput) _indexInput.clone();
-      setupBuffers(clone);
-      return clone;
-    }
-
-    public void close() throws IOException {
-      if (!_isClone) {
-        _decompressor.end();
-      }
-      _indexInput.close();
-    }
-
-    public long getFilePointer() {
-      return _pos;
-    }
-
-    public long length() {
-      return _origLength;
-    }
-
-    public byte readByte() throws IOException {
-      int blockId = getBlockId();
-      if (blockId != _currentBlockId) {
-        fetchBlock(blockId);
-      }
-      int blockPosition = getBlockPosition();
-      _pos++;
-      return _blockBuffer[blockPosition];
-    }
-
-    public void readBytes(byte[] b, int offset, int len) throws IOException {
-      while (len > 0) {
-        int blockId = getBlockId();
-        if (blockId != _currentBlockId) {
-          fetchBlock(blockId);
-        }
-        int blockPosition = getBlockPosition();
-        int length = Math.min(_blockBufferLength - blockPosition, len);
-        System.arraycopy(_blockBuffer, blockPosition, b, offset, length);
-        _pos += length;
-        len -= length;
-        offset += length;
-      }
-    }
-
-    private int getBlockPosition() {
-      return (int) (_pos % _blockSize);
-    }
-
-    private void fetchBlock(int blockId) throws IOException {
-      long position = _blockPositions[blockId];
-      int length = _blockLengths[blockId];
-      _indexInput.seek(position);
-      _indexInput.readBytes(_decompressionBuffer, 0, length);
-
-      synchronized (_decompressor) {
-        _decompressor.reset();
-        _decompressor.setInput(_decompressionBuffer, 0, length);
-        _blockBufferLength = _decompressor.decompress(_blockBuffer, 0, _blockBuffer.length);
-      }
-
-      _currentBlockId = blockId;
-    }
-
-    private int getBlockId() {
-      return (int) (_pos / _blockSize);
-    }
-
-    public void seek(long pos) throws IOException {
-      _pos = pos;
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/a4601422/src/blur-store/src/main/java/org/apache/blur/store/hdfs/BlurLockFactory.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/hdfs/BlurLockFactory.java b/src/blur-store/src/main/java/org/apache/blur/store/hdfs/BlurLockFactory.java
new file mode 100644
index 0000000..9f3bf00
--- /dev/null
+++ b/src/blur-store/src/main/java/org/apache/blur/store/hdfs/BlurLockFactory.java
@@ -0,0 +1,110 @@
+package org.apache.blur.store.hdfs;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.store.Lock;
+import org.apache.lucene.store.LockFactory;
+
+public class BlurLockFactory extends LockFactory {
+  
+  private static final Log LOG = LogFactory.getLog(BlurLockFactory.class);
+
+  private final Configuration _configuration;
+  private final FileSystem _fileSystem;
+  private final String _baseLockKey;
+  private byte[] _lockKey;
+  private final Path _dir;
+
+  public BlurLockFactory(Configuration configuration, Path dir, String host, int pid) throws IOException {
+    _configuration = configuration;
+    _dir = dir;
+    _fileSystem = _dir.getFileSystem(_configuration);
+    _baseLockKey = host + "/" + pid;
+  }
+
+  @Override
+  public Lock makeLock(String lockName) {
+    final Path lockPath = new Path(_dir, lockName);
+    return new Lock() {
+      private boolean _set;
+
+      @Override
+      public boolean obtain() throws IOException {
+        if (_set) {
+          throw new IOException("Lock for [" + _baseLockKey + "] can only be set once.");
+        }
+        try {
+          _lockKey = (_baseLockKey + "/" + System.currentTimeMillis()).getBytes();
+          FSDataOutputStream outputStream = _fileSystem.create(lockPath, true);
+          outputStream.write(_lockKey);
+          outputStream.close();
+        } finally {
+          _set = true;
+        }
+        return true;
+      }
+
+      @Override
+      public void release() throws IOException {
+        _fileSystem.delete(lockPath, false);
+      }
+
+      @Override
+      public boolean isLocked() throws IOException {
+        if (!_set) {
+          LOG.info("The lock has NOT been set.");
+          return false;
+        }
+        if (!_fileSystem.exists(lockPath)) {
+          LOG.info("The lock file has been removed.");
+          return false;
+        }
+        FileStatus fileStatus = _fileSystem.getFileStatus(lockPath);
+        long len = fileStatus.getLen();
+        if (len != _lockKey.length) {
+          LOG.info("The lock file length has changed.");
+          return false;
+        }
+        byte[] buf = new byte[_lockKey.length];
+        FSDataInputStream inputStream = _fileSystem.open(lockPath);
+        inputStream.readFully(buf);
+        inputStream.close();
+        if (Arrays.equals(_lockKey, buf)) {
+          return true;
+        }
+        LOG.info("The lock information has been changed.");
+        return false;
+      }
+    };
+  }
+
+  @Override
+  public void clearLock(String lockName) throws IOException {
+    _fileSystem.delete(new Path(_dir, lockName), false);
+  }
+}
\ No newline at end of file


Mime
View raw message