incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [47/92] [abbrv] [partial] Fixed BLUR-126.
Date Tue, 11 Jun 2013 02:41:33 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableMultiple.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableMultiple.java b/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableMultiple.java
new file mode 100644
index 0000000..1a911c3
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableMultiple.java
@@ -0,0 +1,119 @@
+package org.apache.blur.manager.results;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.thrift.generated.BlurResult;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.hadoop.io.IOUtils;
+
+
+public class BlurResultIterableMultiple implements BlurResultIterable {
+  
+  private static final Log LOG = LogFactory.getLog(BlurResultIterableMultiple.class);
+
+  private long totalResults;
+  private Map<String, Long> shardInfo = new TreeMap<String, Long>();
+  private long skipTo;
+  private List<BlurResultIterable> results = new ArrayList<BlurResultIterable>();
+
+  public void addBlurResultIterable(BlurResultIterable iterable) {
+    totalResults += iterable.getTotalResults();
+    shardInfo.putAll(iterable.getShardInfo());
+    results.add(iterable);
+  }
+
+  @Override
+  public Map<String, Long> getShardInfo() {
+    return shardInfo;
+  }
+
+  @Override
+  public long getTotalResults() {
+    return totalResults;
+  }
+
+  @Override
+  public void skipTo(long skipTo) {
+    this.skipTo = skipTo;
+  }
+
+  @Override
+  public Iterator<BlurResult> iterator() {
+    MultipleHitsIterator iterator = new MultipleHitsIterator(results);
+    long start = 0;
+    while (iterator.hasNext() && start < skipTo) {
+      iterator.next();
+      start++;
+    }
+    return iterator;
+  }
+
+  public static class MultipleHitsIterator implements Iterator<BlurResult> {
+
+    private List<PeekableIterator<BlurResult>> iterators = new ArrayList<PeekableIterator<BlurResult>>();
+    private int length;
+
+    public MultipleHitsIterator(List<BlurResultIterable> hits) {
+      for (BlurResultIterable hitsIterable : hits) {
+        iterators.add(new PeekableIterator<BlurResult>(hitsIterable.iterator()));
+      }
+      length = iterators.size();
+    }
+
+    @Override
+    public boolean hasNext() {
+      for (int i = 0; i < length; i++) {
+        if (iterators.get(i).hasNext()) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    @Override
+    public BlurResult next() {
+      Collections.sort(iterators, BlurConstants.HITS_PEEKABLE_ITERATOR_COMPARATOR);
+      return fetchResult(iterators.get(0).next());
+    }
+
+    public BlurResult fetchResult(BlurResult next) {
+      return next;
+    }
+
+    @Override
+    public void remove() {
+
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    for (BlurResultIterable it : results) {
+      IOUtils.cleanup(LOG, it);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableSearcher.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableSearcher.java b/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableSearcher.java
new file mode 100644
index 0000000..ee48ccd
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableSearcher.java
@@ -0,0 +1,128 @@
+package org.apache.blur.manager.results;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.blur.lucene.search.IterablePaging;
+import org.apache.blur.lucene.search.IterablePaging.ProgressRef;
+import org.apache.blur.lucene.search.IterablePaging.TotalHitsRef;
+import org.apache.blur.manager.IndexManager;
+import org.apache.blur.server.IndexSearcherClosable;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.BlurResult;
+import org.apache.blur.thrift.generated.FetchResult;
+import org.apache.blur.thrift.generated.Selector;
+import org.apache.blur.utils.Converter;
+import org.apache.blur.utils.IteratorConverter;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.ScoreDoc;
+
+
+public class BlurResultIterableSearcher implements BlurResultIterable {
+
+  private Map<String, Long> _shardInfo = new TreeMap<String, Long>();
+  private String _shard;
+  private long _skipTo;
+  private String _table;
+  private int _fetchCount = 1000;
+
+  private IteratorConverter<ScoreDoc, BlurResult> _iterator;
+  private final Selector _selector;
+  private final Query _query;
+  private IndexSearcherClosable _searcher;
+  private final TotalHitsRef _totalHitsRef = new TotalHitsRef();
+  private final ProgressRef _progressRef = new ProgressRef();
+  private final AtomicBoolean _running;
+  private final boolean _closeSearcher;
+
+  public BlurResultIterableSearcher(AtomicBoolean running, Query query, String table, String shard, IndexSearcherClosable searcher, Selector selector, boolean closeSearcher)
+      throws IOException {
+    _running = running;
+    _table = table;
+    _query = query;
+    _shard = shard;
+    _searcher = searcher;
+    _selector = selector;
+    _closeSearcher = closeSearcher;
+    performSearch();
+  }
+
+  private void performSearch() throws IOException {
+    IterablePaging iterablePaging = new IterablePaging(_running, _searcher, _query, _fetchCount, _totalHitsRef, _progressRef);
+    _iterator = new IteratorConverter<ScoreDoc, BlurResult>(iterablePaging.iterator(), new Converter<ScoreDoc, BlurResult>() {
+      @Override
+      public BlurResult convert(ScoreDoc scoreDoc) throws Exception {
+        String resolveId = resolveId(scoreDoc.doc);
+        return new BlurResult(resolveId, scoreDoc.score, getFetchResult(resolveId));
+      }
+    });
+    _shardInfo.put(_shard, (long) _totalHitsRef.totalHits());
+  }
+
+  private FetchResult getFetchResult(String resolveId) throws IOException, BlurException {
+    if (_selector == null) {
+      return null;
+    }
+    FetchResult fetchResult = new FetchResult();
+    _selector.setLocationId(resolveId);
+    IndexManager.validSelector(_selector);
+    IndexManager.fetchRow(_searcher.getIndexReader(), _table, _selector, fetchResult, null);
+    return fetchResult;
+  }
+
+  @Override
+  public Map<String, Long> getShardInfo() {
+    return _shardInfo;
+  }
+
+  @Override
+  public long getTotalResults() {
+    return _totalHitsRef.totalHits();
+  }
+
+  @Override
+  public void skipTo(long skipTo) {
+    _skipTo = skipTo;
+  }
+
+  @Override
+  public Iterator<BlurResult> iterator() {
+    long start = 0;
+    while (_iterator.hasNext() && start < _skipTo) {
+      _iterator.next();
+      start++;
+    }
+    return _iterator;
+  }
+
+  private String resolveId(int docId) {
+    return _shard + "/" + docId;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (_searcher != null && _closeSearcher) {
+      _searcher.close();
+      _searcher = null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableSimple.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableSimple.java b/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableSimple.java
new file mode 100644
index 0000000..3158576
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableSimple.java
@@ -0,0 +1,74 @@
+package org.apache.blur.manager.results;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.blur.thrift.generated.BlurResult;
+import org.apache.blur.utils.BlurConstants;
+
+
+public class BlurResultIterableSimple implements BlurResultIterable {
+
+  private List<BlurResult> results;
+  private Map<String, Long> shardInfo;
+  private long skipTo;
+
+  public BlurResultIterableSimple(String shard, List<BlurResult> hits) {
+    Collections.sort(hits, BlurConstants.HITS_COMPARATOR);
+    this.results = hits;
+    this.shardInfo = new TreeMap<String, Long>();
+    this.shardInfo.put(shard, (long) hits.size());
+  }
+
+  @Override
+  public Map<String, Long> getShardInfo() {
+    return shardInfo;
+  }
+
+  @Override
+  public long getTotalResults() {
+    return results.size();
+  }
+
+  @Override
+  public void skipTo(long skipTo) {
+    this.skipTo = skipTo;
+  }
+
+  @Override
+  public Iterator<BlurResult> iterator() {
+    long start = 0;
+    Iterator<BlurResult> iterator = results.iterator();
+    while (iterator.hasNext() && start < skipTo) {
+      iterator.next();
+      start++;
+    }
+    return iterator;
+  }
+
+  @Override
+  public void close() throws IOException {
+    // do nothing
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultPeekableIteratorComparator.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultPeekableIteratorComparator.java b/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultPeekableIteratorComparator.java
new file mode 100644
index 0000000..7eddb0a
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultPeekableIteratorComparator.java
@@ -0,0 +1,44 @@
+package org.apache.blur.manager.results;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.Comparator;
+
+import org.apache.blur.thrift.generated.BlurResult;
+
+
+public class BlurResultPeekableIteratorComparator implements Comparator<PeekableIterator<BlurResult>> {
+
+  @Override
+  public int compare(PeekableIterator<BlurResult> o1, PeekableIterator<BlurResult> o2) {
+    BlurResult result1 = o1.peek();
+    BlurResult result2 = o2.peek();
+    if (result1 == null && result2 == null) {
+      return 0;
+    } else if (result1 == null) {
+      return 1;
+    } else if (result2 == null) {
+      return -1;
+    }
+    int compare = Double.compare(result2.score, result1.score);
+    if (compare == 0) {
+      return result2.locationId.compareTo(result1.locationId);
+    }
+    return compare;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/manager/results/LazyBlurResult.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/results/LazyBlurResult.java b/blur-core/src/main/java/org/apache/blur/manager/results/LazyBlurResult.java
new file mode 100644
index 0000000..f98fae3
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/results/LazyBlurResult.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.blur.manager.results;
+
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.generated.Blur.Client;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.BlurResult;
+import org.apache.blur.thrift.generated.FetchResult;
+import org.apache.blur.thrift.generated.Selector;
+
+/**
+ * The {@link LazyBlurResult} adds a method to fetch the result with the client
+ * that was used to execute the query.
+ */
+@SuppressWarnings("serial")
+public class LazyBlurResult extends BlurResult {
+
+  private final Client _client;
+
+  public LazyBlurResult(BlurResult result, Client client) {
+    super(result);
+    _client = client;
+  }
+
+  public FetchResult fetchRow(String table, Selector selector) throws BlurException, TException {
+    synchronized (_client) {
+      return _client.fetchRow(table, selector);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/manager/results/MergerBlurResultIterable.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/results/MergerBlurResultIterable.java b/blur-core/src/main/java/org/apache/blur/manager/results/MergerBlurResultIterable.java
new file mode 100644
index 0000000..5fa9f6d
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/results/MergerBlurResultIterable.java
@@ -0,0 +1,65 @@
+package org.apache.blur.manager.results;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.BlurQuery;
+import org.apache.blur.utils.BlurExecutorCompletionService;
+import org.apache.blur.utils.ForkJoin.Merger;
+
+public class MergerBlurResultIterable implements Merger<BlurResultIterable> {
+
+  private static Log LOG = LogFactory.getLog(MergerBlurResultIterable.class);
+
+  private long _minimumNumberOfResults;
+  private long _maxQueryTime;
+  private BlurQuery _blurQuery;
+
+  public MergerBlurResultIterable(BlurQuery blurQuery) {
+    _blurQuery = blurQuery;
+    _minimumNumberOfResults = blurQuery.minimumNumberOfResults;
+    _maxQueryTime = blurQuery.maxQueryTime;
+  }
+
+  @Override
+  public BlurResultIterable merge(BlurExecutorCompletionService<BlurResultIterable> service) throws BlurException {
+    BlurResultIterableMultiple iterable = new BlurResultIterableMultiple();
+    while (service.getRemainingCount() > 0) {
+      Future<BlurResultIterable> future = service.poll(_maxQueryTime, TimeUnit.MILLISECONDS, true, _blurQuery);
+      if (future != null) {
+        BlurResultIterable blurResultIterable = service.getResultThrowException(future, _blurQuery);
+        iterable.addBlurResultIterable(blurResultIterable);
+        if (iterable.getTotalResults() >= _minimumNumberOfResults) {
+          // Called to stop execution of any other running queries.
+          service.cancelAll();
+          return iterable;
+        }
+      } else {
+        LOG.info("Query timeout with max query time of [{2}] for query [{1}].", _maxQueryTime, _blurQuery);
+        throw new BlurException("Query timeout with max query time of [" + _maxQueryTime + "] for query [" + _blurQuery
+            + "].", null);
+      }
+    }
+    return iterable;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/manager/results/PeekableIterator.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/results/PeekableIterator.java b/blur-core/src/main/java/org/apache/blur/manager/results/PeekableIterator.java
new file mode 100644
index 0000000..4b5b5f8
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/results/PeekableIterator.java
@@ -0,0 +1,66 @@
+package org.apache.blur.manager.results;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.Iterator;
+
+public class PeekableIterator<E> implements Iterator<E> {
+
+  private Iterator<E> iterator;
+  private E current;
+
+  public PeekableIterator(Iterator<E> iterator) {
+    if (iterator.hasNext()) {
+      current = iterator.next();
+    }
+    this.iterator = iterator;
+  }
+
+  /**
+   * Only valid is hasNext is true. If hasNext if false, peek will return null;
+   * 
+   * @return <E>
+   */
+  public E peek() {
+    return current;
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (current != null) {
+      return true;
+    }
+    return iterator.hasNext();
+  }
+
+  @Override
+  public E next() {
+    E next = null;
+    if (iterator.hasNext()) {
+      next = iterator.next();
+    }
+    E result = current;
+    current = next;
+    return result;
+  }
+
+  @Override
+  public void remove() {
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/manager/stats/MergerTableStats.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/stats/MergerTableStats.java b/blur-core/src/main/java/org/apache/blur/manager/stats/MergerTableStats.java
new file mode 100644
index 0000000..99771d2
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/stats/MergerTableStats.java
@@ -0,0 +1,56 @@
+package org.apache.blur.manager.stats;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.TableStats;
+import org.apache.blur.utils.BlurExecutorCompletionService;
+import org.apache.blur.utils.ForkJoin.Merger;
+
+
+public class MergerTableStats implements Merger<TableStats> {
+
+  private long _timeout;
+
+  public MergerTableStats(long timeout) {
+    _timeout = timeout;
+  }
+
+  @Override
+  public TableStats merge(BlurExecutorCompletionService<TableStats> service) throws BlurException {
+    TableStats result = new TableStats();
+    while (service.getRemainingCount() > 0) {
+      Future<TableStats> tableStats = service.poll(_timeout, TimeUnit.MILLISECONDS, true);
+      TableStats stats = service.getResultThrowException(tableStats);
+      result = merge(result, stats);
+    }
+    return result;
+  }
+
+  private TableStats merge(TableStats s1, TableStats s2) {
+    s1.tableName = s2.tableName;
+    s1.bytes = Math.max(s1.bytes, s2.bytes);
+    s1.recordCount = s1.recordCount + s2.recordCount;
+    s1.rowCount = s1.rowCount + s2.rowCount;
+    s1.queries = Math.max(s1.queries, s2.queries);
+    return s1;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/manager/status/MergerQueryStatus.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/status/MergerQueryStatus.java b/blur-core/src/main/java/org/apache/blur/manager/status/MergerQueryStatus.java
new file mode 100644
index 0000000..1d7197d
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/status/MergerQueryStatus.java
@@ -0,0 +1,85 @@
+package org.apache.blur.manager.status;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.BlurQueryStatus;
+import org.apache.blur.thrift.generated.CpuTime;
+import org.apache.blur.thrift.generated.QueryState;
+import org.apache.blur.utils.BlurExecutorCompletionService;
+import org.apache.blur.utils.ForkJoin.Merger;
+
+
+public class MergerQueryStatus implements Merger<List<BlurQueryStatus>> {
+
+  private long _timeout;
+
+  public MergerQueryStatus(long timeout) {
+    _timeout = timeout;
+  }
+
+  @Override
+  public List<BlurQueryStatus> merge(BlurExecutorCompletionService<List<BlurQueryStatus>> service) throws BlurException {
+    Map<Long, BlurQueryStatus> statusMap = new HashMap<Long, BlurQueryStatus>();
+    while (service.getRemainingCount() > 0) {
+      Future<List<BlurQueryStatus>> future = service.poll(_timeout, TimeUnit.MILLISECONDS, true);
+      List<BlurQueryStatus> status = service.getResultThrowException(future);
+      addToMap(statusMap, status);
+    }
+    return new ArrayList<BlurQueryStatus>(statusMap.values());
+  }
+
+  private void addToMap(Map<Long, BlurQueryStatus> statusMap, List<BlurQueryStatus> list) {
+    for (BlurQueryStatus status : list) {
+      BlurQueryStatus searchQueryStatus = statusMap.get(status.uuid);
+      if (searchQueryStatus == null) {
+        statusMap.put(status.uuid, status);
+      } else {
+        statusMap.put(status.uuid, merge(searchQueryStatus, status));
+      }
+    }
+  }
+
+  public static BlurQueryStatus merge(BlurQueryStatus s1, BlurQueryStatus s2) {
+    s1.completeShards = s1.completeShards + s2.completeShards;
+    s1.totalShards = s1.totalShards + s2.totalShards;
+    if (s1.state != s2.state) {
+      if (s1.state == QueryState.INTERRUPTED || s2.state == QueryState.INTERRUPTED) {
+        s1.state = QueryState.INTERRUPTED;
+      } else if (s1.state == QueryState.RUNNING || s2.state == QueryState.RUNNING) {
+        s1.state = QueryState.RUNNING;
+      } else {
+        s1.state = QueryState.COMPLETE;
+      }
+    }
+    if (s1.cpuTimes == null) {
+      s1.cpuTimes = new HashMap<String, CpuTime>();
+    }
+    if (s2.cpuTimes != null) {
+      s1.cpuTimes.putAll(s2.cpuTimes);
+    }
+    return s1;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/manager/status/MergerQueryStatusSingle.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/status/MergerQueryStatusSingle.java b/blur-core/src/main/java/org/apache/blur/manager/status/MergerQueryStatusSingle.java
new file mode 100644
index 0000000..f3d922d
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/status/MergerQueryStatusSingle.java
@@ -0,0 +1,50 @@
+package org.apache.blur.manager.status;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.BlurQueryStatus;
+import org.apache.blur.utils.BlurExecutorCompletionService;
+import org.apache.blur.utils.ForkJoin.Merger;
+
+
+public class MergerQueryStatusSingle implements Merger<BlurQueryStatus> {
+
+  private long _timeout;
+
+  public MergerQueryStatusSingle(long timeout) {
+    _timeout = timeout;
+  }
+
+  @Override
+  public BlurQueryStatus merge(BlurExecutorCompletionService<BlurQueryStatus> service) throws BlurException {
+    BlurQueryStatus result = null;
+    while (service.getRemainingCount() > 0) {
+      Future<BlurQueryStatus> future = service.poll(_timeout, TimeUnit.MILLISECONDS, true);
+      BlurQueryStatus status = service.getResultThrowException(future);
+      if (result == null) {
+        result = status;
+      } else {
+        result = MergerQueryStatus.merge(result, status);
+      }
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/manager/status/QueryStatus.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/status/QueryStatus.java b/blur-core/src/main/java/org/apache/blur/manager/status/QueryStatus.java
new file mode 100644
index 0000000..35895bb
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/status/QueryStatus.java
@@ -0,0 +1,153 @@
+package org.apache.blur.manager.status;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.blur.thrift.generated.BlurQuery;
+import org.apache.blur.thrift.generated.BlurQueryStatus;
+import org.apache.blur.thrift.generated.QueryState;
+import org.apache.blur.thrift.generated.CpuTime;
+
+/**
+ * This class is accessed by multiple threads (one for each shard) 
+ * executing the query. Tracks status and collects metrics
+ *
+ */
+public class QueryStatus implements Comparable<QueryStatus> {
+
+  private final static boolean CPU_TIME_SUPPORTED = ManagementFactory.getThreadMXBean().isCurrentThreadCpuTimeSupported();
+
+  private final BlurQuery _blurQuery;
+  private final String _table;
+  private final long _startingTime;
+  private boolean _finished = false;
+  private long _finishedTime;
+  private final ThreadMXBean _bean = ManagementFactory.getThreadMXBean();
+  private final long _ttl;
+  private final AtomicBoolean _interrupted = new AtomicBoolean(false);
+  private final AtomicInteger _totalShards = new AtomicInteger();
+  private final AtomicInteger _completeShards = new AtomicInteger();
+  private AtomicBoolean _running;
+  private final Map<String, CpuTime> _cpuTimes = new HashMap<String, CpuTime>();
+
+  public QueryStatus(long ttl, String table, BlurQuery blurQuery, AtomicBoolean running) {
+    _ttl = ttl;
+    _table = table;
+    _blurQuery = blurQuery;
+    _startingTime = System.currentTimeMillis();
+    _running = running;
+  }
+
+  public QueryStatus attachThread(String shardName) {
+    CpuTime cpuTime = new CpuTime();
+    if (CPU_TIME_SUPPORTED) {
+      cpuTime.cpuTime = _bean.getCurrentThreadCpuTime();
+    } else {
+      cpuTime.cpuTime = -1L;
+    }
+    cpuTime.realTime = System.nanoTime();
+    _cpuTimes.put(shardName, cpuTime);
+    _totalShards.incrementAndGet();
+    return this;
+  }
+
+  public QueryStatus deattachThread(String shardName) {
+    _completeShards.incrementAndGet();
+     CpuTime cpuTime = _cpuTimes.get(shardName);
+    if (CPU_TIME_SUPPORTED) {
+    	cpuTime.cpuTime = _bean.getCurrentThreadCpuTime() - cpuTime.cpuTime;
+    }
+    cpuTime.realTime = System.nanoTime() - cpuTime.realTime;
+    return this;
+  }
+
+  public long getUserUuid() {
+    return _blurQuery.uuid;
+  }
+
+  public void cancelQuery() {
+    _interrupted.set(true);
+    _running.set(false);
+  }
+
+  public BlurQueryStatus getQueryStatus() {
+    BlurQueryStatus queryStatus = new BlurQueryStatus();
+    queryStatus.query = _blurQuery;
+    queryStatus.totalShards = _totalShards.get();
+    queryStatus.completeShards = _completeShards.get();
+    queryStatus.state = getQueryState();
+    if (queryStatus.query != null) {
+      queryStatus.uuid = queryStatus.query.uuid;
+    }
+    queryStatus.cpuTimes = _cpuTimes;
+    return queryStatus;
+  }
+
+  private QueryState getQueryState() {
+    if (_interrupted.get()) {
+      return QueryState.INTERRUPTED;
+    } else if (_finished) {
+      return QueryState.COMPLETE;
+    } else {
+      return QueryState.RUNNING;
+    }
+  }
+
+  public String getTable() {
+    return _table;
+  }
+
+  public boolean isFinished() {
+    return _finished;
+  }
+
+  public void setFinished(boolean finished) {
+    this._finished = finished;
+    _finishedTime = System.currentTimeMillis();
+  }
+
+  public long getFinishedTime() {
+    return _finishedTime;
+  }
+
+  public boolean isValidForCleanUp() {
+    if (!isFinished()) {
+      return false;
+    }
+    if (getFinishedTime() + _ttl < System.currentTimeMillis()) {
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public int compareTo(QueryStatus o) {
+    long startingTime2 = o._startingTime;
+    if (_startingTime == startingTime2) {
+      int hashCode2 = o.hashCode();
+      return hashCode() < hashCode2 ? -1 : 1;
+    }
+    return _startingTime < startingTime2 ? -1 : 1;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/manager/status/QueryStatusManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/status/QueryStatusManager.java b/blur-core/src/main/java/org/apache/blur/manager/status/QueryStatusManager.java
new file mode 100644
index 0000000..88915c7
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/status/QueryStatusManager.java
@@ -0,0 +1,129 @@
+package org.apache.blur.manager.status;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.thrift.generated.BlurQuery;
+import org.apache.blur.thrift.generated.BlurQueryStatus;
+
+public class QueryStatusManager {
+
+  private static final Log LOG = LogFactory.getLog(QueryStatusManager.class);
+  private static final Object CONSTANT_VALUE = new Object();
+
+  private Timer statusCleanupTimer;
+  private long statusCleanupTimerDelay = TimeUnit.SECONDS.toMillis(10);
+  private ConcurrentHashMap<QueryStatus, Object> currentQueryStatusCollection = new ConcurrentHashMap<QueryStatus, Object>();
+
+  public void init() {
+    statusCleanupTimer = new Timer("Query-Status-Cleanup", true);
+    statusCleanupTimer.schedule(new TimerTask() {
+      @Override
+      public void run() {
+        try {
+          cleanupFinishedQueryStatuses();
+        } catch (Throwable e) {
+          LOG.error("Unknown error while trying to cleanup finished queries.", e);
+        }
+      }
+    }, statusCleanupTimerDelay, statusCleanupTimerDelay);
+  }
+
+  public void close() {
+    statusCleanupTimer.cancel();
+    statusCleanupTimer.purge();
+  }
+
+  public QueryStatus newQueryStatus(String table, BlurQuery blurQuery, int maxNumberOfThreads, AtomicBoolean running) {
+    QueryStatus queryStatus = new QueryStatus(statusCleanupTimerDelay, table, blurQuery, running);
+    currentQueryStatusCollection.put(queryStatus, CONSTANT_VALUE);
+    return queryStatus;
+  }
+
+  public void removeStatus(QueryStatus status) {
+    status.setFinished(true);
+  }
+
+  private void cleanupFinishedQueryStatuses() {
+    LOG.debug("QueryStatus Start count [{0}].", currentQueryStatusCollection.size());
+    Iterator<QueryStatus> iterator = currentQueryStatusCollection.keySet().iterator();
+    while (iterator.hasNext()) {
+      QueryStatus status = iterator.next();
+      if (status.isValidForCleanUp()) {
+        currentQueryStatusCollection.remove(status);
+      }
+    }
+    LOG.debug("QueryStatus Finish count [{0}].", currentQueryStatusCollection.size());
+  }
+
+  public long getStatusCleanupTimerDelay() {
+    return statusCleanupTimerDelay;
+  }
+
+  public void setStatusCleanupTimerDelay(long statusCleanupTimerDelay) {
+    this.statusCleanupTimerDelay = statusCleanupTimerDelay;
+  }
+
+  public void cancelQuery(String table, long uuid) {
+    for (QueryStatus status : currentQueryStatusCollection.keySet()) {
+      if (status.getUserUuid() == uuid && status.getTable().equals(table)) {
+        status.cancelQuery();
+      }
+    }
+  }
+
+  public List<BlurQueryStatus> currentQueries(String table) {
+    List<BlurQueryStatus> result = new ArrayList<BlurQueryStatus>();
+    for (QueryStatus status : currentQueryStatusCollection.keySet()) {
+      if (status.getTable().equals(table)) {
+        result.add(status.getQueryStatus());
+      }
+    }
+    return result;
+  }
+
+  public BlurQueryStatus queryStatus(String table, long uuid) {
+    for (QueryStatus status : currentQueryStatusCollection.keySet()) {
+      if (status.getUserUuid() == uuid && status.getTable().equals(table)) {
+        return status.getQueryStatus();
+      }
+    }
+    return null;
+  }
+
+  public List<Long> queryStatusIdList(String table) {
+    Set<Long> ids = new HashSet<Long>();
+    for (QueryStatus status : currentQueryStatusCollection.keySet()) {
+      if (status.getTable().equals(table)) {
+        ids.add(status.getUserUuid());
+      }
+    }
+    return new ArrayList<Long>(ids);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/manager/writer/AbstractBlurIndex.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/AbstractBlurIndex.java b/blur-core/src/main/java/org/apache/blur/manager/writer/AbstractBlurIndex.java
new file mode 100644
index 0000000..9808803
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/AbstractBlurIndex.java
@@ -0,0 +1,163 @@
+package org.apache.blur.manager.writer;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.apache.blur.lucene.LuceneVersionConstant.LUCENE_VERSION;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.blur.analysis.BlurAnalyzer;
+import org.apache.blur.server.IndexSearcherClosable;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexDeletionPolicy;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
+import org.apache.lucene.index.TieredMergePolicy;
+import org.apache.lucene.search.similarities.Similarity;
+import org.apache.lucene.store.Directory;
+
+public abstract class AbstractBlurIndex extends BlurIndex {
+
+  private BlurAnalyzer _analyzer;
+  private BlurIndexCloser _closer;
+  private Directory _directory;
+  private IndexDeletionPolicy _indexDeletionPolicy = new KeepOnlyLastCommitDeletionPolicy();
+  private AtomicReference<DirectoryReader> _indexReaderRef = new AtomicReference<DirectoryReader>();
+  private AtomicBoolean _isClosed = new AtomicBoolean(false);
+  private AtomicBoolean _open = new AtomicBoolean();
+  private BlurIndexRefresher _refresher;
+  private String _shard;
+  private Similarity _similarity;
+  private String _table;
+
+  protected IndexWriterConfig initIndexWriterConfig() {
+    IndexWriterConfig conf = new IndexWriterConfig(LUCENE_VERSION, _analyzer);
+    conf.setWriteLockTimeout(TimeUnit.MINUTES.toMillis(5));
+    conf.setIndexDeletionPolicy(_indexDeletionPolicy);
+    conf.setSimilarity(_similarity);
+    TieredMergePolicy mergePolicy = (TieredMergePolicy) conf.getMergePolicy();
+    mergePolicy.setUseCompoundFile(false);
+    _open.set(true);
+    return conf;
+  }
+
+  protected void initIndexReader(DirectoryReader reader) throws IOException {
+    _indexReaderRef.set(reader);
+    _refresher.register(this);
+  }
+
+  @Override
+  public void refresh() throws IOException {
+    if (!_open.get()) {
+      return;
+    }
+    DirectoryReader oldReader = _indexReaderRef.get();
+    DirectoryReader reader = DirectoryReader.openIfChanged(oldReader);
+    if (reader != null) {
+      _indexReaderRef.set(reader);
+      _closer.close(oldReader);
+    }
+  }
+
+  @Override
+  public IndexSearcherClosable getIndexReader() throws IOException {
+    throw new RuntimeException("not impl");
+//    IndexReader indexReader = _indexReaderRef.get();
+//    indexReader.incRef();
+//    return indexReader;
+  }
+
+  @Override
+  public void close() throws IOException {
+    close(null);
+    _directory.close();
+  }
+
+  public void close(Callable<Void> innerClose) throws IOException {
+    _open.set(false);
+    _refresher.unregister(this);
+    if (innerClose != null) {
+      try {
+        innerClose.call();
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+    }
+    _isClosed.set(true);
+  }
+
+  @Override
+  public AtomicBoolean isClosed() {
+    return _isClosed;
+  }
+
+  public void setAnalyzer(BlurAnalyzer analyzer) {
+    _analyzer = analyzer;
+  }
+
+  public void setCloser(BlurIndexCloser closer) {
+    _closer = closer;
+  }
+
+  public void setDirectory(Directory directory) {
+    _directory = directory;
+  }
+
+  public void setIndexDeletionPolicy(IndexDeletionPolicy indexDeletionPolicy) {
+    _indexDeletionPolicy = indexDeletionPolicy;
+  }
+
+  public void setRefresher(BlurIndexRefresher refresher) {
+    _refresher = refresher;
+  }
+
+  public void setShard(String shard) {
+    this._shard = shard;
+  }
+
+  public void setSimilarity(Similarity similarity) {
+    _similarity = similarity;
+  }
+
+  public void setTable(String table) {
+    this._table = table;
+  }
+
+  protected BlurAnalyzer getAnalyzer() {
+    return _analyzer;
+  }
+
+  protected Directory getDirectory() {
+    return _directory;
+  }
+
+  protected String getShard() {
+    return _shard;
+  }
+
+  protected String getTable() {
+    return _table;
+  }
+
+  protected boolean isOpen() {
+    return _open.get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
new file mode 100644
index 0000000..23c3241
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
@@ -0,0 +1,42 @@
+package org.apache.blur.manager.writer;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.blur.server.IndexSearcherClosable;
+import org.apache.blur.thrift.generated.Row;
+
+public abstract class BlurIndex {
+
+  public abstract void replaceRow(boolean waitToBeVisible, boolean wal, Row row) throws IOException;
+
+  public abstract void deleteRow(boolean waitToBeVisible, boolean wal, String rowId) throws IOException;
+
+  public abstract IndexSearcherClosable getIndexReader() throws IOException;
+
+  public abstract void close() throws IOException;
+
+  public abstract void refresh() throws IOException;
+
+  public abstract AtomicBoolean isClosed();
+
+  public abstract void optimize(int numberOfSegmentsPerShard) throws IOException;
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexCloser.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexCloser.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexCloser.java
new file mode 100644
index 0000000..e096818
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexCloser.java
@@ -0,0 +1,108 @@
+package org.apache.blur.manager.writer;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.blur.concurrent.Executors;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.lucene.index.IndexReader;
+
+
+public class BlurIndexCloser implements Runnable {
+
+  private static final Log LOG = LogFactory.getLog(BlurIndexCloser.class);
+  private static final long PAUSE_TIME = TimeUnit.SECONDS.toMillis(1);
+  private Thread daemon;
+  private Collection<IndexReader> readers = new LinkedBlockingQueue<IndexReader>();
+  private AtomicBoolean running = new AtomicBoolean();
+  private ExecutorService executorService;
+
+  public void init() {
+    running.set(true);
+    daemon = new Thread(this);
+    daemon.setDaemon(true);
+    daemon.setName(getClass().getName() + "-Daemon");
+    daemon.start();
+    LOG.info("Init Complete");
+    executorService = Executors.newThreadPool("Blur Index Closer Pool", 10);
+  }
+
+  public void close() {
+    running.set(false);
+    daemon.interrupt();
+    executorService.shutdownNow();
+  }
+
+  public void close(IndexReader reader) {
+    readers.add(reader);
+  }
+
+  @Override
+  public void run() {
+    while (running.get()) {
+      try {
+        tryToCloseReaders();
+      } catch (Throwable t) {
+        LOG.error("Unknown error", t);
+      }
+      try {
+        Thread.sleep(PAUSE_TIME);
+      } catch (InterruptedException e) {
+        return;
+      }
+    }
+  }
+
+  private void tryToCloseReaders() {
+    LOG.debug("Trying to close [{0}] readers", readers.size());
+    Iterator<IndexReader> it = readers.iterator();
+    while (it.hasNext()) {
+      IndexReader reader = it.next();
+      if (reader.getRefCount() == 1) {
+        it.remove();
+        closeInternal(reader);
+      } else {
+        LOG.debug("Could not close indexreader [" + reader + "] because of ref count [" + reader.getRefCount() + "].");
+      }
+      closeInternal(reader);
+    }
+  }
+
+  private void closeInternal(final IndexReader reader) {
+    executorService.submit(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          long s = System.currentTimeMillis();
+          reader.close();
+          long e = System.currentTimeMillis();
+          LOG.debug("Size [{0}] time to close [{1}] Closing indexreader [{2}].", readers.size(), (e - s), reader);
+        } catch (Exception e) {
+          readers.add(reader);
+          LOG.error("Error while trying to close indexreader [" + reader + "].", e);
+        }
+      }
+    });
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
new file mode 100644
index 0000000..431a829
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
@@ -0,0 +1,129 @@
+package org.apache.blur.manager.writer;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.apache.blur.lucene.LuceneVersionConstant.LUCENE_VERSION;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.blur.index.IndexWriter;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.server.IndexSearcherClosable;
+import org.apache.blur.server.ShardContext;
+import org.apache.blur.server.TableContext;
+import org.apache.blur.thrift.generated.Row;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.TieredMergePolicy;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.store.Directory;
+
+public class BlurIndexReader extends BlurIndex {
+
+  private static final Log LOG = LogFactory.getLog(BlurIndexReader.class);
+
+  private BlurIndexCloser _closer;
+  private Directory _directory;
+  private AtomicReference<DirectoryReader> _indexReaderRef = new AtomicReference<DirectoryReader>();
+  private AtomicBoolean _isClosed = new AtomicBoolean(false);
+  private AtomicBoolean _open = new AtomicBoolean();
+  private BlurIndexRefresher _refresher;
+  private final TableContext _tableContext;
+  private final ShardContext _shardContext;
+
+  public BlurIndexReader(ShardContext shardContext, Directory directory, BlurIndexRefresher refresher,
+      BlurIndexCloser closer) throws IOException {
+    _tableContext = shardContext.getTableContext();
+    _directory = directory;
+    _shardContext = shardContext;
+    _refresher = refresher;
+    _closer = closer;
+    IndexWriterConfig conf = new IndexWriterConfig(LUCENE_VERSION, _tableContext.getAnalyzer());
+    conf.setWriteLockTimeout(TimeUnit.MINUTES.toMillis(5));
+    conf.setIndexDeletionPolicy(_tableContext.getIndexDeletionPolicy());
+    conf.setSimilarity(_tableContext.getSimilarity());
+    TieredMergePolicy mergePolicy = (TieredMergePolicy) conf.getMergePolicy();
+    mergePolicy.setUseCompoundFile(false);
+
+    _open.set(true);
+
+    if (!DirectoryReader.indexExists(directory)) {
+      new IndexWriter(directory, conf).close();
+    }
+    _indexReaderRef.set(DirectoryReader.open(directory));
+    _refresher.register(this);
+  }
+
+  @Override
+  public void refresh() throws IOException {
+    if (!_open.get()) {
+      return;
+    }
+    DirectoryReader oldReader = _indexReaderRef.get();
+    DirectoryReader reader = DirectoryReader.openIfChanged(oldReader);
+    if (reader != null) {
+      _indexReaderRef.set(reader);
+      _closer.close(oldReader);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    _open.set(false);
+    _refresher.unregister(this);
+    _directory.close();
+    _isClosed.set(true);
+    LOG.info("Reader for table [{0}] shard [{1}] closed.", _tableContext.getTable(), _shardContext.getShard());
+  }
+
+  @Override
+  public void replaceRow(boolean waitToBeVisible, boolean wal, Row row) throws IOException {
+    throw new RuntimeException("Read-only shard");
+  }
+
+  @Override
+  public void deleteRow(boolean waitToBeVisible, boolean wal, String rowId) throws IOException {
+    throw new RuntimeException("Read-only shard");
+  }
+
+  @Override
+  public void optimize(int numberOfSegmentsPerShard) throws IOException {
+    // Do nothing
+  }
+
+  @Override
+  public IndexSearcherClosable getIndexReader() throws IOException {
+    throw new RuntimeException("not implemented");
+  }
+
+  @Override
+  public AtomicBoolean isClosed() {
+    return _isClosed;
+  }
+
+  public IndexSearcher getSearcher() {
+    IndexReader indexReader = _indexReaderRef.get();
+    indexReader.incRef();
+    return new IndexSearcher(indexReader);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexRefresher.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexRefresher.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexRefresher.java
new file mode 100644
index 0000000..7f669b8
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexRefresher.java
@@ -0,0 +1,85 @@
+package org.apache.blur.manager.writer;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+
+
+public class BlurIndexRefresher extends TimerTask {
+
+  private static final Log LOG = LogFactory.getLog(BlurIndexRefresher.class);
+
+  private Timer _timer;
+  private long _period = TimeUnit.MINUTES.toMillis(1);
+  private long _delay = _period;
+  private Collection<BlurIndex> _indexes = new LinkedBlockingQueue<BlurIndex>();
+
+  public void register(BlurIndex blurIndex) {
+    _indexes.add(blurIndex);
+  }
+
+  public void unregister(BlurIndex blurIndex) {
+    _indexes.remove(blurIndex);
+  }
+
+  public void close() {
+    _timer.purge();
+    _timer.cancel();
+  }
+
+  public void init() {
+    _timer = new Timer("IndexReader-Refresher", true);
+    _timer.schedule(this, _delay, _period);
+    LOG.info("Init Complete");
+  }
+
+  @Override
+  public void run() {
+    try {
+      refreshInternal();
+    } catch (Throwable e) {
+      LOG.error("Unknown error", e);
+    }
+  }
+
+  private void refreshInternal() {
+    for (BlurIndex index : _indexes) {
+      try {
+        index.refresh();
+      } catch (IOException e) {
+        LOG.error("Unknown error while refreshing index of writer [{0}]", e, index);
+      }
+    }
+  }
+
+  public void setPeriod(long period) {
+    _period = period;
+  }
+
+  public void setDelay(long delay) {
+    _delay = delay;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
new file mode 100644
index 0000000..9adbc46
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
@@ -0,0 +1,249 @@
+package org.apache.blur.manager.writer;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.apache.blur.lucene.LuceneVersionConstant.LUCENE_VERSION;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.blur.index.IndexWriter;
+import org.apache.blur.index.IndexWriter.LockOwnerException;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.lucene.store.refcounter.DirectoryReferenceCounter;
+import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC;
+import org.apache.blur.lucene.store.refcounter.IndexInputCloser;
+import org.apache.blur.server.IndexSearcherClosable;
+import org.apache.blur.server.ShardContext;
+import org.apache.blur.server.TableContext;
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.Row;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.TieredMergePolicy;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.NRTManager;
+import org.apache.lucene.search.NRTManager.TrackingIndexWriter;
+import org.apache.lucene.search.NRTManagerReopenThread;
+import org.apache.lucene.search.SearcherFactory;
+import org.apache.lucene.store.Directory;
+
+public class BlurNRTIndex extends BlurIndex {
+
+  private static final Log LOG = LogFactory.getLog(BlurNRTIndex.class);
+  private static final boolean APPLY_ALL_DELETES = true;
+
+  private final AtomicReference<NRTManager> _nrtManagerRef = new AtomicReference<NRTManager>();
+  private final AtomicBoolean _isClosed = new AtomicBoolean();
+  private final IndexWriter _writer;
+  private final Thread _committer;
+  private final SearcherFactory _searcherFactory;
+  private final Directory _directory;
+  private final NRTManagerReopenThread _refresher;
+  private final TableContext _tableContext;
+  private final ShardContext _shardContext;
+  private final TransactionRecorder _recorder;
+  private final TrackingIndexWriter _trackingWriter;
+  // This lock is used during a import of data from the file system. For example
+  // after a mapreduce program.
+  private final ReadWriteLock _lock = new ReentrantReadWriteLock();
+  private long _lastRefresh = 0;
+  private IndexImporter _indexImporter;
+
+  public BlurNRTIndex(ShardContext shardContext, SharedMergeScheduler mergeScheduler, IndexInputCloser closer,
+      Directory directory, DirectoryReferenceFileGC gc, final ExecutorService searchExecutor) throws IOException {
+    _tableContext = shardContext.getTableContext();
+    _directory = directory;
+    _shardContext = shardContext;
+
+    IndexWriterConfig conf = new IndexWriterConfig(LUCENE_VERSION, _tableContext.getAnalyzer());
+    conf.setWriteLockTimeout(TimeUnit.MINUTES.toMillis(5));
+    conf.setSimilarity(_tableContext.getSimilarity());
+    conf.setIndexDeletionPolicy(_tableContext.getIndexDeletionPolicy());
+    conf.setMergedSegmentWarmer(new FieldBasedWarmer(shardContext));
+
+    TieredMergePolicy mergePolicy = (TieredMergePolicy) conf.getMergePolicy();
+    mergePolicy.setUseCompoundFile(false);
+    conf.setMergeScheduler(mergeScheduler);
+
+    DirectoryReferenceCounter referenceCounter = new DirectoryReferenceCounter(directory, gc, closer);
+
+    _writer = new IndexWriter(referenceCounter, conf);
+    _recorder = new TransactionRecorder(shardContext);
+    _recorder.replay(_writer);
+
+    _searcherFactory = new SearcherFactory() {
+      @Override
+      public IndexSearcher newSearcher(IndexReader reader) throws IOException {
+        return new IndexSearcherClosable(reader, searchExecutor, _nrtManagerRef, _directory);
+      }
+    };
+
+    _trackingWriter = new TrackingIndexWriter(_writer);
+    _indexImporter = new IndexImporter(_trackingWriter, _lock, _shardContext, TimeUnit.SECONDS, 10);
+    _nrtManagerRef.set(new NRTManager(_trackingWriter, _searcherFactory, APPLY_ALL_DELETES));
+    // start commiter
+
+    _committer = new Thread(new Committer());
+    _committer.setDaemon(true);
+    _committer.setName("Commit Thread [" + _tableContext.getTable() + "/" + shardContext.getShard() + "]");
+    _committer.start();
+
+    // start refresher
+    double targetMinStaleSec = _tableContext.getTimeBetweenRefreshs() / 1000.0;
+    _refresher = new NRTManagerReopenThread(getNRTManager(), targetMinStaleSec * 10, targetMinStaleSec);
+    _refresher.setName("Refresh Thread [" + _tableContext.getTable() + "/" + shardContext.getShard() + "]");
+    _refresher.setDaemon(true);
+    _refresher.start();
+  }
+
+  @Override
+  public void replaceRow(boolean waitToBeVisible, boolean wal, Row row) throws IOException {
+    _lock.readLock().lock();
+    try {
+      List<Record> records = row.records;
+      if (records == null || records.isEmpty()) {
+        deleteRow(waitToBeVisible, wal, row.id);
+        return;
+      }
+      long generation = _recorder.replaceRow(wal, row, _trackingWriter);
+      waitToBeVisible(waitToBeVisible, generation);
+    } finally {
+      _lock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public void deleteRow(boolean waitToBeVisible, boolean wal, String rowId) throws IOException {
+    _lock.readLock().lock();
+    try {
+      long generation = _recorder.deleteRow(wal, rowId, _trackingWriter);
+      waitToBeVisible(waitToBeVisible, generation);
+    } finally {
+      _lock.readLock().unlock();
+    }
+  }
+
+  /**
+   * The method fetches a reference to the IndexSearcher, the caller is
+   * responsible for calling close on the searcher.
+   */
+  @Override
+  public IndexSearcherClosable getIndexReader() throws IOException {
+    return (IndexSearcherClosable) getNRTManager().acquire();
+  }
+
+  private NRTManager getNRTManager() {
+    return _nrtManagerRef.get();
+  }
+
+  @Override
+  public void close() throws IOException {
+    // @TODO make sure that locks are cleaned up.
+    if (!_isClosed.get()) {
+      _isClosed.set(true);
+      _indexImporter.close();
+      _committer.interrupt();
+      _refresher.close();
+      try {
+        _recorder.close();
+        _writer.close();
+        getNRTManager().close();
+      } finally {
+        _directory.close();
+      }
+    }
+  }
+
+  @Override
+  public void refresh() throws IOException {
+    getNRTManager().maybeRefresh();
+    _lastRefresh = System.currentTimeMillis();
+  }
+
+  @Override
+  public AtomicBoolean isClosed() {
+    return _isClosed;
+  }
+
+  @Override
+  public void optimize(int numberOfSegmentsPerShard) throws IOException {
+    _writer.forceMerge(numberOfSegmentsPerShard);
+  }
+
+  private void waitToBeVisible(boolean waitToBeVisible, long generation) throws IOException {
+    if (needsRefresh()) {
+      refresh();
+    }
+    if (waitToBeVisible && getNRTManager().getCurrentSearchingGen() < generation) {
+      getNRTManager().waitForGeneration(generation);
+    }
+  }
+
+  private boolean needsRefresh() {
+    if (_lastRefresh + _tableContext.getTimeBetweenRefreshs() < System.currentTimeMillis()) {
+      return true;
+    }
+    return false;
+  }
+
+  class Committer implements Runnable {
+    @Override
+    public void run() {
+      synchronized (this) {
+        while (!_isClosed.get()) {
+          try {
+            LOG.debug("Committing of [{0}/{1}].", _tableContext.getTable(), _shardContext.getShard());
+            _recorder.commit(_writer);
+          } catch (CorruptIndexException e) {
+            LOG.error("Curruption Error during commit of [{0}/{1}].", e, _tableContext.getTable(),
+                _shardContext.getShard());
+          } catch (LockOwnerException e) {
+            LOG.info("This shard server no longer owns the lock on [{0}/{1}], closing.", _tableContext.getTable(),
+                _shardContext.getShard());
+            try {
+              close();
+            } catch (IOException ex) {
+              LOG.error("Unknown error while trying to close [{0}/{1}]", _tableContext.getTable(),
+                  _shardContext.getShard());
+            }
+            return;
+          } catch (IOException e) {
+            LOG.error("IO Error during commit of [{0}/{1}].", e, _tableContext.getTable(), _shardContext.getShard());
+          }
+          try {
+            wait(_tableContext.getTimeBetweenCommits());
+          } catch (InterruptedException e) {
+            if (_isClosed.get()) {
+              return;
+            }
+            LOG.error("Unknown error with committer thread [{0}/{1}].", e, _tableContext.getTable(),
+                _shardContext.getShard());
+          }
+        }
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/manager/writer/FieldBasedWarmer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/FieldBasedWarmer.java b/blur-core/src/main/java/org/apache/blur/manager/writer/FieldBasedWarmer.java
new file mode 100644
index 0000000..7b275d2
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/FieldBasedWarmer.java
@@ -0,0 +1,94 @@
+package org.apache.blur.manager.writer;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.server.ShardContext;
+import org.apache.blur.server.TableContext;
+import org.apache.blur.thrift.generated.ColumnPreCache;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.lucene.index.AtomicReader;
+import org.apache.lucene.index.Fields;
+import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
+
+public class FieldBasedWarmer extends IndexReaderWarmer {
+
+  private static final Log LOG = LogFactory.getLog(FieldBasedWarmer.class);
+
+  private List<String> preCacheCols;
+
+  public FieldBasedWarmer(ShardContext shardContext) {
+    this(shardContext.getTableContext());
+  }
+
+  public FieldBasedWarmer(TableContext tableContext) {
+    this(tableContext.getDescriptor());
+  }
+
+  public FieldBasedWarmer(TableDescriptor tableDescriptor) {
+    this(tableDescriptor.getColumnPreCache());
+  }
+
+  public FieldBasedWarmer(ColumnPreCache columnPreCache) {
+    this(columnPreCache == null ? null : columnPreCache.getPreCacheCols());
+  }
+
+  public FieldBasedWarmer(List<String> preCacheCols) {
+    this.preCacheCols = preCacheCols;
+  }
+
+  @Override
+  public void warm(AtomicReader reader) throws IOException {
+    if (preCacheCols != null) {
+      warm(reader, preCacheCols);
+    } else {
+      Fields fields = reader.fields();
+      warm(reader, fields);
+    }
+  }
+
+  private void warm(AtomicReader reader, Iterable<String> fieldNames) throws IOException {
+    // for (String field : fieldNames) {
+    // LOG.debug("Warming field [{0}] in reader [{1}]", field, reader);
+    //
+    // AtomicReaderContext context = reader.getContext();
+    // context.
+    //
+    // Fields fields = reader.fields();
+    // Terms terms = fields.terms(field);
+    // TermsEnum termsEnum = terms.iterator(null);
+    // BytesRef ref = null;
+    // Term term = new Term(field);
+    // while ((ref = termsEnum.next()) != null) {
+    // term.set(field, ref);
+    // DocsAndPositionsEnum termPositionsEnum = reader.termPositionsEnum(term);
+    // if (termPositionsEnum != null) {
+    // while (termPositionsEnum.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
+    // int freq = termPositionsEnum.freq();
+    // for (int i = 0; i < freq; i++) {
+    // termPositionsEnum.nextPosition();
+    // }
+    // }
+    // }
+    // }
+    // }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java b/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
new file mode 100644
index 0000000..9470d40
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
@@ -0,0 +1,184 @@
+package org.apache.blur.manager.writer;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.manager.BlurPartitioner;
+import org.apache.blur.server.ShardContext;
+import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.lucene.index.AtomicReader;
+import org.apache.lucene.index.AtomicReaderContext;
+import org.apache.lucene.index.CompositeReaderContext;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.Fields;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.index.Terms;
+import org.apache.lucene.index.TermsEnum;
+import org.apache.lucene.search.NRTManager.TrackingIndexWriter;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.BytesRef;
+
+public class IndexImporter extends TimerTask implements Closeable {
+  private final static Log LOG = LogFactory.getLog(IndexImporter.class);
+
+  private final TrackingIndexWriter _trackingWriter;
+  private final ReadWriteLock _lock;
+  private final ShardContext _shardContext;
+  private final Timer _timer;
+
+  public IndexImporter(TrackingIndexWriter trackingWriter, ReadWriteLock lock, ShardContext shardContext,
+      TimeUnit refreshUnit, long refreshAmount) {
+    _trackingWriter = trackingWriter;
+    _lock = lock;
+    _shardContext = shardContext;
+    _timer = new Timer("IndexImporter [" + shardContext.getShard() + "/" + shardContext.getTableContext().getTable()
+        + "]", true);
+    long period = refreshUnit.toMillis(refreshAmount);
+    _timer.schedule(this, period, period);
+  }
+
+  @Override
+  public void close() throws IOException {
+    _timer.cancel();
+    _timer.purge();
+  }
+
+  @Override
+  public void run() {
+    Path path = _shardContext.getHdfsDirPath();
+    Configuration configuration = _shardContext.getTableContext().getConfiguration();
+    try {
+      FileSystem fileSystem = path.getFileSystem(configuration);
+      SortedSet<FileStatus> listStatus = sort(fileSystem.listStatus(path));
+      List<HdfsDirectory> indexesToImport = new ArrayList<HdfsDirectory>();
+      for (FileStatus fileStatus : listStatus) {
+        Path file = fileStatus.getPath();
+        if (fileStatus.isDir() && file.getName().endsWith(".commit")) {
+          HdfsDirectory hdfsDirectory = new HdfsDirectory(configuration, file);
+          if (!DirectoryReader.indexExists(hdfsDirectory)) {
+            LOG.error("Directory found at [{0}] is not a vaild index.", file);
+          } else {
+            indexesToImport.add(hdfsDirectory);
+          }
+        }
+      }
+      if (indexesToImport.isEmpty()) {
+        return;
+      }
+      String table = _shardContext.getTableContext().getTable();
+      String shard = _shardContext.getShard();
+      for (Directory directory : indexesToImport) {
+        LOG.info("About to import [{0}] into [{1}/{2}]", directory, shard, table);
+      }
+      LOG.info("Obtaining lock on [{0}/{1}]", shard, table);
+      _lock.writeLock().lock();
+      try {
+        IndexWriter indexWriter = _trackingWriter.getIndexWriter();
+        for (HdfsDirectory directory : indexesToImport) {
+          LOG.info("Starting import [{0}], commiting on [{1}/{2}]", directory, shard, table);
+          indexWriter.commit();
+          boolean isSuccess = true;
+          boolean isRollbackDueToException = false;
+          boolean emitDeletes = indexWriter.numDocs() != 0;
+          try {
+            isSuccess = applyDeletes(directory, indexWriter, shard, emitDeletes);
+          } catch (IOException e) {
+            LOG.error("Some issue with deleting the old index on [{0}/{1}]", e, shard, table);
+            isSuccess = false;
+            isRollbackDueToException = true;
+          }
+          Path dirPath = directory.getPath();
+          if (isSuccess) {
+            LOG.info("Add index [{0}] [{1}/{2}]", directory, shard, table);
+            indexWriter.addIndexes(directory);
+            LOG.info("Finishing import [{0}], commiting on [{1}/{2}]", directory, shard, table);
+            indexWriter.commit();
+            LOG.info("Cleaning up old directory [{0}] for [{1}/{2}]", dirPath, shard, table);
+            fileSystem.delete(dirPath, true);
+            LOG.info("Import complete on [{0}/{1}]", shard, table);
+          } else {
+            if (!isRollbackDueToException) {
+              LOG.error(
+                  "Index is corrupted, RowIds are found in wrong shard [{0}/{1}], cancelling index import for [{2}]",
+                  shard, table, directory);
+            }
+            LOG.info("Starting rollback on [{0}/{1}]", shard, table);
+            indexWriter.rollback();
+            LOG.info("Finished rollback on [{0}/{1}]", shard, table);
+            String name = dirPath.getName();
+            int lastIndexOf = name.lastIndexOf('.');
+            String badRowIdsName = name.substring(0, lastIndexOf) + ".bad_rowids";
+            fileSystem.rename(dirPath, new Path(dirPath.getParent(), badRowIdsName));
+          }
+        }
+      } finally {
+        _lock.writeLock().unlock();
+      }
+    } catch (IOException e) {
+      LOG.error("Unknown error while trying to refresh imports.", e);
+    }
+
+  }
+
+  private SortedSet<FileStatus> sort(FileStatus[] listStatus) {
+    SortedSet<FileStatus> result = new TreeSet<FileStatus>();
+    for (FileStatus fileStatus : listStatus) {
+      result.add(fileStatus);
+    }
+    return result;
+  }
+
+  private boolean applyDeletes(Directory directory, IndexWriter indexWriter, String shard, boolean emitDeletes)
+      throws IOException {
+    DirectoryReader reader = DirectoryReader.open(directory);
+    try {
+      LOG.info("Applying deletes in reader [{0}]", reader);
+      CompositeReaderContext compositeReaderContext = reader.getContext();
+      List<AtomicReaderContext> leaves = compositeReaderContext.leaves();
+      BlurPartitioner blurPartitioner = new BlurPartitioner();
+      Text key = new Text();
+      int numberOfShards = _shardContext.getTableContext().getDescriptor().getShardCount();
+      for (AtomicReaderContext context : leaves) {
+        AtomicReader atomicReader = context.reader();
+        Fields fields = atomicReader.fields();
+        Terms terms = fields.terms(BlurConstants.ROW_ID);
+        TermsEnum termsEnum = terms.iterator(null);
+        BytesRef ref = null;
+        while ((ref = termsEnum.next()) != null) {
+          byte[] rowIdInBytes = ref.bytes;
+          key.set(rowIdInBytes, 0, rowIdInBytes.length);
+          int partition = blurPartitioner.getPartition(key, null, numberOfShards);
+          int shardId = BlurUtil.getShardIndex(shard);
+          if (shardId != partition) {
+            return false;
+          }
+          if (emitDeletes) {
+            Term term = new Term(BlurConstants.ROW_ID, BytesRef.deepCopyOf(ref));
+            indexWriter.deleteDocuments(term);
+          }
+        }
+      }
+    } finally {
+      reader.close();
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/manager/writer/SharedMergeScheduler.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/SharedMergeScheduler.java b/blur-core/src/main/java/org/apache/blur/manager/writer/SharedMergeScheduler.java
new file mode 100644
index 0000000..f5668e8
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/SharedMergeScheduler.java
@@ -0,0 +1,90 @@
+package org.apache.blur.manager.writer;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.blur.concurrent.Executors;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.MergePolicy;
+import org.apache.lucene.index.MergeScheduler;
+
+public class SharedMergeScheduler extends MergeScheduler implements Runnable {
+
+  private static final Log LOG = LogFactory.getLog(SharedMergeScheduler.class);
+
+  private static final long ONE_SECOND = 1000;
+
+  private BlockingQueue<IndexWriter> _writers = new LinkedBlockingQueue<IndexWriter>();
+  private AtomicBoolean _running = new AtomicBoolean(true);
+  private ExecutorService service;
+
+  public SharedMergeScheduler() {
+    int threads = 3;
+    service = Executors.newThreadPool("sharedMergeScheduler", threads, false);
+    for (int i = 0; i < threads; i++) {
+      service.submit(this);
+    }
+  }
+
+  @Override
+  public void merge(IndexWriter writer) throws IOException {
+    synchronized (_writers) {
+      if (!_writers.contains(writer)) {
+        LOG.debug("Adding writer to merge [{0}]", writer);
+        _writers.add(writer);
+      }
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    _running.set(false);
+    service.shutdownNow();
+  }
+
+  @Override
+  public void run() {
+    while (_running.get()) {
+      try {
+        IndexWriter writer;
+        synchronized (_writers) {
+          writer = _writers.poll();
+        }
+        if (writer == null) {
+          synchronized (this) {
+            wait(ONE_SECOND);
+          }
+        } else if (mergeWriter(writer)) {
+          // there seems to be more merges to do
+          merge(writer);
+        }
+      } catch (InterruptedException e) {
+        LOG.debug("Merging interrupted, exiting.");
+        return;
+      } catch (IOException e) {
+        LOG.error("Unknown IOException", e);
+      }
+    }
+  }
+
+  private boolean mergeWriter(IndexWriter writer) throws IOException {
+    MergePolicy.OneMerge merge = writer.getNextMerge();
+    if (merge == null) {
+      LOG.debug("No merges to run for [{0}]", writer);
+      return false;
+    }
+    long s = System.currentTimeMillis();
+    writer.merge(merge);
+    long e = System.currentTimeMillis();
+    double time = (e - s) / 1000.0;
+    double rate = (merge.totalBytesSize() / 1024 / 1024) / time;
+    LOG.debug("Merge took [{0} s] to complete at rate of [{1} MB/s]", time, rate);
+    return true;
+  }
+
+}


Mime
View raw message