incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [5/51] [partial] Initial repackage to org.apache.
Date Mon, 03 Sep 2012 03:17:10 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/com/nearinfinity/blur/manager/writer/BlurIndexReader.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/com/nearinfinity/blur/manager/writer/BlurIndexReader.java b/src/blur-core/src/main/java/com/nearinfinity/blur/manager/writer/BlurIndexReader.java
deleted file mode 100644
index 8e1b609..0000000
--- a/src/blur-core/src/main/java/com/nearinfinity/blur/manager/writer/BlurIndexReader.java
+++ /dev/null
@@ -1,72 +0,0 @@
-package com.nearinfinity.blur.manager.writer;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-
-import org.apache.lucene.analysis.KeywordAnalyzer;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.util.Version;
-
-import com.nearinfinity.blur.index.IndexWriter;
-import com.nearinfinity.blur.log.Log;
-import com.nearinfinity.blur.log.LogFactory;
-import com.nearinfinity.blur.thrift.generated.Row;
-
-public class BlurIndexReader extends AbstractBlurIndex {
-
-  private static final Log LOG = LogFactory.getLog(BlurIndexReader.class);
-
-  public void init() throws IOException {
-    initIndexWriterConfig();
-    Directory directory = getDirectory();
-    if (!IndexReader.indexExists(directory)) {
-      IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_35, new KeywordAnalyzer());
-      new IndexWriter(directory, conf).close();
-    }
-    initIndexReader(IndexReader.open(directory));
-  }
-
-  @Override
-  public synchronized void refresh() throws IOException {
-    // Override so that we can call within synchronized method
-    super.refresh();
-  }
-
-  @Override
-  public void close() throws IOException {
-    super.close();
-    LOG.info("Reader for table [{0}] shard [{1}] closed.", getTable(), getShard());
-  }
-
-  @Override
-  public void replaceRow(boolean waitToBeVisible, boolean wal, Row row) throws IOException {
-    throw new RuntimeException("Read-only shard");
-  }
-
-  @Override
-  public void deleteRow(boolean waitToBeVisible, boolean wal, String rowId) throws IOException {
-    throw new RuntimeException("Read-only shard");
-  }
-
-  @Override
-  public void optimize(int numberOfSegmentsPerShard) throws IOException {
-    // Do nothing
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/com/nearinfinity/blur/manager/writer/BlurIndexRefresher.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/com/nearinfinity/blur/manager/writer/BlurIndexRefresher.java b/src/blur-core/src/main/java/com/nearinfinity/blur/manager/writer/BlurIndexRefresher.java
deleted file mode 100644
index 88e7e92..0000000
--- a/src/blur-core/src/main/java/com/nearinfinity/blur/manager/writer/BlurIndexRefresher.java
+++ /dev/null
@@ -1,84 +0,0 @@
-package com.nearinfinity.blur.manager.writer;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import com.nearinfinity.blur.log.Log;
-import com.nearinfinity.blur.log.LogFactory;
-
-public class BlurIndexRefresher extends TimerTask {
-
-  private static final Log LOG = LogFactory.getLog(BlurIndexRefresher.class);
-
-  private Timer _timer;
-  private long _period = TimeUnit.MINUTES.toMillis(1);
-  private long _delay = _period;
-  private Collection<BlurIndex> _indexes = new LinkedBlockingQueue<BlurIndex>();
-
-  public void register(BlurIndex blurIndex) {
-    _indexes.add(blurIndex);
-  }
-
-  public void unregister(BlurIndex blurIndex) {
-    _indexes.remove(blurIndex);
-  }
-
-  public void close() {
-    _timer.purge();
-    _timer.cancel();
-  }
-
-  public void init() {
-    _timer = new Timer("IndexReader-Refresher", true);
-    _timer.schedule(this, _delay, _period);
-    LOG.info("Init Complete");
-  }
-
-  @Override
-  public void run() {
-    try {
-      refreshInternal();
-    } catch (Throwable e) {
-      LOG.error("Unknown error", e);
-    }
-  }
-
-  private void refreshInternal() {
-    for (BlurIndex index : _indexes) {
-      try {
-        index.refresh();
-      } catch (IOException e) {
-        LOG.error("Unknown error while refreshing index of writer [{0}]", e, index);
-      }
-    }
-  }
-
-  public void setPeriod(long period) {
-    _period = period;
-  }
-
-  public void setDelay(long delay) {
-    _delay = delay;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/com/nearinfinity/blur/manager/writer/BlurNRTIndex.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/com/nearinfinity/blur/manager/writer/BlurNRTIndex.java b/src/blur-core/src/main/java/com/nearinfinity/blur/manager/writer/BlurNRTIndex.java
deleted file mode 100644
index 1b0038c..0000000
--- a/src/blur-core/src/main/java/com/nearinfinity/blur/manager/writer/BlurNRTIndex.java
+++ /dev/null
@@ -1,294 +0,0 @@
-package com.nearinfinity.blur.manager.writer;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import static com.nearinfinity.blur.lucene.LuceneConstant.LUCENE_VERSION;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.lucene.index.CorruptIndexException;
-import org.apache.lucene.index.IndexDeletionPolicy;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.index.TieredMergePolicy;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.NRTManager;
-import org.apache.lucene.search.NRTManager.TrackingIndexWriter;
-import org.apache.lucene.search.NRTManagerReopenThread;
-import org.apache.lucene.search.SearcherFactory;
-import org.apache.lucene.search.Similarity;
-import org.apache.lucene.store.Directory;
-
-import com.nearinfinity.blur.analysis.BlurAnalyzer;
-import com.nearinfinity.blur.index.IndexWriter;
-import com.nearinfinity.blur.log.Log;
-import com.nearinfinity.blur.log.LogFactory;
-import com.nearinfinity.blur.thrift.generated.Record;
-import com.nearinfinity.blur.thrift.generated.Row;
-
-public class BlurNRTIndex extends BlurIndex {
-
-  private static final Log LOG = LogFactory.getLog(BlurNRTIndex.class);
-  private static final boolean APPLY_ALL_DELETES = true;
-
-  private NRTManager _nrtManager;
-  private AtomicBoolean _isClosed = new AtomicBoolean();
-  private IndexWriter _writer;
-  private Thread _committer;
-
-  // externally set
-  private BlurAnalyzer _analyzer;
-  private Directory _directory;
-  private String _table;
-  private String _shard;
-  private Similarity _similarity;
-  private NRTManagerReopenThread _refresher;
-  private TransactionRecorder _recorder;
-  private Configuration _configuration;
-  private Path _walPath;
-  private IndexDeletionPolicy _indexDeletionPolicy;
-  private BlurIndexCloser _closer;
-  private AtomicReference<IndexReader> _indexRef = new AtomicReference<IndexReader>();
-  private long _timeBetweenCommits = TimeUnit.SECONDS.toMillis(60);
-  private long _timeBetweenRefreshs = TimeUnit.MILLISECONDS.toMillis(5000);
-  private DirectoryReferenceFileGC _gc;
-  private TrackingIndexWriter _trackingWriter;
-  private SearcherFactory _searcherFactory = new SearcherFactory();
-  private long _lastRefresh;
-  private long _timeBetweenRefreshsNanos;
-
-  // private SearcherWarmer _warmer = new SearcherWarmer() {
-  // @Override
-  // public void warm(IndexSearcher s) throws IOException {
-  // IndexReader indexReader = s.getIndexReader();
-  // IndexReader[] subReaders = indexReader.getSequentialSubReaders();
-  // if (subReaders == null) {
-  // PrimeDocCache.getPrimeDocBitSet(indexReader);
-  // } else {
-  // for (IndexReader reader : subReaders) {
-  // PrimeDocCache.getPrimeDocBitSet(reader);
-  // }
-  // }
-  // }
-  // };
-
-  public void init() throws IOException {
-    Path walTablePath = new Path(_walPath, _table);
-    Path walShardPath = new Path(walTablePath, _shard);
-
-    _timeBetweenRefreshsNanos = TimeUnit.MILLISECONDS.toNanos(_timeBetweenRefreshs);
-
-    IndexWriterConfig conf = new IndexWriterConfig(LUCENE_VERSION, _analyzer);
-    conf.setWriteLockTimeout(TimeUnit.MINUTES.toMillis(5));
-    conf.setSimilarity(_similarity);
-    conf.setIndexDeletionPolicy(_indexDeletionPolicy);
-    TieredMergePolicy mergePolicy = (TieredMergePolicy) conf.getMergePolicy();
-    mergePolicy.setUseCompoundFile(false);
-    DirectoryReferenceCounter referenceCounter = new DirectoryReferenceCounter(_directory, _gc);
-    _writer = new IndexWriter(referenceCounter, conf);
-    _recorder = new TransactionRecorder();
-    _recorder.setAnalyzer(_analyzer);
-    _recorder.setConfiguration(_configuration);
-    _recorder.setWalPath(walShardPath);
-    _recorder.init();
-    _recorder.replay(_writer);
-
-    _trackingWriter = new TrackingIndexWriter(_writer);
-    _nrtManager = new NRTManager(_trackingWriter, _searcherFactory, APPLY_ALL_DELETES);
-    IndexSearcher searcher = _nrtManager.acquire();
-    _indexRef.set(searcher.getIndexReader());
-    _lastRefresh = System.nanoTime();
-    startCommiter();
-    startRefresher();
-  }
-
-  private void startRefresher() {
-    double targetMinStaleSec = _timeBetweenRefreshs / 1000.0;
-    _refresher = new NRTManagerReopenThread(_nrtManager, targetMinStaleSec * 10, targetMinStaleSec);
-    _refresher.setName("Refresh Thread [" + _table + "/" + _shard + "]");
-    _refresher.setDaemon(true);
-    _refresher.start();
-  }
-
-  private void startCommiter() {
-    _committer = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        while (!_isClosed.get()) {
-          try {
-            LOG.info("Committing of [{0}/{1}].", _table, _shard);
-            _recorder.commit(_writer);
-          } catch (CorruptIndexException e) {
-            LOG.error("Curruption Error during commit of [{0}/{1}].", e, _table, _shard);
-          } catch (IOException e) {
-            LOG.error("IO Error during commit of [{0}/{1}].", e, _table, _shard);
-          }
-          try {
-            Thread.sleep(_timeBetweenCommits);
-          } catch (InterruptedException e) {
-            if (_isClosed.get()) {
-              return;
-            }
-            LOG.error("Unknown error with committer thread [{0}/{1}].", e, _table, _shard);
-          }
-        }
-      }
-    });
-    _committer.setDaemon(true);
-    _committer.setName("Commit Thread [" + _table + "/" + _shard + "]");
-    _committer.start();
-  }
-
-  @Override
-  public void replaceRow(boolean waitToBeVisible, boolean wal, Row row) throws IOException {
-    List<Record> records = row.records;
-    if (records == null || records.isEmpty()) {
-      deleteRow(waitToBeVisible, wal, row.id);
-      return;
-    }
-    long generation = _recorder.replaceRow(wal, row, _trackingWriter);
-    waitToBeVisible(waitToBeVisible, generation);
-  }
-
-  @Override
-  public void deleteRow(boolean waitToBeVisible, boolean wal, String rowId) throws IOException {
-    long generation = _recorder.deleteRow(wal, rowId, _trackingWriter);
-    waitToBeVisible(waitToBeVisible, generation);
-  }
-
-  @Override
-  public IndexReader getIndexReader() throws IOException {
-    IndexReader indexReader = _indexRef.get();
-    while (!indexReader.tryIncRef()) {
-      indexReader = _indexRef.get();
-    }
-    LOG.debug("Index fetched with ref of [{0}] [{1}]", indexReader.getRefCount(), indexReader);
-    return indexReader;
-  }
-
-  @Override
-  public void close() throws IOException {
-    // @TODO make sure that locks are cleaned up.
-    _isClosed.set(true);
-    _committer.interrupt();
-    _refresher.close();
-    try {
-      _recorder.close();
-      _writer.close();
-      _closer.close(_indexRef.get());
-      _nrtManager.close();
-    } finally {
-      _directory.close();
-    }
-  }
-
-  @Override
-  public void refresh() throws IOException {
-    _nrtManager.maybeRefresh();
-    swap();
-  }
-
-  @Override
-  public AtomicBoolean isClosed() {
-    return _isClosed;
-  }
-
-  @Override
-  public void optimize(int numberOfSegmentsPerShard) throws IOException {
-    _writer.forceMerge(numberOfSegmentsPerShard);
-  }
-
-  private void waitToBeVisible(boolean waitToBeVisible, long generation) throws IOException {
-    if (waitToBeVisible && _nrtManager.getCurrentSearchingGen() < generation) {
-      // if visibility is required then reopen.
-      _nrtManager.waitForGeneration(generation);
-      swap();
-    } else {
-      long now = System.nanoTime();
-      if (_lastRefresh + _timeBetweenRefreshsNanos < now) {
-        refresh();
-        _lastRefresh = now;
-      }
-    }
-  }
-
-  private void swap() {
-    IndexSearcher searcher = _nrtManager.acquire();
-    IndexReader indexReader = searcher.getIndexReader();
-    IndexReader oldIndexReader = _indexRef.getAndSet(indexReader);
-    _closer.close(oldIndexReader);
-  }
-
-  public void setAnalyzer(BlurAnalyzer analyzer) {
-    _analyzer = analyzer;
-  }
-
-  public void setDirectory(Directory directory) {
-    _directory = directory;
-  }
-
-  public void setTable(String table) {
-    _table = table;
-  }
-
-  public void setShard(String shard) {
-    _shard = shard;
-  }
-
-  public void setSimilarity(Similarity similarity) {
-    _similarity = similarity;
-  }
-
-  public void setTimeBetweenCommits(long timeBetweenCommits) {
-    _timeBetweenCommits = timeBetweenCommits;
-  }
-
-  public void setTimeBetweenRefreshs(long timeBetweenRefreshs) {
-    _timeBetweenRefreshs = timeBetweenRefreshs;
-  }
-
-  public void setWalPath(Path walPath) {
-    _walPath = walPath;
-  }
-
-  public void setConfiguration(Configuration configuration) {
-    _configuration = configuration;
-  }
-
-  public void setIndexDeletionPolicy(IndexDeletionPolicy indexDeletionPolicy) {
-    _indexDeletionPolicy = indexDeletionPolicy;
-  }
-
-  public void setCloser(BlurIndexCloser closer) {
-    _closer = closer;
-  }
-
-  public DirectoryReferenceFileGC getGc() {
-    return _gc;
-  }
-
-  public void setGc(DirectoryReferenceFileGC gc) {
-    _gc = gc;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/com/nearinfinity/blur/manager/writer/DirectoryReferenceCounter.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/com/nearinfinity/blur/manager/writer/DirectoryReferenceCounter.java b/src/blur-core/src/main/java/com/nearinfinity/blur/manager/writer/DirectoryReferenceCounter.java
deleted file mode 100644
index 19ec285..0000000
--- a/src/blur-core/src/main/java/com/nearinfinity/blur/manager/writer/DirectoryReferenceCounter.java
+++ /dev/null
@@ -1,279 +0,0 @@
-package com.nearinfinity.blur.manager.writer;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.lucene.index.IndexFileNames;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.IndexInput;
-import org.apache.lucene.store.IndexOutput;
-import org.apache.lucene.store.Lock;
-import org.apache.lucene.store.LockFactory;
-
-import com.nearinfinity.blur.log.Log;
-import com.nearinfinity.blur.log.LogFactory;
-
-public class DirectoryReferenceCounter extends Directory {
-
-  private final static Log LOG = LogFactory.getLog(DirectoryReferenceCounter.class);
-  private Directory directory;
-  private Map<String, AtomicInteger> refs = new ConcurrentHashMap<String, AtomicInteger>();
-  private DirectoryReferenceFileGC gc;
-
-  public DirectoryReferenceCounter(Directory directory, DirectoryReferenceFileGC gc) {
-    this.directory = directory;
-    this.gc = gc;
-  }
-
-  public void deleteFile(String name) throws IOException {
-    if (name.equals(IndexFileNames.SEGMENTS_GEN)) {
-      deleteFile(name);
-      return;
-    }
-    AtomicInteger counter = refs.get(name);
-    if (counter != null && counter.get() > 0) {
-      addToFileGC(name);
-    } else {
-      LOG.debug("Delete file [{0}]", name);
-      directory.deleteFile(name);
-    }
-  }
-
-  private void addToFileGC(String name) {
-    if (gc != null) {
-      LOG.debug("Add file [{0}] to be GCed once refs are closed.", name);
-      gc.add(directory, name, refs);
-    }
-  }
-
-  public IndexOutput createOutput(String name) throws IOException {
-    if (name.equals(IndexFileNames.SEGMENTS_GEN)) {
-      return directory.createOutput(name);
-    }
-    LOG.debug("Create file [{0}]", name);
-    AtomicInteger counter = refs.get(name);
-    if (counter != null) {
-      LOG.error("Unknown error while trying to create ref counter for [{0}] reference exists.", name);
-      throw new IOException("Reference exists [" + name + "]");
-    }
-    counter = new AtomicInteger(0);
-    refs.put(name, counter);
-    return directory.createOutput(name);
-  }
-
-  public IndexInput openInput(String name) throws IOException {
-    IndexInput input = directory.openInput(name);
-    if (name.equals(IndexFileNames.SEGMENTS_GEN)) {
-      return input;
-    }
-    return wrap(name, input);
-  }
-
-  public IndexInput openInput(String name, int bufferSize) throws IOException {
-    IndexInput input = directory.openInput(name, bufferSize);
-    if (name.equals(IndexFileNames.SEGMENTS_GEN)) {
-      return input;
-    }
-    return wrap(name, input);
-  }
-
-  private IndexInput wrap(String name, IndexInput input) {
-    AtomicInteger counter = refs.get(name);
-    if (counter == null) {
-      counter = new AtomicInteger();
-      refs.put(name, counter);
-    }
-    return new RefIndexInput(input, counter);
-  }
-
-  @SuppressWarnings("deprecation")
-  public static class RefIndexInput extends IndexInput {
-
-    private IndexInput input;
-    private AtomicInteger ref;
-    private boolean closed = false;
-
-    public RefIndexInput(IndexInput input, AtomicInteger ref) {
-      this.input = input;
-      this.ref = ref;
-      ref.incrementAndGet();
-    }
-
-    @Override
-    protected void finalize() throws Throwable {
-      // Seems like not all the clones are being closed...
-      close();
-    }
-
-    public Object clone() {
-      RefIndexInput ref = (RefIndexInput) super.clone();
-      ref.input = (IndexInput) input.clone();
-      ref.ref.incrementAndGet();
-      return ref;
-    }
-
-    public void skipChars(int length) throws IOException {
-      input.skipChars(length);
-    }
-
-    public void setModifiedUTF8StringsMode() {
-      input.setModifiedUTF8StringsMode();
-    }
-
-    public void close() throws IOException {
-      if (!closed) {
-        input.close();
-        ref.decrementAndGet();
-        closed = true;
-      }
-    }
-
-    public short readShort() throws IOException {
-      return input.readShort();
-    }
-
-    public int readInt() throws IOException {
-      return input.readInt();
-    }
-
-    public void seek(long pos) throws IOException {
-      input.seek(pos);
-    }
-
-    public void copyBytes(IndexOutput out, long numBytes) throws IOException {
-      input.copyBytes(out, numBytes);
-    }
-
-    public int readVInt() throws IOException {
-      return input.readVInt();
-    }
-
-    public String toString() {
-      return input.toString();
-    }
-
-    public long readLong() throws IOException {
-      return input.readLong();
-    }
-
-    public long readVLong() throws IOException {
-      return input.readVLong();
-    }
-
-    public String readString() throws IOException {
-      return input.readString();
-    }
-
-    public long getFilePointer() {
-      return input.getFilePointer();
-    }
-
-    public byte readByte() throws IOException {
-      return input.readByte();
-    }
-
-    public void readBytes(byte[] b, int offset, int len) throws IOException {
-      input.readBytes(b, offset, len);
-    }
-
-    public void readBytes(byte[] b, int offset, int len, boolean useBuffer) throws IOException {
-      input.readBytes(b, offset, len, useBuffer);
-    }
-
-    public long length() {
-      return input.length();
-    }
-
-    public void readChars(char[] buffer, int start, int length) throws IOException {
-      input.readChars(buffer, start, length);
-    }
-
-    public Map<String, String> readStringStringMap() throws IOException {
-      return input.readStringStringMap();
-    }
-
-  }
-
-  @SuppressWarnings("deprecation")
-  public void touchFile(String name) throws IOException {
-    directory.touchFile(name);
-  }
-
-  @SuppressWarnings("deprecation")
-  public void sync(String name) throws IOException {
-    directory.sync(name);
-  }
-
-  public void sync(Collection<String> names) throws IOException {
-    directory.sync(names);
-  }
-
-  public void clearLock(String name) throws IOException {
-    directory.clearLock(name);
-  }
-
-  public void close() throws IOException {
-    directory.close();
-  }
-
-  public void setLockFactory(LockFactory lockFactory) throws IOException {
-    directory.setLockFactory(lockFactory);
-  }
-
-  public String getLockID() {
-    return directory.getLockID();
-  }
-
-  public void copy(Directory to, String src, String dest) throws IOException {
-    directory.copy(to, src, dest);
-  }
-
-  public boolean fileExists(String name) throws IOException {
-    return directory.fileExists(name);
-  }
-
-  @SuppressWarnings("deprecation")
-  public long fileModified(String name) throws IOException {
-    return directory.fileModified(name);
-  }
-
-  public long fileLength(String name) throws IOException {
-    return directory.fileLength(name);
-  }
-
-  public LockFactory getLockFactory() {
-    return directory.getLockFactory();
-  }
-
-  public String[] listAll() throws IOException {
-    return directory.listAll();
-  }
-
-  public Lock makeLock(String name) {
-    return directory.makeLock(name);
-  }
-
-  public String toString() {
-    return directory.toString();
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/com/nearinfinity/blur/manager/writer/DirectoryReferenceFileGC.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/com/nearinfinity/blur/manager/writer/DirectoryReferenceFileGC.java b/src/blur-core/src/main/java/com/nearinfinity/blur/manager/writer/DirectoryReferenceFileGC.java
deleted file mode 100644
index eecd816..0000000
--- a/src/blur-core/src/main/java/com/nearinfinity/blur/manager/writer/DirectoryReferenceFileGC.java
+++ /dev/null
@@ -1,98 +0,0 @@
-package com.nearinfinity.blur.manager.writer;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.lucene.store.Directory;
-
-import com.nearinfinity.blur.log.Log;
-import com.nearinfinity.blur.log.LogFactory;
-
-public class DirectoryReferenceFileGC extends TimerTask {
-
-  private static final Log LOG = LogFactory.getLog(DirectoryReferenceFileGC.class);
-
-  private Timer _timer;
-  private long _delay = 5000;
-  private LinkedBlockingQueue<Value> _queue;
-
-  public static class Value {
-    public Value(Directory directory, String name, Map<String, AtomicInteger> refs) {
-      this.directory = directory;
-      this.name = name;
-      this.refs = refs;
-    }
-
-    Directory directory;
-    String name;
-    Map<String, AtomicInteger> refs;
-
-    public boolean tryToDelete() throws IOException {
-      AtomicInteger counter = refs.get(name);
-      if (counter.get() <= 0) {
-        refs.remove(name);
-        LOG.debug("Removing file [{0}]", name);
-        directory.deleteFile(name);
-        return true;
-      } else {
-        LOG.debug("File [{0}] had too many refs [{1}]", name, counter.get());
-      }
-      return false;
-    }
-  }
-
-  public void init() {
-    _timer = new Timer("Blur-File-GC", true);
-    _timer.scheduleAtFixedRate(this, _delay, _delay);
-    _queue = new LinkedBlockingQueue<Value>();
-  }
-
-  public void add(Directory directory, String name, Map<String, AtomicInteger> refs) {
-    try {
-      _queue.put(new Value(directory, name, refs));
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  public void close() {
-    _timer.purge();
-    _timer.cancel();
-  }
-
-  @Override
-  public void run() {
-    Iterator<Value> iterator = _queue.iterator();
-    while (iterator.hasNext()) {
-      Value value = iterator.next();
-      try {
-        if (value.tryToDelete()) {
-          iterator.remove();
-        }
-      } catch (IOException e) {
-        LOG.error("Unknown error", e);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/com/nearinfinity/blur/manager/writer/TransactionRecorder.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/com/nearinfinity/blur/manager/writer/TransactionRecorder.java b/src/blur-core/src/main/java/com/nearinfinity/blur/manager/writer/TransactionRecorder.java
deleted file mode 100644
index aed2e23..0000000
--- a/src/blur-core/src/main/java/com/nearinfinity/blur/manager/writer/TransactionRecorder.java
+++ /dev/null
@@ -1,344 +0,0 @@
-package com.nearinfinity.blur.manager.writer;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.record.Utils;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field;
-import org.apache.lucene.document.Field.Index;
-import org.apache.lucene.document.Field.Store;
-import org.apache.lucene.index.CorruptIndexException;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.search.NRTManager.TrackingIndexWriter;
-
-import com.nearinfinity.blur.analysis.BlurAnalyzer;
-import com.nearinfinity.blur.analysis.FieldConverterUtil;
-import com.nearinfinity.blur.index.IndexWriter;
-import com.nearinfinity.blur.log.Log;
-import com.nearinfinity.blur.log.LogFactory;
-import com.nearinfinity.blur.thrift.generated.Column;
-import com.nearinfinity.blur.thrift.generated.Record;
-import com.nearinfinity.blur.thrift.generated.Row;
-import com.nearinfinity.blur.utils.BlurConstants;
-import com.nearinfinity.blur.utils.RowIndexWriter;
-
-public class TransactionRecorder {
-
-  enum TYPE {
-    DELETE((byte) 0), ROW((byte) 1);
-    private byte b;
-
-    private TYPE(byte b) {
-      this.b = b;
-    }
-
-    public byte value() {
-      return b;
-    }
-
-    public static TYPE lookup(byte b) {
-      switch (b) {
-      case 0:
-        return DELETE;
-      case 1:
-        return ROW;
-      default:
-        throw new RuntimeException("Type not found [" + b + "]");
-      }
-    }
-  }
-
-  private static final Log LOG = LogFactory.getLog(TransactionRecorder.class);
-  private static final Term ROW_ID = new Term(BlurConstants.ROW_ID);
-  private AtomicBoolean running = new AtomicBoolean(true);
-  private Path walPath;
-  private Configuration configuration;
-  private FileSystem fileSystem;
-  private AtomicReference<FSDataOutputStream> outputStream = new AtomicReference<FSDataOutputStream>();
-  private AtomicLong lastSync = new AtomicLong();
-  private long timeBetweenSyncs = TimeUnit.MILLISECONDS.toNanos(10);
-  private BlurAnalyzer analyzer;
-
-  public void init() throws IOException {
-    fileSystem = walPath.getFileSystem(configuration);
-  }
-
-  public void open() throws IOException {
-    if (fileSystem.exists(walPath)) {
-      throw new IOException("WAL path [" + walPath + "] still exists, replay must have not worked.");
-    } else {
-      outputStream.set(fileSystem.create(walPath));
-    }
-    if (outputStream == null) {
-      throw new RuntimeException();
-    }
-    lastSync.set(System.nanoTime());
-  }
-
-  public void replay(IndexWriter writer) throws IOException {
-    if (fileSystem.exists(walPath)) {
-      FSDataInputStream inputStream = fileSystem.open(walPath);
-      replay(writer, inputStream);
-      inputStream.close();
-      commit(writer);
-    } else {
-      open();
-    }
-  }
-
-  private void replay(IndexWriter writer, DataInputStream inputStream) throws CorruptIndexException, IOException {
-    long updateCount = 0;
-    long deleteCount = 0;
-    byte[] buffer;
-    while ((buffer = readBuffer(inputStream)) != null) {
-      DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(buffer));
-      TYPE lookup = TYPE.lookup(dataInputStream.readByte());
-      switch (lookup) {
-      case ROW:
-        Row row = readRow(dataInputStream);
-        writer.updateDocuments(ROW_ID.createTerm(row.id), getDocs(row, analyzer));
-        updateCount++;
-        continue;
-      case DELETE:
-        String deleteRowId = readString(dataInputStream);
-        writer.deleteDocuments(ROW_ID.createTerm(deleteRowId));
-        deleteCount++;
-        continue;
-      default:
-        LOG.error("Unknown type [{0}]", lookup);
-        throw new IOException("Unknown type [" + lookup + "]");
-      }
-    }
-    LOG.info("Rows reclaimed form the WAL [{0}]", updateCount);
-    LOG.info("Deletes reclaimed form the WAL [{0}]", deleteCount);
-  }
-
-  private byte[] readBuffer(DataInputStream inputStream) {
-    try {
-      int length = inputStream.readInt();
-      byte[] buffer = new byte[length];
-      inputStream.readFully(buffer);
-      return buffer;
-    } catch (IOException e) {
-      if (e instanceof EOFException) {
-        return null;
-      }
-      e.printStackTrace();
-    }
-    return null;
-  }
-
-  private void rollLog() throws IOException {
-    LOG.info("Rolling WAL path [" + walPath + "]");
-    FSDataOutputStream os = outputStream.get();
-    if (os != null) {
-      os.close();
-    }
-    fileSystem.delete(walPath, false);
-    open();
-  }
-
-  public void close() throws IOException {
-    synchronized (running) {
-      running.set(false);
-    }
-    outputStream.get().close();
-  }
-
-  private static void writeRow(DataOutputStream outputStream, Row row) throws IOException {
-    writeString(outputStream, row.id);
-    List<Record> records = row.records;
-    int size = records.size();
-    outputStream.writeInt(size);
-    for (int i = 0; i < size; i++) {
-      Record record = records.get(i);
-      writeRecord(outputStream, record);
-    }
-  }
-
-  private static Row readRow(DataInputStream inputStream) throws IOException {
-    Row row = new Row();
-    row.id = readString(inputStream);
-    int size = inputStream.readInt();
-    for (int i = 0; i < size; i++) {
-      row.addToRecords(readRecord(inputStream));
-    }
-    return row;
-  }
-
-  private static void writeRecord(DataOutputStream outputStream, Record record) throws IOException {
-    writeString(outputStream, record.recordId);
-    writeString(outputStream, record.family);
-    List<Column> columns = record.columns;
-    int size = columns.size();
-    outputStream.writeInt(size);
-    for (int i = 0; i < size; i++) {
-      writeColumn(outputStream, columns.get(i));
-    }
-  }
-
-  private static Record readRecord(DataInputStream inputStream) throws IOException {
-    Record record = new Record();
-    record.recordId = readString(inputStream);
-    record.family = readString(inputStream);
-    int size = inputStream.readInt();
-    for (int i = 0; i < size; i++) {
-      record.addToColumns(readColumn(inputStream));
-    }
-    return record;
-  }
-
-  private static void writeColumn(DataOutputStream outputStream, Column column) throws IOException {
-    writeString(outputStream, column.name);
-    writeString(outputStream, column.value);
-  }
-
-  private static Column readColumn(DataInputStream inputStream) throws IOException {
-    Column column = new Column();
-    column.name = readString(inputStream);
-    column.value = readString(inputStream);
-    return column;
-  }
-
-  private static void writeDelete(DataOutputStream outputStream, String deleteRowId) throws IOException {
-    writeString(outputStream, deleteRowId);
-  }
-
-  private static void writeString(DataOutputStream outputStream, String s) throws IOException {
-    byte[] bs = s.getBytes();
-    Utils.writeVInt(outputStream, bs.length);
-    outputStream.write(bs);
-  }
-
-  private static String readString(DataInputStream inputStream) throws IOException {
-    int length = Utils.readVInt(inputStream);
-    byte[] buffer = new byte[length];
-    inputStream.readFully(buffer);
-    return new String(buffer);
-  }
-
-  private void sync(byte[] bs) throws IOException {
-    if (bs == null || outputStream == null) {
-      throw new RuntimeException("bs [" + bs + "] outputStream [" + outputStream + "]");
-    }
-    FSDataOutputStream os = outputStream.get();
-    os.writeInt(bs.length);
-    os.write(bs);
-    long now = System.nanoTime();
-    if (lastSync.get() + timeBetweenSyncs < now) {
-      os.sync();
-      lastSync.set(now);
-    }
-  }
-
-  public long replaceRow(boolean wal, Row row, TrackingIndexWriter writer) throws IOException {
-    if (wal) {
-      synchronized (running) {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        DataOutputStream outputStream = new DataOutputStream(baos);
-        outputStream.writeByte(TYPE.ROW.value());
-        writeRow(outputStream, row);
-        outputStream.close();
-        sync(baos.toByteArray());
-      }
-    }
-    Term term = ROW_ID.createTerm(row.id);
-    List<Document> docs = getDocs(row, analyzer);
-    return writer.updateDocuments(term, docs);
-  }
-
-  public long deleteRow(boolean wal, String rowId, TrackingIndexWriter writer) throws IOException {
-    if (wal) {
-      synchronized (running) {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        DataOutputStream outputStream = new DataOutputStream(baos);
-        outputStream.writeByte(TYPE.DELETE.value());
-        writeDelete(outputStream, rowId);
-        outputStream.close();
-        sync(baos.toByteArray());
-      }
-    }
-    return writer.deleteDocuments(ROW_ID.createTerm(rowId));
-  }
-
-  public void setWalPath(Path walPath) {
-    this.walPath = walPath;
-  }
-
-  public void setConfiguration(Configuration configuration) {
-    this.configuration = configuration;
-  }
-
-  public void commit(IndexWriter writer) throws CorruptIndexException, IOException {
-    synchronized (running) {
-      long s = System.nanoTime();
-      writer.commit();
-      long m = System.nanoTime();
-      LOG.info("Commit took [{0}] for [{1}]", (m - s) / 1000000.0, writer);
-      rollLog();
-      long e = System.nanoTime();
-      LOG.info("Log roller took [{0}] for [{1}]", (e - m) / 1000000.0, writer);
-    }
-  }
-
-  public static List<Document> getDocs(Row row, BlurAnalyzer analyzer) {
-    List<Record> records = row.records;
-    int size = records.size();
-    final String rowId = row.id;
-    final StringBuilder builder = new StringBuilder();
-    List<Document> docs = new ArrayList<Document>(size);
-    for (int i = 0; i < size; i++) {
-      Document document = convert(rowId, records.get(i), builder, analyzer);
-      if (i == 0) {
-        document.add(BlurConstants.PRIME_DOC_FIELD);
-      }
-      docs.add(document);
-    }
-    return docs;
-  }
-
-  public static Document convert(String rowId, Record record, StringBuilder builder, BlurAnalyzer analyzer) {
-    Document document = new Document();
-    document.add(new Field(BlurConstants.ROW_ID, rowId, Store.YES, Index.NOT_ANALYZED_NO_NORMS));
-    document.add(new Field(BlurConstants.RECORD_ID, record.recordId, Store.YES, Index.NOT_ANALYZED_NO_NORMS));
-    RowIndexWriter.addColumns(document, analyzer, builder, record.family, record.columns);
-    FieldConverterUtil.convert(document, analyzer);
-    return document;
-  }
-
-  public void setAnalyzer(BlurAnalyzer analyzer) {
-    this.analyzer = analyzer;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/com/nearinfinity/blur/manager/writer/lucene/SnapshotIndexReader.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/com/nearinfinity/blur/manager/writer/lucene/SnapshotIndexReader.java b/src/blur-core/src/main/java/com/nearinfinity/blur/manager/writer/lucene/SnapshotIndexReader.java
deleted file mode 100644
index 737e2d7..0000000
--- a/src/blur-core/src/main/java/com/nearinfinity/blur/manager/writer/lucene/SnapshotIndexReader.java
+++ /dev/null
@@ -1,170 +0,0 @@
-package com.nearinfinity.blur.manager.writer.lucene;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-
-import org.apache.lucene.index.FilterIndexReader;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.MultiReader;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.index.TermDocs;
-import org.apache.lucene.index.TermPositions;
-
-public class SnapshotIndexReader extends FilterIndexReader {
-
-  private final int maxDocs;
-  private final int numDocs;
-
-  public SnapshotIndexReader(IndexReader in, int maxDocs) {
-    super(in);
-    this.numDocs = in.numDocs();
-    this.maxDocs = maxDocs;
-  }
-
-  public static IndexReader wrap(IndexReader reader, int maxDocs) throws IOException {
-    IndexReader[] readers = reader.getSequentialSubReaders();
-    if (readers == null) {
-      return wrapInternal(reader, maxDocs);
-    } else {
-      IndexReader[] result = new IndexReader[readers.length];
-      for (int i = 0; i < readers.length; i++) {
-        result[i] = wrapInternal(readers[i], maxDocs);
-      }
-      return new MultiReader(result, false);
-    }
-  }
-
-  private static IndexReader wrapInternal(IndexReader reader, int maxDocs) throws IOException {
-    return new SnapshotIndexReader(reader, maxDocs);
-  }
-
-  @Override
-  public IndexReader[] getSequentialSubReaders() {
-    return null;
-  }
-
-  @Override
-  public int numDocs() {
-    return numDocs;
-  }
-
-  @Override
-  public int maxDoc() {
-    return maxDocs;
-  }
-
-  @Override
-  public TermDocs termDocs() throws IOException {
-    return new SnapshotTermDocs(in.termDocs(), maxDocs);
-  }
-
-  public TermDocs termDocs(Term term) throws IOException {
-    ensureOpen();
-    TermDocs termDocs = termDocs();
-    termDocs.seek(term);
-    return termDocs;
-  }
-
-  @Override
-  public TermPositions termPositions() throws IOException {
-    return new SnapshotTermPositions(in.termPositions(), maxDocs);
-  }
-
-  // public TermPositions termPositions(Term term) throws IOException {
-  // ensureOpen();
-  // TermPositions termPositions = termPositions();
-  // termPositions.seek(term);
-  // return termPositions;
-  // }
-
-  public static class SnapshotTermPositions extends FilterTermPositions {
-
-    private final int maxDocs;
-
-    public SnapshotTermPositions(TermPositions termPositions, int maxDocs) {
-      super(termPositions);
-      this.maxDocs = maxDocs;
-    }
-
-    @Override
-    public boolean next() throws IOException {
-      boolean next = super.next();
-      if (next) {
-        if (doc() >= maxDocs) {
-          return false;
-        }
-      }
-      return next;
-    }
-
-    @Override
-    public int read(int[] docs, int[] freqs) throws IOException {
-      int read = super.read(docs, freqs);
-      if (read == 0) {
-        return 0;
-      }
-      if (doc() >= maxDocs) {
-        return checkResults(docs, maxDocs);
-      }
-      return read;
-    }
-  }
-
-  public static class SnapshotTermDocs extends FilterTermDocs {
-
-    private final int maxDocs;
-
-    public SnapshotTermDocs(TermDocs termDocs, int maxDocs) {
-      super(termDocs);
-      this.maxDocs = maxDocs;
-    }
-
-    @Override
-    public boolean next() throws IOException {
-      boolean next = super.next();
-      if (next) {
-        if (doc() >= maxDocs) {
-          return false;
-        }
-      }
-      return next;
-    }
-
-    @Override
-    public int read(int[] docs, int[] freqs) throws IOException {
-      int read = super.read(docs, freqs);
-      if (read == 0) {
-        return 0;
-      }
-      if (doc() >= maxDocs) {
-        return checkResults(docs, maxDocs);
-      }
-      return read;
-    }
-  }
-
-  private static int checkResults(int[] docs, int maxDocs) {
-    int length = docs.length;
-    for (int i = 0; i < length; i++) {
-      if (docs[i] >= maxDocs) {
-        return i;
-      }
-    }
-    return length;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/com/nearinfinity/blur/manager/writer/lucene/SoftDeleteIndexReader.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/com/nearinfinity/blur/manager/writer/lucene/SoftDeleteIndexReader.java b/src/blur-core/src/main/java/com/nearinfinity/blur/manager/writer/lucene/SoftDeleteIndexReader.java
deleted file mode 100644
index 530da19..0000000
--- a/src/blur-core/src/main/java/com/nearinfinity/blur/manager/writer/lucene/SoftDeleteIndexReader.java
+++ /dev/null
@@ -1,213 +0,0 @@
-package com.nearinfinity.blur.manager.writer.lucene;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.util.Collection;
-
-import org.apache.lucene.index.FilterIndexReader;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.MultiReader;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.index.TermDocs;
-import org.apache.lucene.index.TermPositions;
-import org.apache.lucene.util.BitVector;
-
-public class SoftDeleteIndexReader extends FilterIndexReader {
-
-  private final boolean baseHasDeletions;
-  private final BitVector deletes;
-  private final int deleteCount;
-  private final int numDocs;
-
-  public SoftDeleteIndexReader(IndexReader in, BitVector deletes) {
-    super(in);
-    this.baseHasDeletions = in.hasDeletions();
-    if (deletes == null) {
-      throw new RuntimeException("No deletes, use regular indexreader.");
-    }
-    this.deletes = deletes;
-    this.deleteCount = deletes.count();
-    this.numDocs = in.numDocs() - deleteCount;
-  }
-
-  public static IndexReader wrap(IndexReader reader, Collection<Term> deleteTerms) throws IOException {
-    IndexReader[] readers = reader.getSequentialSubReaders();
-    if (readers == null) {
-      return wrapInternal(reader, deleteTerms);
-    } else {
-      IndexReader[] result = new IndexReader[readers.length];
-      for (int i = 0; i < readers.length; i++) {
-        result[i] = wrapInternal(readers[i], deleteTerms);
-      }
-      return new MultiReader(result, false);
-    }
-  }
-
-  private static IndexReader wrapInternal(IndexReader reader, Collection<Term> deleteTerms) throws IOException {
-    BitVector deletes = getDeletes(reader, deleteTerms);
-    if (deletes == null) {
-      return reader;
-    }
-    return new SoftDeleteIndexReader(reader, deletes);
-  }
-
-  private static BitVector getDeletes(IndexReader reader, Collection<Term> deleteTerms) throws IOException {
-    BitVector deletes = null;
-    TermDocs termDocs = reader.termDocs();
-    for (Term t : deleteTerms) {
-      termDocs.seek(t);
-      while (termDocs.next()) {
-        if (deletes == null) {
-          deletes = new BitVector(reader.maxDoc());
-        }
-        int doc = termDocs.doc();
-        deletes.set(doc);
-      }
-    }
-    termDocs.close();
-    return deletes;
-  }
-
-  @Override
-  public IndexReader[] getSequentialSubReaders() {
-    return null;
-  }
-
-  @Override
-  public int numDocs() {
-    return numDocs;
-  }
-
-  @Override
-  public boolean isDeleted(int n) {
-    if (baseHasDeletions && in.isDeleted(n)) {
-      return true;
-    }
-    return deletes.get(n);
-  }
-
-  @Override
-  public boolean hasDeletions() {
-    return baseHasDeletions;
-  }
-
-  @Override
-  public TermDocs termDocs() throws IOException {
-    return new SoftDeleteTermDocs(in.termDocs(), deletes);
-  }
-
-  public TermDocs termDocs(Term term) throws IOException {
-    ensureOpen();
-    TermDocs termDocs = termDocs();
-    termDocs.seek(term);
-    return termDocs;
-  }
-
-  @Override
-  public TermPositions termPositions() throws IOException {
-    return new SoftDeleteTermPositions(in.termPositions(), deletes);
-  }
-
-  // public TermPositions termPositions(Term term) throws IOException {
-  // ensureOpen();
-  // TermPositions termPositions = termPositions();
-  // termPositions.seek(term);
-  // return termPositions;
-  // }
-
-  public static class SoftDeleteTermPositions extends FilterTermPositions {
-
-    private BitVector deletes;
-
-    public SoftDeleteTermPositions(TermPositions termPositions, BitVector deletes) {
-      super(termPositions);
-      this.deletes = deletes;
-    }
-
-    @Override
-    public boolean next() throws IOException {
-      while (super.next()) {
-        if (!deletes.get(doc())) {
-          return true;
-        }
-      }
-      return false;
-    }
-
-    @Override
-    public int read(int[] docs, int[] freqs) throws IOException {
-      int read = super.read(docs, freqs);
-      if (read == 0) {
-        return 0;
-      }
-      int validResults = removeDeletes(docs, freqs, read, deletes);
-      if (validResults == 0) {
-        return read(docs, freqs);
-      }
-      return validResults;
-    }
-  }
-
-  public static class SoftDeleteTermDocs extends FilterTermDocs {
-
-    private BitVector deletes;
-
-    public SoftDeleteTermDocs(TermDocs termDocs, BitVector deletes) {
-      super(termDocs);
-      this.deletes = deletes;
-    }
-
-    @Override
-    public boolean next() throws IOException {
-      while (super.next()) {
-        if (!deletes.get(doc())) {
-          return true;
-        }
-      }
-      return false;
-    }
-
-    @Override
-    public int read(int[] docs, int[] freqs) throws IOException {
-      int read = super.read(docs, freqs);
-      if (read == 0) {
-        return 0;
-      }
-      int validResults = removeDeletes(docs, freqs, read, deletes);
-      if (validResults == 0) {
-        return read(docs, freqs);
-      }
-      return validResults;
-    }
-  }
-
-  private static int removeDeletes(int[] docs, int[] freqs, int validLength, BitVector deletes) {
-    int targetPosition = 0;
-    for (int i = 0; i < validLength; i++) {
-      int doc = docs[i];
-      if (!deletes.get(doc)) {
-        if (targetPosition != i) {
-          docs[targetPosition] = doc;
-          freqs[targetPosition] = freqs[i];
-        }
-        targetPosition++;
-      }
-    }
-    return targetPosition;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/com/nearinfinity/blur/thrift/BlurControllerServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/com/nearinfinity/blur/thrift/BlurControllerServer.java b/src/blur-core/src/main/java/com/nearinfinity/blur/thrift/BlurControllerServer.java
deleted file mode 100644
index e3e344c..0000000
--- a/src/blur-core/src/main/java/com/nearinfinity/blur/thrift/BlurControllerServer.java
+++ /dev/null
@@ -1,809 +0,0 @@
-package com.nearinfinity.blur.thrift;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLongArray;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.thrift.TException;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.Stat;
-
-import com.nearinfinity.blur.concurrent.Executors;
-import com.nearinfinity.blur.log.Log;
-import com.nearinfinity.blur.log.LogFactory;
-import com.nearinfinity.blur.manager.BlurPartitioner;
-import com.nearinfinity.blur.manager.BlurQueryChecker;
-import com.nearinfinity.blur.manager.IndexManager;
-import com.nearinfinity.blur.manager.clusterstatus.ZookeeperPathConstants;
-import com.nearinfinity.blur.manager.indexserver.DistributedLayoutManager;
-import com.nearinfinity.blur.manager.results.BlurResultIterable;
-import com.nearinfinity.blur.manager.results.BlurResultIterableClient;
-import com.nearinfinity.blur.manager.results.MergerBlurResultIterable;
-import com.nearinfinity.blur.manager.stats.MergerTableStats;
-import com.nearinfinity.blur.manager.status.MergerQueryStatus;
-import com.nearinfinity.blur.manager.status.MergerQueryStatusSingle;
-import com.nearinfinity.blur.thrift.commands.BlurCommand;
-import com.nearinfinity.blur.thrift.generated.Blur.Client;
-import com.nearinfinity.blur.thrift.generated.Blur.Iface;
-import com.nearinfinity.blur.thrift.generated.BlurException;
-import com.nearinfinity.blur.thrift.generated.BlurQuery;
-import com.nearinfinity.blur.thrift.generated.BlurQueryStatus;
-import com.nearinfinity.blur.thrift.generated.BlurResults;
-import com.nearinfinity.blur.thrift.generated.FetchResult;
-import com.nearinfinity.blur.thrift.generated.RowMutation;
-import com.nearinfinity.blur.thrift.generated.Schema;
-import com.nearinfinity.blur.thrift.generated.Selector;
-import com.nearinfinity.blur.thrift.generated.TableDescriptor;
-import com.nearinfinity.blur.thrift.generated.TableStats;
-import com.nearinfinity.blur.utils.BlurConstants;
-import com.nearinfinity.blur.utils.BlurExecutorCompletionService;
-import com.nearinfinity.blur.utils.BlurUtil;
-import com.nearinfinity.blur.utils.ForkJoin;
-import com.nearinfinity.blur.utils.ForkJoin.Merger;
-import com.nearinfinity.blur.utils.ForkJoin.ParallelCall;
-import com.nearinfinity.blur.utils.QueryCache;
-import com.nearinfinity.blur.utils.QueryCacheEntry;
-import com.nearinfinity.blur.utils.QueryCacheKey;
-import com.nearinfinity.blur.zookeeper.WatchChildren;
-import com.nearinfinity.blur.zookeeper.WatchChildren.OnChange;
-import com.nearinfinity.blur.zookeeper.WatchNodeExistance;
-
-public class BlurControllerServer extends TableAdmin implements Iface {
-
-  public static abstract class BlurClient {
-    public abstract <T> T execute(String node, BlurCommand<T> command, int maxRetries, long backOffTime, long maxBackOffTime) throws Exception;
-  }
-
-  public static class BlurClientRemote extends BlurClient {
-    @Override
-    public <T> T execute(String node, BlurCommand<T> command, int maxRetries, long backOffTime, long maxBackOffTime) throws Exception {
-      return BlurClientManager.execute(node, command, maxRetries, backOffTime, maxBackOffTime);
-    }
-  }
-
-  private static final String CONTROLLER_THREAD_POOL = "controller-thread-pool";
-  private static final Log LOG = LogFactory.getLog(BlurControllerServer.class);
-
-  private ExecutorService _executor;
-  private AtomicReference<Map<String, Map<String, String>>> _shardServerLayout = new AtomicReference<Map<String, Map<String, String>>>(new HashMap<String, Map<String, String>>());
-  private BlurClient _client;
-  private int _threadCount = 64;
-  private AtomicBoolean _closed = new AtomicBoolean();
-  private Map<String, Integer> _tableShardCountMap = new ConcurrentHashMap<String, Integer>();
-  private BlurPartitioner<BytesWritable, Void> _blurPartitioner = new BlurPartitioner<BytesWritable, Void>();
-  private String _nodeName;
-  private int _remoteFetchCount = 100;
-  private long _maxTimeToLive = TimeUnit.MINUTES.toMillis(1);
-  private int _maxQueryCacheElements = 128;
-  private QueryCache _queryCache;
-  private BlurQueryChecker _queryChecker;
-  private AtomicBoolean _running = new AtomicBoolean();
-
-  private int _maxFetchRetries = 3;
-  private int _maxMutateRetries = 3;
-  private int _maxDefaultRetries = 3;
-  private long _fetchDelay = 500;
-  private long _mutateDelay = 500;
-  private long _defaultDelay = 500;
-  private long _maxFetchDelay = 2000;
-  private long _maxMutateDelay = 2000;
-  private long _maxDefaultDelay = 2000;
-
-  private long _defaultParallelCallTimeout = TimeUnit.MINUTES.toMillis(1);
-  private WatchChildren _watchForClusters;
-  private ConcurrentMap<String, WatchNodeExistance> _watchForTablesPerClusterExistance = new ConcurrentHashMap<String, WatchNodeExistance>();
-  private ConcurrentMap<String, WatchNodeExistance> _watchForOnlineShardsPerClusterExistance = new ConcurrentHashMap<String, WatchNodeExistance>();
-  private ConcurrentMap<String, WatchChildren> _watchForTablesPerCluster = new ConcurrentHashMap<String, WatchChildren>();
-  private ConcurrentMap<String, WatchChildren> _watchForOnlineShardsPerCluster = new ConcurrentHashMap<String, WatchChildren>();
-
-  public void init() throws KeeperException, InterruptedException {
-    setupZookeeper();
-    registerMyself();
-    _queryCache = new QueryCache("controller-cache", _maxQueryCacheElements, _maxTimeToLive);
-    _executor = Executors.newThreadPool(CONTROLLER_THREAD_POOL, _threadCount);
-    _running.set(true);
-    watchForClusterChanges();
-    List<String> clusterList = _clusterStatus.getClusterList(false);
-    for (String cluster : clusterList) {
-      watchForLayoutChanges(cluster);
-    }
-    updateLayout();
-  }
-
-  private void setupZookeeper() throws KeeperException, InterruptedException {
-    BlurUtil.createIfMissing(_zookeeper, "/blur");
-    BlurUtil.createIfMissing(_zookeeper, ZookeeperPathConstants.getOnlineControllersPath());
-    BlurUtil.createIfMissing(_zookeeper, ZookeeperPathConstants.getClustersPath());
-  }
-
-  private void watchForClusterChanges() throws KeeperException, InterruptedException {
-    _watchForClusters = new WatchChildren(_zookeeper, ZookeeperPathConstants.getClustersPath());
-    _watchForClusters.watch(new OnChange() {
-      @Override
-      public void action(List<String> children) {
-        for (String cluster : children) {
-          try {
-            watchForLayoutChanges(cluster);
-          } catch (KeeperException e) {
-            LOG.error("Unknown error", e);
-            throw new RuntimeException(e);
-          } catch (InterruptedException e) {
-            LOG.error("Unknown error", e);
-            throw new RuntimeException(e);
-          }
-        }
-      }
-    });
-  }
-
-  private void watchForLayoutChanges(final String cluster) throws KeeperException, InterruptedException {
-    WatchNodeExistance we1 = new WatchNodeExistance(_zookeeper, ZookeeperPathConstants.getTablesPath(cluster));
-    we1.watch(new WatchNodeExistance.OnChange() {
-      @Override
-      public void action(Stat stat) {
-        if (stat != null) {
-          watch(cluster, ZookeeperPathConstants.getTablesPath(cluster), _watchForTablesPerCluster);
-        }
-      }
-    });
-    if (_watchForTablesPerClusterExistance.putIfAbsent(cluster, we1) != null) {
-      we1.close();
-    }
-
-    WatchNodeExistance we2 = new WatchNodeExistance(_zookeeper, ZookeeperPathConstants.getTablesPath(cluster));
-    we2.watch(new WatchNodeExistance.OnChange() {
-      @Override
-      public void action(Stat stat) {
-        if (stat != null) {
-          watch(cluster, ZookeeperPathConstants.getOnlineShardsPath(cluster), _watchForOnlineShardsPerCluster);
-        }
-      }
-    });
-    if (_watchForOnlineShardsPerClusterExistance.putIfAbsent(cluster, we2) != null) {
-      we2.close();
-    }
-  }
-
-  private void watch(String cluster, String path, ConcurrentMap<String, WatchChildren> map) {
-    WatchChildren watchForTables = new WatchChildren(_zookeeper, path);
-    watchForTables.watch(new OnChange() {
-      @Override
-      public void action(List<String> children) {
-        LOG.info("Layout change.");
-        updateLayout();
-      }
-    });
-
-    if (map.putIfAbsent(cluster, watchForTables) != null) {
-      watchForTables.close();
-    }
-  }
-
-  private synchronized void updateLayout() {
-    if (!_clusterStatus.isOpen()) {
-      LOG.warn("The cluster status object has been closed.");
-      return;
-    }
-    List<String> tableList = _clusterStatus.getTableList(false);
-    HashMap<String, Map<String, String>> newLayout = new HashMap<String, Map<String, String>>();
-    for (String table : tableList) {
-      DistributedLayoutManager layoutManager = new DistributedLayoutManager();
-      String cluster = _clusterStatus.getCluster(false, table);
-      if (cluster == null) {
-        continue;
-      }
-      List<String> shardServerList = _clusterStatus.getShardServerList(cluster);
-      List<String> offlineShardServers = _clusterStatus.getOfflineShardServers(false, cluster);
-      List<String> shardList = getShardList(cluster, table);
-      layoutManager.setNodes(shardServerList);
-      layoutManager.setNodesOffline(offlineShardServers);
-      layoutManager.setShards(shardList);
-      layoutManager.init();
-      Map<String, String> layout = layoutManager.getLayout();
-      newLayout.put(table, layout);
-    }
-    _shardServerLayout.set(newLayout);
-  }
-
-  private List<String> getShardList(String cluster, String table) {
-    List<String> shards = new ArrayList<String>();
-    TableDescriptor tableDescriptor = _clusterStatus.getTableDescriptor(true, cluster, table);
-    for (int i = 0; i < tableDescriptor.shardCount; i++) {
-      shards.add(BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, i));
-    }
-    return shards;
-  }
-
-  private void registerMyself() {
-    try {
-      String onlineControllerPath = ZookeeperPathConstants.getOnlineControllersPath() + "/" + _nodeName;
-      while (_zookeeper.exists(onlineControllerPath, false) != null) {
-        LOG.info("Node [{0}] already registered, waiting for path [{1}] to be released", _nodeName, onlineControllerPath);
-        Thread.sleep(3000);
-      }
-      String version = BlurUtil.getVersion();
-      _zookeeper.create(onlineControllerPath, version.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
-    } catch (KeeperException e) {
-      throw new RuntimeException(e);
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  public synchronized void close() {
-    if (!_closed.get()) {
-      _closed.set(true);
-      _running.set(false);
-      _executor.shutdownNow();
-      close(_watchForClusters);
-      close(_watchForOnlineShardsPerCluster.values());
-      close(_watchForOnlineShardsPerClusterExistance.values());
-      close(_watchForTablesPerCluster.values());
-      close(_watchForTablesPerClusterExistance.values());
-    }
-  }
-
-  private void close(Collection<? extends Closeable> closableLst) {
-    for (Closeable closeable : closableLst) {
-      close(closeable);
-    }
-  }
-
-  private void close(Closeable closeable) {
-    try {
-      closeable.close();
-    } catch (IOException e) {
-      LOG.error("Unknown", e);
-    }
-  }
-
-  @Override
-  public BlurResults query(final String table, final BlurQuery blurQuery) throws BlurException, TException {
-    // @TODO make this faster
-    checkTable(table);
-    String cluster = _clusterStatus.getCluster(true, table);
-    _queryChecker.checkQuery(blurQuery);
-    int shardCount = _clusterStatus.getShardCount(true, cluster, table);
-
-    OUTER: for (int retries = 0; retries < _maxDefaultRetries; retries++) {
-      try {
-        final AtomicLongArray facetCounts = BlurUtil.getAtomicLongArraySameLengthAsList(blurQuery.facets);
-
-        BlurQuery original = new BlurQuery(blurQuery);
-        if (blurQuery.useCacheIfPresent) {
-          LOG.debug("Using cache for query [{0}] on table [{1}].", blurQuery, table);
-          QueryCacheKey key = QueryCache.getNormalizedBlurQueryKey(table, blurQuery);
-          QueryCacheEntry queryCacheEntry = _queryCache.get(key);
-          if (_queryCache.isValid(queryCacheEntry)) {
-            LOG.debug("Cache hit for query [{0}] on table [{1}].", blurQuery, table);
-            return queryCacheEntry.getBlurResults(blurQuery);
-          } else {
-            _queryCache.remove(key);
-          }
-        }
-
-        BlurUtil.setStartTime(original);
-
-        Selector selector = blurQuery.getSelector();
-        blurQuery.setSelector(null);
-
-        BlurResultIterable hitsIterable = scatterGather(getCluster(table), new BlurCommand<BlurResultIterable>() {
-          @Override
-          public BlurResultIterable call(Client client) throws BlurException, TException {
-            return new BlurResultIterableClient(client, table, blurQuery, facetCounts, _remoteFetchCount);
-          }
-        }, new MergerBlurResultIterable(blurQuery));
-        BlurResults results = BlurUtil.convertToHits(hitsIterable, blurQuery, facetCounts, _executor, selector, this, table);
-        if (!validResults(results, shardCount, blurQuery)) {
-          BlurClientManager.sleep(_defaultDelay, _maxDefaultDelay, retries, _maxDefaultRetries);
-          continue OUTER;
-        }
-        return _queryCache.cache(table, original, results);
-      } catch (Exception e) {
-        LOG.error("Unknown error during search of [table={0},blurQuery={1}]", e, table, blurQuery);
-        throw new BException("Unknown error during search of [table={0},blurQuery={1}]", e, table, blurQuery);
-      }
-    }
-    throw new BlurException("Query could not be completed.", null);
-  }
-
-  private boolean validResults(BlurResults results, int shardCount, BlurQuery query) {
-    if (results.totalResults >= query.minimumNumberOfResults) {
-      return true;
-    }
-    int shardInfoSize = results.getShardInfoSize();
-    if (shardInfoSize == shardCount) {
-      return true;
-    }
-    return false;
-  }
-
-  @Override
-  public FetchResult fetchRow(final String table, final Selector selector) throws BlurException, TException {
-    checkTable(table);
-    IndexManager.validSelector(selector);
-    String clientHostnamePort = null;
-    try {
-      clientHostnamePort = getNode(table, selector);
-      return _client.execute(clientHostnamePort, new BlurCommand<FetchResult>() {
-        @Override
-        public FetchResult call(Client client) throws BlurException, TException {
-          return client.fetchRow(table, selector);
-        }
-      }, _maxFetchRetries, _fetchDelay, _maxFetchDelay);
-    } catch (Exception e) {
-      LOG.error("Unknown error during fetch of row from table [{0}] selector [{1}] node [{2}]", e, table, selector, clientHostnamePort);
-      throw new BException("Unknown error during fetch of row from table [{0}] selector [{1}] node [{2}]", e, table, selector, clientHostnamePort);
-    }
-  }
-
-  @Override
-  public void cancelQuery(final String table, final long uuid) throws BlurException, TException {
-    checkTable(table);
-    try {
-      scatter(getCluster(table), new BlurCommand<Void>() {
-        @Override
-        public Void call(Client client) throws BlurException, TException {
-          client.cancelQuery(table, uuid);
-          return null;
-        }
-      });
-    } catch (Exception e) {
-      LOG.error("Unknown error while trying to cancel search table [{0}] uuid [{1}]", e, table, uuid);
-      throw new BException("Unknown error while trying to cancel search table [{0}] uuid [{1}]", e, table, uuid);
-    }
-  }
-
-  @Override
-  public List<BlurQueryStatus> currentQueries(final String table) throws BlurException, TException {
-    checkTable(table);
-    try {
-      return scatterGather(getCluster(table), new BlurCommand<List<BlurQueryStatus>>() {
-        @Override
-        public List<BlurQueryStatus> call(Client client) throws BlurException, TException {
-          return client.currentQueries(table);
-        }
-      }, new MergerQueryStatus(_defaultParallelCallTimeout));
-    } catch (Exception e) {
-      LOG.error("Unknown error while trying to get current searches [{0}]", e, table);
-      throw new BException("Unknown error while trying to get current searches [{0}]", e, table);
-    }
-  }
-
-  @Override
-  public List<Long> queryStatusIdList(final String table) throws BlurException, TException {
-    checkTable(table);
-    try {
-      return scatterGather(getCluster(table), new BlurCommand<List<Long>>() {
-        @Override
-        public List<Long> call(Client client) throws BlurException, TException {
-          return client.queryStatusIdList(table);
-        }
-      }, new Merger<List<Long>>() {
-        @Override
-        public List<Long> merge(BlurExecutorCompletionService<List<Long>> service) throws BlurException {
-          Set<Long> result = new HashSet<Long>();
-          while (service.getRemainingCount() > 0) {
-            Future<List<Long>> future = service.poll(_defaultParallelCallTimeout, TimeUnit.MILLISECONDS, true);
-            List<Long> ids = service.getResultThrowException(future);
-            result.addAll(ids);
-          }
-          return new ArrayList<Long>(result);
-        }
-      });
-    } catch (Exception e) {
-      LOG.error("Unknown error while trying to get query status ids for table [{0}]", e, table);
-      throw new BException("Unknown error while trying to get query status ids for table [{0}]", e, table);
-    }
-  }
-
-  @Override
-  public BlurQueryStatus queryStatusById(final String table, final long uuid) throws BlurException, TException {
-    checkTable(table);
-    try {
-      return scatterGather(getCluster(table), new BlurCommand<BlurQueryStatus>() {
-        @Override
-        public BlurQueryStatus call(Client client) throws BlurException, TException {
-          return client.queryStatusById(table, uuid);
-        }
-      }, new MergerQueryStatusSingle(_defaultParallelCallTimeout));
-    } catch (Exception e) {
-      LOG.error("Unknown error while trying to get query status [{0}]", e, table, uuid);
-      throw new BException("Unknown error while trying to get query status [{0}]", e, table, uuid);
-    }
-  }
-
-  @Override
-  public TableStats tableStats(final String table) throws BlurException, TException {
-    checkTable(table);
-    try {
-      return scatterGather(getCluster(table), new BlurCommand<TableStats>() {
-        @Override
-        public TableStats call(Client client) throws BlurException, TException {
-          return client.getTableStats(table);
-        }
-      }, new MergerTableStats(_defaultParallelCallTimeout));
-    } catch (Exception e) {
-      LOG.error("Unknown error while trying to get table stats [{0}]", e, table);
-      throw new BException("Unknown error while trying to get table stats [{0}]", e, table);
-    }
-  }
-
-  @Override
-  public Map<String, String> shardServerLayout(String table) throws BlurException, TException {
-    checkTable(table);
-    Map<String, Map<String, String>> layout = _shardServerLayout.get();
-    Map<String, String> tableLayout = layout.get(table);
-    if (tableLayout == null) {
-      return new HashMap<String, String>();
-    }
-    return tableLayout;
-  }
-
-  @Override
-  public long recordFrequency(final String table, final String columnFamily, final String columnName, final String value) throws BlurException, TException {
-    checkTable(table);
-    try {
-      return scatterGather(getCluster(table), new BlurCommand<Long>() {
-        @Override
-        public Long call(Client client) throws BlurException, TException {
-          return client.recordFrequency(table, columnFamily, columnName, value);
-        }
-      }, new Merger<Long>() {
-
-        @Override
-        public Long merge(BlurExecutorCompletionService<Long> service) throws BlurException {
-          Long total = 0L;
-          while (service.getRemainingCount() > 0) {
-            Future<Long> future = service.poll(_defaultParallelCallTimeout, TimeUnit.MILLISECONDS, true, table, columnFamily, columnName, value);
-            total += service.getResultThrowException(future, table, columnFamily, columnName, value);
-          }
-          return total;
-        }
-      });
-    } catch (Exception e) {
-      LOG.error("Unknown error while trying to get record frequency [{0}/{1}/{2}/{3}]", e, table, columnFamily, columnName, value);
-      throw new BException("Unknown error while trying to get record frequency [{0}/{1}/{2}/{3}]", e, table, columnFamily, columnName, value);
-    }
-  }
-
-  @Override
-  public Schema schema(final String table) throws BlurException, TException {
-    checkTable(table);
-    try {
-      return scatterGather(getCluster(table), new BlurCommand<Schema>() {
-        @Override
-        public Schema call(Client client) throws BlurException, TException {
-          return client.schema(table);
-        }
-      }, new Merger<Schema>() {
-        @Override
-        public Schema merge(BlurExecutorCompletionService<Schema> service) throws BlurException {
-          Schema result = null;
-          while (service.getRemainingCount() > 0) {
-            Future<Schema> future = service.poll(_defaultParallelCallTimeout, TimeUnit.MILLISECONDS, true, table);
-            Schema schema = service.getResultThrowException(future, table);
-            if (result == null) {
-              result = schema;
-            } else {
-              result = BlurControllerServer.merge(result, schema);
-            }
-          }
-          return result;
-        }
-      });
-    } catch (Exception e) {
-      LOG.error("Unknown error while trying to schema table [{0}]", e, table);
-      throw new BException("Unknown error while trying to schema table [{0}]", e, table);
-    }
-  }
-
-  @Override
-  public List<String> terms(final String table, final String columnFamily, final String columnName, final String startWith, final short size) throws BlurException, TException {
-    checkTable(table);
-    try {
-      return scatterGather(getCluster(table), new BlurCommand<List<String>>() {
-        @Override
-        public List<String> call(Client client) throws BlurException, TException {
-          return client.terms(table, columnFamily, columnName, startWith, size);
-        }
-      }, new Merger<List<String>>() {
-        @Override
-        public List<String> merge(BlurExecutorCompletionService<List<String>> service) throws BlurException {
-          TreeSet<String> terms = new TreeSet<String>();
-          while (service.getRemainingCount() > 0) {
-            Future<List<String>> future = service.poll(_defaultParallelCallTimeout, TimeUnit.MILLISECONDS, true, table, columnFamily, columnName, startWith, size);
-            terms.addAll(service.getResultThrowException(future, table, columnFamily, columnName, startWith, size));
-          }
-          return new ArrayList<String>(terms).subList(0, Math.min(terms.size(), size));
-        }
-      });
-    } catch (Exception e) {
-      LOG.error("Unknown error while trying to terms table [{0}] columnFamily [{1}] columnName [{2}] startWith [{3}] size [{4}]", e, table, columnFamily, columnName, startWith,
-          size);
-      throw new BException("Unknown error while trying to terms table [{0}] columnFamily [{1}] columnName [{2}] startWith [{3}] size [{4}]", e, table, columnFamily, columnName,
-          startWith, size);
-    }
-  }
-
-  private String getNode(String table, Selector selector) throws BlurException, TException {
-    Map<String, String> layout = shardServerLayout(table);
-    String locationId = selector.locationId;
-    if (locationId != null) {
-      String shard = locationId.substring(0, locationId.indexOf('/'));
-      return layout.get(shard);
-    }
-    int numberOfShards = getShardCount(table);
-    if (selector.rowId != null) {
-      String shardName = MutationHelper.getShardName(table, selector.rowId, numberOfShards, _blurPartitioner);
-      return layout.get(shardName);
-    }
-    throw new BlurException("Selector is missing both a locationid and a rowid, one is needed.", null);
-  }
-
-  private <R> R scatterGather(String cluster, final BlurCommand<R> command, Merger<R> merger) throws Exception {
-    return ForkJoin.execute(_executor, _clusterStatus.getOnlineShardServers(true, cluster), new ParallelCall<String, R>() {
-      @SuppressWarnings("unchecked")
-      @Override
-      public R call(String hostnamePort) throws Exception {
-        return _client.execute(hostnamePort, (BlurCommand<R>) command.clone(), _maxDefaultRetries, _defaultDelay, _maxDefaultDelay);
-      }
-    }).merge(merger);
-  }
-
-  private <R> void scatter(String cluster, BlurCommand<R> command) throws Exception {
-    scatterGather(cluster, command, new Merger<R>() {
-      @Override
-      public R merge(BlurExecutorCompletionService<R> service) throws BlurException {
-        while (service.getRemainingCount() > 0) {
-          Future<R> future = service.poll(_defaultParallelCallTimeout, TimeUnit.MILLISECONDS, true);
-          service.getResultThrowException(future);
-        }
-        return null;
-      }
-    });
-  }
-
-  private String getCluster(String table) throws BlurException, TException {
-    TableDescriptor describe = describe(table);
-    if (describe == null) {
-      throw new BlurException("Table [" + table + "] not found.", null);
-    }
-    return describe.cluster;
-  }
-
-  public static Schema merge(Schema result, Schema schema) {
-    Map<String, Set<String>> destColumnFamilies = result.columnFamilies;
-    Map<String, Set<String>> srcColumnFamilies = schema.columnFamilies;
-    for (String srcColumnFamily : srcColumnFamilies.keySet()) {
-      Set<String> destColumnNames = destColumnFamilies.get(srcColumnFamily);
-      Set<String> srcColumnNames = srcColumnFamilies.get(srcColumnFamily);
-      if (destColumnNames == null) {
-        destColumnFamilies.put(srcColumnFamily, srcColumnNames);
-      } else {
-        destColumnNames.addAll(srcColumnNames);
-      }
-    }
-    return result;
-  }
-
-  @Override
-  public void mutate(final RowMutation mutation) throws BlurException, TException {
-    checkTable(mutation.table);
-    checkForUpdates(mutation.table);
-    try {
-      MutationHelper.validateMutation(mutation);
-      String table = mutation.getTable();
-
-      int numberOfShards = getShardCount(table);
-      Map<String, String> tableLayout = _shardServerLayout.get().get(table);
-      if (tableLayout.size() != numberOfShards) {
-        throw new BlurException("Cannot update data while shard is missing", null);
-      }
-
-      String shardName = MutationHelper.getShardName(table, mutation.rowId, numberOfShards, _blurPartitioner);
-      String node = tableLayout.get(shardName);
-      _client.execute(node, new BlurCommand<Void>() {
-        @Override
-        public Void call(Client client) throws BlurException, TException {
-          client.mutate(mutation);
-          return null;
-        }
-      }, _maxMutateRetries, _mutateDelay, _maxMutateDelay);
-    } catch (Exception e) {
-      LOG.error("Unknown error during mutation of [{0}]", e, mutation);
-      throw new BException("Unknown error during mutation of [{0}]", e, mutation);
-    }
-  }
-
-  private int getShardCount(String table) throws BlurException, TException {
-    Integer numberOfShards = _tableShardCountMap.get(table);
-    if (numberOfShards == null) {
-      TableDescriptor descriptor = describe(table);
-      numberOfShards = descriptor.shardCount;
-      _tableShardCountMap.put(table, numberOfShards);
-    }
-    return numberOfShards;
-  }
-
-  @Override
-  public void mutateBatch(List<RowMutation> mutations) throws BlurException, TException {
-    for (RowMutation mutation : mutations) {
-      MutationHelper.validateMutation(mutation);
-    }
-    Map<String, List<RowMutation>> batches = new HashMap<String, List<RowMutation>>();
-    for (RowMutation mutation : mutations) {
-      checkTable(mutation.table);
-      checkForUpdates(mutation.table);
-
-      MutationHelper.validateMutation(mutation);
-      String table = mutation.getTable();
-
-      int numberOfShards = getShardCount(table);
-      Map<String, String> tableLayout = _shardServerLayout.get().get(table);
-      if (tableLayout.size() != numberOfShards) {
-        throw new BlurException("Cannot update data while shard is missing", null);
-      }
-
-      String shardName = MutationHelper.getShardName(table, mutation.rowId, numberOfShards, _blurPartitioner);
-      String node = tableLayout.get(shardName);
-      List<RowMutation> list = batches.get(node);
-      if (list == null) {
-        list = new ArrayList<RowMutation>();
-        batches.put(node, list);
-      }
-      list.add(mutation);
-
-    }
-
-    for (Entry<String, List<RowMutation>> entry : batches.entrySet()) {
-      String node = entry.getKey();
-      final List<RowMutation> mutationsLst = entry.getValue();
-      try {
-        _client.execute(node, new BlurCommand<Void>() {
-          @Override
-          public Void call(Client client) throws BlurException, TException {
-            client.mutateBatch(mutationsLst);
-            return null;
-          }
-        }, _maxMutateRetries, _mutateDelay, _maxMutateDelay);
-      } catch (Exception e) {
-        LOG.error("Unknown error during mutations of [{0}]", e, mutationsLst);
-        throw new BException("Unknown error during mutations of [{0}]", e, mutationsLst);
-      }
-    }
-  }
-
-  public void setNodeName(String nodeName) {
-    _nodeName = nodeName;
-  }
-
-  public int getRemoteFetchCount() {
-    return _remoteFetchCount;
-  }
-
-  public void setRemoteFetchCount(int remoteFetchCount) {
-    _remoteFetchCount = remoteFetchCount;
-  }
-
-  public long getMaxTimeToLive() {
-    return _maxTimeToLive;
-  }
-
-  public void setMaxTimeToLive(long maxTimeToLive) {
-    _maxTimeToLive = maxTimeToLive;
-  }
-
-  public int getMaxQueryCacheElements() {
-    return _maxQueryCacheElements;
-  }
-
-  public void setMaxQueryCacheElements(int maxQueryCacheElements) {
-    _maxQueryCacheElements = maxQueryCacheElements;
-  }
-
-  public void setQueryChecker(BlurQueryChecker queryChecker) {
-    _queryChecker = queryChecker;
-  }
-
-  public void setThreadCount(int threadCount) {
-    _threadCount = threadCount;
-  }
-
-  public void setMaxFetchRetries(int maxFetchRetries) {
-    _maxFetchRetries = maxFetchRetries;
-  }
-
-  public void setMaxMutateRetries(int maxMutateRetries) {
-    _maxMutateRetries = maxMutateRetries;
-  }
-
-  public void setMaxDefaultRetries(int maxDefaultRetries) {
-    _maxDefaultRetries = maxDefaultRetries;
-  }
-
-  public void setFetchDelay(long fetchDelay) {
-    _fetchDelay = fetchDelay;
-  }
-
-  public void setMutateDelay(long mutateDelay) {
-    _mutateDelay = mutateDelay;
-  }
-
-  public void setDefaultDelay(long defaultDelay) {
-    _defaultDelay = defaultDelay;
-  }
-
-  public void setMaxFetchDelay(long maxFetchDelay) {
-    _maxFetchDelay = maxFetchDelay;
-  }
-
-  public void setMaxMutateDelay(long maxMutateDelay) {
-    _maxMutateDelay = maxMutateDelay;
-  }
-
-  public void setMaxDefaultDelay(long maxDefaultDelay) {
-    _maxDefaultDelay = maxDefaultDelay;
-  }
-
-  public BlurClient getClient() {
-    return _client;
-  }
-
-  public void setClient(BlurClient client) {
-    _client = client;
-  }
-
-  @Override
-  public void optimize(final String table, final int numberOfSegmentsPerShard) throws BlurException, TException {
-    checkTable(table);
-    try {
-      scatter(getCluster(table), new BlurCommand<Void>() {
-        @Override
-        public Void call(Client client) throws BlurException, TException {
-          client.optimize(table, numberOfSegmentsPerShard);
-          return null;
-        }
-      });
-    } catch (Exception e) {
-      LOG.error("Unknown error while trying to optimize [table={0},numberOfSegmentsPerShard={1}]", e, table, numberOfSegmentsPerShard);
-      throw new BException("Unknown error while trying to optimize [table={0},numberOfSegmentsPerShard={1}]", e, table, numberOfSegmentsPerShard);
-    }
-  }
-
-}
\ No newline at end of file


Mime
View raw message