incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [5/8] Blur store and pom files have been updated, everything else is still broken.
Date Tue, 16 Oct 2012 00:57:06 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/39539e56/src/blur-store/src/main/java/org/apache/blur/store/compressed/CompressedFieldDataDirectory.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/compressed/CompressedFieldDataDirectory.java b/src/blur-store/src/main/java/org/apache/blur/store/compressed/CompressedFieldDataDirectory.java
deleted file mode 100644
index 368d719..0000000
--- a/src/blur-store/src/main/java/org/apache/blur/store/compressed/CompressedFieldDataDirectory.java
+++ /dev/null
@@ -1,813 +0,0 @@
-package org.apache.blur.store.compressed;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLongArray;
-
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.Compressor;
-import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.IndexInput;
-import org.apache.lucene.store.IndexOutput;
-import org.apache.lucene.store.Lock;
-import org.apache.lucene.store.LockFactory;
-
-
-public class CompressedFieldDataDirectory extends Directory {
-
-  private static final Log LOG = LogFactory.getLog(CompressedFieldDataDirectory.class);
-
-  private static final int _MIN_BUFFER_SIZE = 100;
-  private static final String FDZ = ".fdz";
-  private static final String FDT = ".fdt";
-  private static final String Z_TMP = ".tmp";
-
-  private static final int COMPRESSED_BUFFER_SIZE = 65536;
-
-  public static CompressionCodec DEFAULT_COMPRESSION = new DefaultCodec();
-
-  private CompressionCodec _compression = DEFAULT_COMPRESSION;
-  private Directory _directory;
-  private int _writingBlockSize;
-
-  public Directory getInnerDirectory() {
-    return _directory;
-  }
-
-  public CompressedFieldDataDirectory(Directory dir) {
-    this(dir, DEFAULT_COMPRESSION);
-  }
-
-  public CompressedFieldDataDirectory(Directory dir, CompressionCodec compression) {
-    this(dir, compression, COMPRESSED_BUFFER_SIZE);
-  }
-
-  public CompressedFieldDataDirectory(Directory dir, CompressionCodec compression, int blockSize) {
-    _directory = dir;
-    if (compression == null) {
-      _compression = DEFAULT_COMPRESSION;
-    } else {
-      _compression = compression;
-    }
-    _writingBlockSize = blockSize;
-  }
-
-  private IndexInput wrapInput(String name) throws IOException {
-    IndexInput indexInput = _directory.openInput(name);
-    int version = getVersion(indexInput);
-    switch (version) {
-    case 0:
-      return new CompressedIndexInput_V0(name, indexInput, _compression);
-    case 1:
-      return new CompressedIndexInput_V1(name, indexInput, _compression);
-    default:
-      throw new RuntimeException("Unknown version [" + version + "]");
-    }
-  }
-
-  private int getVersion(IndexInput indexInput) throws IOException {
-    long length = indexInput.length();
-    indexInput.seek(length - 8);
-    long l = indexInput.readLong();
-    if (l < 0) {
-      return (int) Math.abs(l);
-    } else {
-      return 0;
-    }
-  }
-
-  private IndexOutput wrapOutput(String name) throws IOException {
-    return new CompressedIndexOutput_V0(name, _directory, _compression, _writingBlockSize);
-  }
-
-  public IndexInput openInput(String name) throws IOException {
-    if (compressedFileExists(name)) {
-      return wrapInput(getCompressedName(name));
-    }
-    return _directory.openInput(name);
-  }
-
-  public IndexInput openInput(String name, int bufferSize) throws IOException {
-    if (compressedFileExists(name)) {
-      return wrapInput(getCompressedName(name));
-    }
-    return _directory.openInput(name, bufferSize);
-  }
-
-  private boolean compressedFileExists(String name) throws IOException {
-    if (!name.endsWith(FDT)) {
-      return false;
-    }
-    return _directory.fileExists(getCompressedName(name));
-  }
-
-  private String getCompressedName(String name) {
-    int index = name.lastIndexOf('.');
-    return name.substring(0, index) + FDZ;
-  }
-
-  private String getNormalName(String compressedName) {
-    int index = compressedName.lastIndexOf('.');
-    return compressedName.substring(0, index) + FDT;
-  }
-
-  public IndexOutput createOutput(String name) throws IOException {
-    if (name.endsWith(FDT)) {
-      return wrapOutput(getCompressedName(name));
-    }
-    return _directory.createOutput(name);
-  }
-
-  public void clearLock(String name) throws IOException {
-    _directory.clearLock(name);
-  }
-
-  public void close() throws IOException {
-    _directory.close();
-  }
-
-  public void deleteFile(String name) throws IOException {
-    if (compressedFileExists(name)) {
-      _directory.deleteFile(getCompressedName(name));
-    } else {
-      _directory.deleteFile(name);
-    }
-  }
-
-  public boolean fileExists(String name) throws IOException {
-    if (compressedFileExists(name)) {
-      return true;
-    }
-    return _directory.fileExists(name);
-  }
-
-  public long fileLength(String name) throws IOException {
-    if (compressedFileExists(name)) {
-      IndexInput input = _directory.openInput(getCompressedName(name));
-      try {
-        long length = input.length();
-        input.seek(length - 8);
-        long fileLength = input.readLong();
-        if (fileLength < 0) {
-          input.seek(length - 16);
-          return input.readLong();
-        } else {
-          return fileLength;
-        }
-      } finally {
-        input.close();
-      }
-    }
-    return _directory.fileLength(name);
-  }
-
-  @SuppressWarnings("deprecation")
-  public long fileModified(String name) throws IOException {
-    if (compressedFileExists(name)) {
-      return _directory.fileModified(getCompressedName(name));
-    }
-    return _directory.fileModified(name);
-  }
-
-  public String[] listAll() throws IOException {
-    return fixNames(_directory.listAll());
-  }
-
-  private String[] fixNames(String[] listAll) {
-    for (int i = 0; i < listAll.length; i++) {
-      if (listAll[i].endsWith(FDZ)) {
-        listAll[i] = getNormalName(listAll[i]);
-      }
-    }
-    return listAll;
-  }
-
-  public void touchFile(String name) throws IOException {
-    // do nothing
-  }
-
-  public LockFactory getLockFactory() {
-    return _directory.getLockFactory();
-  }
-
-  public String getLockID() {
-    return _directory.getLockID();
-  }
-
-  public Lock makeLock(String name) {
-    return _directory.makeLock(name);
-  }
-
-  public void setLockFactory(LockFactory lockFactory) throws IOException {
-    _directory.setLockFactory(lockFactory);
-  }
-
-  @SuppressWarnings("deprecation")
-  public void sync(String name) throws IOException {
-    if (compressedFileExists(name)) {
-      _directory.sync(getCompressedName(name));
-    } else {
-      _directory.sync(name);
-    }
-  }
-
-  public String toString() {
-    return _directory.toString();
-  }
-
-  public static class CompressedIndexOutput_V1 extends IndexOutput {
-
-    private static final long VERSION = -1L;
-    private long _position = 0;
-    private IndexOutput _output;
-    private byte[] _buffer;
-    private int _bufferPosition = 0;
-    private byte[] _compressedBuffer;
-    private IndexOutput _tmpOutput;
-    private Directory _directory;
-    private String _name;
-    private int _blockCount;
-    private Compressor _compressor;
-
-    public CompressedIndexOutput_V1(String name, Directory directory, CompressionCodec codec, int blockSize) throws IOException {
-      _compressor = codec.createCompressor();
-      if (_compressor == null) {
-        throw new RuntimeException("CompressionCodec [" + codec + "] does not support compressor on this platform.");
-      }
-      _directory = directory;
-      _name = name;
-      _output = directory.createOutput(name);
-      _tmpOutput = directory.createOutput(name + Z_TMP);
-      _buffer = new byte[blockSize];
-      int dsize = blockSize * 2;
-      if (dsize < _MIN_BUFFER_SIZE) {
-        dsize = _MIN_BUFFER_SIZE;
-      }
-      _compressedBuffer = new byte[dsize];
-    }
-
-    @Override
-    public void writeByte(byte b) throws IOException {
-      _buffer[_bufferPosition] = b;
-      _bufferPosition++;
-      _position++;
-      flushIfNeeded();
-    }
-
-    private void flushIfNeeded() throws IOException {
-      if (_bufferPosition >= _buffer.length) {
-        flushBuffer();
-        _bufferPosition = 0;
-      }
-    }
-
-    private void flushBuffer() throws IOException {
-      if (_bufferPosition > 0) {
-        _compressor.reset();
-        _compressor.setInput(_buffer, 0, _bufferPosition);
-        _compressor.finish();
-
-        long filePointer = _output.getFilePointer();
-
-        int length = _compressor.compress(_compressedBuffer, 0, _compressedBuffer.length);
-
-        _tmpOutput.writeLong(filePointer);
-        _blockCount++;
-        _output.writeBytes(_compressedBuffer, 0, length);
-      }
-    }
-
-    @Override
-    public void writeBytes(byte[] b, int offset, int length) throws IOException {
-      int len = length + offset;
-      for (int i = offset; i < len; i++) {
-        writeByte(b[i]);
-      }
-    }
-
-    @Override
-    public void close() throws IOException {
-      flushBuffer();
-      _tmpOutput.close();
-      IndexInput input = _directory.openInput(_name + Z_TMP);
-      try {
-        long len = input.length();
-        long readCount = 0;
-        while (readCount < len) {
-          int toRead = readCount + _buffer.length > len ? (int) (len - readCount) : _buffer.length;
-          input.readBytes(_buffer, 0, toRead);
-          _output.writeBytes(_buffer, toRead);
-          readCount += toRead;
-        }
-        _output.writeInt(_blockCount);
-        _output.writeInt(_buffer.length);
-        _output.writeLong(_position);
-        _output.writeLong(VERSION);
-      } finally {
-        try {
-          _output.close();
-        } finally {
-          input.close();
-        }
-      }
-      _directory.deleteFile(_name + Z_TMP);
-      _compressor.end();
-    }
-
-    @Override
-    public long getFilePointer() {
-      return _position;
-    }
-
-    @Override
-    public long length() throws IOException {
-      throw new RuntimeException("not supported");
-    }
-
-    @Override
-    public void seek(long pos) throws IOException {
-      throw new RuntimeException("not supported");
-    }
-
-    @Override
-    public void flush() throws IOException {
-
-    }
-  }
-
-  public static class CompressedIndexOutput_V0 extends IndexOutput {
-
-    private long _position = 0;
-    private IndexOutput _output;
-    private byte[] _buffer;
-    private int _bufferPosition = 0;
-    private byte[] _compressedBuffer;
-    private IndexOutput _tmpOutput;
-    private Directory _directory;
-    private String _name;
-    private int _blockCount;
-    private Compressor _compressor;
-
-    public CompressedIndexOutput_V0(String name, Directory directory, CompressionCodec codec, int blockSize) throws IOException {
-      _compressor = codec.createCompressor();
-      if (_compressor == null) {
-        throw new RuntimeException("CompressionCodec [" + codec + "] does not support compressor on this platform.");
-      }
-      _directory = directory;
-      _name = name;
-      _output = directory.createOutput(name);
-      _tmpOutput = directory.createOutput(name + Z_TMP);
-      _buffer = new byte[blockSize];
-      int dsize = blockSize * 2;
-      if (dsize < _MIN_BUFFER_SIZE) {
-        dsize = _MIN_BUFFER_SIZE;
-      }
-      _compressedBuffer = new byte[dsize];
-    }
-
-    @Override
-    public void writeByte(byte b) throws IOException {
-      _buffer[_bufferPosition] = b;
-      _bufferPosition++;
-      _position++;
-      flushIfNeeded();
-    }
-
-    private void flushIfNeeded() throws IOException {
-      if (_bufferPosition >= _buffer.length) {
-        flushBuffer();
-        _bufferPosition = 0;
-      }
-    }
-
-    private void flushBuffer() throws IOException {
-      if (_bufferPosition > 0) {
-        _compressor.reset();
-        _compressor.setInput(_buffer, 0, _bufferPosition);
-        _compressor.finish();
-
-        long filePointer = _output.getFilePointer();
-
-        int length = _compressor.compress(_compressedBuffer, 0, _compressedBuffer.length);
-
-        _tmpOutput.writeVLong(filePointer);
-        _tmpOutput.writeVInt(length);
-        _blockCount++;
-        _output.writeBytes(_compressedBuffer, 0, length);
-      }
-    }
-
-    @Override
-    public void writeBytes(byte[] b, int offset, int length) throws IOException {
-      int len = length + offset;
-      for (int i = offset; i < len; i++) {
-        writeByte(b[i]);
-      }
-    }
-
-    @Override
-    public void close() throws IOException {
-      flushBuffer();
-      _tmpOutput.close();
-      IndexInput input = _directory.openInput(_name + Z_TMP);
-      try {
-        long len = input.length();
-        long readCount = 0;
-        while (readCount < len) {
-          int toRead = readCount + _buffer.length > len ? (int) (len - readCount) : _buffer.length;
-          input.readBytes(_buffer, 0, toRead);
-          _output.writeBytes(_buffer, toRead);
-          readCount += toRead;
-        }
-        _output.writeLong(len);
-        _output.writeInt(_blockCount);
-        _output.writeInt(_buffer.length);
-        _output.writeLong(_position);
-      } finally {
-        try {
-          _output.close();
-        } finally {
-          input.close();
-        }
-      }
-      _directory.deleteFile(_name + Z_TMP);
-      _compressor.end();
-    }
-
-    @Override
-    public long getFilePointer() {
-      return _position;
-    }
-
-    @Override
-    public long length() throws IOException {
-      throw new RuntimeException("not supported");
-    }
-
-    @Override
-    public void seek(long pos) throws IOException {
-      throw new RuntimeException("not supported");
-    }
-
-    @Override
-    public void flush() throws IOException {
-
-    }
-  }
-
-  public static class CompressedIndexInput_V1 extends IndexInput {
-
-    private static final long VERSION = -1l;
-
-    private static final int _SIZES_META_DATA = 24;
-
-    private final AtomicLongArray _blockPositions;
-    private final long _realLength;
-    private final long _origLength;
-    private final int _blockSize;
-
-    private IndexInput _indexInput;
-    private long _pos;
-    private boolean _isClone;
-    private long _currentBlockId = -1;
-    private byte[] _blockBuffer;
-    private byte[] _decompressionBuffer;
-    private int _blockBufferLength;
-    private Decompressor _decompressor;
-    private int _blockCount;
-    private Thread _openerThread;
-    private AtomicBoolean _errorInOpener = new AtomicBoolean(false);
-    private String _name;
-
-    public CompressedIndexInput_V1(String name, IndexInput indexInput, CompressionCodec codec) throws IOException {
-      super(name);
-      _name = name;
-      long s = System.nanoTime();
-      _decompressor = codec.createDecompressor();
-      if (_decompressor == null) {
-        throw new RuntimeException("CompressionCodec [" + codec + "] does not support decompressor on this platform.");
-      }
-      _indexInput = indexInput;
-      _realLength = _indexInput.length();
-
-      // read meta data
-      _indexInput.seek(_realLength - _SIZES_META_DATA); // 8 - 4 - 4 - 8
-      _blockCount = _indexInput.readInt();
-      _blockSize = _indexInput.readInt();
-      _origLength = _indexInput.readLong();
-      long version = _indexInput.readLong();
-      if (version != VERSION) {
-        throw new IOException("Version [" + version + "] mismatch!");
-      }
-
-      _blockPositions = new AtomicLongArray(_blockCount);
-      for (int i = 0; i < _blockCount; i++) {
-        _blockPositions.set(i, -1l);
-      }
-      readBlockPositions((IndexInput) indexInput.clone(), name);
-      setupBuffers(this);
-      long e = System.nanoTime();
-      double total = (e - s) / 1000000.0;
-      LOG.debug("Took [" + total + " ms] to open file [" + name + "].");
-    }
-
-    private void readBlockPositions(final IndexInput indexInput, final String name) throws IOException {
-      _openerThread = new Thread(new Runnable() {
-        @Override
-        public void run() {
-          try {
-            long s = System.nanoTime();
-            long metaDataLength = _blockCount * 8;
-            indexInput.seek(_realLength - _SIZES_META_DATA - metaDataLength);
-            for (int i = 0; i < _blockCount; i++) {
-              _blockPositions.set(i, indexInput.readLong());
-            }
-            long e = System.nanoTime();
-            double total = (e - s) / 1000000.0;
-            LOG.debug("Took [{0} ms] to read block positions with blockCount of [{1}] in file [{2}].", total, _blockCount, name);
-            indexInput.close();
-          } catch (Exception e) {
-            LOG.error("Error during the reading of block positions in file [{0}] ", e, name);
-            _errorInOpener.set(true);
-          }
-        }
-      });
-      _openerThread.setName("Block-Position-Reader-" + name);
-      _openerThread.start();
-    }
-
-    private int getBlockLength(int blockId) throws IOException {
-      int newBlockId = blockId + 1;
-      if (newBlockId == _blockCount) {
-        // last block
-        return (int) (_realLength - _SIZES_META_DATA - getBlockPosition(blockId));
-      } else {
-        return (int) (getBlockPosition(newBlockId) - getBlockPosition(blockId));
-      }
-    }
-
-    public long getBlockPosition(int blockId) throws IOException {
-      long position = _blockPositions.get(blockId);
-      while (true) {
-        if (position < 0) {
-          try {
-            Thread.sleep(10);
-          } catch (InterruptedException e) {
-            throw new RuntimeException(e);
-          }
-        } else {
-          return position;
-        }
-        if (_errorInOpener.get()) {
-          throw new IOException("Block positions for file [" + _name + "] can not be read.");
-        }
-        position = _blockPositions.get(blockId);
-      }
-    }
-
-    private static void setupBuffers(CompressedIndexInput_V1 input) {
-      input._blockBuffer = new byte[input._blockSize];
-      int dsize = input._blockSize * 2;
-      if (dsize < _MIN_BUFFER_SIZE) {
-        dsize = _MIN_BUFFER_SIZE;
-      }
-      input._decompressionBuffer = new byte[dsize];
-    }
-
-    public Object clone() {
-      CompressedIndexInput_V1 clone = (CompressedIndexInput_V1) super.clone();
-      clone._isClone = true;
-      clone._indexInput = (IndexInput) _indexInput.clone();
-      setupBuffers(clone);
-      return clone;
-    }
-
-    public void close() throws IOException {
-      if (!_isClone) {
-        _decompressor.end();
-      }
-      _indexInput.close();
-    }
-
-    public long getFilePointer() {
-      return _pos;
-    }
-
-    public long length() {
-      return _origLength;
-    }
-
-    public byte readByte() throws IOException {
-      int blockId = getBlockId();
-      if (blockId != _currentBlockId) {
-        fetchBlock(blockId);
-      }
-      int blockPosition = getBlockPosition();
-      _pos++;
-      return _blockBuffer[blockPosition];
-    }
-
-    public void readBytes(byte[] b, int offset, int len) throws IOException {
-      while (len > 0) {
-        int blockId = getBlockId();
-        if (blockId != _currentBlockId) {
-          fetchBlock(blockId);
-        }
-        int blockPosition = getBlockPosition();
-        int length = Math.min(_blockBufferLength - blockPosition, len);
-        System.arraycopy(_blockBuffer, blockPosition, b, offset, length);
-        _pos += length;
-        len -= length;
-        offset += length;
-      }
-    }
-
-    private int getBlockPosition() {
-      return (int) (_pos % _blockSize);
-    }
-
-    private void fetchBlock(int blockId) throws IOException {
-      long position = getBlockPosition(blockId);
-      int length = getBlockLength(blockId);
-      _indexInput.seek(position);
-      _indexInput.readBytes(_decompressionBuffer, 0, length);
-
-      synchronized (_decompressor) {
-        _decompressor.reset();
-        _decompressor.setInput(_decompressionBuffer, 0, length);
-        _blockBufferLength = _decompressor.decompress(_blockBuffer, 0, _blockBuffer.length);
-      }
-
-      _currentBlockId = blockId;
-    }
-
-    private int getBlockId() {
-      return (int) (_pos / _blockSize);
-    }
-
-    public void seek(long pos) throws IOException {
-      _pos = pos;
-    }
-  }
-
-  public static class CompressedIndexInput_V0 extends IndexInput {
-
-    private static final int _SIZES_META_DATA = 24;
-
-    private final int[] _blockLengths;
-    private final long[] _blockPositions;
-    private final long _realLength;
-    private final long _origLength;
-    private final int _blockSize;
-
-    private IndexInput _indexInput;
-    private long _pos;
-    private boolean _isClone;
-    private long _currentBlockId = -1;
-    private byte[] _blockBuffer;
-    private byte[] _decompressionBuffer;
-    private int _blockBufferLength;
-    private Decompressor _decompressor;
-
-    public CompressedIndexInput_V0(String name, IndexInput indexInput, CompressionCodec codec) throws IOException {
-      super(name);
-      _decompressor = codec.createDecompressor();
-      if (_decompressor == null) {
-        throw new RuntimeException("CompressionCodec [" + codec + "] does not support decompressor on this platform.");
-      }
-      long s1 = System.nanoTime();
-      _indexInput = indexInput;
-      _realLength = _indexInput.length();
-
-      // read meta data
-      _indexInput.seek(_realLength - _SIZES_META_DATA); // 8 - 4 - 4 - 8
-      long metaDataLength = _indexInput.readLong();
-      int blockCount = _indexInput.readInt();
-      _blockSize = _indexInput.readInt();
-      _origLength = _indexInput.readLong();
-      long e1 = System.nanoTime();
-
-      _blockLengths = new int[blockCount];
-      _blockPositions = new long[blockCount];
-
-      long s2 = System.nanoTime();
-      _indexInput.seek(_realLength - _SIZES_META_DATA - metaDataLength);
-      for (int i = 0; i < blockCount; i++) {
-        _blockPositions[i] = _indexInput.readVLong();
-        _blockLengths[i] = _indexInput.readVInt();
-      }
-      long e2 = System.nanoTime();
-
-      setupBuffers(this);
-
-      double total = (e2 - s1) / 1000000.0;
-      double _1st = (e1 - s1) / 1000000.0;
-      double _2nd = (e2 - s2) / 1000000.0;
-      LOG.debug("Took [" + total + " ms] to open [" + _1st + "] [" + _2nd + " with blockCount of " + blockCount + "].");
-    }
-
-    private static void setupBuffers(CompressedIndexInput_V0 input) {
-      input._blockBuffer = new byte[input._blockSize];
-      int dsize = input._blockSize * 2;
-      if (dsize < _MIN_BUFFER_SIZE) {
-        dsize = _MIN_BUFFER_SIZE;
-      }
-      input._decompressionBuffer = new byte[dsize];
-    }
-
-    public Object clone() {
-      CompressedIndexInput_V0 clone = (CompressedIndexInput_V0) super.clone();
-      clone._isClone = true;
-      clone._indexInput = (IndexInput) _indexInput.clone();
-      setupBuffers(clone);
-      return clone;
-    }
-
-    public void close() throws IOException {
-      if (!_isClone) {
-        _decompressor.end();
-      }
-      _indexInput.close();
-    }
-
-    public long getFilePointer() {
-      return _pos;
-    }
-
-    public long length() {
-      return _origLength;
-    }
-
-    public byte readByte() throws IOException {
-      int blockId = getBlockId();
-      if (blockId != _currentBlockId) {
-        fetchBlock(blockId);
-      }
-      int blockPosition = getBlockPosition();
-      _pos++;
-      return _blockBuffer[blockPosition];
-    }
-
-    public void readBytes(byte[] b, int offset, int len) throws IOException {
-      while (len > 0) {
-        int blockId = getBlockId();
-        if (blockId != _currentBlockId) {
-          fetchBlock(blockId);
-        }
-        int blockPosition = getBlockPosition();
-        int length = Math.min(_blockBufferLength - blockPosition, len);
-        System.arraycopy(_blockBuffer, blockPosition, b, offset, length);
-        _pos += length;
-        len -= length;
-        offset += length;
-      }
-    }
-
-    private int getBlockPosition() {
-      return (int) (_pos % _blockSize);
-    }
-
-    private void fetchBlock(int blockId) throws IOException {
-      long position = _blockPositions[blockId];
-      int length = _blockLengths[blockId];
-      _indexInput.seek(position);
-      _indexInput.readBytes(_decompressionBuffer, 0, length);
-
-      synchronized (_decompressor) {
-        _decompressor.reset();
-        _decompressor.setInput(_decompressionBuffer, 0, length);
-        _blockBufferLength = _decompressor.decompress(_blockBuffer, 0, _blockBuffer.length);
-      }
-
-      _currentBlockId = blockId;
-    }
-
-    private int getBlockId() {
-      return (int) (_pos / _blockSize);
-    }
-
-    public void seek(long pos) throws IOException {
-      _pos = pos;
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/39539e56/src/blur-store/src/main/java/org/apache/blur/store/hdfs/ChangeFileExt.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/hdfs/ChangeFileExt.java b/src/blur-store/src/main/java/org/apache/blur/store/hdfs/ChangeFileExt.java
deleted file mode 100644
index 52f7e02..0000000
--- a/src/blur-store/src/main/java/org/apache/blur/store/hdfs/ChangeFileExt.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package org.apache.blur.store.hdfs;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-public class ChangeFileExt {
-
-  public static void main(String[] args) throws IOException {
-    Path p = new Path(args[0]);
-    FileSystem fileSystem = FileSystem.get(p.toUri(), new Configuration());
-    FileStatus[] listStatus = fileSystem.listStatus(p);
-    for (FileStatus fileStatus : listStatus) {
-      Path path = fileStatus.getPath();
-      fileSystem.rename(path, new Path(path.toString() + ".lf"));
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/39539e56/src/blur-store/src/main/java/org/apache/blur/store/hdfs/ConvertDirectory.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/hdfs/ConvertDirectory.java b/src/blur-store/src/main/java/org/apache/blur/store/hdfs/ConvertDirectory.java
deleted file mode 100644
index 2763ba4..0000000
--- a/src/blur-store/src/main/java/org/apache/blur/store/hdfs/ConvertDirectory.java
+++ /dev/null
@@ -1,62 +0,0 @@
-package org.apache.blur.store.hdfs;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.lucene.index.CorruptIndexException;
-import org.apache.lucene.store.LockObtainFailedException;
-
-public class ConvertDirectory {
-
-  public static void main(String[] args) throws CorruptIndexException, LockObtainFailedException, IOException {
-    Path path = new Path(args[0]);
-    convert(path);
-  }
-
-  public static void convert(Path path) throws IOException {
-    FileSystem fileSystem = FileSystem.get(path.toUri(), new Configuration());
-    if (!fileSystem.exists(path)) {
-      System.out.println(path + " does not exists.");
-      return;
-    }
-    FileStatus fileStatus = fileSystem.getFileStatus(path);
-    if (fileStatus.isDir()) {
-      FileStatus[] listStatus = fileSystem.listStatus(path);
-      for (FileStatus status : listStatus) {
-        convert(status.getPath());
-      }
-    } else {
-      System.out.println("Converting file [" + path + "]");
-      HdfsMetaBlock block = new HdfsMetaBlock();
-      block.realPosition = 0;
-      block.logicalPosition = 0;
-      block.length = fileStatus.getLen();
-      FSDataOutputStream outputStream = fileSystem.append(path);
-      block.write(outputStream);
-      outputStream.writeInt(1);
-      outputStream.writeLong(fileStatus.getLen());
-      outputStream.writeInt(HdfsFileWriter.VERSION);
-      outputStream.close();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/39539e56/src/blur-store/src/main/java/org/apache/blur/store/hdfs/CopyFromHdfsLocal.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/hdfs/CopyFromHdfsLocal.java b/src/blur-store/src/main/java/org/apache/blur/store/hdfs/CopyFromHdfsLocal.java
deleted file mode 100644
index 1144126..0000000
--- a/src/blur-store/src/main/java/org/apache/blur/store/hdfs/CopyFromHdfsLocal.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package org.apache.blur.store.hdfs;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.blur.store.compressed.CompressedFieldDataDirectory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.FSDirectory;
-
-
-public class CopyFromHdfsLocal {
-
-  public static void main(String[] args) throws IOException {
-    Path path = new Path(args[0]);
-    HdfsDirectory src = new HdfsDirectory(path);
-
-    for (String name : src.listAll()) {
-      System.out.println(name);
-    }
-
-    CompressedFieldDataDirectory compressedDirectory = new CompressedFieldDataDirectory(src, new DefaultCodec(), 32768);
-    Directory dest = FSDirectory.open(new File(args[1]));
-
-    for (String name : compressedDirectory.listAll()) {
-      compressedDirectory.copy(dest, name, name);
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/39539e56/src/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java b/src/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
index e3bc7ca..7cbc5b1 100644
--- a/src/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
+++ b/src/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
@@ -1,377 +1,189 @@
 package org.apache.blur.store.hdfs;
 
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.Collection;
 
-import org.apache.blur.store.CustomBufferedIndexInput;
+import org.apache.blur.store.blockcache.BlockDirectory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.lucene.store.BufferedIndexInput;
+import org.apache.lucene.store.BufferedIndexOutput;
 import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.IndexInput;
 import org.apache.lucene.store.IndexOutput;
-
+import org.apache.lucene.store.NoLockFactory;
 
 public class HdfsDirectory extends Directory {
 
-  public static final int BUFFER_SIZE = 8192;
-
-  private static final String LF_EXT = ".lf";
-  protected static final String SEGMENTS_GEN = "segments.gen";
-  protected static final IndexOutput NULL_WRITER = new NullIndexOutput();
-  protected Path _hdfsDirPath;
-  protected AtomicReference<FileSystem> _fileSystemRef = new AtomicReference<FileSystem>();
-  protected Configuration _configuration;
+  private final Path path;
+  private final FileSystem fileSystem;
 
-  public HdfsDirectory(Path hdfsDirPath) throws IOException {
-    _hdfsDirPath = hdfsDirPath;
-    _configuration = new Configuration();
-    reopenFileSystem();
-    try {
-      if (!getFileSystem().exists(hdfsDirPath)) {
-        getFileSystem().mkdirs(hdfsDirPath);
-      }
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
+  public HdfsDirectory(Configuration configuration, Path path) throws IOException {
+    this.path = path;
+    fileSystem = path.getFileSystem(configuration);
+    setLockFactory(NoLockFactory.getNoLockFactory());
   }
 
   @Override
-  public void close() throws IOException {
-
-  }
-
-  @Override
-  public IndexOutput createOutput(String name) throws IOException {
-    if (SEGMENTS_GEN.equals(name)) {
-      return NULL_WRITER;
+  public IndexOutput createOutput(String name, IOContext context) throws IOException {
+    if (fileExists(name)) {
+      throw new IOException("File [" + name + "] already exists found.");
     }
-    name = getRealName(name);
-    HdfsFileWriter writer = new HdfsFileWriter(getFileSystem(), new Path(_hdfsDirPath, name));
-    return new HdfsLayeredIndexOutput(writer);
-  }
+    final FSDataOutputStream outputStream = fileSystem.create(getPath(name));
+    return new BufferedIndexOutput() {
 
-  private String getRealName(String name) throws IOException {
-    if (getFileSystem().exists(new Path(_hdfsDirPath, name))) {
-      return name;
-    }
-    return name + LF_EXT;
-  }
+      @Override
+      public long length() throws IOException {
+        return outputStream.getPos();
+      }
 
-  private String[] getNormalNames(List<String> files) {
-    int size = files.size();
-    for (int i = 0; i < size; i++) {
-      String str = files.get(i);
-      files.set(i, toNormalName(str));
-    }
-    return files.toArray(new String[] {});
-  }
+      @Override
+      protected void flushBuffer(byte[] b, int offset, int len) throws IOException {
+        outputStream.write(b, offset, len);
+      }
 
-  private String toNormalName(String name) {
-    if (name.endsWith(LF_EXT)) {
-      return name.substring(0, name.length() - 3);
-    }
-    return name;
-  }
+      @Override
+      public void close() throws IOException {
+        super.close();
+        outputStream.close();
+      }
 
-  @Override
-  public IndexInput openInput(String name) throws IOException {
-    return openInput(name, BUFFER_SIZE);
+      @Override
+      public void seek(long pos) throws IOException {
+        throw new IOException("seeks not allowed on IndexOutputs.");
+      }
+    };
   }
 
   @Override
-  public IndexInput openInput(String name, int bufferSize) throws IOException {
-    name = getRealName(name);
-    if (isLayeredFile(name)) {
-      HdfsFileReader reader = new HdfsFileReader(getFileSystem(), new Path(_hdfsDirPath, name), BUFFER_SIZE);
-      return new HdfsLayeredIndexInput(name, reader, BUFFER_SIZE);
-    } else {
-      return new HdfsNormalIndexInput(name, getFileSystem(), new Path(_hdfsDirPath, name), BUFFER_SIZE);
-    }
-  }
+  public IndexInput openInput(String name, IOContext context) throws IOException {
+    if (!fileExists(name)) {
+      throw new FileNotFoundException("File [" + name + "] not found.");
+    }
+    Path filePath = getPath(name);
+    final FSDataInputStream inputStream = fileSystem.open(filePath);
+    FileStatus fileStatus = fileSystem.getFileStatus(filePath);
+    final long len = fileStatus.getLen();
+    return new BufferedIndexInput(name, context) {
+      @Override
+      public long length() {
+        return len;
+      }
 
-  private boolean isLayeredFile(String name) {
-    if (name.endsWith(LF_EXT)) {
-      return true;
-    }
-    return false;
+      @Override
+      public void close() throws IOException {
+        inputStream.close();
+      }
+
+      @Override
+      protected void seekInternal(long pos) throws IOException {
+
+      }
+
+      @Override
+      protected void readInternal(byte[] b, int offset, int length) throws IOException {
+        inputStream.readFully(getFilePointer(), b, offset, length);
+      }
+    };
   }
 
   @Override
-  public void deleteFile(String name) throws IOException {
-    name = getRealName(name);
-    if (!fileExists(name)) {
-      throw new FileNotFoundException(name);
+  public String[] listAll() throws IOException {
+    FileStatus[] files = fileSystem.listStatus(path, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        try {
+          return fileSystem.isFile(path);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    });
+    String[] result = new String[files.length];
+    for (int i = 0; i < result.length; i++) {
+      result[i] = files[i].getPath().getName();
     }
-    getFileSystem().delete(new Path(_hdfsDirPath, name), false);
+    return result;
   }
 
   @Override
   public boolean fileExists(String name) throws IOException {
-    name = getRealName(name);
-    return getFileSystem().exists(new Path(_hdfsDirPath, name));
+    return fileSystem.exists(getPath(name));
   }
 
   @Override
-  public long fileLength(String name) throws IOException {
-    name = getRealName(name);
-    if (!fileExists(name)) {
-      throw new FileNotFoundException(name);
+  public void deleteFile(String name) throws IOException {
+    if (fileExists(name)) {
+      fileSystem.delete(getPath(name), true);
+    } else {
+      throw new FileNotFoundException("File [" + name + "] not found");
     }
-    return HdfsFileReader.getLength(getFileSystem(), new Path(_hdfsDirPath, name));
   }
 
   @Override
-  public long fileModified(String name) throws IOException {
-    name = getRealName(name);
-    if (!fileExists(name)) {
-      throw new FileNotFoundException(name);
-    }
-    FileStatus fileStatus = getFileSystem().getFileStatus(new Path(_hdfsDirPath, name));
-    return fileStatus.getModificationTime();
+  public long fileLength(String name) throws IOException {
+    FileStatus fileStatus = fileSystem.getFileStatus(getPath(name));
+    return fileStatus.getLen();
   }
 
   @Override
-  public String[] listAll() throws IOException {
-    FileStatus[] listStatus = getFileSystem().listStatus(_hdfsDirPath);
-    List<String> files = new ArrayList<String>();
-    if (listStatus == null) {
-      return new String[] {};
-    }
-    for (FileStatus status : listStatus) {
-      if (!status.isDir()) {
-        files.add(status.getPath().getName());
-      }
-    }
-    return getNormalNames(files);
-  }
+  public void sync(Collection<String> names) throws IOException {
 
-  @Override
-  public void touchFile(String name) throws IOException {
-    // do nothing
   }
 
-  public Path getHdfsDirPath() {
-    return _hdfsDirPath;
+  @Override
+  public void close() throws IOException {
+    fileSystem.close();
   }
 
-  public FileSystem getFileSystem() {
-    return _fileSystemRef.get();
+  private Path getPath(String name) {
+    return new Path(path, name);
   }
 
-  protected void reopenFileSystem() throws IOException {
-    FileSystem fileSystem = FileSystem.get(_hdfsDirPath.toUri(), _configuration);
-    FileSystem oldFs = _fileSystemRef.get();
-    _fileSystemRef.set(fileSystem);
-    if (oldFs != null) {
-      oldFs.close();
+  public long getFileModified(String name) throws IOException {
+    if (!fileExists(name)) {
+      throw new FileNotFoundException("File [" + name + "] not found");
     }
+    Path file = getPath(name);
+    return fileSystem.getFileStatus(file).getModificationTime();
   }
 
-  static class HdfsLayeredIndexInput extends CustomBufferedIndexInput {
-
-    private HdfsFileReader _reader;
-    private long _length;
-    private boolean isClone;
-
-    public HdfsLayeredIndexInput(String name, HdfsFileReader reader, int bufferSize) {
-      super(name, bufferSize);
-      _reader = reader;
-      _length = _reader.length();
-    }
-
-    @Override
-    protected void closeInternal() throws IOException {
-      if (!isClone) {
-        _reader.close();
+  @Override
+  public void copy(Directory to, String src, String dest, IOContext context) throws IOException {
+    if (to instanceof HdfsDirectory) {
+      if (quickMove(to, src, dest, context)) {
+        return;
       }
-    }
-
-    @Override
-    public long length() {
-      return _length;
-    }
-
-    @Override
-    public Object clone() {
-      HdfsLayeredIndexInput input = (HdfsLayeredIndexInput) super.clone();
-      input.isClone = true;
-      input._reader = (HdfsFileReader) _reader.clone();
-      return input;
-    }
-
-    @Override
-    protected void readInternal(byte[] b, int offset, int length) throws IOException {
-      long position = getFilePointer();
-      _reader.seek(position);
-      _reader.readBytes(b, offset, length);
-    }
-
-    @Override
-    protected void seekInternal(long pos) throws IOException {
-      // do nothing
-    }
-  }
-
-  static class HdfsNormalIndexInput extends CustomBufferedIndexInput {
-
-    private final FSDataInputStream _inputStream;
-    private final long _length;
-    private boolean _clone = false;
-
-    public HdfsNormalIndexInput(String name, FileSystem fileSystem, Path path, int bufferSize) throws IOException {
-      super(name);
-      FileStatus fileStatus = fileSystem.getFileStatus(path);
-      _length = fileStatus.getLen();
-      _inputStream = fileSystem.open(path, bufferSize);
-    }
-
-    @Override
-    protected void readInternal(byte[] b, int offset, int length) throws IOException {
-      _inputStream.read(getFilePointer(), b, offset, length);
-    }
-
-    @Override
-    protected void seekInternal(long pos) throws IOException {
-
-    }
-
-    @Override
-    protected void closeInternal() throws IOException {
-      if (!_clone) {
-        _inputStream.close();
+    } else if (to instanceof BlockDirectory) {
+      BlockDirectory bd = (BlockDirectory) to;
+      Directory inner = bd.getDirectory();
+      if (quickMove(inner, src, dest, context)) {
+        return;
       }
     }
-
-    @Override
-    public long length() {
-      return _length;
-    }
-
-    @Override
-    public Object clone() {
-      HdfsNormalIndexInput clone = (HdfsNormalIndexInput) super.clone();
-      clone._clone = true;
-      return clone;
-    }
+    super.copy(to, src, dest, context);
   }
 
-  static class HdfsLayeredIndexOutput extends IndexOutput {
-
-    private HdfsFileWriter _writer;
-
-    public HdfsLayeredIndexOutput(HdfsFileWriter writer) {
-      _writer = writer;
-    }
-
-    @Override
-    public void close() throws IOException {
-      _writer.close();
-    }
-
-    @Override
-    public void flush() throws IOException {
-
-    }
-
-    @Override
-    public long getFilePointer() {
-      return _writer.getPosition();
-    }
-
-    @Override
-    public long length() throws IOException {
-      return _writer.length();
-    }
-
-    @Override
-    public void seek(long pos) throws IOException {
-      _writer.seek(pos);
-    }
-
-    @Override
-    public void writeByte(byte b) throws IOException {
-      _writer.writeByte(b);
-    }
-
-    @Override
-    public void writeBytes(byte[] b, int offset, int length) throws IOException {
-      _writer.writeBytes(b, offset, length);
+  private boolean quickMove(Directory to, String src, String dest, IOContext context) throws IOException {
+    HdfsDirectory simpleTo = (HdfsDirectory) to;
+    if (ifSameCluster(to, this)) {
+      Path newDest = simpleTo.getPath(dest);
+      Path oldSrc = getPath(src);
+      return fileSystem.rename(oldSrc, newDest);
     }
+    return false;
   }
 
-  static class DirectIOHdfsIndexInput extends CustomBufferedIndexInput {
-
-    private long _length;
-    private FSDataInputStream _inputStream;
-    private boolean isClone;
-
-    public DirectIOHdfsIndexInput(String name, FSDataInputStream inputStream, long length) throws IOException {
-      super(name);
-      if (inputStream instanceof DFSDataInputStream) {
-        // This is needed because if the file was in progress of being written
-        // but
-        // was not closed the
-        // length of the file is 0. This will fetch the synced length of the
-        // file.
-        _length = ((DFSDataInputStream) inputStream).getVisibleLength();
-      } else {
-        _length = length;
-      }
-      _inputStream = inputStream;
-    }
-
-    @Override
-    public long length() {
-      return _length;
-    }
-
-    @Override
-    protected void closeInternal() throws IOException {
-      if (!isClone) {
-        _inputStream.close();
-      }
-    }
-
-    @Override
-    protected void seekInternal(long pos) throws IOException {
-
-    }
-
-    @Override
-    protected void readInternal(byte[] b, int offset, int length) throws IOException {
-      synchronized (_inputStream) {
-        _inputStream.readFully(getFilePointer(), b, offset, length);
-      }
-    }
-
-    @Override
-    public Object clone() {
-      DirectIOHdfsIndexInput clone = (DirectIOHdfsIndexInput) super.clone();
-      clone.isClone = true;
-      return clone;
-    }
+  private boolean ifSameCluster(Directory to, HdfsDirectory simpleHDFSDirectory) {
+    // @TODO
+    return true;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/39539e56/src/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsFileReader.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsFileReader.java b/src/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsFileReader.java
deleted file mode 100644
index 35ecaf8..0000000
--- a/src/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsFileReader.java
+++ /dev/null
@@ -1,188 +0,0 @@
-package org.apache.blur.store.hdfs;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.lucene.store.DataInput;
-
-
-public class HdfsFileReader extends DataInput {
-
-  private static final Log LOG = LogFactory.getLog(HdfsFileReader.class);
-
-  private static final int VERSION = -1;
-
-  private final long _length;
-  private final long _hdfsLength;
-  private final List<HdfsMetaBlock> _metaBlocks;
-  private FSDataInputStream _inputStream;
-  private long _logicalPos;
-  private long _boundary;
-  private long _realPos;
-  private boolean isClone;
-
-  public HdfsFileReader(FileSystem fileSystem, Path path, int bufferSize) throws IOException {
-    if (!fileSystem.exists(path)) {
-      throw new FileNotFoundException(path.toString());
-    }
-    FileStatus fileStatus = fileSystem.getFileStatus(path);
-    _hdfsLength = fileStatus.getLen();
-    _inputStream = fileSystem.open(path, bufferSize);
-
-    // read meta blocks
-    _inputStream.seek(_hdfsLength - 16);
-    int numberOfBlocks = _inputStream.readInt();
-    _length = _inputStream.readLong();
-    int version = _inputStream.readInt();
-    if (version != VERSION) {
-      throw new RuntimeException("Version of file [" + version + "] does not match reader [" + VERSION + "]");
-    }
-    _inputStream.seek(_hdfsLength - 16 - (numberOfBlocks * 24)); // 3 longs per
-                                                                 // block
-    _metaBlocks = new ArrayList<HdfsMetaBlock>(numberOfBlocks);
-    for (int i = 0; i < numberOfBlocks; i++) {
-      HdfsMetaBlock hdfsMetaBlock = new HdfsMetaBlock();
-      hdfsMetaBlock.readFields(_inputStream);
-      _metaBlocks.add(hdfsMetaBlock);
-    }
-    seek(0);
-  }
-
-  public HdfsFileReader(FileSystem fileSystem, Path path) throws IOException {
-    this(fileSystem, path, HdfsDirectory.BUFFER_SIZE);
-  }
-
-  public long getPosition() {
-    return _logicalPos;
-  }
-
-  public long length() {
-    return _length;
-  }
-
-  public void seek(long pos) throws IOException {
-    if (_logicalPos == pos) {
-      return;
-    }
-    _logicalPos = pos;
-    seekInternal();
-  }
-
-  public void close() throws IOException {
-    if (!isClone) {
-      _inputStream.close();
-    }
-  }
-
-  /**
-   * This method should never be used!
-   */
-  @Override
-  public byte readByte() throws IOException {
-    LOG.warn("Should not be used!");
-    byte[] buf = new byte[1];
-    readBytes(buf, 0, 1);
-    return buf[0];
-  }
-
-  @Override
-  public void readBytes(byte[] b, int offset, int len) throws IOException {
-    checkBoundary();
-    // might need to read in multiple stages
-    while (len > 0) {
-      if (_logicalPos >= _boundary) {
-        seekInternal();
-      }
-      int lengthToRead = (int) Math.min(_boundary - _logicalPos, len);
-      _inputStream.read(_realPos, b, offset, lengthToRead);
-      offset += lengthToRead;
-      _logicalPos += lengthToRead;
-      _realPos += lengthToRead;
-      len -= lengthToRead;
-    }
-  }
-
-  private void checkBoundary() throws IOException {
-    if (_boundary == -1l) {
-      throw new IOException("eof");
-    }
-  }
-
-  private void seekInternal() throws IOException {
-    HdfsMetaBlock block = null;
-    for (HdfsMetaBlock b : _metaBlocks) {
-      if (b.containsDataAt(_logicalPos)) {
-        block = b;
-      }
-    }
-    if (block == null) {
-      _boundary = -1l;
-      _realPos = -1l;
-    } else {
-      _realPos = block.getRealPosition(_logicalPos);
-      _boundary = getBoundary(block);
-    }
-  }
-
-  private long getBoundary(HdfsMetaBlock block) {
-    _boundary = block.logicalPosition + block.length;
-    for (HdfsMetaBlock b : _metaBlocks) {
-      if (b.logicalPosition > block.logicalPosition && b.logicalPosition < _boundary && b.logicalPosition >= _logicalPos) {
-        _boundary = b.logicalPosition;
-      }
-    }
-    return _boundary;
-  }
-
-  public static long getLength(FileSystem fileSystem, Path path) throws IOException {
-    FSDataInputStream inputStream = null;
-    try {
-      FileStatus fileStatus = fileSystem.getFileStatus(path);
-      inputStream = fileSystem.open(path);
-      long hdfsLength = fileStatus.getLen();
-      inputStream.seek(hdfsLength - 12);
-      long length = inputStream.readLong();
-      int version = inputStream.readInt();
-      if (version != VERSION) {
-        throw new RuntimeException("Version of file [" + version + "] does not match reader [" + VERSION + "]");
-      }
-      return length;
-    } finally {
-      if (inputStream != null) {
-        inputStream.close();
-      }
-    }
-  }
-
-  @Override
-  public Object clone() {
-    HdfsFileReader reader = (HdfsFileReader) super.clone();
-    reader.isClone = true;
-    return reader;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/39539e56/src/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsFileWriter.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsFileWriter.java b/src/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsFileWriter.java
deleted file mode 100644
index 1c23f60..0000000
--- a/src/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsFileWriter.java
+++ /dev/null
@@ -1,99 +0,0 @@
-package org.apache.blur.store.hdfs;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.lucene.store.DataOutput;
-
-public class HdfsFileWriter extends DataOutput {
-
-  public static final int VERSION = -1;
-
-  private FSDataOutputStream _outputStream;
-  private HdfsMetaBlock _block;
-  private List<HdfsMetaBlock> _blocks = new ArrayList<HdfsMetaBlock>();
-  private long _length;
-  private long _currentPosition;
-
-  public HdfsFileWriter(FileSystem fileSystem, Path path) throws IOException {
-    _outputStream = fileSystem.create(path);
-    seek(0);
-  }
-
-  public long length() {
-    return _length;
-  }
-
-  public void seek(long pos) throws IOException {
-    if (_block != null) {
-      _blocks.add(_block);
-    }
-    _block = new HdfsMetaBlock();
-    _block.realPosition = _outputStream.getPos();
-    _block.logicalPosition = pos;
-    _currentPosition = pos;
-  }
-
-  public void close() throws IOException {
-    if (_block != null) {
-      _blocks.add(_block);
-    }
-    flushMetaBlocks();
-    _outputStream.close();
-  }
-
-  private void flushMetaBlocks() throws IOException {
-    for (HdfsMetaBlock block : _blocks) {
-      block.write(_outputStream);
-    }
-    _outputStream.writeInt(_blocks.size());
-    _outputStream.writeLong(length());
-    _outputStream.writeInt(VERSION);
-  }
-
-  @Override
-  public void writeByte(byte b) throws IOException {
-    _outputStream.write(b & 0xFF);
-    _block.length++;
-    _currentPosition++;
-    updateLength();
-  }
-
-  @Override
-  public void writeBytes(byte[] b, int offset, int length) throws IOException {
-    _outputStream.write(b, offset, length);
-    _block.length += length;
-    _currentPosition += length;
-    updateLength();
-  }
-
-  private void updateLength() {
-    if (_currentPosition > _length) {
-      _length = _currentPosition;
-    }
-  }
-
-  public long getPosition() {
-    return _currentPosition;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/39539e56/src/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsMetaBlock.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsMetaBlock.java b/src/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsMetaBlock.java
deleted file mode 100644
index b939293..0000000
--- a/src/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsMetaBlock.java
+++ /dev/null
@@ -1,61 +0,0 @@
-package org.apache.blur.store.hdfs;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-
-class HdfsMetaBlock implements Writable {
-  long logicalPosition;
-  long realPosition;
-  long length;
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    logicalPosition = in.readLong();
-    realPosition = in.readLong();
-    length = in.readLong();
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeLong(logicalPosition);
-    out.writeLong(realPosition);
-    out.writeLong(length);
-  }
-
-  boolean containsDataAt(long logicalPos) {
-    if (logicalPos >= logicalPosition && logicalPos < logicalPosition + length) {
-      return true;
-    }
-    return false;
-  }
-
-  long getRealPosition(long logicalPos) {
-    long offset = logicalPos - logicalPosition;
-    long pos = realPosition + offset;
-    return pos;
-  }
-
-  @Override
-  public String toString() {
-    return "HdfsMetaBlock [length=" + length + ", logicalPosition=" + logicalPosition + ", realPosition=" + realPosition + "]";
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/39539e56/src/blur-store/src/main/java/org/apache/blur/store/hdfs/NullIndexOutput.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/hdfs/NullIndexOutput.java b/src/blur-store/src/main/java/org/apache/blur/store/hdfs/NullIndexOutput.java
deleted file mode 100644
index 1a03c10..0000000
--- a/src/blur-store/src/main/java/org/apache/blur/store/hdfs/NullIndexOutput.java
+++ /dev/null
@@ -1,70 +0,0 @@
-package org.apache.blur.store.hdfs;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-
-import org.apache.lucene.store.IndexOutput;
-
-public class NullIndexOutput extends IndexOutput {
-
-  private long _pos;
-  private long _length;
-
-  @Override
-  public void close() throws IOException {
-
-  }
-
-  @Override
-  public void flush() throws IOException {
-
-  }
-
-  @Override
-  public long getFilePointer() {
-    return _pos;
-  }
-
-  @Override
-  public long length() throws IOException {
-    return _length;
-  }
-
-  @Override
-  public void seek(long pos) throws IOException {
-    _pos = pos;
-  }
-
-  @Override
-  public void writeByte(byte b) throws IOException {
-    _pos++;
-  }
-
-  @Override
-  public void writeBytes(byte[] b, int offset, int length) throws IOException {
-    _pos += length;
-    updateLength();
-  }
-
-  private void updateLength() {
-    if (_pos > _length) {
-      _length = _pos;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/39539e56/src/blur-store/src/main/java/org/apache/blur/store/lock/BlurLockFactory.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/lock/BlurLockFactory.java b/src/blur-store/src/main/java/org/apache/blur/store/lock/BlurLockFactory.java
deleted file mode 100644
index c79608c..0000000
--- a/src/blur-store/src/main/java/org/apache/blur/store/lock/BlurLockFactory.java
+++ /dev/null
@@ -1,102 +0,0 @@
-package org.apache.blur.store.lock;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.util.Arrays;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.lucene.store.Lock;
-import org.apache.lucene.store.LockFactory;
-
-public class BlurLockFactory extends LockFactory {
-
-  private final Configuration _configuration;
-  private final FileSystem _fileSystem;
-  private final String _baseLockKey;
-  private byte[] _lockKey;
-  private final Path _dir;
-
-  public BlurLockFactory(Configuration configuration, Path dir, String host, int pid) throws IOException {
-    _configuration = configuration;
-    _dir = dir;
-    _fileSystem = _dir.getFileSystem(_configuration);
-    _baseLockKey = host + "/" + pid;
-  }
-
-  @Override
-  public Lock makeLock(String lockName) {
-    final Path lockPath = new Path(_dir, lockName);
-    return new Lock() {
-      private boolean _set;
-
-      @Override
-      public boolean obtain() throws IOException {
-        if (_set) {
-          throw new IOException("Lock for [" + _baseLockKey + "] can only be set once.");
-        }
-        try {
-          _lockKey = (_baseLockKey + "/" + System.currentTimeMillis()).getBytes();
-          FSDataOutputStream outputStream = _fileSystem.create(lockPath, true);
-          outputStream.write(_lockKey);
-          outputStream.close();
-        } finally {
-          _set = true;
-        }
-        return true;
-      }
-
-      @Override
-      public void release() throws IOException {
-        _fileSystem.delete(lockPath, false);
-      }
-
-      @Override
-      public boolean isLocked() throws IOException {
-        if (!_set) {
-          return false;
-        }
-        if (!_fileSystem.exists(lockPath)) {
-          return false;
-        }
-        FileStatus fileStatus = _fileSystem.getFileStatus(lockPath);
-        long len = fileStatus.getLen();
-        if (len != _lockKey.length) {
-          return false;
-        }
-        byte[] buf = new byte[_lockKey.length];
-        FSDataInputStream inputStream = _fileSystem.open(lockPath);
-        inputStream.readFully(buf);
-        inputStream.close();
-        if (Arrays.equals(_lockKey, buf)) {
-          return true;
-        }
-        return false;
-      }
-    };
-  }
-
-  @Override
-  public void clearLock(String lockName) throws IOException {
-    _fileSystem.delete(new Path(_dir, lockName), false);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/39539e56/src/blur-store/src/main/java/org/apache/lucene/index/WarmUpByFieldBounds.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/lucene/index/WarmUpByFieldBounds.java b/src/blur-store/src/main/java/org/apache/lucene/index/WarmUpByFieldBounds.java
deleted file mode 100644
index c159fbc..0000000
--- a/src/blur-store/src/main/java/org/apache/lucene/index/WarmUpByFieldBounds.java
+++ /dev/null
@@ -1,217 +0,0 @@
-package org.apache.lucene.index;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.blur.lucene.LuceneConstant;
-import org.apache.lucene.analysis.KeywordAnalyzer;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field;
-import org.apache.lucene.document.Field.Index;
-import org.apache.lucene.document.Field.Store;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.IndexInput;
-import org.apache.lucene.store.LockObtainFailedException;
-import org.apache.lucene.store.RAMDirectory;
-import org.apache.lucene.util.ReaderUtil;
-
-
-public class WarmUpByFieldBounds {
-
-  public static void main(String[] args) throws CorruptIndexException, IOException {
-    Directory dir = getDir();
-    IndexReader reader = IndexReader.open(dir);
-    AtomicBoolean isClosed = new AtomicBoolean(false);
-    WarmUpByFieldBounds warmUpByFieldBounds = new WarmUpByFieldBounds();
-    WarmUpByFieldBoundsStatus status = new WarmUpByFieldBoundsStatus() {
-      @Override
-      public void complete(String name, Term start, Term end, long startPosition, long endPosition, long totalBytesRead, long nanoTime, AtomicBoolean isClosed) {
-        // System.out.println(name + " " + start + " " + end + " " +
-        // startPosition + " " + endPosition + " " + totalBytesRead + " " +
-        // nanoTime + " " + isClosed);
-
-        double bytesPerNano = totalBytesRead / (double) nanoTime;
-        double mBytesPerNano = bytesPerNano / 1024 / 1024;
-        double mBytesPerSecond = mBytesPerNano * 1000000000.0;
-        if (totalBytesRead > 0) {
-          System.out.println("Precached field [" + start.field() + "] in file [" + name + "], " + totalBytesRead + " bytes cached at [" + mBytesPerSecond + " MB/s]");
-        }
-      }
-    };
-    warmUpByFieldBounds.warmUpByField(isClosed, new Term("f1"), reader, status);
-    warmUpByFieldBounds.warmUpByField(isClosed, new Term("f0"), reader, status);
-    warmUpByFieldBounds.warmUpByField(isClosed, new Term("f9"), reader, status);
-    warmUpByFieldBounds.warmUpByField(isClosed, new Term("f"), reader, status);
-  }
-
-  private static Directory getDir() throws CorruptIndexException, LockObtainFailedException, IOException {
-    RAMDirectory dir = new RAMDirectory();
-    IndexWriterConfig conf = new IndexWriterConfig(LuceneConstant.LUCENE_VERSION, new KeywordAnalyzer());
-    TieredMergePolicy mergePolicy = (TieredMergePolicy) conf.getMergePolicy();
-    mergePolicy.setUseCompoundFile(false);
-    IndexWriter writer = new IndexWriter(dir, conf);
-    for (int i = 0; i < 100000; i++) {
-      writer.addDocument(getDoc());
-    }
-    writer.close();
-    return dir;
-  }
-
-  private static Document getDoc() {
-    Document document = new Document();
-    for (int i = 0; i < 10; i++) {
-      document.add(new Field("f" + i, UUID.randomUUID().toString(), Store.YES, Index.ANALYZED));
-    }
-    return document;
-  }
-
-  private static final Log LOG = LogFactory.getLog(WarmUpByFieldBounds.class);
-
-  public void warmUpByField(AtomicBoolean isClosed, Term term, IndexReader reader, WarmUpByFieldBoundsStatus status) throws IOException {
-    FieldInfos fieldInfos = ReaderUtil.getMergedFieldInfos(reader);
-    Collection<String> fieldNames = new HashSet<String>();
-    for (FieldInfo info : fieldInfos) {
-      if (info.isIndexed) {
-        fieldNames.add(info.name);
-      }
-    }
-    List<String> fields = new ArrayList<String>(fieldNames);
-    Collections.sort(fields);
-    int index = fields.indexOf(term.field);
-    if (index < fields.size() - 1) {
-      warmUpByTermRange(isClosed, term, new Term(fields.get(index + 1)), reader, status);
-    } else {
-      warmUpByTermRange(isClosed, term, null, reader, status);
-    }
-  }
-
-  public void warmUpByTermRange(AtomicBoolean isClosed, Term start, Term end, IndexReader reader, WarmUpByFieldBoundsStatus status) throws IOException {
-    if (reader instanceof SegmentReader) {
-      warmUpByTermRangeSegmentReader(isClosed, start, end, (SegmentReader) reader, status);
-      return;
-    }
-    IndexReader[] subReaders = reader.getSequentialSubReaders();
-    if (subReaders == null) {
-      throw new RuntimeException("Reader is not supported [" + reader.getClass() + "] [" + reader + "]");
-    }
-    for (int i = 0; i < subReaders.length; i++) {
-      warmUpByTermRange(isClosed, start, end, subReaders[i], status);
-    }
-  }
-
-  private static void warmUpByTermRangeSegmentReader(AtomicBoolean isClosed, Term start, Term end, SegmentReader reader, WarmUpByFieldBoundsStatus status) throws IOException {
-    SegmentCoreReaders core = reader.core;
-    TermInfosReader termsReader = core.getTermsReader();
-    Directory directory = reader.directory();
-    String segmentName = reader.getSegmentName();
-    IndexInput tis = null;
-    IndexInput frq = null;
-    IndexInput prx = null;
-    try {
-      String nameTis = segmentName + ".tis";
-      String nameFrq = segmentName + ".frq";
-      String namePrx = segmentName + ".prx";
-      tis = directory.openInput(nameTis);
-      frq = directory.openInput(nameFrq);
-      prx = directory.openInput(namePrx);
-
-      long startTermPointer = 0;
-      long endTermPointer = tis.length();
-      long startFreqPointer = 0;
-      long endFreqPointer = frq.length();
-      long startProxPointer = 0;
-      long endProxPointer = prx.length();
-      if (start != null) {
-        Term realStartTerm = getFirstTerm(start, reader);
-        if (realStartTerm == null) {
-          return;
-        }
-        TermInfo startTermInfo = termsReader.get(realStartTerm);
-        startTermPointer = termsReader.getPosition(realStartTerm);
-        startFreqPointer = startTermInfo.freqPointer;
-        startProxPointer = startTermInfo.proxPointer;
-      }
-
-      if (end != null) {
-        Term realEndTerm = getFirstTerm(end, reader);
-        if (realEndTerm == null) {
-          return;
-        }
-        TermInfo endTermInfo = termsReader.get(realEndTerm);
-        endTermPointer = termsReader.getPosition(realEndTerm);
-        endFreqPointer = endTermInfo.freqPointer;
-        endProxPointer = endTermInfo.proxPointer;
-      }
-      readFile(isClosed, tis, startTermPointer, endTermPointer, status, start, end, nameTis);
-      readFile(isClosed, frq, startFreqPointer, endFreqPointer, status, start, end, nameFrq);
-      readFile(isClosed, prx, startProxPointer, endProxPointer, status, start, end, namePrx);
-    } finally {
-      close(tis);
-      close(frq);
-      close(prx);
-    }
-  }
-
-  private static Term getFirstTerm(Term t, SegmentReader reader) throws IOException {
-    TermEnum terms = reader.terms(t);
-    try {
-      if (terms.next()) {
-        return terms.term();
-      }
-      return null;
-    } finally {
-      terms.close();
-    }
-  }
-
-  private static void close(IndexInput input) {
-    try {
-      if (input != null) {
-        input.close();
-      }
-    } catch (IOException e) {
-      LOG.error("Error while trying to close file [" + input + "]", e);
-    }
-  }
-
-  private static void readFile(AtomicBoolean isClosed, IndexInput input, long startTermPointer, long endTermPointer, WarmUpByFieldBoundsStatus status, Term start, Term end,
-      String name) throws IOException {
-    byte[] buffer = new byte[4096];
-    long position = startTermPointer;
-    input.seek(position);
-    long total = 0;
-    long s = System.nanoTime();
-    while (position < endTermPointer && !isClosed.get()) {
-      int length = (int) Math.min(buffer.length, endTermPointer - position);
-      input.readBytes(buffer, 0, length);
-      position += length;
-      total += length;
-    }
-    long e = System.nanoTime();
-    status.complete(name, start, end, startTermPointer, endTermPointer, total, e - s, isClosed);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/39539e56/src/blur-store/src/main/java/org/apache/lucene/index/WarmUpByFieldBoundsStatus.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/lucene/index/WarmUpByFieldBoundsStatus.java b/src/blur-store/src/main/java/org/apache/lucene/index/WarmUpByFieldBoundsStatus.java
deleted file mode 100644
index d327796..0000000
--- a/src/blur-store/src/main/java/org/apache/lucene/index/WarmUpByFieldBoundsStatus.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package org.apache.lucene.index;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public interface WarmUpByFieldBoundsStatus {
-
-  void complete(String name, Term start, Term end, long startPosition, long endPosition, long totalBytesRead, long nanoTime, AtomicBoolean isClosed);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/39539e56/src/blur-store/src/test/java/org/apache/blur/store/BenchmarkDirectory.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/test/java/org/apache/blur/store/BenchmarkDirectory.java b/src/blur-store/src/test/java/org/apache/blur/store/BenchmarkDirectory.java
deleted file mode 100644
index 9bc9d3c..0000000
--- a/src/blur-store/src/test/java/org/apache/blur/store/BenchmarkDirectory.java
+++ /dev/null
@@ -1,148 +0,0 @@
-package org.apache.blur.store;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.UUID;
-
-import org.apache.blur.metrics.BlurMetrics;
-import org.apache.blur.store.blockcache.BlockCache;
-import org.apache.blur.store.blockcache.BlockDirectory;
-import org.apache.blur.store.blockcache.BlockDirectoryCache;
-import org.apache.blur.store.hdfs.HdfsDirectory;
-import org.apache.blur.store.lock.BlurLockFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.lucene.analysis.standard.StandardAnalyzer;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field;
-import org.apache.lucene.document.Field.Index;
-import org.apache.lucene.document.Field.Store;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.index.TermEnum;
-import org.apache.lucene.index.TieredMergePolicy;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.TermQuery;
-import org.apache.lucene.search.TopDocs;
-import org.apache.lucene.search.WildcardQuery;
-
-
-import static org.apache.blur.lucene.LuceneConstant.LUCENE_VERSION;
-
-public class BenchmarkDirectory {
-
-  public static void main(String[] args) throws IOException {
-    int blockSize = BlockDirectory.BLOCK_SIZE;
-    long totalMemory = BlockCache._128M * 2;
-    int slabSize = (int) (totalMemory / 2);
-
-    BlockCache blockCache = new BlockCache(new BlurMetrics(new Configuration()), true, totalMemory, slabSize, blockSize);
-    BlurMetrics metrics = new BlurMetrics(new Configuration());
-    BlockDirectoryCache cache = new BlockDirectoryCache(blockCache, metrics);
-
-    Configuration configuration = new Configuration();
-    Path p = new Path("hdfs://localhost:9000/bench");
-    BlurLockFactory factory = new BlurLockFactory(configuration, p, "localhost", 0);
-
-    FileSystem fs = FileSystem.get(p.toUri(), configuration);
-    fs.delete(p, true);
-
-    final HdfsDirectory dir = new HdfsDirectory(p);
-    dir.setLockFactory(factory);
-
-    BlockDirectory directory = new BlockDirectory("test", dir, cache);
-
-    while (true) {
-      long s, e;
-
-      s = System.currentTimeMillis();
-      IndexWriterConfig conf = new IndexWriterConfig(LUCENE_VERSION, new StandardAnalyzer(LUCENE_VERSION));
-      TieredMergePolicy mergePolicy = (TieredMergePolicy) conf.getMergePolicy();
-      mergePolicy.setUseCompoundFile(false);
-      IndexWriter writer = new IndexWriter(directory, conf);
-      for (int i = 0; i < 1000000; i++) {
-        writer.addDocument(getDoc());
-      }
-      writer.close();
-      e = System.currentTimeMillis();
-      System.out.println("Indexing " + (e - s));
-
-      IndexReader reader = IndexReader.open(directory);
-      System.out.println("Docs " + reader.numDocs());
-      TermEnum terms = reader.terms();
-      List<Term> sample = new ArrayList<Term>();
-      int limit = 1000;
-      Random random = new Random();
-      SAMPLE: while (terms.next()) {
-        if (sample.size() < limit) {
-          if (random.nextInt() % 7 == 0) {
-            sample.add(terms.term());
-          }
-        } else {
-          break SAMPLE;
-        }
-      }
-      terms.close();
-
-      System.out.println("Sampling complete [" + sample.size() + "]");
-      IndexSearcher searcher = new IndexSearcher(reader);
-      long total = 0;
-      long time = 0;
-      int search = 10;
-      for (int i = 0; i < search; i++) {
-        s = System.currentTimeMillis();
-        TopDocs topDocs = searcher.search(new TermQuery(sample.get(random.nextInt(sample.size()))), 10);
-        total += topDocs.totalHits;
-        e = System.currentTimeMillis();
-        time += (e - s);
-      }
-      System.out.println("Searching " + time + " " + (time / (double) search) + " " + total);
-      for (int i = 0; i < 10; i++) {
-        s = System.currentTimeMillis();
-        TopDocs topDocs = searcher.search(new WildcardQuery(new Term("name", "fff*0*")), 10);
-        e = System.currentTimeMillis();
-        System.out.println(topDocs.totalHits + " " + (e - s));
-      }
-      reader.close();
-    }
-  }
-
-  private static Document getDoc() {
-    Document document = new Document();
-    document.add(new Field("name", UUID.randomUUID().toString(), Store.YES, Index.ANALYZED_NO_NORMS));
-    return document;
-  }
-
-  public static int getNumberOfSlabs(float heapPercentage, int numberOfBlocksPerSlab, int blockSize) {
-    long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
-    long targetBytes = (long) (max * heapPercentage);
-    int slabSize = numberOfBlocksPerSlab * blockSize;
-    int slabs = (int) (targetBytes / slabSize);
-    if (slabs == 0) {
-      throw new RuntimeException("Minimum heap size is 512m!");
-    }
-    return slabs;
-  }
-}


Mime
View raw message