incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [27/51] [partial] Initial repackage to org.apache.
Date Mon, 03 Sep 2012 03:17:12 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-store/src/main/java/com/nearinfinity/blur/store/compressed/CompressedFieldDataDirectory.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/com/nearinfinity/blur/store/compressed/CompressedFieldDataDirectory.java b/src/blur-store/src/main/java/com/nearinfinity/blur/store/compressed/CompressedFieldDataDirectory.java
deleted file mode 100644
index 634da0f..0000000
--- a/src/blur-store/src/main/java/com/nearinfinity/blur/store/compressed/CompressedFieldDataDirectory.java
+++ /dev/null
@@ -1,813 +0,0 @@
-package com.nearinfinity.blur.store.compressed;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLongArray;
-
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.Compressor;
-import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.IndexInput;
-import org.apache.lucene.store.IndexOutput;
-import org.apache.lucene.store.Lock;
-import org.apache.lucene.store.LockFactory;
-
-import com.nearinfinity.blur.log.Log;
-import com.nearinfinity.blur.log.LogFactory;
-
-public class CompressedFieldDataDirectory extends Directory {
-
-  private static final Log LOG = LogFactory.getLog(CompressedFieldDataDirectory.class);
-
-  private static final int _MIN_BUFFER_SIZE = 100;
-  private static final String FDZ = ".fdz";
-  private static final String FDT = ".fdt";
-  private static final String Z_TMP = ".tmp";
-
-  private static final int COMPRESSED_BUFFER_SIZE = 65536;
-
-  public static CompressionCodec DEFAULT_COMPRESSION = new DefaultCodec();
-
-  private CompressionCodec _compression = DEFAULT_COMPRESSION;
-  private Directory _directory;
-  private int _writingBlockSize;
-
-  public Directory getInnerDirectory() {
-    return _directory;
-  }
-
-  public CompressedFieldDataDirectory(Directory dir) {
-    this(dir, DEFAULT_COMPRESSION);
-  }
-
-  public CompressedFieldDataDirectory(Directory dir, CompressionCodec compression) {
-    this(dir, compression, COMPRESSED_BUFFER_SIZE);
-  }
-
-  public CompressedFieldDataDirectory(Directory dir, CompressionCodec compression, int blockSize) {
-    _directory = dir;
-    if (compression == null) {
-      _compression = DEFAULT_COMPRESSION;
-    } else {
-      _compression = compression;
-    }
-    _writingBlockSize = blockSize;
-  }
-
-  private IndexInput wrapInput(String name) throws IOException {
-    IndexInput indexInput = _directory.openInput(name);
-    int version = getVersion(indexInput);
-    switch (version) {
-    case 0:
-      return new CompressedIndexInput_V0(name, indexInput, _compression);
-    case 1:
-      return new CompressedIndexInput_V1(name, indexInput, _compression);
-    default:
-      throw new RuntimeException("Unknown version [" + version + "]");
-    }
-  }
-
-  private int getVersion(IndexInput indexInput) throws IOException {
-    long length = indexInput.length();
-    indexInput.seek(length - 8);
-    long l = indexInput.readLong();
-    if (l < 0) {
-      return (int) Math.abs(l);
-    } else {
-      return 0;
-    }
-  }
-
-  private IndexOutput wrapOutput(String name) throws IOException {
-    return new CompressedIndexOutput_V0(name, _directory, _compression, _writingBlockSize);
-  }
-
-  public IndexInput openInput(String name) throws IOException {
-    if (compressedFileExists(name)) {
-      return wrapInput(getCompressedName(name));
-    }
-    return _directory.openInput(name);
-  }
-
-  public IndexInput openInput(String name, int bufferSize) throws IOException {
-    if (compressedFileExists(name)) {
-      return wrapInput(getCompressedName(name));
-    }
-    return _directory.openInput(name, bufferSize);
-  }
-
-  private boolean compressedFileExists(String name) throws IOException {
-    if (!name.endsWith(FDT)) {
-      return false;
-    }
-    return _directory.fileExists(getCompressedName(name));
-  }
-
-  private String getCompressedName(String name) {
-    int index = name.lastIndexOf('.');
-    return name.substring(0, index) + FDZ;
-  }
-
-  private String getNormalName(String compressedName) {
-    int index = compressedName.lastIndexOf('.');
-    return compressedName.substring(0, index) + FDT;
-  }
-
-  public IndexOutput createOutput(String name) throws IOException {
-    if (name.endsWith(FDT)) {
-      return wrapOutput(getCompressedName(name));
-    }
-    return _directory.createOutput(name);
-  }
-
-  public void clearLock(String name) throws IOException {
-    _directory.clearLock(name);
-  }
-
-  public void close() throws IOException {
-    _directory.close();
-  }
-
-  public void deleteFile(String name) throws IOException {
-    if (compressedFileExists(name)) {
-      _directory.deleteFile(getCompressedName(name));
-    } else {
-      _directory.deleteFile(name);
-    }
-  }
-
-  public boolean fileExists(String name) throws IOException {
-    if (compressedFileExists(name)) {
-      return true;
-    }
-    return _directory.fileExists(name);
-  }
-
-  public long fileLength(String name) throws IOException {
-    if (compressedFileExists(name)) {
-      IndexInput input = _directory.openInput(getCompressedName(name));
-      try {
-        long length = input.length();
-        input.seek(length - 8);
-        long fileLength = input.readLong();
-        if (fileLength < 0) {
-          input.seek(length - 16);
-          return input.readLong();
-        } else {
-          return fileLength;
-        }
-      } finally {
-        input.close();
-      }
-    }
-    return _directory.fileLength(name);
-  }
-
-  @SuppressWarnings("deprecation")
-  public long fileModified(String name) throws IOException {
-    if (compressedFileExists(name)) {
-      return _directory.fileModified(getCompressedName(name));
-    }
-    return _directory.fileModified(name);
-  }
-
-  public String[] listAll() throws IOException {
-    return fixNames(_directory.listAll());
-  }
-
-  private String[] fixNames(String[] listAll) {
-    for (int i = 0; i < listAll.length; i++) {
-      if (listAll[i].endsWith(FDZ)) {
-        listAll[i] = getNormalName(listAll[i]);
-      }
-    }
-    return listAll;
-  }
-
-  public void touchFile(String name) throws IOException {
-    // do nothing
-  }
-
-  public LockFactory getLockFactory() {
-    return _directory.getLockFactory();
-  }
-
-  public String getLockID() {
-    return _directory.getLockID();
-  }
-
-  public Lock makeLock(String name) {
-    return _directory.makeLock(name);
-  }
-
-  public void setLockFactory(LockFactory lockFactory) throws IOException {
-    _directory.setLockFactory(lockFactory);
-  }
-
-  @SuppressWarnings("deprecation")
-  public void sync(String name) throws IOException {
-    if (compressedFileExists(name)) {
-      _directory.sync(getCompressedName(name));
-    } else {
-      _directory.sync(name);
-    }
-  }
-
-  public String toString() {
-    return _directory.toString();
-  }
-
-  public static class CompressedIndexOutput_V1 extends IndexOutput {
-
-    private static final long VERSION = -1L;
-    private long _position = 0;
-    private IndexOutput _output;
-    private byte[] _buffer;
-    private int _bufferPosition = 0;
-    private byte[] _compressedBuffer;
-    private IndexOutput _tmpOutput;
-    private Directory _directory;
-    private String _name;
-    private int _blockCount;
-    private Compressor _compressor;
-
-    public CompressedIndexOutput_V1(String name, Directory directory, CompressionCodec codec, int blockSize) throws IOException {
-      _compressor = codec.createCompressor();
-      if (_compressor == null) {
-        throw new RuntimeException("CompressionCodec [" + codec + "] does not support compressor on this platform.");
-      }
-      _directory = directory;
-      _name = name;
-      _output = directory.createOutput(name);
-      _tmpOutput = directory.createOutput(name + Z_TMP);
-      _buffer = new byte[blockSize];
-      int dsize = blockSize * 2;
-      if (dsize < _MIN_BUFFER_SIZE) {
-        dsize = _MIN_BUFFER_SIZE;
-      }
-      _compressedBuffer = new byte[dsize];
-    }
-
-    @Override
-    public void writeByte(byte b) throws IOException {
-      _buffer[_bufferPosition] = b;
-      _bufferPosition++;
-      _position++;
-      flushIfNeeded();
-    }
-
-    private void flushIfNeeded() throws IOException {
-      if (_bufferPosition >= _buffer.length) {
-        flushBuffer();
-        _bufferPosition = 0;
-      }
-    }
-
-    private void flushBuffer() throws IOException {
-      if (_bufferPosition > 0) {
-        _compressor.reset();
-        _compressor.setInput(_buffer, 0, _bufferPosition);
-        _compressor.finish();
-
-        long filePointer = _output.getFilePointer();
-
-        int length = _compressor.compress(_compressedBuffer, 0, _compressedBuffer.length);
-
-        _tmpOutput.writeLong(filePointer);
-        _blockCount++;
-        _output.writeBytes(_compressedBuffer, 0, length);
-      }
-    }
-
-    @Override
-    public void writeBytes(byte[] b, int offset, int length) throws IOException {
-      int len = length + offset;
-      for (int i = offset; i < len; i++) {
-        writeByte(b[i]);
-      }
-    }
-
-    @Override
-    public void close() throws IOException {
-      flushBuffer();
-      _tmpOutput.close();
-      IndexInput input = _directory.openInput(_name + Z_TMP);
-      try {
-        long len = input.length();
-        long readCount = 0;
-        while (readCount < len) {
-          int toRead = readCount + _buffer.length > len ? (int) (len - readCount) : _buffer.length;
-          input.readBytes(_buffer, 0, toRead);
-          _output.writeBytes(_buffer, toRead);
-          readCount += toRead;
-        }
-        _output.writeInt(_blockCount);
-        _output.writeInt(_buffer.length);
-        _output.writeLong(_position);
-        _output.writeLong(VERSION);
-      } finally {
-        try {
-          _output.close();
-        } finally {
-          input.close();
-        }
-      }
-      _directory.deleteFile(_name + Z_TMP);
-      _compressor.end();
-    }
-
-    @Override
-    public long getFilePointer() {
-      return _position;
-    }
-
-    @Override
-    public long length() throws IOException {
-      throw new RuntimeException("not supported");
-    }
-
-    @Override
-    public void seek(long pos) throws IOException {
-      throw new RuntimeException("not supported");
-    }
-
-    @Override
-    public void flush() throws IOException {
-
-    }
-  }
-
-  public static class CompressedIndexOutput_V0 extends IndexOutput {
-
-    private long _position = 0;
-    private IndexOutput _output;
-    private byte[] _buffer;
-    private int _bufferPosition = 0;
-    private byte[] _compressedBuffer;
-    private IndexOutput _tmpOutput;
-    private Directory _directory;
-    private String _name;
-    private int _blockCount;
-    private Compressor _compressor;
-
-    public CompressedIndexOutput_V0(String name, Directory directory, CompressionCodec codec, int blockSize) throws IOException {
-      _compressor = codec.createCompressor();
-      if (_compressor == null) {
-        throw new RuntimeException("CompressionCodec [" + codec + "] does not support compressor on this platform.");
-      }
-      _directory = directory;
-      _name = name;
-      _output = directory.createOutput(name);
-      _tmpOutput = directory.createOutput(name + Z_TMP);
-      _buffer = new byte[blockSize];
-      int dsize = blockSize * 2;
-      if (dsize < _MIN_BUFFER_SIZE) {
-        dsize = _MIN_BUFFER_SIZE;
-      }
-      _compressedBuffer = new byte[dsize];
-    }
-
-    @Override
-    public void writeByte(byte b) throws IOException {
-      _buffer[_bufferPosition] = b;
-      _bufferPosition++;
-      _position++;
-      flushIfNeeded();
-    }
-
-    private void flushIfNeeded() throws IOException {
-      if (_bufferPosition >= _buffer.length) {
-        flushBuffer();
-        _bufferPosition = 0;
-      }
-    }
-
-    private void flushBuffer() throws IOException {
-      if (_bufferPosition > 0) {
-        _compressor.reset();
-        _compressor.setInput(_buffer, 0, _bufferPosition);
-        _compressor.finish();
-
-        long filePointer = _output.getFilePointer();
-
-        int length = _compressor.compress(_compressedBuffer, 0, _compressedBuffer.length);
-
-        _tmpOutput.writeVLong(filePointer);
-        _tmpOutput.writeVInt(length);
-        _blockCount++;
-        _output.writeBytes(_compressedBuffer, 0, length);
-      }
-    }
-
-    @Override
-    public void writeBytes(byte[] b, int offset, int length) throws IOException {
-      int len = length + offset;
-      for (int i = offset; i < len; i++) {
-        writeByte(b[i]);
-      }
-    }
-
-    @Override
-    public void close() throws IOException {
-      flushBuffer();
-      _tmpOutput.close();
-      IndexInput input = _directory.openInput(_name + Z_TMP);
-      try {
-        long len = input.length();
-        long readCount = 0;
-        while (readCount < len) {
-          int toRead = readCount + _buffer.length > len ? (int) (len - readCount) : _buffer.length;
-          input.readBytes(_buffer, 0, toRead);
-          _output.writeBytes(_buffer, toRead);
-          readCount += toRead;
-        }
-        _output.writeLong(len);
-        _output.writeInt(_blockCount);
-        _output.writeInt(_buffer.length);
-        _output.writeLong(_position);
-      } finally {
-        try {
-          _output.close();
-        } finally {
-          input.close();
-        }
-      }
-      _directory.deleteFile(_name + Z_TMP);
-      _compressor.end();
-    }
-
-    @Override
-    public long getFilePointer() {
-      return _position;
-    }
-
-    @Override
-    public long length() throws IOException {
-      throw new RuntimeException("not supported");
-    }
-
-    @Override
-    public void seek(long pos) throws IOException {
-      throw new RuntimeException("not supported");
-    }
-
-    @Override
-    public void flush() throws IOException {
-
-    }
-  }
-
-  public static class CompressedIndexInput_V1 extends IndexInput {
-
-    private static final long VERSION = -1l;
-
-    private static final int _SIZES_META_DATA = 24;
-
-    private final AtomicLongArray _blockPositions;
-    private final long _realLength;
-    private final long _origLength;
-    private final int _blockSize;
-
-    private IndexInput _indexInput;
-    private long _pos;
-    private boolean _isClone;
-    private long _currentBlockId = -1;
-    private byte[] _blockBuffer;
-    private byte[] _decompressionBuffer;
-    private int _blockBufferLength;
-    private Decompressor _decompressor;
-    private int _blockCount;
-    private Thread _openerThread;
-    private AtomicBoolean _errorInOpener = new AtomicBoolean(false);
-    private String _name;
-
-    public CompressedIndexInput_V1(String name, IndexInput indexInput, CompressionCodec codec) throws IOException {
-      super(name);
-      _name = name;
-      long s = System.nanoTime();
-      _decompressor = codec.createDecompressor();
-      if (_decompressor == null) {
-        throw new RuntimeException("CompressionCodec [" + codec + "] does not support decompressor on this platform.");
-      }
-      _indexInput = indexInput;
-      _realLength = _indexInput.length();
-
-      // read meta data
-      _indexInput.seek(_realLength - _SIZES_META_DATA); // 8 - 4 - 4 - 8
-      _blockCount = _indexInput.readInt();
-      _blockSize = _indexInput.readInt();
-      _origLength = _indexInput.readLong();
-      long version = _indexInput.readLong();
-      if (version != VERSION) {
-        throw new IOException("Version [" + version + "] mismatch!");
-      }
-
-      _blockPositions = new AtomicLongArray(_blockCount);
-      for (int i = 0; i < _blockCount; i++) {
-        _blockPositions.set(i, -1l);
-      }
-      readBlockPositions((IndexInput) indexInput.clone(), name);
-      setupBuffers(this);
-      long e = System.nanoTime();
-      double total = (e - s) / 1000000.0;
-      LOG.debug("Took [" + total + " ms] to open file [" + name + "].");
-    }
-
-    private void readBlockPositions(final IndexInput indexInput, final String name) throws IOException {
-      _openerThread = new Thread(new Runnable() {
-        @Override
-        public void run() {
-          try {
-            long s = System.nanoTime();
-            long metaDataLength = _blockCount * 8;
-            indexInput.seek(_realLength - _SIZES_META_DATA - metaDataLength);
-            for (int i = 0; i < _blockCount; i++) {
-              _blockPositions.set(i, indexInput.readLong());
-            }
-            long e = System.nanoTime();
-            double total = (e - s) / 1000000.0;
-            LOG.debug("Took [{0} ms] to read block positions with blockCount of [{1}] in file [{2}].", total, _blockCount, name);
-            indexInput.close();
-          } catch (Exception e) {
-            LOG.error("Error during the reading of block positions in file [{0}] ", e, name);
-            _errorInOpener.set(true);
-          }
-        }
-      });
-      _openerThread.setName("Block-Position-Reader-" + name);
-      _openerThread.start();
-    }
-
-    private int getBlockLength(int blockId) throws IOException {
-      int newBlockId = blockId + 1;
-      if (newBlockId == _blockCount) {
-        // last block
-        return (int) (_realLength - _SIZES_META_DATA - getBlockPosition(blockId));
-      } else {
-        return (int) (getBlockPosition(newBlockId) - getBlockPosition(blockId));
-      }
-    }
-
-    public long getBlockPosition(int blockId) throws IOException {
-      long position = _blockPositions.get(blockId);
-      while (true) {
-        if (position < 0) {
-          try {
-            Thread.sleep(10);
-          } catch (InterruptedException e) {
-            throw new RuntimeException(e);
-          }
-        } else {
-          return position;
-        }
-        if (_errorInOpener.get()) {
-          throw new IOException("Block positions for file [" + _name + "] can not be read.");
-        }
-        position = _blockPositions.get(blockId);
-      }
-    }
-
-    private static void setupBuffers(CompressedIndexInput_V1 input) {
-      input._blockBuffer = new byte[input._blockSize];
-      int dsize = input._blockSize * 2;
-      if (dsize < _MIN_BUFFER_SIZE) {
-        dsize = _MIN_BUFFER_SIZE;
-      }
-      input._decompressionBuffer = new byte[dsize];
-    }
-
-    public Object clone() {
-      CompressedIndexInput_V1 clone = (CompressedIndexInput_V1) super.clone();
-      clone._isClone = true;
-      clone._indexInput = (IndexInput) _indexInput.clone();
-      setupBuffers(clone);
-      return clone;
-    }
-
-    public void close() throws IOException {
-      if (!_isClone) {
-        _decompressor.end();
-      }
-      _indexInput.close();
-    }
-
-    public long getFilePointer() {
-      return _pos;
-    }
-
-    public long length() {
-      return _origLength;
-    }
-
-    public byte readByte() throws IOException {
-      int blockId = getBlockId();
-      if (blockId != _currentBlockId) {
-        fetchBlock(blockId);
-      }
-      int blockPosition = getBlockPosition();
-      _pos++;
-      return _blockBuffer[blockPosition];
-    }
-
-    public void readBytes(byte[] b, int offset, int len) throws IOException {
-      while (len > 0) {
-        int blockId = getBlockId();
-        if (blockId != _currentBlockId) {
-          fetchBlock(blockId);
-        }
-        int blockPosition = getBlockPosition();
-        int length = Math.min(_blockBufferLength - blockPosition, len);
-        System.arraycopy(_blockBuffer, blockPosition, b, offset, length);
-        _pos += length;
-        len -= length;
-        offset += length;
-      }
-    }
-
-    private int getBlockPosition() {
-      return (int) (_pos % _blockSize);
-    }
-
-    private void fetchBlock(int blockId) throws IOException {
-      long position = getBlockPosition(blockId);
-      int length = getBlockLength(blockId);
-      _indexInput.seek(position);
-      _indexInput.readBytes(_decompressionBuffer, 0, length);
-
-      synchronized (_decompressor) {
-        _decompressor.reset();
-        _decompressor.setInput(_decompressionBuffer, 0, length);
-        _blockBufferLength = _decompressor.decompress(_blockBuffer, 0, _blockBuffer.length);
-      }
-
-      _currentBlockId = blockId;
-    }
-
-    private int getBlockId() {
-      return (int) (_pos / _blockSize);
-    }
-
-    public void seek(long pos) throws IOException {
-      _pos = pos;
-    }
-  }
-
-  public static class CompressedIndexInput_V0 extends IndexInput {
-
-    private static final int _SIZES_META_DATA = 24;
-
-    private final int[] _blockLengths;
-    private final long[] _blockPositions;
-    private final long _realLength;
-    private final long _origLength;
-    private final int _blockSize;
-
-    private IndexInput _indexInput;
-    private long _pos;
-    private boolean _isClone;
-    private long _currentBlockId = -1;
-    private byte[] _blockBuffer;
-    private byte[] _decompressionBuffer;
-    private int _blockBufferLength;
-    private Decompressor _decompressor;
-
-    public CompressedIndexInput_V0(String name, IndexInput indexInput, CompressionCodec codec) throws IOException {
-      super(name);
-      _decompressor = codec.createDecompressor();
-      if (_decompressor == null) {
-        throw new RuntimeException("CompressionCodec [" + codec + "] does not support decompressor on this platform.");
-      }
-      long s1 = System.nanoTime();
-      _indexInput = indexInput;
-      _realLength = _indexInput.length();
-
-      // read meta data
-      _indexInput.seek(_realLength - _SIZES_META_DATA); // 8 - 4 - 4 - 8
-      long metaDataLength = _indexInput.readLong();
-      int blockCount = _indexInput.readInt();
-      _blockSize = _indexInput.readInt();
-      _origLength = _indexInput.readLong();
-      long e1 = System.nanoTime();
-
-      _blockLengths = new int[blockCount];
-      _blockPositions = new long[blockCount];
-
-      long s2 = System.nanoTime();
-      _indexInput.seek(_realLength - _SIZES_META_DATA - metaDataLength);
-      for (int i = 0; i < blockCount; i++) {
-        _blockPositions[i] = _indexInput.readVLong();
-        _blockLengths[i] = _indexInput.readVInt();
-      }
-      long e2 = System.nanoTime();
-
-      setupBuffers(this);
-
-      double total = (e2 - s1) / 1000000.0;
-      double _1st = (e1 - s1) / 1000000.0;
-      double _2nd = (e2 - s2) / 1000000.0;
-      LOG.debug("Took [" + total + " ms] to open [" + _1st + "] [" + _2nd + " with blockCount of " + blockCount + "].");
-    }
-
-    private static void setupBuffers(CompressedIndexInput_V0 input) {
-      input._blockBuffer = new byte[input._blockSize];
-      int dsize = input._blockSize * 2;
-      if (dsize < _MIN_BUFFER_SIZE) {
-        dsize = _MIN_BUFFER_SIZE;
-      }
-      input._decompressionBuffer = new byte[dsize];
-    }
-
-    public Object clone() {
-      CompressedIndexInput_V0 clone = (CompressedIndexInput_V0) super.clone();
-      clone._isClone = true;
-      clone._indexInput = (IndexInput) _indexInput.clone();
-      setupBuffers(clone);
-      return clone;
-    }
-
-    public void close() throws IOException {
-      if (!_isClone) {
-        _decompressor.end();
-      }
-      _indexInput.close();
-    }
-
-    public long getFilePointer() {
-      return _pos;
-    }
-
-    public long length() {
-      return _origLength;
-    }
-
-    public byte readByte() throws IOException {
-      int blockId = getBlockId();
-      if (blockId != _currentBlockId) {
-        fetchBlock(blockId);
-      }
-      int blockPosition = getBlockPosition();
-      _pos++;
-      return _blockBuffer[blockPosition];
-    }
-
-    public void readBytes(byte[] b, int offset, int len) throws IOException {
-      while (len > 0) {
-        int blockId = getBlockId();
-        if (blockId != _currentBlockId) {
-          fetchBlock(blockId);
-        }
-        int blockPosition = getBlockPosition();
-        int length = Math.min(_blockBufferLength - blockPosition, len);
-        System.arraycopy(_blockBuffer, blockPosition, b, offset, length);
-        _pos += length;
-        len -= length;
-        offset += length;
-      }
-    }
-
-    private int getBlockPosition() {
-      return (int) (_pos % _blockSize);
-    }
-
-    private void fetchBlock(int blockId) throws IOException {
-      long position = _blockPositions[blockId];
-      int length = _blockLengths[blockId];
-      _indexInput.seek(position);
-      _indexInput.readBytes(_decompressionBuffer, 0, length);
-
-      synchronized (_decompressor) {
-        _decompressor.reset();
-        _decompressor.setInput(_decompressionBuffer, 0, length);
-        _blockBufferLength = _decompressor.decompress(_blockBuffer, 0, _blockBuffer.length);
-      }
-
-      _currentBlockId = blockId;
-    }
-
-    private int getBlockId() {
-      return (int) (_pos / _blockSize);
-    }
-
-    public void seek(long pos) throws IOException {
-      _pos = pos;
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-store/src/main/java/com/nearinfinity/blur/store/hdfs/ChangeFileExt.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/com/nearinfinity/blur/store/hdfs/ChangeFileExt.java b/src/blur-store/src/main/java/com/nearinfinity/blur/store/hdfs/ChangeFileExt.java
deleted file mode 100644
index 4050107..0000000
--- a/src/blur-store/src/main/java/com/nearinfinity/blur/store/hdfs/ChangeFileExt.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package com.nearinfinity.blur.store.hdfs;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-public class ChangeFileExt {
-
-  public static void main(String[] args) throws IOException {
-    Path p = new Path(args[0]);
-    FileSystem fileSystem = FileSystem.get(p.toUri(), new Configuration());
-    FileStatus[] listStatus = fileSystem.listStatus(p);
-    for (FileStatus fileStatus : listStatus) {
-      Path path = fileStatus.getPath();
-      fileSystem.rename(path, new Path(path.toString() + ".lf"));
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-store/src/main/java/com/nearinfinity/blur/store/hdfs/ConvertDirectory.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/com/nearinfinity/blur/store/hdfs/ConvertDirectory.java b/src/blur-store/src/main/java/com/nearinfinity/blur/store/hdfs/ConvertDirectory.java
deleted file mode 100644
index 2773a6a..0000000
--- a/src/blur-store/src/main/java/com/nearinfinity/blur/store/hdfs/ConvertDirectory.java
+++ /dev/null
@@ -1,62 +0,0 @@
-package com.nearinfinity.blur.store.hdfs;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.lucene.index.CorruptIndexException;
-import org.apache.lucene.store.LockObtainFailedException;
-
-public class ConvertDirectory {
-
-  public static void main(String[] args) throws CorruptIndexException, LockObtainFailedException, IOException {
-    Path path = new Path(args[0]);
-    convert(path);
-  }
-
-  public static void convert(Path path) throws IOException {
-    FileSystem fileSystem = FileSystem.get(path.toUri(), new Configuration());
-    if (!fileSystem.exists(path)) {
-      System.out.println(path + " does not exists.");
-      return;
-    }
-    FileStatus fileStatus = fileSystem.getFileStatus(path);
-    if (fileStatus.isDir()) {
-      FileStatus[] listStatus = fileSystem.listStatus(path);
-      for (FileStatus status : listStatus) {
-        convert(status.getPath());
-      }
-    } else {
-      System.out.println("Converting file [" + path + "]");
-      HdfsMetaBlock block = new HdfsMetaBlock();
-      block.realPosition = 0;
-      block.logicalPosition = 0;
-      block.length = fileStatus.getLen();
-      FSDataOutputStream outputStream = fileSystem.append(path);
-      block.write(outputStream);
-      outputStream.writeInt(1);
-      outputStream.writeLong(fileStatus.getLen());
-      outputStream.writeInt(HdfsFileWriter.VERSION);
-      outputStream.close();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-store/src/main/java/com/nearinfinity/blur/store/hdfs/CopyFromHdfsLocal.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/com/nearinfinity/blur/store/hdfs/CopyFromHdfsLocal.java b/src/blur-store/src/main/java/com/nearinfinity/blur/store/hdfs/CopyFromHdfsLocal.java
deleted file mode 100644
index c413d46..0000000
--- a/src/blur-store/src/main/java/com/nearinfinity/blur/store/hdfs/CopyFromHdfsLocal.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package com.nearinfinity.blur.store.hdfs;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.FSDirectory;
-
-import com.nearinfinity.blur.store.compressed.CompressedFieldDataDirectory;
-
-public class CopyFromHdfsLocal {
-
-  public static void main(String[] args) throws IOException {
-    Path path = new Path(args[0]);
-    HdfsDirectory src = new HdfsDirectory(path);
-
-    for (String name : src.listAll()) {
-      System.out.println(name);
-    }
-
-    CompressedFieldDataDirectory compressedDirectory = new CompressedFieldDataDirectory(src, new DefaultCodec(), 32768);
-    Directory dest = FSDirectory.open(new File(args[1]));
-
-    for (String name : compressedDirectory.listAll()) {
-      compressedDirectory.copy(dest, name, name);
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-store/src/main/java/com/nearinfinity/blur/store/hdfs/HdfsDirectory.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/com/nearinfinity/blur/store/hdfs/HdfsDirectory.java b/src/blur-store/src/main/java/com/nearinfinity/blur/store/hdfs/HdfsDirectory.java
deleted file mode 100644
index 5d7bfae..0000000
--- a/src/blur-store/src/main/java/com/nearinfinity/blur/store/hdfs/HdfsDirectory.java
+++ /dev/null
@@ -1,377 +0,0 @@
-package com.nearinfinity.blur.store.hdfs;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.IndexInput;
-import org.apache.lucene.store.IndexOutput;
-
-import com.nearinfinity.blur.store.CustomBufferedIndexInput;
-
-public class HdfsDirectory extends Directory {
-
-  public static final int BUFFER_SIZE = 8192;
-
-  private static final String LF_EXT = ".lf";
-  protected static final String SEGMENTS_GEN = "segments.gen";
-  protected static final IndexOutput NULL_WRITER = new NullIndexOutput();
-  protected Path _hdfsDirPath;
-  protected AtomicReference<FileSystem> _fileSystemRef = new AtomicReference<FileSystem>();
-  protected Configuration _configuration;
-
-  public HdfsDirectory(Path hdfsDirPath) throws IOException {
-    _hdfsDirPath = hdfsDirPath;
-    _configuration = new Configuration();
-    reopenFileSystem();
-    try {
-      if (!getFileSystem().exists(hdfsDirPath)) {
-        getFileSystem().mkdirs(hdfsDirPath);
-      }
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-
-  }
-
-  @Override
-  public IndexOutput createOutput(String name) throws IOException {
-    if (SEGMENTS_GEN.equals(name)) {
-      return NULL_WRITER;
-    }
-    name = getRealName(name);
-    HdfsFileWriter writer = new HdfsFileWriter(getFileSystem(), new Path(_hdfsDirPath, name));
-    return new HdfsLayeredIndexOutput(writer);
-  }
-
-  private String getRealName(String name) throws IOException {
-    if (getFileSystem().exists(new Path(_hdfsDirPath, name))) {
-      return name;
-    }
-    return name + LF_EXT;
-  }
-
-  private String[] getNormalNames(List<String> files) {
-    int size = files.size();
-    for (int i = 0; i < size; i++) {
-      String str = files.get(i);
-      files.set(i, toNormalName(str));
-    }
-    return files.toArray(new String[] {});
-  }
-
-  private String toNormalName(String name) {
-    if (name.endsWith(LF_EXT)) {
-      return name.substring(0, name.length() - 3);
-    }
-    return name;
-  }
-
-  @Override
-  public IndexInput openInput(String name) throws IOException {
-    return openInput(name, BUFFER_SIZE);
-  }
-
-  @Override
-  public IndexInput openInput(String name, int bufferSize) throws IOException {
-    name = getRealName(name);
-    if (isLayeredFile(name)) {
-      HdfsFileReader reader = new HdfsFileReader(getFileSystem(), new Path(_hdfsDirPath, name), BUFFER_SIZE);
-      return new HdfsLayeredIndexInput(name, reader, BUFFER_SIZE);
-    } else {
-      return new HdfsNormalIndexInput(name, getFileSystem(), new Path(_hdfsDirPath, name), BUFFER_SIZE);
-    }
-  }
-
-  private boolean isLayeredFile(String name) {
-    if (name.endsWith(LF_EXT)) {
-      return true;
-    }
-    return false;
-  }
-
-  @Override
-  public void deleteFile(String name) throws IOException {
-    name = getRealName(name);
-    if (!fileExists(name)) {
-      throw new FileNotFoundException(name);
-    }
-    getFileSystem().delete(new Path(_hdfsDirPath, name), false);
-  }
-
-  @Override
-  public boolean fileExists(String name) throws IOException {
-    name = getRealName(name);
-    return getFileSystem().exists(new Path(_hdfsDirPath, name));
-  }
-
-  @Override
-  public long fileLength(String name) throws IOException {
-    name = getRealName(name);
-    if (!fileExists(name)) {
-      throw new FileNotFoundException(name);
-    }
-    return HdfsFileReader.getLength(getFileSystem(), new Path(_hdfsDirPath, name));
-  }
-
-  @Override
-  public long fileModified(String name) throws IOException {
-    name = getRealName(name);
-    if (!fileExists(name)) {
-      throw new FileNotFoundException(name);
-    }
-    FileStatus fileStatus = getFileSystem().getFileStatus(new Path(_hdfsDirPath, name));
-    return fileStatus.getModificationTime();
-  }
-
-  @Override
-  public String[] listAll() throws IOException {
-    FileStatus[] listStatus = getFileSystem().listStatus(_hdfsDirPath);
-    List<String> files = new ArrayList<String>();
-    if (listStatus == null) {
-      return new String[] {};
-    }
-    for (FileStatus status : listStatus) {
-      if (!status.isDir()) {
-        files.add(status.getPath().getName());
-      }
-    }
-    return getNormalNames(files);
-  }
-
-  @Override
-  public void touchFile(String name) throws IOException {
-    // do nothing
-  }
-
-  public Path getHdfsDirPath() {
-    return _hdfsDirPath;
-  }
-
-  public FileSystem getFileSystem() {
-    return _fileSystemRef.get();
-  }
-
-  protected void reopenFileSystem() throws IOException {
-    FileSystem fileSystem = FileSystem.get(_hdfsDirPath.toUri(), _configuration);
-    FileSystem oldFs = _fileSystemRef.get();
-    _fileSystemRef.set(fileSystem);
-    if (oldFs != null) {
-      oldFs.close();
-    }
-  }
-
-  static class HdfsLayeredIndexInput extends CustomBufferedIndexInput {
-
-    private HdfsFileReader _reader;
-    private long _length;
-    private boolean isClone;
-
-    public HdfsLayeredIndexInput(String name, HdfsFileReader reader, int bufferSize) {
-      super(name, bufferSize);
-      _reader = reader;
-      _length = _reader.length();
-    }
-
-    @Override
-    protected void closeInternal() throws IOException {
-      if (!isClone) {
-        _reader.close();
-      }
-    }
-
-    @Override
-    public long length() {
-      return _length;
-    }
-
-    @Override
-    public Object clone() {
-      HdfsLayeredIndexInput input = (HdfsLayeredIndexInput) super.clone();
-      input.isClone = true;
-      input._reader = (HdfsFileReader) _reader.clone();
-      return input;
-    }
-
-    @Override
-    protected void readInternal(byte[] b, int offset, int length) throws IOException {
-      long position = getFilePointer();
-      _reader.seek(position);
-      _reader.readBytes(b, offset, length);
-    }
-
-    @Override
-    protected void seekInternal(long pos) throws IOException {
-      // do nothing
-    }
-  }
-
-  static class HdfsNormalIndexInput extends CustomBufferedIndexInput {
-
-    private final FSDataInputStream _inputStream;
-    private final long _length;
-    private boolean _clone = false;
-
-    public HdfsNormalIndexInput(String name, FileSystem fileSystem, Path path, int bufferSize) throws IOException {
-      super(name);
-      FileStatus fileStatus = fileSystem.getFileStatus(path);
-      _length = fileStatus.getLen();
-      _inputStream = fileSystem.open(path, bufferSize);
-    }
-
-    @Override
-    protected void readInternal(byte[] b, int offset, int length) throws IOException {
-      _inputStream.read(getFilePointer(), b, offset, length);
-    }
-
-    @Override
-    protected void seekInternal(long pos) throws IOException {
-
-    }
-
-    @Override
-    protected void closeInternal() throws IOException {
-      if (!_clone) {
-        _inputStream.close();
-      }
-    }
-
-    @Override
-    public long length() {
-      return _length;
-    }
-
-    @Override
-    public Object clone() {
-      HdfsNormalIndexInput clone = (HdfsNormalIndexInput) super.clone();
-      clone._clone = true;
-      return clone;
-    }
-  }
-
-  static class HdfsLayeredIndexOutput extends IndexOutput {
-
-    private HdfsFileWriter _writer;
-
-    public HdfsLayeredIndexOutput(HdfsFileWriter writer) {
-      _writer = writer;
-    }
-
-    @Override
-    public void close() throws IOException {
-      _writer.close();
-    }
-
-    @Override
-    public void flush() throws IOException {
-
-    }
-
-    @Override
-    public long getFilePointer() {
-      return _writer.getPosition();
-    }
-
-    @Override
-    public long length() throws IOException {
-      return _writer.length();
-    }
-
-    @Override
-    public void seek(long pos) throws IOException {
-      _writer.seek(pos);
-    }
-
-    @Override
-    public void writeByte(byte b) throws IOException {
-      _writer.writeByte(b);
-    }
-
-    @Override
-    public void writeBytes(byte[] b, int offset, int length) throws IOException {
-      _writer.writeBytes(b, offset, length);
-    }
-  }
-
-  static class DirectIOHdfsIndexInput extends CustomBufferedIndexInput {
-
-    private long _length;
-    private FSDataInputStream _inputStream;
-    private boolean isClone;
-
-    public DirectIOHdfsIndexInput(String name, FSDataInputStream inputStream, long length) throws IOException {
-      super(name);
-      if (inputStream instanceof DFSDataInputStream) {
-        // This is needed because if the file was in progress of being written
-        // but
-        // was not closed the
-        // length of the file is 0. This will fetch the synced length of the
-        // file.
-        _length = ((DFSDataInputStream) inputStream).getVisibleLength();
-      } else {
-        _length = length;
-      }
-      _inputStream = inputStream;
-    }
-
-    @Override
-    public long length() {
-      return _length;
-    }
-
-    @Override
-    protected void closeInternal() throws IOException {
-      if (!isClone) {
-        _inputStream.close();
-      }
-    }
-
-    @Override
-    protected void seekInternal(long pos) throws IOException {
-
-    }
-
-    @Override
-    protected void readInternal(byte[] b, int offset, int length) throws IOException {
-      synchronized (_inputStream) {
-        _inputStream.readFully(getFilePointer(), b, offset, length);
-      }
-    }
-
-    @Override
-    public Object clone() {
-      DirectIOHdfsIndexInput clone = (DirectIOHdfsIndexInput) super.clone();
-      clone.isClone = true;
-      return clone;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-store/src/main/java/com/nearinfinity/blur/store/hdfs/HdfsFileReader.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/com/nearinfinity/blur/store/hdfs/HdfsFileReader.java b/src/blur-store/src/main/java/com/nearinfinity/blur/store/hdfs/HdfsFileReader.java
deleted file mode 100644
index 9b21cc1..0000000
--- a/src/blur-store/src/main/java/com/nearinfinity/blur/store/hdfs/HdfsFileReader.java
+++ /dev/null
@@ -1,188 +0,0 @@
-package com.nearinfinity.blur.store.hdfs;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.lucene.store.DataInput;
-
-import com.nearinfinity.blur.log.Log;
-import com.nearinfinity.blur.log.LogFactory;
-
-public class HdfsFileReader extends DataInput {
-
-  private static final Log LOG = LogFactory.getLog(HdfsFileReader.class);
-
-  private static final int VERSION = -1;
-
-  private final long _length;
-  private final long _hdfsLength;
-  private final List<HdfsMetaBlock> _metaBlocks;
-  private FSDataInputStream _inputStream;
-  private long _logicalPos;
-  private long _boundary;
-  private long _realPos;
-  private boolean isClone;
-
-  public HdfsFileReader(FileSystem fileSystem, Path path, int bufferSize) throws IOException {
-    if (!fileSystem.exists(path)) {
-      throw new FileNotFoundException(path.toString());
-    }
-    FileStatus fileStatus = fileSystem.getFileStatus(path);
-    _hdfsLength = fileStatus.getLen();
-    _inputStream = fileSystem.open(path, bufferSize);
-
-    // read meta blocks
-    _inputStream.seek(_hdfsLength - 16);
-    int numberOfBlocks = _inputStream.readInt();
-    _length = _inputStream.readLong();
-    int version = _inputStream.readInt();
-    if (version != VERSION) {
-      throw new RuntimeException("Version of file [" + version + "] does not match reader [" + VERSION + "]");
-    }
-    _inputStream.seek(_hdfsLength - 16 - (numberOfBlocks * 24)); // 3 longs per
-                                                                 // block
-    _metaBlocks = new ArrayList<HdfsMetaBlock>(numberOfBlocks);
-    for (int i = 0; i < numberOfBlocks; i++) {
-      HdfsMetaBlock hdfsMetaBlock = new HdfsMetaBlock();
-      hdfsMetaBlock.readFields(_inputStream);
-      _metaBlocks.add(hdfsMetaBlock);
-    }
-    seek(0);
-  }
-
-  public HdfsFileReader(FileSystem fileSystem, Path path) throws IOException {
-    this(fileSystem, path, HdfsDirectory.BUFFER_SIZE);
-  }
-
-  public long getPosition() {
-    return _logicalPos;
-  }
-
-  public long length() {
-    return _length;
-  }
-
-  public void seek(long pos) throws IOException {
-    if (_logicalPos == pos) {
-      return;
-    }
-    _logicalPos = pos;
-    seekInternal();
-  }
-
-  public void close() throws IOException {
-    if (!isClone) {
-      _inputStream.close();
-    }
-  }
-
-  /**
-   * This method should never be used!
-   */
-  @Override
-  public byte readByte() throws IOException {
-    LOG.warn("Should not be used!");
-    byte[] buf = new byte[1];
-    readBytes(buf, 0, 1);
-    return buf[0];
-  }
-
-  @Override
-  public void readBytes(byte[] b, int offset, int len) throws IOException {
-    checkBoundary();
-    // might need to read in multiple stages
-    while (len > 0) {
-      if (_logicalPos >= _boundary) {
-        seekInternal();
-      }
-      int lengthToRead = (int) Math.min(_boundary - _logicalPos, len);
-      _inputStream.read(_realPos, b, offset, lengthToRead);
-      offset += lengthToRead;
-      _logicalPos += lengthToRead;
-      _realPos += lengthToRead;
-      len -= lengthToRead;
-    }
-  }
-
-  private void checkBoundary() throws IOException {
-    if (_boundary == -1l) {
-      throw new IOException("eof");
-    }
-  }
-
-  private void seekInternal() throws IOException {
-    HdfsMetaBlock block = null;
-    for (HdfsMetaBlock b : _metaBlocks) {
-      if (b.containsDataAt(_logicalPos)) {
-        block = b;
-      }
-    }
-    if (block == null) {
-      _boundary = -1l;
-      _realPos = -1l;
-    } else {
-      _realPos = block.getRealPosition(_logicalPos);
-      _boundary = getBoundary(block);
-    }
-  }
-
-  private long getBoundary(HdfsMetaBlock block) {
-    _boundary = block.logicalPosition + block.length;
-    for (HdfsMetaBlock b : _metaBlocks) {
-      if (b.logicalPosition > block.logicalPosition && b.logicalPosition < _boundary && b.logicalPosition >= _logicalPos) {
-        _boundary = b.logicalPosition;
-      }
-    }
-    return _boundary;
-  }
-
-  public static long getLength(FileSystem fileSystem, Path path) throws IOException {
-    FSDataInputStream inputStream = null;
-    try {
-      FileStatus fileStatus = fileSystem.getFileStatus(path);
-      inputStream = fileSystem.open(path);
-      long hdfsLength = fileStatus.getLen();
-      inputStream.seek(hdfsLength - 12);
-      long length = inputStream.readLong();
-      int version = inputStream.readInt();
-      if (version != VERSION) {
-        throw new RuntimeException("Version of file [" + version + "] does not match reader [" + VERSION + "]");
-      }
-      return length;
-    } finally {
-      if (inputStream != null) {
-        inputStream.close();
-      }
-    }
-  }
-
-  @Override
-  public Object clone() {
-    HdfsFileReader reader = (HdfsFileReader) super.clone();
-    reader.isClone = true;
-    return reader;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-store/src/main/java/com/nearinfinity/blur/store/hdfs/HdfsFileWriter.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/com/nearinfinity/blur/store/hdfs/HdfsFileWriter.java b/src/blur-store/src/main/java/com/nearinfinity/blur/store/hdfs/HdfsFileWriter.java
deleted file mode 100644
index 3664ff1..0000000
--- a/src/blur-store/src/main/java/com/nearinfinity/blur/store/hdfs/HdfsFileWriter.java
+++ /dev/null
@@ -1,99 +0,0 @@
-package com.nearinfinity.blur.store.hdfs;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.lucene.store.DataOutput;
-
-public class HdfsFileWriter extends DataOutput {
-
-  public static final int VERSION = -1;
-
-  private FSDataOutputStream _outputStream;
-  private HdfsMetaBlock _block;
-  private List<HdfsMetaBlock> _blocks = new ArrayList<HdfsMetaBlock>();
-  private long _length;
-  private long _currentPosition;
-
-  public HdfsFileWriter(FileSystem fileSystem, Path path) throws IOException {
-    _outputStream = fileSystem.create(path);
-    seek(0);
-  }
-
-  public long length() {
-    return _length;
-  }
-
-  public void seek(long pos) throws IOException {
-    if (_block != null) {
-      _blocks.add(_block);
-    }
-    _block = new HdfsMetaBlock();
-    _block.realPosition = _outputStream.getPos();
-    _block.logicalPosition = pos;
-    _currentPosition = pos;
-  }
-
-  public void close() throws IOException {
-    if (_block != null) {
-      _blocks.add(_block);
-    }
-    flushMetaBlocks();
-    _outputStream.close();
-  }
-
-  private void flushMetaBlocks() throws IOException {
-    for (HdfsMetaBlock block : _blocks) {
-      block.write(_outputStream);
-    }
-    _outputStream.writeInt(_blocks.size());
-    _outputStream.writeLong(length());
-    _outputStream.writeInt(VERSION);
-  }
-
-  @Override
-  public void writeByte(byte b) throws IOException {
-    _outputStream.write(b & 0xFF);
-    _block.length++;
-    _currentPosition++;
-    updateLength();
-  }
-
-  @Override
-  public void writeBytes(byte[] b, int offset, int length) throws IOException {
-    _outputStream.write(b, offset, length);
-    _block.length += length;
-    _currentPosition += length;
-    updateLength();
-  }
-
-  private void updateLength() {
-    if (_currentPosition > _length) {
-      _length = _currentPosition;
-    }
-  }
-
-  public long getPosition() {
-    return _currentPosition;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-store/src/main/java/com/nearinfinity/blur/store/hdfs/HdfsMetaBlock.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/com/nearinfinity/blur/store/hdfs/HdfsMetaBlock.java b/src/blur-store/src/main/java/com/nearinfinity/blur/store/hdfs/HdfsMetaBlock.java
deleted file mode 100644
index 4905ed4..0000000
--- a/src/blur-store/src/main/java/com/nearinfinity/blur/store/hdfs/HdfsMetaBlock.java
+++ /dev/null
@@ -1,61 +0,0 @@
-package com.nearinfinity.blur.store.hdfs;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-
-class HdfsMetaBlock implements Writable {
-  long logicalPosition;
-  long realPosition;
-  long length;
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    logicalPosition = in.readLong();
-    realPosition = in.readLong();
-    length = in.readLong();
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeLong(logicalPosition);
-    out.writeLong(realPosition);
-    out.writeLong(length);
-  }
-
-  boolean containsDataAt(long logicalPos) {
-    if (logicalPos >= logicalPosition && logicalPos < logicalPosition + length) {
-      return true;
-    }
-    return false;
-  }
-
-  long getRealPosition(long logicalPos) {
-    long offset = logicalPos - logicalPosition;
-    long pos = realPosition + offset;
-    return pos;
-  }
-
-  @Override
-  public String toString() {
-    return "HdfsMetaBlock [length=" + length + ", logicalPosition=" + logicalPosition + ", realPosition=" + realPosition + "]";
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-store/src/main/java/com/nearinfinity/blur/store/hdfs/NullIndexOutput.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/com/nearinfinity/blur/store/hdfs/NullIndexOutput.java b/src/blur-store/src/main/java/com/nearinfinity/blur/store/hdfs/NullIndexOutput.java
deleted file mode 100644
index 9ad6147..0000000
--- a/src/blur-store/src/main/java/com/nearinfinity/blur/store/hdfs/NullIndexOutput.java
+++ /dev/null
@@ -1,70 +0,0 @@
-package com.nearinfinity.blur.store.hdfs;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-
-import org.apache.lucene.store.IndexOutput;
-
-public class NullIndexOutput extends IndexOutput {
-
-  private long _pos;
-  private long _length;
-
-  @Override
-  public void close() throws IOException {
-
-  }
-
-  @Override
-  public void flush() throws IOException {
-
-  }
-
-  @Override
-  public long getFilePointer() {
-    return _pos;
-  }
-
-  @Override
-  public long length() throws IOException {
-    return _length;
-  }
-
-  @Override
-  public void seek(long pos) throws IOException {
-    _pos = pos;
-  }
-
-  @Override
-  public void writeByte(byte b) throws IOException {
-    _pos++;
-  }
-
-  @Override
-  public void writeBytes(byte[] b, int offset, int length) throws IOException {
-    _pos += length;
-    updateLength();
-  }
-
-  private void updateLength() {
-    if (_pos > _length) {
-      _length = _pos;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-store/src/main/java/com/nearinfinity/blur/store/lock/BlurLockFactory.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/com/nearinfinity/blur/store/lock/BlurLockFactory.java b/src/blur-store/src/main/java/com/nearinfinity/blur/store/lock/BlurLockFactory.java
deleted file mode 100644
index d2a3141..0000000
--- a/src/blur-store/src/main/java/com/nearinfinity/blur/store/lock/BlurLockFactory.java
+++ /dev/null
@@ -1,102 +0,0 @@
-package com.nearinfinity.blur.store.lock;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.util.Arrays;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.lucene.store.Lock;
-import org.apache.lucene.store.LockFactory;
-
-public class BlurLockFactory extends LockFactory {
-
-  private final Configuration _configuration;
-  private final FileSystem _fileSystem;
-  private final String _baseLockKey;
-  private byte[] _lockKey;
-  private final Path _dir;
-
-  public BlurLockFactory(Configuration configuration, Path dir, String host, int pid) throws IOException {
-    _configuration = configuration;
-    _dir = dir;
-    _fileSystem = _dir.getFileSystem(_configuration);
-    _baseLockKey = host + "/" + pid;
-  }
-
-  @Override
-  public Lock makeLock(String lockName) {
-    final Path lockPath = new Path(_dir, lockName);
-    return new Lock() {
-      private boolean _set;
-
-      @Override
-      public boolean obtain() throws IOException {
-        if (_set) {
-          throw new IOException("Lock for [" + _baseLockKey + "] can only be set once.");
-        }
-        try {
-          _lockKey = (_baseLockKey + "/" + System.currentTimeMillis()).getBytes();
-          FSDataOutputStream outputStream = _fileSystem.create(lockPath, true);
-          outputStream.write(_lockKey);
-          outputStream.close();
-        } finally {
-          _set = true;
-        }
-        return true;
-      }
-
-      @Override
-      public void release() throws IOException {
-        _fileSystem.delete(lockPath, false);
-      }
-
-      @Override
-      public boolean isLocked() throws IOException {
-        if (!_set) {
-          return false;
-        }
-        if (!_fileSystem.exists(lockPath)) {
-          return false;
-        }
-        FileStatus fileStatus = _fileSystem.getFileStatus(lockPath);
-        long len = fileStatus.getLen();
-        if (len != _lockKey.length) {
-          return false;
-        }
-        byte[] buf = new byte[_lockKey.length];
-        FSDataInputStream inputStream = _fileSystem.open(lockPath);
-        inputStream.readFully(buf);
-        inputStream.close();
-        if (Arrays.equals(_lockKey, buf)) {
-          return true;
-        }
-        return false;
-      }
-    };
-  }
-
-  @Override
-  public void clearLock(String lockName) throws IOException {
-    _fileSystem.delete(new Path(_dir, lockName), false);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-store/src/main/java/org/apache/blur/index/IndexWriter.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/index/IndexWriter.java b/src/blur-store/src/main/java/org/apache/blur/index/IndexWriter.java
new file mode 100644
index 0000000..2842514
--- /dev/null
+++ b/src/blur-store/src/main/java/org/apache/blur/index/IndexWriter.java
@@ -0,0 +1,63 @@
+package org.apache.blur.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.lang.reflect.Field;
+
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.Lock;
+import org.apache.lucene.store.LockObtainFailedException;
+
+public class IndexWriter extends org.apache.lucene.index.IndexWriter {
+
+  private Lock internalLock;
+
+  public IndexWriter(Directory d, IndexWriterConfig conf) throws CorruptIndexException, LockObtainFailedException, IOException {
+    super(d, conf);
+    try {
+      internalLock = getInternalLock();
+    } catch (Exception e) {
+      throw new RuntimeException("Could not get the write lock instance.", e);
+    }
+  }
+
+  private Lock getInternalLock() throws SecurityException, NoSuchFieldException, IllegalArgumentException, IllegalAccessException {
+    Field field = org.apache.lucene.index.IndexWriter.class.getDeclaredField("writeLock");
+    field.setAccessible(true);
+    return (Lock) field.get(this);
+  }
+
+  @Override
+  protected void doAfterFlush() throws IOException {
+    super.doAfterFlush();
+    if (!internalLock.isLocked()) {
+      throw new IOException("Lock [" + internalLock + "] no longer has write lock.");
+    }
+  }
+
+  @Override
+  protected void doBeforeFlush() throws IOException {
+    super.doBeforeFlush();
+    if (!internalLock.isLocked()) {
+      throw new IOException("Lock [" + internalLock + "] no longer has write lock.");
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-store/src/main/java/org/apache/blur/lucene/EscapeRewrite.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/lucene/EscapeRewrite.java b/src/blur-store/src/main/java/org/apache/blur/lucene/EscapeRewrite.java
new file mode 100644
index 0000000..b1e0c7b
--- /dev/null
+++ b/src/blur-store/src/main/java/org/apache/blur/lucene/EscapeRewrite.java
@@ -0,0 +1,334 @@
+package org.apache.blur.lucene;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.FieldSelector;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.FieldInfos;
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.index.TermDocs;
+import org.apache.lucene.index.TermEnum;
+import org.apache.lucene.index.TermFreqVector;
+import org.apache.lucene.index.TermPositions;
+import org.apache.lucene.index.TermVectorMapper;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.search.WildcardQuery;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+
+@SuppressWarnings("deprecation")
+public class EscapeRewrite {
+
+  public static void main(String[] args) throws CorruptIndexException, IOException {
+    Directory directory = FSDirectory.open(new File("/Users/amccurry/Documents/workspace/low-lat-lucene-rt/index"));
+    AtomicBoolean running = new AtomicBoolean(true);
+    IndexReader indexReader = IndexReader.open(directory);
+    // IndexReader reader = indexReader;
+    IndexReader reader = wrap(indexReader, running);
+    Query query = new WildcardQuery(new Term("id", "*0*"));
+    // Query query = new TermQuery(new Term("id","0"));
+    escapeIn(running, TimeUnit.SECONDS.toMillis(5));
+    IndexSearcher searcher = new IndexSearcher(reader);
+    long s1 = System.nanoTime();
+    Query rewrite = searcher.rewrite(query);
+    long e1 = System.nanoTime();
+
+    long s2 = System.nanoTime();
+    TopDocs topDocs = searcher.search(rewrite, 10);
+    long e2 = System.nanoTime();
+
+    System.out.println((e1 - s1) / 1000000.0 + " " + rewrite);
+    System.out.println((e2 - s2) / 1000000.0 + " " + topDocs.totalHits);
+  }
+
+  private static void escapeIn(final AtomicBoolean running, final long millis) {
+    new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          Thread.sleep(millis);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+        running.set(false);
+      }
+    }).start();
+  }
+
+  public static IndexReader wrap(IndexReader reader, AtomicBoolean running) {
+    return new IndexReaderRewriteEscape(reader, running);
+  }
+
+  public static class IndexReaderRewriteEscape extends IndexReader {
+    private IndexReader reader;
+    private AtomicBoolean running;
+
+    public IndexReaderRewriteEscape(IndexReader reader, AtomicBoolean running) {
+      this.reader = reader;
+      this.running = running;
+    }
+
+    public String toString() {
+      return reader.toString();
+    }
+
+    public IndexReader reopen() throws CorruptIndexException, IOException {
+      return reader.reopen();
+    }
+
+    public IndexReader reopen(boolean openReadOnly) throws CorruptIndexException, IOException {
+      return reader.reopen(openReadOnly);
+    }
+
+    public IndexReader reopen(IndexCommit commit) throws CorruptIndexException, IOException {
+      return reader.reopen(commit);
+    }
+
+    public IndexReader reopen(IndexWriter writer, boolean applyAllDeletes) throws CorruptIndexException, IOException {
+      return reader.reopen(writer, applyAllDeletes);
+    }
+
+    public Directory directory() {
+      return reader.directory();
+    }
+
+    public long getVersion() {
+      return reader.getVersion();
+    }
+
+    public boolean isOptimized() {
+      return reader.isOptimized();
+    }
+
+    public TermFreqVector[] getTermFreqVectors(int docNumber) throws IOException {
+      return reader.getTermFreqVectors(docNumber);
+    }
+
+    public TermFreqVector getTermFreqVector(int docNumber, String field) throws IOException {
+      return reader.getTermFreqVector(docNumber, field);
+    }
+
+    public void getTermFreqVector(int docNumber, String field, TermVectorMapper mapper) throws IOException {
+      reader.getTermFreqVector(docNumber, field, mapper);
+    }
+
+    public void getTermFreqVector(int docNumber, TermVectorMapper mapper) throws IOException {
+      reader.getTermFreqVector(docNumber, mapper);
+    }
+
+    public int numDocs() {
+      return reader.numDocs();
+    }
+
+    public int maxDoc() {
+      return reader.maxDoc();
+    }
+
+    public Document document(int n, FieldSelector fieldSelector) throws CorruptIndexException, IOException {
+      return reader.document(n, fieldSelector);
+    }
+
+    public boolean hasDeletions() {
+      return reader.hasDeletions();
+    }
+
+    public boolean hasNorms(String field) throws IOException {
+      return reader.hasNorms(field);
+    }
+
+    public int docFreq(Term t) throws IOException {
+      return reader.docFreq(t);
+    }
+
+    public boolean equals(Object arg0) {
+      return reader.equals(arg0);
+    }
+
+    public Map<String, String> getCommitUserData() {
+      return reader.getCommitUserData();
+    }
+
+    public FieldInfos getFieldInfos() {
+      return reader.getFieldInfos();
+    }
+
+    public IndexCommit getIndexCommit() throws IOException {
+      return reader.getIndexCommit();
+    }
+
+    public Object getCoreCacheKey() {
+      return reader.getCoreCacheKey();
+    }
+
+    public Object getDeletesCacheKey() {
+      return reader.getDeletesCacheKey();
+    }
+
+    public long getUniqueTermCount() throws IOException {
+      return reader.getUniqueTermCount();
+    }
+
+    public int getTermInfosIndexDivisor() {
+      return reader.getTermInfosIndexDivisor();
+    }
+
+    public int hashCode() {
+      return reader.hashCode();
+    }
+
+    public boolean isCurrent() throws CorruptIndexException, IOException {
+      return reader.isCurrent();
+    }
+
+    public boolean isDeleted(int n) {
+      return reader.isDeleted(n);
+    }
+
+    public byte[] norms(String field) throws IOException {
+      return reader.norms(field);
+    }
+
+    public void norms(String field, byte[] bytes, int offset) throws IOException {
+      reader.norms(field, bytes, offset);
+    }
+
+    public TermDocs termDocs(Term term) throws IOException {
+      return reader.termDocs(term);
+    }
+
+    public TermDocs termDocs() throws IOException {
+      return reader.termDocs();
+    }
+
+    public TermPositions termPositions() throws IOException {
+      return reader.termPositions();
+    }
+
+    public Object clone() {
+      IndexReaderRewriteEscape clone = (IndexReaderRewriteEscape) super.clone();
+      clone.reader = (IndexReader) reader.clone();
+      return clone;
+    }
+
+    public IndexReader clone(boolean openReadOnly) throws CorruptIndexException, IOException {
+      IndexReaderRewriteEscape clone = (IndexReaderRewriteEscape) super.clone();
+      clone.reader = reader.clone(openReadOnly);
+      return clone;
+    }
+
+    public IndexReader[] getSequentialSubReaders() {
+      return wrap(reader.getSequentialSubReaders(), running);
+    }
+
+    public TermEnum terms() throws IOException {
+      return wrap(reader.terms(), running);
+    }
+
+    public TermEnum terms(Term t) throws IOException {
+      return wrap(reader.terms(t), running);
+    }
+
+    @Override
+    protected void doSetNorm(int doc, String field, byte value) throws CorruptIndexException, IOException {
+      reader.setNorm(doc, field, value);
+    }
+
+    @Override
+    protected void doDelete(int docNum) throws CorruptIndexException, IOException {
+      reader.deleteDocument(docNum);
+    }
+
+    @Override
+    protected void doUndeleteAll() throws CorruptIndexException, IOException {
+      reader.undeleteAll();
+    }
+
+    @Override
+    protected void doCommit(Map<String, String> commitUserData) throws IOException {
+      reader.commit(commitUserData);
+    }
+
+    @Override
+    protected void doClose() throws IOException {
+      reader.close();
+    }
+  }
+
+  public static TermEnum wrap(final TermEnum terms, final AtomicBoolean running) {
+    return new TermEnum() {
+
+      private int count = 0;
+      private boolean quit = false;
+
+      @Override
+      public Term term() {
+        Term term = terms.term();
+        System.out.println(term);
+        return term;
+      }
+
+      @Override
+      public boolean next() throws IOException {
+        if (quit) {
+          return false;
+        }
+        if (count >= 10000) {
+          if (!running.get()) {
+            quit = true;
+          }
+          count = 0;
+        }
+        count++;
+        return terms.next();
+      }
+
+      @Override
+      public int docFreq() {
+        return terms.docFreq();
+      }
+
+      @Override
+      public void close() throws IOException {
+        terms.close();
+      }
+    };
+  }
+
+  public static IndexReader[] wrap(IndexReader[] sequentialSubReaders, AtomicBoolean running) {
+    if (sequentialSubReaders == null) {
+      return null;
+    }
+    IndexReader[] result = new IndexReader[sequentialSubReaders.length];
+    for (int i = 0; i < sequentialSubReaders.length; i++) {
+      result[i] = wrap(sequentialSubReaders[i], running);
+    }
+    return result;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-store/src/main/java/org/apache/blur/lucene/LuceneConstant.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/lucene/LuceneConstant.java b/src/blur-store/src/main/java/org/apache/blur/lucene/LuceneConstant.java
new file mode 100644
index 0000000..b4df0c4
--- /dev/null
+++ b/src/blur-store/src/main/java/org/apache/blur/lucene/LuceneConstant.java
@@ -0,0 +1,25 @@
+package org.apache.blur.lucene;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.lucene.util.Version;
+
+public class LuceneConstant {
+
+  public static final Version LUCENE_VERSION = Version.LUCENE_36;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-store/src/main/java/org/apache/blur/store/BufferStore.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/BufferStore.java b/src/blur-store/src/main/java/org/apache/blur/store/BufferStore.java
new file mode 100644
index 0000000..dbd9fcd
--- /dev/null
+++ b/src/blur-store/src/main/java/org/apache/blur/store/BufferStore.java
@@ -0,0 +1,113 @@
+package org.apache.blur.store;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.blur.BlurConfiguration;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.metrics.BlurMetrics;
+
+
+public class BufferStore {
+
+  private static final Log LOG = LogFactory.getLog(BufferStore.class);
+
+  private static BlockingQueue<byte[]> _1024 = setupBuffers(1024, 1);
+  private static BlockingQueue<byte[]> _8192 = setupBuffers(8192, 1);
+  public static AtomicLong _blurShardBuffercacheLost = new AtomicLong();
+  public static AtomicLong _blurShardBuffercacheAllocate1024 = new AtomicLong();
+  public static AtomicLong _blurShardBuffercacheAllocate8192 = new AtomicLong();
+  public static AtomicLong _blurShardBuffercacheAllocateOther = new AtomicLong();
+
+  public static void init(BlurConfiguration configuration, BlurMetrics metrics) {
+    int _1024Size = configuration.getInt("blur.shard.buffercache.1024", 8192);
+    int _8192Size = configuration.getInt("blur.shard.buffercache.8192", 8192);
+    LOG.info("Initializing the 1024 buffers with [{0}] buffers.", _1024Size);
+    _1024 = setupBuffers(1024, _1024Size);
+    LOG.info("Initializing the 8192 buffers with [{0}] buffers.", _8192Size);
+    _8192 = setupBuffers(8192, _8192Size);
+    _blurShardBuffercacheLost = metrics.blurShardBuffercacheLost;
+    _blurShardBuffercacheAllocate1024 = metrics.blurShardBuffercacheAllocate1024;
+    _blurShardBuffercacheAllocate8192 = metrics.blurShardBuffercacheAllocate8192;
+    _blurShardBuffercacheAllocateOther = metrics.blurShardBuffercacheAllocateOther;
+  }
+
+  private static BlockingQueue<byte[]> setupBuffers(int bufferSize, int count) {
+    BlockingQueue<byte[]> queue = new ArrayBlockingQueue<byte[]>(count);
+    for (int i = 0; i < count; i++) {
+      queue.add(new byte[bufferSize]);
+    }
+    return queue;
+  }
+
+  public static byte[] takeBuffer(int bufferSize) {
+    switch (bufferSize) {
+    case 1024:
+      return newBuffer1024(_1024.poll());
+    case 8192:
+      return newBuffer8192(_8192.poll());
+    default:
+      return newBuffer(bufferSize);
+    }
+  }
+
+  public static void putBuffer(byte[] buffer) {
+    if (buffer == null) {
+      return;
+    }
+    int bufferSize = buffer.length;
+    switch (bufferSize) {
+    case 1024:
+      checkReturn(_1024.offer(buffer));
+      return;
+    case 8192:
+      checkReturn(_8192.offer(buffer));
+      return;
+    }
+  }
+
+  private static void checkReturn(boolean offer) {
+    if (!offer) {
+      _blurShardBuffercacheLost.incrementAndGet();
+    }
+  }
+
+  private static byte[] newBuffer1024(byte[] buf) {
+    if (buf != null) {
+      return buf;
+    }
+    _blurShardBuffercacheAllocate1024.incrementAndGet();
+    return new byte[1024];
+  }
+
+  private static byte[] newBuffer8192(byte[] buf) {
+    if (buf != null) {
+      return buf;
+    }
+    _blurShardBuffercacheAllocate8192.incrementAndGet();
+    return new byte[8192];
+  }
+
+  private static byte[] newBuffer(int size) {
+    _blurShardBuffercacheAllocateOther.incrementAndGet();
+    return new byte[size];
+  }
+}


Mime
View raw message