incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [28/51] [partial] Initial repackage to org.apache.
Date Mon, 03 Sep 2012 03:17:12 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-store/src/main/java/org/apache/blur/store/CustomBufferedIndexInput.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/CustomBufferedIndexInput.java b/src/blur-store/src/main/java/org/apache/blur/store/CustomBufferedIndexInput.java
new file mode 100644
index 0000000..b206ec9
--- /dev/null
+++ b/src/blur-store/src/main/java/org/apache/blur/store/CustomBufferedIndexInput.java
@@ -0,0 +1,277 @@
+package org.apache.blur.store;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+
+public abstract class CustomBufferedIndexInput extends IndexInput {
+
+  public static final int BUFFER_SIZE = 1024;
+
+  private int bufferSize = BUFFER_SIZE;
+
+  protected byte[] buffer;
+
+  private long bufferStart = 0; // position in file of buffer
+  private int bufferLength = 0; // end of valid bytes
+  private int bufferPosition = 0; // next byte to read
+
+  @Override
+  public byte readByte() throws IOException {
+    if (bufferPosition >= bufferLength)
+      refill();
+    return buffer[bufferPosition++];
+  }
+
+  public CustomBufferedIndexInput(String resourceDesc) {
+    this(resourceDesc, BUFFER_SIZE);
+  }
+
+  public CustomBufferedIndexInput(String resourceDesc, int bufferSize) {
+    super(resourceDesc);
+    checkBufferSize(bufferSize);
+    this.bufferSize = bufferSize;
+  }
+
+  private void checkBufferSize(int bufferSize) {
+    if (bufferSize <= 0)
+      throw new IllegalArgumentException("bufferSize must be greater than 0 (got " + bufferSize + ")");
+  }
+
+  @Override
+  public void readBytes(byte[] b, int offset, int len) throws IOException {
+    readBytes(b, offset, len, true);
+  }
+
+  @Override
+  public void readBytes(byte[] b, int offset, int len, boolean useBuffer) throws IOException {
+
+    if (len <= (bufferLength - bufferPosition)) {
+      // the buffer contains enough data to satisfy this request
+      if (len > 0) // to allow b to be null if len is 0...
+        System.arraycopy(buffer, bufferPosition, b, offset, len);
+      bufferPosition += len;
+    } else {
+      // the buffer does not have enough data. First serve all we've got.
+      int available = bufferLength - bufferPosition;
+      if (available > 0) {
+        System.arraycopy(buffer, bufferPosition, b, offset, available);
+        offset += available;
+        len -= available;
+        bufferPosition += available;
+      }
+      // and now, read the remaining 'len' bytes:
+      if (useBuffer && len < bufferSize) {
+        // If the amount left to read is small enough, and
+        // we are allowed to use our buffer, do it in the usual
+        // buffered way: fill the buffer and copy from it:
+        refill();
+        if (bufferLength < len) {
+          // Throw an exception when refill() could not read len bytes:
+          System.arraycopy(buffer, 0, b, offset, bufferLength);
+          throw new IOException("read past EOF");
+        } else {
+          System.arraycopy(buffer, 0, b, offset, len);
+          bufferPosition = len;
+        }
+      } else {
+        // The amount left to read is larger than the buffer
+        // or we've been asked to not use our buffer -
+        // there's no performance reason not to read it all
+        // at once. Note that unlike the previous code of
+        // this function, there is no need to do a seek
+        // here, because there's no need to reread what we
+        // had in the buffer.
+        long after = bufferStart + bufferPosition + len;
+        if (after > length())
+          throw new IOException("read past EOF");
+        readInternal(b, offset, len);
+        bufferStart = after;
+        bufferPosition = 0;
+        bufferLength = 0; // trigger refill() on read
+      }
+    }
+  }
+
+  @Override
+  public int readInt() throws IOException {
+    if (4 <= (bufferLength - bufferPosition)) {
+      return ((buffer[bufferPosition++] & 0xFF) << 24) | ((buffer[bufferPosition++] & 0xFF) << 16) | ((buffer[bufferPosition++] & 0xFF) << 8) | (buffer[bufferPosition++] & 0xFF);
+    } else {
+      return super.readInt();
+    }
+  }
+
+  @Override
+  public long readLong() throws IOException {
+    if (8 <= (bufferLength - bufferPosition)) {
+      final int i1 = ((buffer[bufferPosition++] & 0xff) << 24) | ((buffer[bufferPosition++] & 0xff) << 16) | ((buffer[bufferPosition++] & 0xff) << 8)
+          | (buffer[bufferPosition++] & 0xff);
+      final int i2 = ((buffer[bufferPosition++] & 0xff) << 24) | ((buffer[bufferPosition++] & 0xff) << 16) | ((buffer[bufferPosition++] & 0xff) << 8)
+          | (buffer[bufferPosition++] & 0xff);
+      return (((long) i1) << 32) | (i2 & 0xFFFFFFFFL);
+    } else {
+      return super.readLong();
+    }
+  }
+
+  @Override
+  public int readVInt() throws IOException {
+    if (5 <= (bufferLength - bufferPosition)) {
+      byte b = buffer[bufferPosition++];
+      int i = b & 0x7F;
+      for (int shift = 7; (b & 0x80) != 0; shift += 7) {
+        b = buffer[bufferPosition++];
+        i |= (b & 0x7F) << shift;
+      }
+      return i;
+    } else {
+      return super.readVInt();
+    }
+  }
+
+  @Override
+  public long readVLong() throws IOException {
+    if (9 <= bufferLength - bufferPosition) {
+      byte b = buffer[bufferPosition++];
+      long i = b & 0x7F;
+      for (int shift = 7; (b & 0x80) != 0; shift += 7) {
+        b = buffer[bufferPosition++];
+        i |= (b & 0x7FL) << shift;
+      }
+      return i;
+    } else {
+      return super.readVLong();
+    }
+  }
+
+  private void refill() throws IOException {
+    long start = bufferStart + bufferPosition;
+    long end = start + bufferSize;
+    if (end > length()) // don't read past EOF
+      end = length();
+    int newLength = (int) (end - start);
+    if (newLength <= 0)
+      throw new IOException("read past EOF");
+
+    if (buffer == null) {
+      buffer = BufferStore.takeBuffer(bufferSize);
+      seekInternal(bufferStart);
+    }
+    readInternal(buffer, 0, newLength);
+    bufferLength = newLength;
+    bufferStart = start;
+    bufferPosition = 0;
+  }
+
+  @Override
+  public final void close() throws IOException {
+    closeInternal();
+    BufferStore.putBuffer(buffer);
+    buffer = null;
+  }
+
+  protected abstract void closeInternal() throws IOException;
+
+  /**
+   * Expert: implements buffer refill. Reads bytes from the current position in
+   * the input.
+   * 
+   * @param b
+   *          the array to read bytes into
+   * @param offset
+   *          the offset in the array to start storing bytes
+   * @param length
+   *          the number of bytes to read
+   */
+  protected abstract void readInternal(byte[] b, int offset, int length) throws IOException;
+
+  @Override
+  public long getFilePointer() {
+    return bufferStart + bufferPosition;
+  }
+
+  @Override
+  public void seek(long pos) throws IOException {
+    if (pos >= bufferStart && pos < (bufferStart + bufferLength))
+      bufferPosition = (int) (pos - bufferStart); // seek within buffer
+    else {
+      bufferStart = pos;
+      bufferPosition = 0;
+      bufferLength = 0; // trigger refill() on read()
+      seekInternal(pos);
+    }
+  }
+
+  /**
+   * Expert: implements seek. Sets current position in this file, where the next
+   * {@link #readInternal(byte[],int,int)} will occur.
+   * 
+   * @see #readInternal(byte[],int,int)
+   */
+  protected abstract void seekInternal(long pos) throws IOException;
+
+  @Override
+  public Object clone() {
+    CustomBufferedIndexInput clone = (CustomBufferedIndexInput) super.clone();
+
+    clone.buffer = null;
+    clone.bufferLength = 0;
+    clone.bufferPosition = 0;
+    clone.bufferStart = getFilePointer();
+
+    return clone;
+  }
+
+  /**
+   * Flushes the in-memory bufer to the given output, copying at most
+   * <code>numBytes</code>.
+   * <p>
+   * <b>NOTE:</b> this method does not refill the buffer, however it does
+   * advance the buffer position.
+   * 
+   * @return the number of bytes actually flushed from the in-memory buffer.
+   */
+  protected int flushBuffer(IndexOutput out, long numBytes) throws IOException {
+    int toCopy = bufferLength - bufferPosition;
+    if (toCopy > numBytes) {
+      toCopy = (int) numBytes;
+    }
+    if (toCopy > 0) {
+      out.writeBytes(buffer, bufferPosition, toCopy);
+      bufferPosition += toCopy;
+    }
+    return toCopy;
+  }
+
+  @Override
+  public void copyBytes(IndexOutput out, long numBytes) throws IOException {
+    assert numBytes >= 0 : "numBytes=" + numBytes;
+
+    while (numBytes > 0) {
+      if (bufferLength == bufferPosition) {
+        refill();
+      }
+      numBytes -= flushBuffer(out, numBytes);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockCache.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockCache.java b/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockCache.java
new file mode 100644
index 0000000..02c9dbf
--- /dev/null
+++ b/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockCache.java
@@ -0,0 +1,190 @@
+package org.apache.blur.store.blockcache;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.blur.metrics.BlurMetrics;
+
+import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
+import com.googlecode.concurrentlinkedhashmap.EvictionListener;
+
+public class BlockCache {
+
+  public static final int _128M = 134217728;
+  public static final int _32K = 32768;
+  private final ConcurrentMap<BlockCacheKey, BlockCacheLocation> _cache;
+  private final ByteBuffer[] _slabs;
+  private final BlockLocks[] _locks;
+  private final AtomicInteger[] _lockCounters;
+  private final int _blockSize;
+  private final int _numberOfBlocksPerSlab;
+  private final int _maxEntries;
+  private final BlurMetrics _metrics;
+
+  public BlockCache(BlurMetrics metrics, boolean directAllocation, long totalMemory) {
+    this(metrics, directAllocation, totalMemory, _128M);
+  }
+
+  public BlockCache(BlurMetrics metrics, boolean directAllocation, long totalMemory, int slabSize) {
+    this(metrics, directAllocation, totalMemory, slabSize, _32K);
+  }
+
+  public BlockCache(BlurMetrics metrics, boolean directAllocation, long totalMemory, int slabSize, int blockSize) {
+    _metrics = metrics;
+    _numberOfBlocksPerSlab = slabSize / blockSize;
+    int numberOfSlabs = (int) (totalMemory / slabSize);
+
+    _slabs = new ByteBuffer[numberOfSlabs];
+    _locks = new BlockLocks[numberOfSlabs];
+    _lockCounters = new AtomicInteger[numberOfSlabs];
+    _maxEntries = (_numberOfBlocksPerSlab * numberOfSlabs) - 1;
+    for (int i = 0; i < numberOfSlabs; i++) {
+      if (directAllocation) {
+        _slabs[i] = ByteBuffer.allocateDirect(_numberOfBlocksPerSlab * blockSize);
+      } else {
+        _slabs[i] = ByteBuffer.allocate(_numberOfBlocksPerSlab * blockSize);
+      }
+      _locks[i] = new BlockLocks(_numberOfBlocksPerSlab);
+      _lockCounters[i] = new AtomicInteger();
+    }
+
+    EvictionListener<BlockCacheKey, BlockCacheLocation> listener = new EvictionListener<BlockCacheKey, BlockCacheLocation>() {
+      @Override
+      public void onEviction(BlockCacheKey key, BlockCacheLocation location) {
+        releaseLocation(location);
+      }
+    };
+    _cache = new ConcurrentLinkedHashMap.Builder<BlockCacheKey, BlockCacheLocation>().maximumWeightedCapacity(_maxEntries).listener(listener).build();
+    _blockSize = blockSize;
+  }
+
+  private void releaseLocation(BlockCacheLocation location) {
+    if (location == null) {
+      return;
+    }
+    int slabId = location.getSlabId();
+    int block = location.getBlock();
+    location.setRemoved(true);
+    _locks[slabId].clear(block);
+    _lockCounters[slabId].decrementAndGet();
+    _metrics.blockCacheEviction.incrementAndGet();
+    _metrics.blockCacheSize.decrementAndGet();
+  }
+
+  public boolean store(BlockCacheKey blockCacheKey, byte[] data) {
+    checkLength(data);
+    BlockCacheLocation location = _cache.get(blockCacheKey);
+    boolean newLocation = false;
+    if (location == null) {
+      newLocation = true;
+      location = new BlockCacheLocation();
+      if (!findEmptyLocation(location)) {
+        return false;
+      }
+    }
+    if (location.isRemoved()) {
+      return false;
+    }
+    int slabId = location.getSlabId();
+    int offset = location.getBlock() * _blockSize;
+    ByteBuffer slab = getSlab(slabId);
+    slab.position(offset);
+    slab.put(data, 0, _blockSize);
+    if (newLocation) {
+      releaseLocation(_cache.put(blockCacheKey.clone(), location));
+      _metrics.blockCacheSize.incrementAndGet();
+    }
+    return true;
+  }
+
+  public boolean fetch(BlockCacheKey blockCacheKey, byte[] buffer, int blockOffset, int off, int length) {
+    BlockCacheLocation location = _cache.get(blockCacheKey);
+    if (location == null) {
+      return false;
+    }
+    if (location.isRemoved()) {
+      return false;
+    }
+    int slabId = location.getSlabId();
+    int offset = location.getBlock() * _blockSize;
+    location.touch();
+    ByteBuffer slab = getSlab(slabId);
+    slab.position(offset + blockOffset);
+    slab.get(buffer, off, length);
+    return true;
+  }
+
+  public boolean fetch(BlockCacheKey blockCacheKey, byte[] buffer) {
+    checkLength(buffer);
+    return fetch(blockCacheKey, buffer, 0, 0, _blockSize);
+  }
+
+  private boolean findEmptyLocation(BlockCacheLocation location) {
+    // This is a tight loop that will try and find a location to
+    // place the block before giving up
+    for (int j = 0; j < 10; j++) {
+      OUTER: for (int slabId = 0; slabId < _slabs.length; slabId++) {
+        AtomicInteger bitSetCounter = _lockCounters[slabId];
+        BlockLocks bitSet = _locks[slabId];
+        if (bitSetCounter.get() == _numberOfBlocksPerSlab) {
+          // if bitset is full
+          continue OUTER;
+        }
+        // this check needs to spin, if a lock was attempted but not obtained
+        // the rest of the slab should not be skipped
+        int bit = bitSet.nextClearBit(0);
+        INNER: while (bit != -1) {
+          if (bit >= _numberOfBlocksPerSlab) {
+            // bit set is full
+            continue OUTER;
+          }
+          if (!bitSet.set(bit)) {
+            // lock was not obtained
+            // this restarts at 0 because another block could have been unlocked
+            // while this was executing
+            bit = bitSet.nextClearBit(0);
+            continue INNER;
+          } else {
+            // lock obtained
+            location.setSlabId(slabId);
+            location.setBlock(bit);
+            bitSetCounter.incrementAndGet();
+            return true;
+          }
+        }
+      }
+    }
+    return false;
+  }
+
+  private void checkLength(byte[] buffer) {
+    if (buffer.length != _blockSize) {
+      throw new RuntimeException("Buffer wrong size, expecting [" + _blockSize + "] got [" + buffer.length + "]");
+    }
+  }
+
+  private ByteBuffer getSlab(int slabId) {
+    return _slabs[slabId].duplicate();
+  }
+
+  public int getSize() {
+    return _cache.size();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockCacheKey.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockCacheKey.java b/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockCacheKey.java
new file mode 100644
index 0000000..cab828d
--- /dev/null
+++ b/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockCacheKey.java
@@ -0,0 +1,73 @@
+package org.apache.blur.store.blockcache;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+public class BlockCacheKey implements Cloneable {
+
+  private long _block;
+  private int _file;
+
+  public long getBlock() {
+    return _block;
+  }
+
+  public int getFile() {
+    return _file;
+  }
+
+  public void setBlock(long block) {
+    _block = block;
+  }
+
+  public void setFile(int file) {
+    _file = file;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + (int) (_block ^ (_block >>> 32));
+    result = prime * result + _file;
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    BlockCacheKey other = (BlockCacheKey) obj;
+    if (_block != other._block)
+      return false;
+    if (_file != other._file)
+      return false;
+    return true;
+  }
+
+  @Override
+  public BlockCacheKey clone() {
+    try {
+      return (BlockCacheKey) super.clone();
+    } catch (CloneNotSupportedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockCacheLocation.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockCacheLocation.java b/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockCacheLocation.java
new file mode 100644
index 0000000..61a54b0
--- /dev/null
+++ b/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockCacheLocation.java
@@ -0,0 +1,66 @@
+package org.apache.blur.store.blockcache;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class BlockCacheLocation {
+
+  private int _block;
+  private int _slabId;
+  private long _lastAccess = System.currentTimeMillis();
+  private long _accesses;
+  private AtomicBoolean _removed = new AtomicBoolean(false);
+
+  public void setBlock(int block) {
+    _block = block;
+  }
+
+  public void setSlabId(int slabId) {
+    _slabId = slabId;
+  }
+
+  public int getBlock() {
+    return _block;
+  }
+
+  public int getSlabId() {
+    return _slabId;
+  }
+
+  public void touch() {
+    _lastAccess = System.currentTimeMillis();
+    _accesses++;
+  }
+
+  public long getLastAccess() {
+    return _lastAccess;
+  }
+
+  public long getNumberOfAccesses() {
+    return _accesses;
+  }
+
+  public boolean isRemoved() {
+    return _removed.get();
+  }
+
+  public void setRemoved(boolean removed) {
+    _removed.set(removed);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectory.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectory.java b/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectory.java
new file mode 100644
index 0000000..fed1039
--- /dev/null
+++ b/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectory.java
@@ -0,0 +1,282 @@
+package org.apache.blur.store.blockcache;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Set;
+
+import org.apache.blur.store.BufferStore;
+import org.apache.blur.store.CustomBufferedIndexInput;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.Lock;
+import org.apache.lucene.store.LockFactory;
+
+
+public class BlockDirectory extends Directory {
+
+  public static final long BLOCK_SHIFT = 13; // 2^13 = 8,192 bytes per block
+  public static final long BLOCK_MOD = 0x1FFF;
+  public static final int BLOCK_SIZE = 1 << BLOCK_SHIFT;
+
+  public static long getBlock(long pos) {
+    return pos >>> BLOCK_SHIFT;
+  }
+
+  public static long getPosition(long pos) {
+    return pos & BLOCK_MOD;
+  }
+
+  public static long getRealPosition(long block, long positionInBlock) {
+    return (block << BLOCK_SHIFT) + positionInBlock;
+  }
+
+  public static Cache NO_CACHE = new Cache() {
+
+    @Override
+    public void update(String name, long blockId, byte[] buffer) {
+
+    }
+
+    @Override
+    public boolean fetch(String name, long blockId, int blockOffset, byte[] b, int off, int lengthToReadInBlock) {
+      return false;
+    }
+
+    @Override
+    public void delete(String name) {
+
+    }
+
+    @Override
+    public long size() {
+      return 0;
+    }
+  };
+
+  private Directory _directory;
+  private int _blockSize;
+  private String _dirName;
+  private Cache _cache;
+  private Set<String> _blockCacheFileTypes;
+
+  public BlockDirectory(String dirName, Directory directory) throws IOException {
+    this(dirName, directory, NO_CACHE);
+  }
+
+  public BlockDirectory(String dirName, Directory directory, Cache cache) throws IOException {
+    this(dirName, directory, cache, null);
+  }
+
+  public BlockDirectory(String dirName, Directory directory, Cache cache, Set<String> blockCacheFileTypes) throws IOException {
+    _dirName = dirName;
+    _directory = directory;
+    _blockSize = BLOCK_SIZE;
+    _cache = cache;
+    if (blockCacheFileTypes == null || blockCacheFileTypes.isEmpty()) {
+      _blockCacheFileTypes = null;
+    } else {
+      _blockCacheFileTypes = blockCacheFileTypes;
+    }
+    setLockFactory(directory.getLockFactory());
+  }
+
+  public IndexInput openInput(String name, int bufferSize) throws IOException {
+    final IndexInput source = _directory.openInput(name, _blockSize);
+    if (_blockCacheFileTypes == null || isCachableFile(name)) {
+      return new CachedIndexInput(source, _blockSize, name, getFileCacheName(name), _cache, bufferSize);
+    }
+    return source;
+  }
+
+  private boolean isCachableFile(String name) {
+    for (String ext : _blockCacheFileTypes) {
+      if (name.endsWith(ext)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public IndexInput openInput(final String name) throws IOException {
+    return openInput(name, _blockSize);
+  }
+
+  static class CachedIndexInput extends CustomBufferedIndexInput {
+
+    private IndexInput _source;
+    private int _blockSize;
+    private long _fileLength;
+    private String _cacheName;
+    private Cache _cache;
+
+    public CachedIndexInput(IndexInput source, int blockSize, String name, String cacheName, Cache cache, int bufferSize) {
+      super(name, bufferSize);
+      _source = source;
+      _blockSize = blockSize;
+      _fileLength = source.length();
+      _cacheName = cacheName;
+      _cache = cache;
+    }
+
+    @Override
+    public Object clone() {
+      CachedIndexInput clone = (CachedIndexInput) super.clone();
+      clone._source = (IndexInput) _source.clone();
+      return clone;
+    }
+
+    @Override
+    public long length() {
+      return _source.length();
+    }
+
+    @Override
+    protected void seekInternal(long pos) throws IOException {
+    }
+
+    @Override
+    protected void readInternal(byte[] b, int off, int len) throws IOException {
+      long position = getFilePointer();
+      while (len > 0) {
+        int length = fetchBlock(position, b, off, len);
+        position += length;
+        len -= length;
+        off += length;
+      }
+    }
+
+    private int fetchBlock(long position, byte[] b, int off, int len) throws IOException {
+      // read whole block into cache and then provide needed data
+      long blockId = getBlock(position);
+      int blockOffset = (int) getPosition(position);
+      int lengthToReadInBlock = Math.min(len, _blockSize - blockOffset);
+      if (checkCache(blockId, blockOffset, b, off, lengthToReadInBlock)) {
+        return lengthToReadInBlock;
+      } else {
+        readIntoCacheAndResult(blockId, blockOffset, b, off, lengthToReadInBlock);
+      }
+      return lengthToReadInBlock;
+    }
+
+    private void readIntoCacheAndResult(long blockId, int blockOffset, byte[] b, int off, int lengthToReadInBlock) throws IOException {
+      long position = getRealPosition(blockId, 0);
+      int length = (int) Math.min(_blockSize, _fileLength - position);
+      _source.seek(position);
+
+      byte[] buf = BufferStore.takeBuffer(_blockSize);
+      _source.readBytes(buf, 0, length);
+      System.arraycopy(buf, blockOffset, b, off, lengthToReadInBlock);
+      _cache.update(_cacheName, blockId, buf);
+      BufferStore.putBuffer(buf);
+    }
+
+    private boolean checkCache(long blockId, int blockOffset, byte[] b, int off, int lengthToReadInBlock) {
+      return _cache.fetch(_cacheName, blockId, blockOffset, b, off, lengthToReadInBlock);
+    }
+
+    @Override
+    protected void closeInternal() throws IOException {
+      _source.close();
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    String[] files = listAll();
+    for (String file : files) {
+      _cache.delete(getFileCacheName(file));
+    }
+    _directory.close();
+  }
+
+  private String getFileCacheName(String name) throws IOException {
+    return _dirName + "/" + name + ":" + fileModified(name);
+  }
+
+  public void clearLock(String name) throws IOException {
+    _directory.clearLock(name);
+  }
+
+  public void copy(Directory to, String src, String dest) throws IOException {
+    _directory.copy(to, src, dest);
+  }
+
+  public LockFactory getLockFactory() {
+    return _directory.getLockFactory();
+  }
+
+  public String getLockID() {
+    return _directory.getLockID();
+  }
+
+  public Lock makeLock(String name) {
+    return _directory.makeLock(name);
+  }
+
+  public void setLockFactory(LockFactory lockFactory) throws IOException {
+    _directory.setLockFactory(lockFactory);
+  }
+
+  public void sync(Collection<String> names) throws IOException {
+    _directory.sync(names);
+  }
+
+  @SuppressWarnings("deprecation")
+  public void sync(String name) throws IOException {
+    _directory.sync(name);
+  }
+
+  public String toString() {
+    return _directory.toString();
+  }
+
+  public IndexOutput createOutput(String name) throws IOException {
+    return _directory.createOutput(name);
+  }
+
+  public void deleteFile(String name) throws IOException {
+    _cache.delete(getFileCacheName(name));
+    _directory.deleteFile(name);
+  }
+
+  public boolean fileExists(String name) throws IOException {
+    return _directory.fileExists(name);
+  }
+
+  public long fileLength(String name) throws IOException {
+    return _directory.fileLength(name);
+  }
+
+  @SuppressWarnings("deprecation")
+  public long fileModified(String name) throws IOException {
+    return _directory.fileModified(name);
+  }
+
+  public String[] listAll() throws IOException {
+    return _directory.listAll();
+  }
+
+  @SuppressWarnings("deprecation")
+  public void touchFile(String name) throws IOException {
+    _directory.touchFile(name);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectoryCache.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectoryCache.java b/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectoryCache.java
new file mode 100644
index 0000000..becd4ba
--- /dev/null
+++ b/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockDirectoryCache.java
@@ -0,0 +1,77 @@
+package org.apache.blur.store.blockcache;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.blur.metrics.BlurMetrics;
+
+
+public class BlockDirectoryCache implements Cache {
+  private BlockCache _blockCache;
+  private AtomicInteger _counter = new AtomicInteger();
+  private Map<String, Integer> _names = new ConcurrentHashMap<String, Integer>();
+  private BlurMetrics _blurMetrics;
+
+  public BlockDirectoryCache(BlockCache blockCache, BlurMetrics blurMetrics) {
+    _blockCache = blockCache;
+    _blurMetrics = blurMetrics;
+  }
+
+  @Override
+  public void delete(String name) {
+    _names.remove(name);
+  }
+
+  @Override
+  public void update(String name, long blockId, byte[] buffer) {
+    Integer file = _names.get(name);
+    if (file == null) {
+      file = _counter.incrementAndGet();
+      _names.put(name, file);
+    }
+    BlockCacheKey blockCacheKey = new BlockCacheKey();
+    blockCacheKey.setBlock(blockId);
+    blockCacheKey.setFile(file);
+    _blockCache.store(blockCacheKey, buffer);
+  }
+
+  @Override
+  public boolean fetch(String name, long blockId, int blockOffset, byte[] b, int off, int lengthToReadInBlock) {
+    Integer file = _names.get(name);
+    if (file == null) {
+      return false;
+    }
+    BlockCacheKey blockCacheKey = new BlockCacheKey();
+    blockCacheKey.setBlock(blockId);
+    blockCacheKey.setFile(file);
+    boolean fetch = _blockCache.fetch(blockCacheKey, b, blockOffset, off, lengthToReadInBlock);
+    if (fetch) {
+      _blurMetrics.blockCacheHit.incrementAndGet();
+    } else {
+      _blurMetrics.blockCacheMiss.incrementAndGet();
+    }
+    return fetch;
+  }
+
+  @Override
+  public long size() {
+    return _blockCache.getSize();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockLocks.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockLocks.java b/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockLocks.java
new file mode 100644
index 0000000..502d786
--- /dev/null
+++ b/src/blur-store/src/main/java/org/apache/blur/store/blockcache/BlockLocks.java
@@ -0,0 +1,96 @@
+package org.apache.blur.store.blockcache;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.concurrent.atomic.AtomicLongArray;
+
+import org.apache.lucene.util.BitUtil;
+import org.apache.lucene.util.OpenBitSet;
+
+public class BlockLocks {
+
+  private AtomicLongArray bits;
+  private int wlen;
+
+  public BlockLocks(long numBits) {
+    int length = OpenBitSet.bits2words(numBits);
+    bits = new AtomicLongArray(length);
+    wlen = length;
+  }
+
+  /**
+   * Find the next clear bit in the bit set.
+   * 
+   * @param index
+   * @return
+   */
+  public int nextClearBit(int index) {
+    int i = index >> 6;
+    if (i >= wlen)
+      return -1;
+    int subIndex = index & 0x3f; // index within the word
+    long word = ~bits.get(i) >> subIndex; // skip all the bits to the right of
+                                          // index
+    if (word != 0) {
+      return (i << 6) + subIndex + BitUtil.ntz(word);
+    }
+    while (++i < wlen) {
+      word = ~bits.get(i);
+      if (word != 0) {
+        return (i << 6) + BitUtil.ntz(word);
+      }
+    }
+    return -1;
+  }
+
+  /**
+   * Thread safe set operation that will set the bit if and only if the bit was
+   * not previously set.
+   * 
+   * @param index
+   *          the index position to set.
+   * @return returns true if the bit was set and false if it was already set.
+   */
+  public boolean set(int index) {
+    int wordNum = index >> 6; // div 64
+    int bit = index & 0x3f; // mod 64
+    long bitmask = 1L << bit;
+    long word, oword;
+    do {
+      word = bits.get(wordNum);
+      // if set another thread stole the lock
+      if ((word & bitmask) != 0) {
+        return false;
+      }
+      oword = word;
+      word |= bitmask;
+    } while (!bits.compareAndSet(wordNum, oword, word));
+    return true;
+  }
+
+  public void clear(int index) {
+    int wordNum = index >> 6;
+    int bit = index & 0x03f;
+    long bitmask = 1L << bit;
+    long word, oword;
+    do {
+      word = bits.get(wordNum);
+      oword = word;
+      word &= ~bitmask;
+    } while (!bits.compareAndSet(wordNum, oword, word));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-store/src/main/java/org/apache/blur/store/blockcache/Cache.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/blockcache/Cache.java b/src/blur-store/src/main/java/org/apache/blur/store/blockcache/Cache.java
new file mode 100644
index 0000000..98a66c3
--- /dev/null
+++ b/src/blur-store/src/main/java/org/apache/blur/store/blockcache/Cache.java
@@ -0,0 +1,29 @@
+package org.apache.blur.store.blockcache;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+public interface Cache {
+
+  void delete(String name);
+
+  void update(String name, long blockId, byte[] buffer);
+
+  boolean fetch(String name, long blockId, int blockOffset, byte[] b, int off, int lengthToReadInBlock);
+
+  long size();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-store/src/main/java/org/apache/blur/store/compressed/CompressedFieldDataDirectory.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/compressed/CompressedFieldDataDirectory.java b/src/blur-store/src/main/java/org/apache/blur/store/compressed/CompressedFieldDataDirectory.java
new file mode 100644
index 0000000..368d719
--- /dev/null
+++ b/src/blur-store/src/main/java/org/apache/blur/store/compressed/CompressedFieldDataDirectory.java
@@ -0,0 +1,813 @@
+package org.apache.blur.store.compressed;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLongArray;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.Lock;
+import org.apache.lucene.store.LockFactory;
+
+
+public class CompressedFieldDataDirectory extends Directory {
+
+  private static final Log LOG = LogFactory.getLog(CompressedFieldDataDirectory.class);
+
+  private static final int _MIN_BUFFER_SIZE = 100;
+  private static final String FDZ = ".fdz";
+  private static final String FDT = ".fdt";
+  private static final String Z_TMP = ".tmp";
+
+  private static final int COMPRESSED_BUFFER_SIZE = 65536;
+
+  public static CompressionCodec DEFAULT_COMPRESSION = new DefaultCodec();
+
+  private CompressionCodec _compression = DEFAULT_COMPRESSION;
+  private Directory _directory;
+  private int _writingBlockSize;
+
+  public Directory getInnerDirectory() {
+    return _directory;
+  }
+
+  public CompressedFieldDataDirectory(Directory dir) {
+    this(dir, DEFAULT_COMPRESSION);
+  }
+
+  public CompressedFieldDataDirectory(Directory dir, CompressionCodec compression) {
+    this(dir, compression, COMPRESSED_BUFFER_SIZE);
+  }
+
+  public CompressedFieldDataDirectory(Directory dir, CompressionCodec compression, int blockSize) {
+    _directory = dir;
+    if (compression == null) {
+      _compression = DEFAULT_COMPRESSION;
+    } else {
+      _compression = compression;
+    }
+    _writingBlockSize = blockSize;
+  }
+
+  private IndexInput wrapInput(String name) throws IOException {
+    IndexInput indexInput = _directory.openInput(name);
+    int version = getVersion(indexInput);
+    switch (version) {
+    case 0:
+      return new CompressedIndexInput_V0(name, indexInput, _compression);
+    case 1:
+      return new CompressedIndexInput_V1(name, indexInput, _compression);
+    default:
+      throw new RuntimeException("Unknown version [" + version + "]");
+    }
+  }
+
+  private int getVersion(IndexInput indexInput) throws IOException {
+    long length = indexInput.length();
+    indexInput.seek(length - 8);
+    long l = indexInput.readLong();
+    if (l < 0) {
+      return (int) Math.abs(l);
+    } else {
+      return 0;
+    }
+  }
+
+  private IndexOutput wrapOutput(String name) throws IOException {
+    return new CompressedIndexOutput_V0(name, _directory, _compression, _writingBlockSize);
+  }
+
+  public IndexInput openInput(String name) throws IOException {
+    if (compressedFileExists(name)) {
+      return wrapInput(getCompressedName(name));
+    }
+    return _directory.openInput(name);
+  }
+
+  public IndexInput openInput(String name, int bufferSize) throws IOException {
+    if (compressedFileExists(name)) {
+      return wrapInput(getCompressedName(name));
+    }
+    return _directory.openInput(name, bufferSize);
+  }
+
+  private boolean compressedFileExists(String name) throws IOException {
+    if (!name.endsWith(FDT)) {
+      return false;
+    }
+    return _directory.fileExists(getCompressedName(name));
+  }
+
+  private String getCompressedName(String name) {
+    int index = name.lastIndexOf('.');
+    return name.substring(0, index) + FDZ;
+  }
+
+  private String getNormalName(String compressedName) {
+    int index = compressedName.lastIndexOf('.');
+    return compressedName.substring(0, index) + FDT;
+  }
+
+  public IndexOutput createOutput(String name) throws IOException {
+    if (name.endsWith(FDT)) {
+      return wrapOutput(getCompressedName(name));
+    }
+    return _directory.createOutput(name);
+  }
+
+  public void clearLock(String name) throws IOException {
+    _directory.clearLock(name);
+  }
+
+  public void close() throws IOException {
+    _directory.close();
+  }
+
+  public void deleteFile(String name) throws IOException {
+    if (compressedFileExists(name)) {
+      _directory.deleteFile(getCompressedName(name));
+    } else {
+      _directory.deleteFile(name);
+    }
+  }
+
+  public boolean fileExists(String name) throws IOException {
+    if (compressedFileExists(name)) {
+      return true;
+    }
+    return _directory.fileExists(name);
+  }
+
+  public long fileLength(String name) throws IOException {
+    if (compressedFileExists(name)) {
+      IndexInput input = _directory.openInput(getCompressedName(name));
+      try {
+        long length = input.length();
+        input.seek(length - 8);
+        long fileLength = input.readLong();
+        if (fileLength < 0) {
+          input.seek(length - 16);
+          return input.readLong();
+        } else {
+          return fileLength;
+        }
+      } finally {
+        input.close();
+      }
+    }
+    return _directory.fileLength(name);
+  }
+
+  @SuppressWarnings("deprecation")
+  public long fileModified(String name) throws IOException {
+    if (compressedFileExists(name)) {
+      return _directory.fileModified(getCompressedName(name));
+    }
+    return _directory.fileModified(name);
+  }
+
+  public String[] listAll() throws IOException {
+    return fixNames(_directory.listAll());
+  }
+
+  private String[] fixNames(String[] listAll) {
+    for (int i = 0; i < listAll.length; i++) {
+      if (listAll[i].endsWith(FDZ)) {
+        listAll[i] = getNormalName(listAll[i]);
+      }
+    }
+    return listAll;
+  }
+
+  public void touchFile(String name) throws IOException {
+    // do nothing
+  }
+
+  public LockFactory getLockFactory() {
+    return _directory.getLockFactory();
+  }
+
+  public String getLockID() {
+    return _directory.getLockID();
+  }
+
+  public Lock makeLock(String name) {
+    return _directory.makeLock(name);
+  }
+
+  public void setLockFactory(LockFactory lockFactory) throws IOException {
+    _directory.setLockFactory(lockFactory);
+  }
+
+  @SuppressWarnings("deprecation")
+  public void sync(String name) throws IOException {
+    if (compressedFileExists(name)) {
+      _directory.sync(getCompressedName(name));
+    } else {
+      _directory.sync(name);
+    }
+  }
+
+  public String toString() {
+    return _directory.toString();
+  }
+
+  public static class CompressedIndexOutput_V1 extends IndexOutput {
+
+    private static final long VERSION = -1L;
+    private long _position = 0;
+    private IndexOutput _output;
+    private byte[] _buffer;
+    private int _bufferPosition = 0;
+    private byte[] _compressedBuffer;
+    private IndexOutput _tmpOutput;
+    private Directory _directory;
+    private String _name;
+    private int _blockCount;
+    private Compressor _compressor;
+
+    public CompressedIndexOutput_V1(String name, Directory directory, CompressionCodec codec, int blockSize) throws IOException {
+      _compressor = codec.createCompressor();
+      if (_compressor == null) {
+        throw new RuntimeException("CompressionCodec [" + codec + "] does not support compressor on this platform.");
+      }
+      _directory = directory;
+      _name = name;
+      _output = directory.createOutput(name);
+      _tmpOutput = directory.createOutput(name + Z_TMP);
+      _buffer = new byte[blockSize];
+      int dsize = blockSize * 2;
+      if (dsize < _MIN_BUFFER_SIZE) {
+        dsize = _MIN_BUFFER_SIZE;
+      }
+      _compressedBuffer = new byte[dsize];
+    }
+
+    @Override
+    public void writeByte(byte b) throws IOException {
+      _buffer[_bufferPosition] = b;
+      _bufferPosition++;
+      _position++;
+      flushIfNeeded();
+    }
+
+    private void flushIfNeeded() throws IOException {
+      if (_bufferPosition >= _buffer.length) {
+        flushBuffer();
+        _bufferPosition = 0;
+      }
+    }
+
+    private void flushBuffer() throws IOException {
+      if (_bufferPosition > 0) {
+        _compressor.reset();
+        _compressor.setInput(_buffer, 0, _bufferPosition);
+        _compressor.finish();
+
+        long filePointer = _output.getFilePointer();
+
+        int length = _compressor.compress(_compressedBuffer, 0, _compressedBuffer.length);
+
+        _tmpOutput.writeLong(filePointer);
+        _blockCount++;
+        _output.writeBytes(_compressedBuffer, 0, length);
+      }
+    }
+
+    @Override
+    public void writeBytes(byte[] b, int offset, int length) throws IOException {
+      int len = length + offset;
+      for (int i = offset; i < len; i++) {
+        writeByte(b[i]);
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      flushBuffer();
+      _tmpOutput.close();
+      IndexInput input = _directory.openInput(_name + Z_TMP);
+      try {
+        long len = input.length();
+        long readCount = 0;
+        while (readCount < len) {
+          int toRead = readCount + _buffer.length > len ? (int) (len - readCount) : _buffer.length;
+          input.readBytes(_buffer, 0, toRead);
+          _output.writeBytes(_buffer, toRead);
+          readCount += toRead;
+        }
+        _output.writeInt(_blockCount);
+        _output.writeInt(_buffer.length);
+        _output.writeLong(_position);
+        _output.writeLong(VERSION);
+      } finally {
+        try {
+          _output.close();
+        } finally {
+          input.close();
+        }
+      }
+      _directory.deleteFile(_name + Z_TMP);
+      _compressor.end();
+    }
+
+    @Override
+    public long getFilePointer() {
+      return _position;
+    }
+
+    @Override
+    public long length() throws IOException {
+      throw new RuntimeException("not supported");
+    }
+
+    @Override
+    public void seek(long pos) throws IOException {
+      throw new RuntimeException("not supported");
+    }
+
+    @Override
+    public void flush() throws IOException {
+
+    }
+  }
+
+  public static class CompressedIndexOutput_V0 extends IndexOutput {
+
+    private long _position = 0;
+    private IndexOutput _output;
+    private byte[] _buffer;
+    private int _bufferPosition = 0;
+    private byte[] _compressedBuffer;
+    private IndexOutput _tmpOutput;
+    private Directory _directory;
+    private String _name;
+    private int _blockCount;
+    private Compressor _compressor;
+
+    public CompressedIndexOutput_V0(String name, Directory directory, CompressionCodec codec, int blockSize) throws IOException {
+      _compressor = codec.createCompressor();
+      if (_compressor == null) {
+        throw new RuntimeException("CompressionCodec [" + codec + "] does not support compressor on this platform.");
+      }
+      _directory = directory;
+      _name = name;
+      _output = directory.createOutput(name);
+      _tmpOutput = directory.createOutput(name + Z_TMP);
+      _buffer = new byte[blockSize];
+      int dsize = blockSize * 2;
+      if (dsize < _MIN_BUFFER_SIZE) {
+        dsize = _MIN_BUFFER_SIZE;
+      }
+      _compressedBuffer = new byte[dsize];
+    }
+
+    @Override
+    public void writeByte(byte b) throws IOException {
+      _buffer[_bufferPosition] = b;
+      _bufferPosition++;
+      _position++;
+      flushIfNeeded();
+    }
+
+    private void flushIfNeeded() throws IOException {
+      if (_bufferPosition >= _buffer.length) {
+        flushBuffer();
+        _bufferPosition = 0;
+      }
+    }
+
+    private void flushBuffer() throws IOException {
+      if (_bufferPosition > 0) {
+        _compressor.reset();
+        _compressor.setInput(_buffer, 0, _bufferPosition);
+        _compressor.finish();
+
+        long filePointer = _output.getFilePointer();
+
+        int length = _compressor.compress(_compressedBuffer, 0, _compressedBuffer.length);
+
+        _tmpOutput.writeVLong(filePointer);
+        _tmpOutput.writeVInt(length);
+        _blockCount++;
+        _output.writeBytes(_compressedBuffer, 0, length);
+      }
+    }
+
+    @Override
+    public void writeBytes(byte[] b, int offset, int length) throws IOException {
+      int len = length + offset;
+      for (int i = offset; i < len; i++) {
+        writeByte(b[i]);
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      flushBuffer();
+      _tmpOutput.close();
+      IndexInput input = _directory.openInput(_name + Z_TMP);
+      try {
+        long len = input.length();
+        long readCount = 0;
+        while (readCount < len) {
+          int toRead = readCount + _buffer.length > len ? (int) (len - readCount) : _buffer.length;
+          input.readBytes(_buffer, 0, toRead);
+          _output.writeBytes(_buffer, toRead);
+          readCount += toRead;
+        }
+        _output.writeLong(len);
+        _output.writeInt(_blockCount);
+        _output.writeInt(_buffer.length);
+        _output.writeLong(_position);
+      } finally {
+        try {
+          _output.close();
+        } finally {
+          input.close();
+        }
+      }
+      _directory.deleteFile(_name + Z_TMP);
+      _compressor.end();
+    }
+
+    @Override
+    public long getFilePointer() {
+      return _position;
+    }
+
+    @Override
+    public long length() throws IOException {
+      throw new RuntimeException("not supported");
+    }
+
+    @Override
+    public void seek(long pos) throws IOException {
+      throw new RuntimeException("not supported");
+    }
+
+    @Override
+    public void flush() throws IOException {
+
+    }
+  }
+
+  public static class CompressedIndexInput_V1 extends IndexInput {
+
+    private static final long VERSION = -1l;
+
+    private static final int _SIZES_META_DATA = 24;
+
+    private final AtomicLongArray _blockPositions;
+    private final long _realLength;
+    private final long _origLength;
+    private final int _blockSize;
+
+    private IndexInput _indexInput;
+    private long _pos;
+    private boolean _isClone;
+    private long _currentBlockId = -1;
+    private byte[] _blockBuffer;
+    private byte[] _decompressionBuffer;
+    private int _blockBufferLength;
+    private Decompressor _decompressor;
+    private int _blockCount;
+    private Thread _openerThread;
+    private AtomicBoolean _errorInOpener = new AtomicBoolean(false);
+    private String _name;
+
+    public CompressedIndexInput_V1(String name, IndexInput indexInput, CompressionCodec codec) throws IOException {
+      super(name);
+      _name = name;
+      long s = System.nanoTime();
+      _decompressor = codec.createDecompressor();
+      if (_decompressor == null) {
+        throw new RuntimeException("CompressionCodec [" + codec + "] does not support decompressor on this platform.");
+      }
+      _indexInput = indexInput;
+      _realLength = _indexInput.length();
+
+      // read meta data
+      _indexInput.seek(_realLength - _SIZES_META_DATA); // 8 - 4 - 4 - 8
+      _blockCount = _indexInput.readInt();
+      _blockSize = _indexInput.readInt();
+      _origLength = _indexInput.readLong();
+      long version = _indexInput.readLong();
+      if (version != VERSION) {
+        throw new IOException("Version [" + version + "] mismatch!");
+      }
+
+      _blockPositions = new AtomicLongArray(_blockCount);
+      for (int i = 0; i < _blockCount; i++) {
+        _blockPositions.set(i, -1l);
+      }
+      readBlockPositions((IndexInput) indexInput.clone(), name);
+      setupBuffers(this);
+      long e = System.nanoTime();
+      double total = (e - s) / 1000000.0;
+      LOG.debug("Took [" + total + " ms] to open file [" + name + "].");
+    }
+
+    private void readBlockPositions(final IndexInput indexInput, final String name) throws IOException {
+      _openerThread = new Thread(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            long s = System.nanoTime();
+            long metaDataLength = _blockCount * 8;
+            indexInput.seek(_realLength - _SIZES_META_DATA - metaDataLength);
+            for (int i = 0; i < _blockCount; i++) {
+              _blockPositions.set(i, indexInput.readLong());
+            }
+            long e = System.nanoTime();
+            double total = (e - s) / 1000000.0;
+            LOG.debug("Took [{0} ms] to read block positions with blockCount of [{1}] in file [{2}].", total, _blockCount, name);
+            indexInput.close();
+          } catch (Exception e) {
+            LOG.error("Error during the reading of block positions in file [{0}] ", e, name);
+            _errorInOpener.set(true);
+          }
+        }
+      });
+      _openerThread.setName("Block-Position-Reader-" + name);
+      _openerThread.start();
+    }
+
+    private int getBlockLength(int blockId) throws IOException {
+      int newBlockId = blockId + 1;
+      if (newBlockId == _blockCount) {
+        // last block
+        return (int) (_realLength - _SIZES_META_DATA - getBlockPosition(blockId));
+      } else {
+        return (int) (getBlockPosition(newBlockId) - getBlockPosition(blockId));
+      }
+    }
+
+    public long getBlockPosition(int blockId) throws IOException {
+      long position = _blockPositions.get(blockId);
+      while (true) {
+        if (position < 0) {
+          try {
+            Thread.sleep(10);
+          } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+          }
+        } else {
+          return position;
+        }
+        if (_errorInOpener.get()) {
+          throw new IOException("Block positions for file [" + _name + "] can not be read.");
+        }
+        position = _blockPositions.get(blockId);
+      }
+    }
+
+    private static void setupBuffers(CompressedIndexInput_V1 input) {
+      input._blockBuffer = new byte[input._blockSize];
+      int dsize = input._blockSize * 2;
+      if (dsize < _MIN_BUFFER_SIZE) {
+        dsize = _MIN_BUFFER_SIZE;
+      }
+      input._decompressionBuffer = new byte[dsize];
+    }
+
+    public Object clone() {
+      CompressedIndexInput_V1 clone = (CompressedIndexInput_V1) super.clone();
+      clone._isClone = true;
+      clone._indexInput = (IndexInput) _indexInput.clone();
+      setupBuffers(clone);
+      return clone;
+    }
+
+    public void close() throws IOException {
+      if (!_isClone) {
+        _decompressor.end();
+      }
+      _indexInput.close();
+    }
+
+    public long getFilePointer() {
+      return _pos;
+    }
+
+    public long length() {
+      return _origLength;
+    }
+
+    public byte readByte() throws IOException {
+      int blockId = getBlockId();
+      if (blockId != _currentBlockId) {
+        fetchBlock(blockId);
+      }
+      int blockPosition = getBlockPosition();
+      _pos++;
+      return _blockBuffer[blockPosition];
+    }
+
+    public void readBytes(byte[] b, int offset, int len) throws IOException {
+      while (len > 0) {
+        int blockId = getBlockId();
+        if (blockId != _currentBlockId) {
+          fetchBlock(blockId);
+        }
+        int blockPosition = getBlockPosition();
+        int length = Math.min(_blockBufferLength - blockPosition, len);
+        System.arraycopy(_blockBuffer, blockPosition, b, offset, length);
+        _pos += length;
+        len -= length;
+        offset += length;
+      }
+    }
+
+    private int getBlockPosition() {
+      return (int) (_pos % _blockSize);
+    }
+
+    private void fetchBlock(int blockId) throws IOException {
+      long position = getBlockPosition(blockId);
+      int length = getBlockLength(blockId);
+      _indexInput.seek(position);
+      _indexInput.readBytes(_decompressionBuffer, 0, length);
+
+      synchronized (_decompressor) {
+        _decompressor.reset();
+        _decompressor.setInput(_decompressionBuffer, 0, length);
+        _blockBufferLength = _decompressor.decompress(_blockBuffer, 0, _blockBuffer.length);
+      }
+
+      _currentBlockId = blockId;
+    }
+
+    private int getBlockId() {
+      return (int) (_pos / _blockSize);
+    }
+
+    public void seek(long pos) throws IOException {
+      _pos = pos;
+    }
+  }
+
+  public static class CompressedIndexInput_V0 extends IndexInput {
+
+    private static final int _SIZES_META_DATA = 24;
+
+    private final int[] _blockLengths;
+    private final long[] _blockPositions;
+    private final long _realLength;
+    private final long _origLength;
+    private final int _blockSize;
+
+    private IndexInput _indexInput;
+    private long _pos;
+    private boolean _isClone;
+    private long _currentBlockId = -1;
+    private byte[] _blockBuffer;
+    private byte[] _decompressionBuffer;
+    private int _blockBufferLength;
+    private Decompressor _decompressor;
+
+    public CompressedIndexInput_V0(String name, IndexInput indexInput, CompressionCodec codec) throws IOException {
+      super(name);
+      _decompressor = codec.createDecompressor();
+      if (_decompressor == null) {
+        throw new RuntimeException("CompressionCodec [" + codec + "] does not support decompressor on this platform.");
+      }
+      long s1 = System.nanoTime();
+      _indexInput = indexInput;
+      _realLength = _indexInput.length();
+
+      // read meta data
+      _indexInput.seek(_realLength - _SIZES_META_DATA); // 8 - 4 - 4 - 8
+      long metaDataLength = _indexInput.readLong();
+      int blockCount = _indexInput.readInt();
+      _blockSize = _indexInput.readInt();
+      _origLength = _indexInput.readLong();
+      long e1 = System.nanoTime();
+
+      _blockLengths = new int[blockCount];
+      _blockPositions = new long[blockCount];
+
+      long s2 = System.nanoTime();
+      _indexInput.seek(_realLength - _SIZES_META_DATA - metaDataLength);
+      for (int i = 0; i < blockCount; i++) {
+        _blockPositions[i] = _indexInput.readVLong();
+        _blockLengths[i] = _indexInput.readVInt();
+      }
+      long e2 = System.nanoTime();
+
+      setupBuffers(this);
+
+      double total = (e2 - s1) / 1000000.0;
+      double _1st = (e1 - s1) / 1000000.0;
+      double _2nd = (e2 - s2) / 1000000.0;
+      LOG.debug("Took [" + total + " ms] to open [" + _1st + "] [" + _2nd + " with blockCount of " + blockCount + "].");
+    }
+
+    private static void setupBuffers(CompressedIndexInput_V0 input) {
+      input._blockBuffer = new byte[input._blockSize];
+      int dsize = input._blockSize * 2;
+      if (dsize < _MIN_BUFFER_SIZE) {
+        dsize = _MIN_BUFFER_SIZE;
+      }
+      input._decompressionBuffer = new byte[dsize];
+    }
+
+    public Object clone() {
+      CompressedIndexInput_V0 clone = (CompressedIndexInput_V0) super.clone();
+      clone._isClone = true;
+      clone._indexInput = (IndexInput) _indexInput.clone();
+      setupBuffers(clone);
+      return clone;
+    }
+
+    public void close() throws IOException {
+      if (!_isClone) {
+        _decompressor.end();
+      }
+      _indexInput.close();
+    }
+
+    public long getFilePointer() {
+      return _pos;
+    }
+
+    public long length() {
+      return _origLength;
+    }
+
+    public byte readByte() throws IOException {
+      int blockId = getBlockId();
+      if (blockId != _currentBlockId) {
+        fetchBlock(blockId);
+      }
+      int blockPosition = getBlockPosition();
+      _pos++;
+      return _blockBuffer[blockPosition];
+    }
+
+    public void readBytes(byte[] b, int offset, int len) throws IOException {
+      while (len > 0) {
+        int blockId = getBlockId();
+        if (blockId != _currentBlockId) {
+          fetchBlock(blockId);
+        }
+        int blockPosition = getBlockPosition();
+        int length = Math.min(_blockBufferLength - blockPosition, len);
+        System.arraycopy(_blockBuffer, blockPosition, b, offset, length);
+        _pos += length;
+        len -= length;
+        offset += length;
+      }
+    }
+
+    private int getBlockPosition() {
+      return (int) (_pos % _blockSize);
+    }
+
+    private void fetchBlock(int blockId) throws IOException {
+      long position = _blockPositions[blockId];
+      int length = _blockLengths[blockId];
+      _indexInput.seek(position);
+      _indexInput.readBytes(_decompressionBuffer, 0, length);
+
+      synchronized (_decompressor) {
+        _decompressor.reset();
+        _decompressor.setInput(_decompressionBuffer, 0, length);
+        _blockBufferLength = _decompressor.decompress(_blockBuffer, 0, _blockBuffer.length);
+      }
+
+      _currentBlockId = blockId;
+    }
+
+    private int getBlockId() {
+      return (int) (_pos / _blockSize);
+    }
+
+    public void seek(long pos) throws IOException {
+      _pos = pos;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-store/src/main/java/org/apache/blur/store/hdfs/ChangeFileExt.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/hdfs/ChangeFileExt.java b/src/blur-store/src/main/java/org/apache/blur/store/hdfs/ChangeFileExt.java
new file mode 100644
index 0000000..52f7e02
--- /dev/null
+++ b/src/blur-store/src/main/java/org/apache/blur/store/hdfs/ChangeFileExt.java
@@ -0,0 +1,38 @@
+package org.apache.blur.store.hdfs;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class ChangeFileExt {
+
+  public static void main(String[] args) throws IOException {
+    Path p = new Path(args[0]);
+    FileSystem fileSystem = FileSystem.get(p.toUri(), new Configuration());
+    FileStatus[] listStatus = fileSystem.listStatus(p);
+    for (FileStatus fileStatus : listStatus) {
+      Path path = fileStatus.getPath();
+      fileSystem.rename(path, new Path(path.toString() + ".lf"));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-store/src/main/java/org/apache/blur/store/hdfs/ConvertDirectory.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/hdfs/ConvertDirectory.java b/src/blur-store/src/main/java/org/apache/blur/store/hdfs/ConvertDirectory.java
new file mode 100644
index 0000000..2763ba4
--- /dev/null
+++ b/src/blur-store/src/main/java/org/apache/blur/store/hdfs/ConvertDirectory.java
@@ -0,0 +1,62 @@
+package org.apache.blur.store.hdfs;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.store.LockObtainFailedException;
+
+public class ConvertDirectory {
+
+  public static void main(String[] args) throws CorruptIndexException, LockObtainFailedException, IOException {
+    Path path = new Path(args[0]);
+    convert(path);
+  }
+
+  public static void convert(Path path) throws IOException {
+    FileSystem fileSystem = FileSystem.get(path.toUri(), new Configuration());
+    if (!fileSystem.exists(path)) {
+      System.out.println(path + " does not exists.");
+      return;
+    }
+    FileStatus fileStatus = fileSystem.getFileStatus(path);
+    if (fileStatus.isDir()) {
+      FileStatus[] listStatus = fileSystem.listStatus(path);
+      for (FileStatus status : listStatus) {
+        convert(status.getPath());
+      }
+    } else {
+      System.out.println("Converting file [" + path + "]");
+      HdfsMetaBlock block = new HdfsMetaBlock();
+      block.realPosition = 0;
+      block.logicalPosition = 0;
+      block.length = fileStatus.getLen();
+      FSDataOutputStream outputStream = fileSystem.append(path);
+      block.write(outputStream);
+      outputStream.writeInt(1);
+      outputStream.writeLong(fileStatus.getLen());
+      outputStream.writeInt(HdfsFileWriter.VERSION);
+      outputStream.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-store/src/main/java/org/apache/blur/store/hdfs/CopyFromHdfsLocal.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/hdfs/CopyFromHdfsLocal.java b/src/blur-store/src/main/java/org/apache/blur/store/hdfs/CopyFromHdfsLocal.java
new file mode 100644
index 0000000..1144126
--- /dev/null
+++ b/src/blur-store/src/main/java/org/apache/blur/store/hdfs/CopyFromHdfsLocal.java
@@ -0,0 +1,48 @@
+package org.apache.blur.store.hdfs;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.blur.store.compressed.CompressedFieldDataDirectory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+
+
+public class CopyFromHdfsLocal {
+
+  public static void main(String[] args) throws IOException {
+    Path path = new Path(args[0]);
+    HdfsDirectory src = new HdfsDirectory(path);
+
+    for (String name : src.listAll()) {
+      System.out.println(name);
+    }
+
+    CompressedFieldDataDirectory compressedDirectory = new CompressedFieldDataDirectory(src, new DefaultCodec(), 32768);
+    Directory dest = FSDirectory.open(new File(args[1]));
+
+    for (String name : compressedDirectory.listAll()) {
+      compressedDirectory.copy(dest, name, name);
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java b/src/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
new file mode 100644
index 0000000..e3bc7ca
--- /dev/null
+++ b/src/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
@@ -0,0 +1,377 @@
+package org.apache.blur.store.hdfs;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.blur.store.CustomBufferedIndexInput;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+
+
+public class HdfsDirectory extends Directory {
+
+  public static final int BUFFER_SIZE = 8192;
+
+  private static final String LF_EXT = ".lf";
+  protected static final String SEGMENTS_GEN = "segments.gen";
+  protected static final IndexOutput NULL_WRITER = new NullIndexOutput();
+  protected Path _hdfsDirPath;
+  protected AtomicReference<FileSystem> _fileSystemRef = new AtomicReference<FileSystem>();
+  protected Configuration _configuration;
+
+  public HdfsDirectory(Path hdfsDirPath) throws IOException {
+    _hdfsDirPath = hdfsDirPath;
+    _configuration = new Configuration();
+    reopenFileSystem();
+    try {
+      if (!getFileSystem().exists(hdfsDirPath)) {
+        getFileSystem().mkdirs(hdfsDirPath);
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+
+  }
+
+  @Override
+  public IndexOutput createOutput(String name) throws IOException {
+    if (SEGMENTS_GEN.equals(name)) {
+      return NULL_WRITER;
+    }
+    name = getRealName(name);
+    HdfsFileWriter writer = new HdfsFileWriter(getFileSystem(), new Path(_hdfsDirPath, name));
+    return new HdfsLayeredIndexOutput(writer);
+  }
+
+  private String getRealName(String name) throws IOException {
+    if (getFileSystem().exists(new Path(_hdfsDirPath, name))) {
+      return name;
+    }
+    return name + LF_EXT;
+  }
+
+  private String[] getNormalNames(List<String> files) {
+    int size = files.size();
+    for (int i = 0; i < size; i++) {
+      String str = files.get(i);
+      files.set(i, toNormalName(str));
+    }
+    return files.toArray(new String[] {});
+  }
+
+  private String toNormalName(String name) {
+    if (name.endsWith(LF_EXT)) {
+      return name.substring(0, name.length() - 3);
+    }
+    return name;
+  }
+
+  @Override
+  public IndexInput openInput(String name) throws IOException {
+    return openInput(name, BUFFER_SIZE);
+  }
+
+  @Override
+  public IndexInput openInput(String name, int bufferSize) throws IOException {
+    name = getRealName(name);
+    if (isLayeredFile(name)) {
+      HdfsFileReader reader = new HdfsFileReader(getFileSystem(), new Path(_hdfsDirPath, name), BUFFER_SIZE);
+      return new HdfsLayeredIndexInput(name, reader, BUFFER_SIZE);
+    } else {
+      return new HdfsNormalIndexInput(name, getFileSystem(), new Path(_hdfsDirPath, name), BUFFER_SIZE);
+    }
+  }
+
+  private boolean isLayeredFile(String name) {
+    if (name.endsWith(LF_EXT)) {
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public void deleteFile(String name) throws IOException {
+    name = getRealName(name);
+    if (!fileExists(name)) {
+      throw new FileNotFoundException(name);
+    }
+    getFileSystem().delete(new Path(_hdfsDirPath, name), false);
+  }
+
+  @Override
+  public boolean fileExists(String name) throws IOException {
+    name = getRealName(name);
+    return getFileSystem().exists(new Path(_hdfsDirPath, name));
+  }
+
+  @Override
+  public long fileLength(String name) throws IOException {
+    name = getRealName(name);
+    if (!fileExists(name)) {
+      throw new FileNotFoundException(name);
+    }
+    return HdfsFileReader.getLength(getFileSystem(), new Path(_hdfsDirPath, name));
+  }
+
+  @Override
+  public long fileModified(String name) throws IOException {
+    name = getRealName(name);
+    if (!fileExists(name)) {
+      throw new FileNotFoundException(name);
+    }
+    FileStatus fileStatus = getFileSystem().getFileStatus(new Path(_hdfsDirPath, name));
+    return fileStatus.getModificationTime();
+  }
+
+  @Override
+  public String[] listAll() throws IOException {
+    FileStatus[] listStatus = getFileSystem().listStatus(_hdfsDirPath);
+    List<String> files = new ArrayList<String>();
+    if (listStatus == null) {
+      return new String[] {};
+    }
+    for (FileStatus status : listStatus) {
+      if (!status.isDir()) {
+        files.add(status.getPath().getName());
+      }
+    }
+    return getNormalNames(files);
+  }
+
+  @Override
+  public void touchFile(String name) throws IOException {
+    // do nothing
+  }
+
+  public Path getHdfsDirPath() {
+    return _hdfsDirPath;
+  }
+
+  public FileSystem getFileSystem() {
+    return _fileSystemRef.get();
+  }
+
+  protected void reopenFileSystem() throws IOException {
+    FileSystem fileSystem = FileSystem.get(_hdfsDirPath.toUri(), _configuration);
+    FileSystem oldFs = _fileSystemRef.get();
+    _fileSystemRef.set(fileSystem);
+    if (oldFs != null) {
+      oldFs.close();
+    }
+  }
+
+  static class HdfsLayeredIndexInput extends CustomBufferedIndexInput {
+
+    private HdfsFileReader _reader;
+    private long _length;
+    private boolean isClone;
+
+    public HdfsLayeredIndexInput(String name, HdfsFileReader reader, int bufferSize) {
+      super(name, bufferSize);
+      _reader = reader;
+      _length = _reader.length();
+    }
+
+    @Override
+    protected void closeInternal() throws IOException {
+      if (!isClone) {
+        _reader.close();
+      }
+    }
+
+    @Override
+    public long length() {
+      return _length;
+    }
+
+    @Override
+    public Object clone() {
+      HdfsLayeredIndexInput input = (HdfsLayeredIndexInput) super.clone();
+      input.isClone = true;
+      input._reader = (HdfsFileReader) _reader.clone();
+      return input;
+    }
+
+    @Override
+    protected void readInternal(byte[] b, int offset, int length) throws IOException {
+      long position = getFilePointer();
+      _reader.seek(position);
+      _reader.readBytes(b, offset, length);
+    }
+
+    @Override
+    protected void seekInternal(long pos) throws IOException {
+      // do nothing
+    }
+  }
+
+  static class HdfsNormalIndexInput extends CustomBufferedIndexInput {
+
+    private final FSDataInputStream _inputStream;
+    private final long _length;
+    private boolean _clone = false;
+
+    public HdfsNormalIndexInput(String name, FileSystem fileSystem, Path path, int bufferSize) throws IOException {
+      super(name);
+      FileStatus fileStatus = fileSystem.getFileStatus(path);
+      _length = fileStatus.getLen();
+      _inputStream = fileSystem.open(path, bufferSize);
+    }
+
+    @Override
+    protected void readInternal(byte[] b, int offset, int length) throws IOException {
+      _inputStream.read(getFilePointer(), b, offset, length);
+    }
+
+    @Override
+    protected void seekInternal(long pos) throws IOException {
+
+    }
+
+    @Override
+    protected void closeInternal() throws IOException {
+      if (!_clone) {
+        _inputStream.close();
+      }
+    }
+
+    @Override
+    public long length() {
+      return _length;
+    }
+
+    @Override
+    public Object clone() {
+      HdfsNormalIndexInput clone = (HdfsNormalIndexInput) super.clone();
+      clone._clone = true;
+      return clone;
+    }
+  }
+
+  static class HdfsLayeredIndexOutput extends IndexOutput {
+
+    private HdfsFileWriter _writer;
+
+    public HdfsLayeredIndexOutput(HdfsFileWriter writer) {
+      _writer = writer;
+    }
+
+    @Override
+    public void close() throws IOException {
+      _writer.close();
+    }
+
+    @Override
+    public void flush() throws IOException {
+
+    }
+
+    @Override
+    public long getFilePointer() {
+      return _writer.getPosition();
+    }
+
+    @Override
+    public long length() throws IOException {
+      return _writer.length();
+    }
+
+    @Override
+    public void seek(long pos) throws IOException {
+      _writer.seek(pos);
+    }
+
+    @Override
+    public void writeByte(byte b) throws IOException {
+      _writer.writeByte(b);
+    }
+
+    @Override
+    public void writeBytes(byte[] b, int offset, int length) throws IOException {
+      _writer.writeBytes(b, offset, length);
+    }
+  }
+
+  static class DirectIOHdfsIndexInput extends CustomBufferedIndexInput {
+
+    private long _length;
+    private FSDataInputStream _inputStream;
+    private boolean isClone;
+
+    public DirectIOHdfsIndexInput(String name, FSDataInputStream inputStream, long length) throws IOException {
+      super(name);
+      if (inputStream instanceof DFSDataInputStream) {
+        // This is needed because if the file was in progress of being written
+        // but
+        // was not closed the
+        // length of the file is 0. This will fetch the synced length of the
+        // file.
+        _length = ((DFSDataInputStream) inputStream).getVisibleLength();
+      } else {
+        _length = length;
+      }
+      _inputStream = inputStream;
+    }
+
+    @Override
+    public long length() {
+      return _length;
+    }
+
+    @Override
+    protected void closeInternal() throws IOException {
+      if (!isClone) {
+        _inputStream.close();
+      }
+    }
+
+    @Override
+    protected void seekInternal(long pos) throws IOException {
+
+    }
+
+    @Override
+    protected void readInternal(byte[] b, int offset, int length) throws IOException {
+      synchronized (_inputStream) {
+        _inputStream.readFully(getFilePointer(), b, offset, length);
+      }
+    }
+
+    @Override
+    public Object clone() {
+      DirectIOHdfsIndexInput clone = (DirectIOHdfsIndexInput) super.clone();
+      clone.isClone = true;
+      return clone;
+    }
+  }
+}


Mime
View raw message