incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [11/51] [partial] Initial repackage to org.apache.
Date Mon, 03 Sep 2012 03:17:11 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/results/MergerBlurResultIterable.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/results/MergerBlurResultIterable.java b/src/blur-core/src/main/java/org/apache/blur/manager/results/MergerBlurResultIterable.java
new file mode 100644
index 0000000..f0d4479
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/results/MergerBlurResultIterable.java
@@ -0,0 +1,65 @@
+package org.apache.blur.manager.results;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.BlurQuery;
+import org.apache.blur.utils.BlurExecutorCompletionService;
+import org.apache.blur.utils.ForkJoin.Merger;
+
+
+public class MergerBlurResultIterable implements Merger<BlurResultIterable> {
+
+  private static Log LOG = LogFactory.getLog(MergerBlurResultIterable.class);
+
+  private long _minimumNumberOfResults;
+  private long _maxQueryTime;
+  private BlurQuery _blurQuery;
+
+  public MergerBlurResultIterable(BlurQuery blurQuery) {
+    _blurQuery = blurQuery;
+    _minimumNumberOfResults = blurQuery.minimumNumberOfResults;
+    _maxQueryTime = blurQuery.maxQueryTime;
+  }
+
+  @Override
+  public BlurResultIterable merge(BlurExecutorCompletionService<BlurResultIterable> service) throws BlurException {
+    BlurResultIterableMultiple iterable = new BlurResultIterableMultiple();
+    while (service.getRemainingCount() > 0) {
+      Future<BlurResultIterable> future = service.poll(_maxQueryTime, TimeUnit.MILLISECONDS, true, _blurQuery);
+      if (future != null) {
+        BlurResultIterable blurResultIterable = service.getResultThrowException(future, _blurQuery);
+        iterable.addBlurResultIterable(blurResultIterable);
+        if (iterable.getTotalResults() >= _minimumNumberOfResults) {
+          service.cancelAll();// Called to stop execution of any other running
+                              // queries.
+          return iterable;
+        }
+      } else {
+        LOG.info("Query timeout with max query time of [{2}] for query [{1}].", _maxQueryTime, _blurQuery);
+        throw new BlurException("Query timeout with max query time of [" + _maxQueryTime + "] for query [" + _blurQuery + "].", null);
+      }
+    }
+    return iterable;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/results/PeekableIterator.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/results/PeekableIterator.java b/src/blur-core/src/main/java/org/apache/blur/manager/results/PeekableIterator.java
new file mode 100644
index 0000000..4b5b5f8
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/results/PeekableIterator.java
@@ -0,0 +1,66 @@
+package org.apache.blur.manager.results;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.Iterator;
+
+public class PeekableIterator<E> implements Iterator<E> {
+
+  private Iterator<E> iterator;
+  private E current;
+
+  public PeekableIterator(Iterator<E> iterator) {
+    if (iterator.hasNext()) {
+      current = iterator.next();
+    }
+    this.iterator = iterator;
+  }
+
+  /**
+   * Only valid is hasNext is true. If hasNext if false, peek will return null;
+   * 
+   * @return <E>
+   */
+  public E peek() {
+    return current;
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (current != null) {
+      return true;
+    }
+    return iterator.hasNext();
+  }
+
+  @Override
+  public E next() {
+    E next = null;
+    if (iterator.hasNext()) {
+      next = iterator.next();
+    }
+    E result = current;
+    current = next;
+    return result;
+  }
+
+  @Override
+  public void remove() {
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/stats/LoadFactor.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/stats/LoadFactor.java b/src/blur-core/src/main/java/org/apache/blur/manager/stats/LoadFactor.java
new file mode 100644
index 0000000..c1d2cba
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/stats/LoadFactor.java
@@ -0,0 +1,139 @@
+package org.apache.blur.manager.stats;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+
+
+public class LoadFactor {
+
+  private static final Log LOG = LogFactory.getLog(LoadFactor.class);
+
+  public static void main(String[] args) throws InterruptedException {
+    LoadFactor loadFactor = new LoadFactor();
+    loadFactor.init();
+    loadFactor.add("heapUsed", new Sampler() {
+      private MemoryMXBean bean = ManagementFactory.getMemoryMXBean();
+
+      @Override
+      public long sample() {
+        return bean.getHeapMemoryUsage().getUsed();
+      }
+    });
+
+    new Thread(new Runnable() {
+      @Override
+      public void run() {
+        long total = 0;
+        while (true) {
+          total += doWork();
+        }
+      }
+    }).start();
+
+    while (true) {
+      System.out.println("one     = " + (long) loadFactor.getOneMinuteLoadFactor("heapUsed"));
+      System.out.println("five    = " + (long) loadFactor.getFiveMinuteLoadFactor("heapUsed"));
+      System.out.println("fifteen = " + (long) loadFactor.getFifteenMinuteLoadFactor("heapUsed"));
+      Thread.sleep(5000);
+    }
+
+  }
+
+  protected static int doWork() {
+    StringBuilder builder = new StringBuilder();
+    int count = 0;
+    for (int i = 0; i < 10000000; i++) {
+      if (count == 1000) {
+        try {
+          Thread.sleep(1);
+        } catch (InterruptedException e) {
+          return 0;
+        }
+        count = 0;
+      }
+      builder.append('m');
+      count++;
+    }
+    return builder.toString().hashCode();
+  }
+
+  private Map<String, LoadFactorProcessor> _processors = new ConcurrentHashMap<String, LoadFactorProcessor>();
+  private Timer _timer;
+  private long _delay = TimeUnit.SECONDS.toMillis(1);
+  private long _period = TimeUnit.SECONDS.toMillis(1);
+
+  public void init() {
+    _timer = new Timer("LoadFactor-Daemon", true);
+    _timer.schedule(new TimerTask() {
+      @Override
+      public void run() {
+        try {
+          sampleAll();
+        } catch (Throwable e) {
+          LOG.error("Unknown error", e);
+        }
+      }
+    }, _delay, _period);
+
+  }
+
+  private void sampleAll() {
+    for (String name : _processors.keySet()) {
+      LoadFactorProcessor processor = _processors.get(name);
+      processor.sample();
+    }
+  }
+
+  public void add(String name, Sampler sampler) {
+    _processors.put(name, new LoadFactorProcessor(sampler));
+  }
+
+  public double getOneMinuteLoadFactor(String name) {
+    LoadFactorProcessor processor = _processors.get(name);
+    if (processor == null) {
+      return 0;
+    }
+    return processor.oneMinuteLoadFactor();
+  }
+
+  public double getFiveMinuteLoadFactor(String name) {
+    LoadFactorProcessor processor = _processors.get(name);
+    if (processor == null) {
+      return 0;
+    }
+    return processor.fiveMinuteLoadFactor();
+  }
+
+  public double getFifteenMinuteLoadFactor(String name) {
+    LoadFactorProcessor processor = _processors.get(name);
+    if (processor == null) {
+      return 0;
+    }
+    return processor.fifteenMinuteLoadFactor();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/stats/LoadFactorProcessor.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/stats/LoadFactorProcessor.java b/src/blur-core/src/main/java/org/apache/blur/manager/stats/LoadFactorProcessor.java
new file mode 100644
index 0000000..6294fce
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/stats/LoadFactorProcessor.java
@@ -0,0 +1,70 @@
+package org.apache.blur.manager.stats;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.concurrent.TimeUnit;
+
+public class LoadFactorProcessor {
+
+  private final Sampler _sampler;
+  private final WeightedAvg _one;
+  private final WeightedAvg _five;
+  private final WeightedAvg _fifteen;
+
+  public LoadFactorProcessor(Sampler sampler) {
+    _sampler = sampler;
+    _one = new WeightedAvg((int) TimeUnit.MINUTES.toSeconds(1));
+    _five = new WeightedAvg((int) TimeUnit.MINUTES.toSeconds(5));
+    _fifteen = new WeightedAvg((int) TimeUnit.MINUTES.toSeconds(15));
+  }
+
+  public void sample() {
+    long sample = _sampler.sample();
+    _one.add(sample);
+    _five.add(sample);
+    _fifteen.add(sample);
+  }
+
+  public double oneMinuteLoadFactor() {
+    return _one.getAvg();
+  }
+
+  public double fiveMinuteLoadFactor() {
+    return _five.getAvg();
+  }
+
+  public double fifteenMinuteLoadFactor() {
+    return _fifteen.getAvg();
+  }
+
+  public Sampler getSampler() {
+    return _sampler;
+  }
+
+  public WeightedAvg getOne() {
+    return _one;
+  }
+
+  public WeightedAvg getFive() {
+    return _five;
+  }
+
+  public WeightedAvg getFifteen() {
+    return _fifteen;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/stats/MergerTableStats.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/stats/MergerTableStats.java b/src/blur-core/src/main/java/org/apache/blur/manager/stats/MergerTableStats.java
new file mode 100644
index 0000000..99771d2
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/stats/MergerTableStats.java
@@ -0,0 +1,56 @@
+package org.apache.blur.manager.stats;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.TableStats;
+import org.apache.blur.utils.BlurExecutorCompletionService;
+import org.apache.blur.utils.ForkJoin.Merger;
+
+
+public class MergerTableStats implements Merger<TableStats> {
+
+  private long _timeout;
+
+  public MergerTableStats(long timeout) {
+    _timeout = timeout;
+  }
+
+  @Override
+  public TableStats merge(BlurExecutorCompletionService<TableStats> service) throws BlurException {
+    TableStats result = new TableStats();
+    while (service.getRemainingCount() > 0) {
+      Future<TableStats> tableStats = service.poll(_timeout, TimeUnit.MILLISECONDS, true);
+      TableStats stats = service.getResultThrowException(tableStats);
+      result = merge(result, stats);
+    }
+    return result;
+  }
+
+  private TableStats merge(TableStats s1, TableStats s2) {
+    s1.tableName = s2.tableName;
+    s1.bytes = Math.max(s1.bytes, s2.bytes);
+    s1.recordCount = s1.recordCount + s2.recordCount;
+    s1.rowCount = s1.rowCount + s2.rowCount;
+    s1.queries = Math.max(s1.queries, s2.queries);
+    return s1;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/stats/Sampler.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/stats/Sampler.java b/src/blur-core/src/main/java/org/apache/blur/manager/stats/Sampler.java
new file mode 100644
index 0000000..8cd84fe
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/stats/Sampler.java
@@ -0,0 +1,23 @@
+package org.apache.blur.manager.stats;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+public interface Sampler {
+
+  long sample();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/stats/WeightedAvg.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/stats/WeightedAvg.java b/src/blur-core/src/main/java/org/apache/blur/manager/stats/WeightedAvg.java
new file mode 100644
index 0000000..c662939
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/stats/WeightedAvg.java
@@ -0,0 +1,73 @@
+package org.apache.blur.manager.stats;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+public class WeightedAvg {
+
+  private final int _maxSize;
+  private final long[] _values;
+  private int _numberOfAdds;
+  private int _currentPosition;
+  private long _totalValue = 0;
+
+  public WeightedAvg(int maxSize) {
+    _maxSize = maxSize;
+    _values = new long[_maxSize];
+    _numberOfAdds = 0;
+    _currentPosition = 0;
+  }
+
+  public void add(long value) {
+    if (_currentPosition >= _maxSize) {
+      _currentPosition = 0;
+    }
+    long currentValue = _values[_currentPosition];
+    _values[_currentPosition] = value;
+    _totalValue += value - currentValue;
+    _numberOfAdds++;
+    _currentPosition++;
+  }
+
+  public double getAvg() {
+    if (_totalValue == 0) {
+      return 0;
+    }
+    return (double) _totalValue / (double) Math.min(_numberOfAdds, _maxSize);
+  }
+
+  public int getMaxSize() {
+    return _maxSize;
+  }
+
+  public long[] getValues() {
+    return _values;
+  }
+
+  public int getNumberOfAdds() {
+    return _numberOfAdds;
+  }
+
+  public int getCurrentPosition() {
+    return _currentPosition;
+  }
+
+  public long getTotalValue() {
+    return _totalValue;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/status/MergerQueryStatus.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/status/MergerQueryStatus.java b/src/blur-core/src/main/java/org/apache/blur/manager/status/MergerQueryStatus.java
new file mode 100644
index 0000000..1d7197d
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/status/MergerQueryStatus.java
@@ -0,0 +1,85 @@
+package org.apache.blur.manager.status;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.BlurQueryStatus;
+import org.apache.blur.thrift.generated.CpuTime;
+import org.apache.blur.thrift.generated.QueryState;
+import org.apache.blur.utils.BlurExecutorCompletionService;
+import org.apache.blur.utils.ForkJoin.Merger;
+
+
+public class MergerQueryStatus implements Merger<List<BlurQueryStatus>> {
+
+  private long _timeout;
+
+  public MergerQueryStatus(long timeout) {
+    _timeout = timeout;
+  }
+
+  @Override
+  public List<BlurQueryStatus> merge(BlurExecutorCompletionService<List<BlurQueryStatus>> service) throws BlurException {
+    Map<Long, BlurQueryStatus> statusMap = new HashMap<Long, BlurQueryStatus>();
+    while (service.getRemainingCount() > 0) {
+      Future<List<BlurQueryStatus>> future = service.poll(_timeout, TimeUnit.MILLISECONDS, true);
+      List<BlurQueryStatus> status = service.getResultThrowException(future);
+      addToMap(statusMap, status);
+    }
+    return new ArrayList<BlurQueryStatus>(statusMap.values());
+  }
+
+  private void addToMap(Map<Long, BlurQueryStatus> statusMap, List<BlurQueryStatus> list) {
+    for (BlurQueryStatus status : list) {
+      BlurQueryStatus searchQueryStatus = statusMap.get(status.uuid);
+      if (searchQueryStatus == null) {
+        statusMap.put(status.uuid, status);
+      } else {
+        statusMap.put(status.uuid, merge(searchQueryStatus, status));
+      }
+    }
+  }
+
+  public static BlurQueryStatus merge(BlurQueryStatus s1, BlurQueryStatus s2) {
+    s1.completeShards = s1.completeShards + s2.completeShards;
+    s1.totalShards = s1.totalShards + s2.totalShards;
+    if (s1.state != s2.state) {
+      if (s1.state == QueryState.INTERRUPTED || s2.state == QueryState.INTERRUPTED) {
+        s1.state = QueryState.INTERRUPTED;
+      } else if (s1.state == QueryState.RUNNING || s2.state == QueryState.RUNNING) {
+        s1.state = QueryState.RUNNING;
+      } else {
+        s1.state = QueryState.COMPLETE;
+      }
+    }
+    if (s1.cpuTimes == null) {
+      s1.cpuTimes = new HashMap<String, CpuTime>();
+    }
+    if (s2.cpuTimes != null) {
+      s1.cpuTimes.putAll(s2.cpuTimes);
+    }
+    return s1;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/status/MergerQueryStatusSingle.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/status/MergerQueryStatusSingle.java b/src/blur-core/src/main/java/org/apache/blur/manager/status/MergerQueryStatusSingle.java
new file mode 100644
index 0000000..f3d922d
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/status/MergerQueryStatusSingle.java
@@ -0,0 +1,50 @@
+package org.apache.blur.manager.status;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.BlurQueryStatus;
+import org.apache.blur.utils.BlurExecutorCompletionService;
+import org.apache.blur.utils.ForkJoin.Merger;
+
+
+public class MergerQueryStatusSingle implements Merger<BlurQueryStatus> {
+
+  private long _timeout;
+
+  public MergerQueryStatusSingle(long timeout) {
+    _timeout = timeout;
+  }
+
+  @Override
+  public BlurQueryStatus merge(BlurExecutorCompletionService<BlurQueryStatus> service) throws BlurException {
+    BlurQueryStatus result = null;
+    while (service.getRemainingCount() > 0) {
+      Future<BlurQueryStatus> future = service.poll(_timeout, TimeUnit.MILLISECONDS, true);
+      BlurQueryStatus status = service.getResultThrowException(future);
+      if (result == null) {
+        result = status;
+      } else {
+        result = MergerQueryStatus.merge(result, status);
+      }
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/status/QueryStatus.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/status/QueryStatus.java b/src/blur-core/src/main/java/org/apache/blur/manager/status/QueryStatus.java
new file mode 100644
index 0000000..f1fe931
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/status/QueryStatus.java
@@ -0,0 +1,143 @@
+package org.apache.blur.manager.status;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.blur.thrift.generated.BlurQuery;
+import org.apache.blur.thrift.generated.BlurQueryStatus;
+import org.apache.blur.thrift.generated.QueryState;
+
+
+public class QueryStatus implements Comparable<QueryStatus> {
+
+  private final static boolean CPU_TIME_SUPPORTED = ManagementFactory.getThreadMXBean().isCurrentThreadCpuTimeSupported();
+
+  private final BlurQuery _blurQuery;
+  private final String _table;
+  private final long _startingTime;
+  private boolean _finished = false;
+  private long _finishedTime;
+  private final AtomicLong _cpuTimeOfFinishedThreads = new AtomicLong();
+  private final ThreadMXBean _bean = ManagementFactory.getThreadMXBean();
+  private final long _ttl;
+  private final ThreadLocal<Long> _cpuTimes = new ThreadLocal<Long>();
+  private final AtomicBoolean _interrupted = new AtomicBoolean(false);
+  private final AtomicInteger _totalShards = new AtomicInteger();
+  private final AtomicInteger _completeShards = new AtomicInteger();
+  private AtomicBoolean _running;
+
+  public QueryStatus(long ttl, String table, BlurQuery blurQuery, AtomicBoolean running) {
+    _ttl = ttl;
+    _table = table;
+    _blurQuery = blurQuery;
+    _startingTime = System.currentTimeMillis();
+    _running = running;
+  }
+
+  public QueryStatus attachThread() {
+    if (CPU_TIME_SUPPORTED) {
+      _cpuTimes.set(_bean.getCurrentThreadCpuTime());
+    } else {
+      _cpuTimes.set(-1L);
+    }
+    _totalShards.incrementAndGet();
+    return this;
+  }
+
+  public QueryStatus deattachThread() {
+    _completeShards.incrementAndGet();
+    if (CPU_TIME_SUPPORTED) {
+      long startingThreadCpuTime = _cpuTimes.get();
+      long currentThreadCpuTime = _bean.getCurrentThreadCpuTime();
+      _cpuTimeOfFinishedThreads.addAndGet(currentThreadCpuTime - startingThreadCpuTime);
+    }
+    return this;
+  }
+
+  public long getUserUuid() {
+    return _blurQuery.uuid;
+  }
+
+  public void cancelQuery() {
+    _interrupted.set(true);
+    _running.set(false);
+  }
+
+  public BlurQueryStatus getQueryStatus() {
+    BlurQueryStatus queryStatus = new BlurQueryStatus();
+    queryStatus.query = _blurQuery;
+    queryStatus.totalShards = _totalShards.get();
+    queryStatus.completeShards = _completeShards.get();
+    queryStatus.state = getQueryState();
+    if (queryStatus.query != null) {
+      queryStatus.uuid = queryStatus.query.uuid;
+    }
+    return queryStatus;
+  }
+
+  private QueryState getQueryState() {
+    if (_interrupted.get()) {
+      return QueryState.INTERRUPTED;
+    } else if (_finished) {
+      return QueryState.COMPLETE;
+    } else {
+      return QueryState.RUNNING;
+    }
+  }
+
+  public String getTable() {
+    return _table;
+  }
+
+  public boolean isFinished() {
+    return _finished;
+  }
+
+  public void setFinished(boolean finished) {
+    this._finished = finished;
+    _finishedTime = System.currentTimeMillis();
+  }
+
+  public long getFinishedTime() {
+    return _finishedTime;
+  }
+
+  public boolean isValidForCleanUp() {
+    if (!isFinished()) {
+      return false;
+    }
+    if (getFinishedTime() + _ttl < System.currentTimeMillis()) {
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public int compareTo(QueryStatus o) {
+    long startingTime2 = o._startingTime;
+    if (_startingTime == startingTime2) {
+      int hashCode2 = o.hashCode();
+      return hashCode() < hashCode2 ? -1 : 1;
+    }
+    return _startingTime < startingTime2 ? -1 : 1;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/status/QueryStatusManager.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/status/QueryStatusManager.java b/src/blur-core/src/main/java/org/apache/blur/manager/status/QueryStatusManager.java
new file mode 100644
index 0000000..17c333f
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/status/QueryStatusManager.java
@@ -0,0 +1,129 @@
+package org.apache.blur.manager.status;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.BlurQuery;
+import org.apache.blur.thrift.generated.BlurQueryStatus;
+
+
+public class QueryStatusManager {
+
+  private static final Log LOG = LogFactory.getLog(QueryStatusManager.class);
+  private static final Object CONSTANT_VALUE = new Object();
+
+  private Timer statusCleanupTimer;
+  private long statusCleanupTimerDelay = TimeUnit.SECONDS.toMillis(10);
+  private ConcurrentHashMap<QueryStatus, Object> currentQueryStatusCollection = new ConcurrentHashMap<QueryStatus, Object>();
+
+  public void init() {
+    statusCleanupTimer = new Timer("Query-Status-Cleanup", true);
+    statusCleanupTimer.schedule(new TimerTask() {
+      @Override
+      public void run() {
+        try {
+          cleanupFinishedQueryStatuses();
+        } catch (Throwable e) {
+          LOG.error("Unknown error while trying to cleanup finished queries.", e);
+        }
+      }
+    }, statusCleanupTimerDelay, statusCleanupTimerDelay);
+  }
+
+  public void close() {
+    statusCleanupTimer.cancel();
+    statusCleanupTimer.purge();
+  }
+
+  public QueryStatus newQueryStatus(String table, BlurQuery blurQuery, int maxNumberOfThreads, AtomicBoolean running) {
+    QueryStatus queryStatus = new QueryStatus(statusCleanupTimerDelay, table, blurQuery, running);
+    currentQueryStatusCollection.put(queryStatus, CONSTANT_VALUE);
+    return queryStatus;
+  }
+
+  public void removeStatus(QueryStatus status) {
+    status.setFinished(true);
+  }
+
+  private void cleanupFinishedQueryStatuses() {
+    LOG.debug("QueryStatus Start count [{0}].", currentQueryStatusCollection.size());
+    Iterator<QueryStatus> iterator = currentQueryStatusCollection.keySet().iterator();
+    while (iterator.hasNext()) {
+      QueryStatus status = iterator.next();
+      if (status.isValidForCleanUp()) {
+        currentQueryStatusCollection.remove(status);
+      }
+    }
+    LOG.debug("QueryStatus Finish count [{0}].", currentQueryStatusCollection.size());
+  }
+
+  public long getStatusCleanupTimerDelay() {
+    return statusCleanupTimerDelay;
+  }
+
+  public void setStatusCleanupTimerDelay(long statusCleanupTimerDelay) {
+    this.statusCleanupTimerDelay = statusCleanupTimerDelay;
+  }
+
+  public void cancelQuery(String table, long uuid) {
+    for (QueryStatus status : currentQueryStatusCollection.keySet()) {
+      if (status.getUserUuid() == uuid && status.getTable().equals(table)) {
+        status.cancelQuery();
+      }
+    }
+  }
+
+  public List<BlurQueryStatus> currentQueries(String table) {
+    List<BlurQueryStatus> result = new ArrayList<BlurQueryStatus>();
+    for (QueryStatus status : currentQueryStatusCollection.keySet()) {
+      if (status.getTable().equals(table)) {
+        result.add(status.getQueryStatus());
+      }
+    }
+    return result;
+  }
+
+  public BlurQueryStatus queryStatus(String table, long uuid) throws BlurException {
+    for (QueryStatus status : currentQueryStatusCollection.keySet()) {
+      if (status.getUserUuid() == uuid && status.getTable().equals(table)) {
+        return status.getQueryStatus();
+      }
+    }
+    throw new BlurException("Query status for table [" + table + "] and uuid [" + uuid + "] not found", null);
+  }
+
+  public List<Long> queryStatusIdList(String table) {
+    Set<Long> ids = new HashSet<Long>();
+    for (QueryStatus status : currentQueryStatusCollection.keySet()) {
+      ids.add(status.getUserUuid());
+    }
+    return new ArrayList<Long>(ids);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/writer/AbstractBlurIndex.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/AbstractBlurIndex.java b/src/blur-core/src/main/java/org/apache/blur/manager/writer/AbstractBlurIndex.java
new file mode 100644
index 0000000..5138bd2
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/AbstractBlurIndex.java
@@ -0,0 +1,164 @@
+package org.apache.blur.manager.writer;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.apache.blur.lucene.LuceneConstant.LUCENE_VERSION;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.blur.analysis.BlurAnalyzer;
+import org.apache.lucene.index.IndexDeletionPolicy;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
+import org.apache.lucene.index.TieredMergePolicy;
+import org.apache.lucene.search.Similarity;
+import org.apache.lucene.store.Directory;
+
+public abstract class AbstractBlurIndex extends BlurIndex {
+
+  private BlurAnalyzer _analyzer;
+  private BlurIndexCloser _closer;
+  private Directory _directory;
+  private IndexDeletionPolicy _indexDeletionPolicy = new KeepOnlyLastCommitDeletionPolicy();
+  private AtomicReference<IndexReader> _indexReaderRef = new AtomicReference<IndexReader>();
+  private AtomicBoolean _isClosed = new AtomicBoolean(false);
+  private AtomicBoolean _open = new AtomicBoolean();
+  private BlurIndexRefresher _refresher;
+  private String _shard;
+  private Similarity _similarity;
+  private String _table;
+
+  protected IndexWriterConfig initIndexWriterConfig() {
+    IndexWriterConfig conf = new IndexWriterConfig(LUCENE_VERSION, _analyzer);
+    conf.setWriteLockTimeout(TimeUnit.MINUTES.toMillis(5));
+    conf.setIndexDeletionPolicy(_indexDeletionPolicy);
+    conf.setSimilarity(_similarity);
+    TieredMergePolicy mergePolicy = (TieredMergePolicy) conf.getMergePolicy();
+    mergePolicy.setUseCompoundFile(false);
+    _open.set(true);
+    return conf;
+  }
+
+  protected void initIndexReader(IndexReader reader) throws IOException {
+    _indexReaderRef.set(reader);
+    _refresher.register(this);
+  }
+
+  @Override
+  public void refresh() throws IOException {
+    if (!_open.get()) {
+      return;
+    }
+    IndexReader oldReader = _indexReaderRef.get();
+    if (oldReader.isCurrent()) {
+      return;
+    }
+    IndexReader reader = IndexReader.openIfChanged(oldReader);
+    if (reader != null && oldReader != reader) {
+      _indexReaderRef.set(reader);
+      _closer.close(oldReader);
+    }
+  }
+
+  @Override
+  public IndexReader getIndexReader() throws IOException {
+    IndexReader indexReader = _indexReaderRef.get();
+    indexReader.incRef();
+    return indexReader;
+  }
+
+  @Override
+  public void close() throws IOException {
+    close(null);
+    _directory.close();
+  }
+
+  public void close(Callable<Void> innerClose) throws IOException {
+    _open.set(false);
+    _refresher.unregister(this);
+    if (innerClose != null) {
+      try {
+        innerClose.call();
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+    }
+    _isClosed.set(true);
+  }
+
+  @Override
+  public AtomicBoolean isClosed() {
+    return _isClosed;
+  }
+
+  public void setAnalyzer(BlurAnalyzer analyzer) {
+    _analyzer = analyzer;
+  }
+
+  public void setCloser(BlurIndexCloser closer) {
+    _closer = closer;
+  }
+
+  public void setDirectory(Directory directory) {
+    _directory = directory;
+  }
+
+  public void setIndexDeletionPolicy(IndexDeletionPolicy indexDeletionPolicy) {
+    _indexDeletionPolicy = indexDeletionPolicy;
+  }
+
+  public void setRefresher(BlurIndexRefresher refresher) {
+    _refresher = refresher;
+  }
+
+  public void setShard(String shard) {
+    this._shard = shard;
+  }
+
+  public void setSimilarity(Similarity similarity) {
+    _similarity = similarity;
+  }
+
+  public void setTable(String table) {
+    this._table = table;
+  }
+
+  protected BlurAnalyzer getAnalyzer() {
+    return _analyzer;
+  }
+
+  protected Directory getDirectory() {
+    return _directory;
+  }
+
+  protected String getShard() {
+    return _shard;
+  }
+
+  protected String getTable() {
+    return _table;
+  }
+
+  protected boolean isOpen() {
+    return _open.get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
new file mode 100644
index 0000000..79ba2b8
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
@@ -0,0 +1,42 @@
+package org.apache.blur.manager.writer;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.blur.thrift.generated.Row;
+import org.apache.lucene.index.IndexReader;
+
+
+public abstract class BlurIndex {
+
+  public abstract void replaceRow(boolean waitToBeVisible, boolean wal, Row row) throws IOException;
+
+  public abstract void deleteRow(boolean waitToBeVisible, boolean wal, String rowId) throws IOException;
+
+  public abstract IndexReader getIndexReader() throws IOException;
+
+  public abstract void close() throws IOException;
+
+  public abstract void refresh() throws IOException;
+
+  public abstract AtomicBoolean isClosed();
+
+  public abstract void optimize(int numberOfSegmentsPerShard) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexCloser.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexCloser.java b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexCloser.java
new file mode 100644
index 0000000..e096818
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexCloser.java
@@ -0,0 +1,108 @@
+package org.apache.blur.manager.writer;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.blur.concurrent.Executors;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.lucene.index.IndexReader;
+
+
+public class BlurIndexCloser implements Runnable {
+
+  private static final Log LOG = LogFactory.getLog(BlurIndexCloser.class);
+  private static final long PAUSE_TIME = TimeUnit.SECONDS.toMillis(1);
+  private Thread daemon;
+  private Collection<IndexReader> readers = new LinkedBlockingQueue<IndexReader>();
+  private AtomicBoolean running = new AtomicBoolean();
+  private ExecutorService executorService;
+
+  public void init() {
+    running.set(true);
+    daemon = new Thread(this);
+    daemon.setDaemon(true);
+    daemon.setName(getClass().getName() + "-Daemon");
+    daemon.start();
+    LOG.info("Init Complete");
+    executorService = Executors.newThreadPool("Blur Index Closer Pool", 10);
+  }
+
+  public void close() {
+    running.set(false);
+    daemon.interrupt();
+    executorService.shutdownNow();
+  }
+
+  public void close(IndexReader reader) {
+    readers.add(reader);
+  }
+
+  @Override
+  public void run() {
+    while (running.get()) {
+      try {
+        tryToCloseReaders();
+      } catch (Throwable t) {
+        LOG.error("Unknown error", t);
+      }
+      try {
+        Thread.sleep(PAUSE_TIME);
+      } catch (InterruptedException e) {
+        return;
+      }
+    }
+  }
+
+  private void tryToCloseReaders() {
+    LOG.debug("Trying to close [{0}] readers", readers.size());
+    Iterator<IndexReader> it = readers.iterator();
+    while (it.hasNext()) {
+      IndexReader reader = it.next();
+      if (reader.getRefCount() == 1) {
+        it.remove();
+        closeInternal(reader);
+      } else {
+        LOG.debug("Could not close indexreader [" + reader + "] because of ref count [" + reader.getRefCount() + "].");
+      }
+      closeInternal(reader);
+    }
+  }
+
+  private void closeInternal(final IndexReader reader) {
+    executorService.submit(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          long s = System.currentTimeMillis();
+          reader.close();
+          long e = System.currentTimeMillis();
+          LOG.debug("Size [{0}] time to close [{1}] Closing indexreader [{2}].", readers.size(), (e - s), reader);
+        } catch (Exception e) {
+          readers.add(reader);
+          LOG.error("Error while trying to close indexreader [" + reader + "].", e);
+        }
+      }
+    });
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
new file mode 100644
index 0000000..48fe298
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
@@ -0,0 +1,72 @@
+package org.apache.blur.manager.writer;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.blur.index.IndexWriter;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.thrift.generated.Row;
+import org.apache.lucene.analysis.KeywordAnalyzer;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.Version;
+
+
+public class BlurIndexReader extends AbstractBlurIndex {
+
+  private static final Log LOG = LogFactory.getLog(BlurIndexReader.class);
+
+  public void init() throws IOException {
+    initIndexWriterConfig();
+    Directory directory = getDirectory();
+    if (!IndexReader.indexExists(directory)) {
+      IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_35, new KeywordAnalyzer());
+      new IndexWriter(directory, conf).close();
+    }
+    initIndexReader(IndexReader.open(directory));
+  }
+
+  @Override
+  public synchronized void refresh() throws IOException {
+    // Override so that we can call within synchronized method
+    super.refresh();
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    LOG.info("Reader for table [{0}] shard [{1}] closed.", getTable(), getShard());
+  }
+
+  @Override
+  public void replaceRow(boolean waitToBeVisible, boolean wal, Row row) throws IOException {
+    throw new RuntimeException("Read-only shard");
+  }
+
+  @Override
+  public void deleteRow(boolean waitToBeVisible, boolean wal, String rowId) throws IOException {
+    throw new RuntimeException("Read-only shard");
+  }
+
+  @Override
+  public void optimize(int numberOfSegmentsPerShard) throws IOException {
+    // Do nothing
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexRefresher.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexRefresher.java b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexRefresher.java
new file mode 100644
index 0000000..7f669b8
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexRefresher.java
@@ -0,0 +1,85 @@
+package org.apache.blur.manager.writer;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+
+
+public class BlurIndexRefresher extends TimerTask {
+
+  private static final Log LOG = LogFactory.getLog(BlurIndexRefresher.class);
+
+  private Timer _timer;
+  private long _period = TimeUnit.MINUTES.toMillis(1);
+  private long _delay = _period;
+  private Collection<BlurIndex> _indexes = new LinkedBlockingQueue<BlurIndex>();
+
+  public void register(BlurIndex blurIndex) {
+    _indexes.add(blurIndex);
+  }
+
+  public void unregister(BlurIndex blurIndex) {
+    _indexes.remove(blurIndex);
+  }
+
+  public void close() {
+    _timer.purge();
+    _timer.cancel();
+  }
+
+  public void init() {
+    _timer = new Timer("IndexReader-Refresher", true);
+    _timer.schedule(this, _delay, _period);
+    LOG.info("Init Complete");
+  }
+
+  @Override
+  public void run() {
+    try {
+      refreshInternal();
+    } catch (Throwable e) {
+      LOG.error("Unknown error", e);
+    }
+  }
+
+  private void refreshInternal() {
+    for (BlurIndex index : _indexes) {
+      try {
+        index.refresh();
+      } catch (IOException e) {
+        LOG.error("Unknown error while refreshing index of writer [{0}]", e, index);
+      }
+    }
+  }
+
+  public void setPeriod(long period) {
+    _period = period;
+  }
+
+  public void setDelay(long delay) {
+    _delay = delay;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
new file mode 100644
index 0000000..e0e6d48
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
@@ -0,0 +1,294 @@
+package org.apache.blur.manager.writer;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.apache.blur.lucene.LuceneConstant.LUCENE_VERSION;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.blur.analysis.BlurAnalyzer;
+import org.apache.blur.index.IndexWriter;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.Row;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.IndexDeletionPolicy;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.TieredMergePolicy;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.NRTManager;
+import org.apache.lucene.search.NRTManager.TrackingIndexWriter;
+import org.apache.lucene.search.NRTManagerReopenThread;
+import org.apache.lucene.search.SearcherFactory;
+import org.apache.lucene.search.Similarity;
+import org.apache.lucene.store.Directory;
+
+
+public class BlurNRTIndex extends BlurIndex {
+
+  private static final Log LOG = LogFactory.getLog(BlurNRTIndex.class);
+  private static final boolean APPLY_ALL_DELETES = true;
+
+  private NRTManager _nrtManager;
+  private AtomicBoolean _isClosed = new AtomicBoolean();
+  private IndexWriter _writer;
+  private Thread _committer;
+
+  // externally set
+  private BlurAnalyzer _analyzer;
+  private Directory _directory;
+  private String _table;
+  private String _shard;
+  private Similarity _similarity;
+  private NRTManagerReopenThread _refresher;
+  private TransactionRecorder _recorder;
+  private Configuration _configuration;
+  private Path _walPath;
+  private IndexDeletionPolicy _indexDeletionPolicy;
+  private BlurIndexCloser _closer;
+  private AtomicReference<IndexReader> _indexRef = new AtomicReference<IndexReader>();
+  private long _timeBetweenCommits = TimeUnit.SECONDS.toMillis(60);
+  private long _timeBetweenRefreshs = TimeUnit.MILLISECONDS.toMillis(5000);
+  private DirectoryReferenceFileGC _gc;
+  private TrackingIndexWriter _trackingWriter;
+  private SearcherFactory _searcherFactory = new SearcherFactory();
+  private long _lastRefresh;
+  private long _timeBetweenRefreshsNanos;
+
+  // private SearcherWarmer _warmer = new SearcherWarmer() {
+  // @Override
+  // public void warm(IndexSearcher s) throws IOException {
+  // IndexReader indexReader = s.getIndexReader();
+  // IndexReader[] subReaders = indexReader.getSequentialSubReaders();
+  // if (subReaders == null) {
+  // PrimeDocCache.getPrimeDocBitSet(indexReader);
+  // } else {
+  // for (IndexReader reader : subReaders) {
+  // PrimeDocCache.getPrimeDocBitSet(reader);
+  // }
+  // }
+  // }
+  // };
+
+  public void init() throws IOException {
+    Path walTablePath = new Path(_walPath, _table);
+    Path walShardPath = new Path(walTablePath, _shard);
+
+    _timeBetweenRefreshsNanos = TimeUnit.MILLISECONDS.toNanos(_timeBetweenRefreshs);
+
+    IndexWriterConfig conf = new IndexWriterConfig(LUCENE_VERSION, _analyzer);
+    conf.setWriteLockTimeout(TimeUnit.MINUTES.toMillis(5));
+    conf.setSimilarity(_similarity);
+    conf.setIndexDeletionPolicy(_indexDeletionPolicy);
+    TieredMergePolicy mergePolicy = (TieredMergePolicy) conf.getMergePolicy();
+    mergePolicy.setUseCompoundFile(false);
+    DirectoryReferenceCounter referenceCounter = new DirectoryReferenceCounter(_directory, _gc);
+    _writer = new IndexWriter(referenceCounter, conf);
+    _recorder = new TransactionRecorder();
+    _recorder.setAnalyzer(_analyzer);
+    _recorder.setConfiguration(_configuration);
+    _recorder.setWalPath(walShardPath);
+    _recorder.init();
+    _recorder.replay(_writer);
+
+    _trackingWriter = new TrackingIndexWriter(_writer);
+    _nrtManager = new NRTManager(_trackingWriter, _searcherFactory, APPLY_ALL_DELETES);
+    IndexSearcher searcher = _nrtManager.acquire();
+    _indexRef.set(searcher.getIndexReader());
+    _lastRefresh = System.nanoTime();
+    startCommiter();
+    startRefresher();
+  }
+
+  private void startRefresher() {
+    double targetMinStaleSec = _timeBetweenRefreshs / 1000.0;
+    _refresher = new NRTManagerReopenThread(_nrtManager, targetMinStaleSec * 10, targetMinStaleSec);
+    _refresher.setName("Refresh Thread [" + _table + "/" + _shard + "]");
+    _refresher.setDaemon(true);
+    _refresher.start();
+  }
+
+  private void startCommiter() {
+    _committer = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        while (!_isClosed.get()) {
+          try {
+            LOG.info("Committing of [{0}/{1}].", _table, _shard);
+            _recorder.commit(_writer);
+          } catch (CorruptIndexException e) {
+            LOG.error("Curruption Error during commit of [{0}/{1}].", e, _table, _shard);
+          } catch (IOException e) {
+            LOG.error("IO Error during commit of [{0}/{1}].", e, _table, _shard);
+          }
+          try {
+            Thread.sleep(_timeBetweenCommits);
+          } catch (InterruptedException e) {
+            if (_isClosed.get()) {
+              return;
+            }
+            LOG.error("Unknown error with committer thread [{0}/{1}].", e, _table, _shard);
+          }
+        }
+      }
+    });
+    _committer.setDaemon(true);
+    _committer.setName("Commit Thread [" + _table + "/" + _shard + "]");
+    _committer.start();
+  }
+
+  @Override
+  public void replaceRow(boolean waitToBeVisible, boolean wal, Row row) throws IOException {
+    List<Record> records = row.records;
+    if (records == null || records.isEmpty()) {
+      deleteRow(waitToBeVisible, wal, row.id);
+      return;
+    }
+    long generation = _recorder.replaceRow(wal, row, _trackingWriter);
+    waitToBeVisible(waitToBeVisible, generation);
+  }
+
+  @Override
+  public void deleteRow(boolean waitToBeVisible, boolean wal, String rowId) throws IOException {
+    long generation = _recorder.deleteRow(wal, rowId, _trackingWriter);
+    waitToBeVisible(waitToBeVisible, generation);
+  }
+
+  @Override
+  public IndexReader getIndexReader() throws IOException {
+    IndexReader indexReader = _indexRef.get();
+    while (!indexReader.tryIncRef()) {
+      indexReader = _indexRef.get();
+    }
+    LOG.debug("Index fetched with ref of [{0}] [{1}]", indexReader.getRefCount(), indexReader);
+    return indexReader;
+  }
+
+  @Override
+  public void close() throws IOException {
+    // @TODO make sure that locks are cleaned up.
+    _isClosed.set(true);
+    _committer.interrupt();
+    _refresher.close();
+    try {
+      _recorder.close();
+      _writer.close();
+      _closer.close(_indexRef.get());
+      _nrtManager.close();
+    } finally {
+      _directory.close();
+    }
+  }
+
+  @Override
+  public void refresh() throws IOException {
+    _nrtManager.maybeRefresh();
+    swap();
+  }
+
+  @Override
+  public AtomicBoolean isClosed() {
+    return _isClosed;
+  }
+
+  @Override
+  public void optimize(int numberOfSegmentsPerShard) throws IOException {
+    _writer.forceMerge(numberOfSegmentsPerShard);
+  }
+
+  private void waitToBeVisible(boolean waitToBeVisible, long generation) throws IOException {
+    if (waitToBeVisible && _nrtManager.getCurrentSearchingGen() < generation) {
+      // if visibility is required then reopen.
+      _nrtManager.waitForGeneration(generation);
+      swap();
+    } else {
+      long now = System.nanoTime();
+      if (_lastRefresh + _timeBetweenRefreshsNanos < now) {
+        refresh();
+        _lastRefresh = now;
+      }
+    }
+  }
+
+  private void swap() {
+    IndexSearcher searcher = _nrtManager.acquire();
+    IndexReader indexReader = searcher.getIndexReader();
+    IndexReader oldIndexReader = _indexRef.getAndSet(indexReader);
+    _closer.close(oldIndexReader);
+  }
+
+  public void setAnalyzer(BlurAnalyzer analyzer) {
+    _analyzer = analyzer;
+  }
+
+  public void setDirectory(Directory directory) {
+    _directory = directory;
+  }
+
+  public void setTable(String table) {
+    _table = table;
+  }
+
+  public void setShard(String shard) {
+    _shard = shard;
+  }
+
+  public void setSimilarity(Similarity similarity) {
+    _similarity = similarity;
+  }
+
+  public void setTimeBetweenCommits(long timeBetweenCommits) {
+    _timeBetweenCommits = timeBetweenCommits;
+  }
+
+  public void setTimeBetweenRefreshs(long timeBetweenRefreshs) {
+    _timeBetweenRefreshs = timeBetweenRefreshs;
+  }
+
+  public void setWalPath(Path walPath) {
+    _walPath = walPath;
+  }
+
+  public void setConfiguration(Configuration configuration) {
+    _configuration = configuration;
+  }
+
+  public void setIndexDeletionPolicy(IndexDeletionPolicy indexDeletionPolicy) {
+    _indexDeletionPolicy = indexDeletionPolicy;
+  }
+
+  public void setCloser(BlurIndexCloser closer) {
+    _closer = closer;
+  }
+
+  public DirectoryReferenceFileGC getGc() {
+    return _gc;
+  }
+
+  public void setGc(DirectoryReferenceFileGC gc) {
+    _gc = gc;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/writer/DirectoryReferenceCounter.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/DirectoryReferenceCounter.java b/src/blur-core/src/main/java/org/apache/blur/manager/writer/DirectoryReferenceCounter.java
new file mode 100644
index 0000000..124b06d
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/DirectoryReferenceCounter.java
@@ -0,0 +1,279 @@
+package org.apache.blur.manager.writer;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.Lock;
+import org.apache.lucene.store.LockFactory;
+
+
+public class DirectoryReferenceCounter extends Directory {
+
+  private final static Log LOG = LogFactory.getLog(DirectoryReferenceCounter.class);
+  private Directory directory;
+  private Map<String, AtomicInteger> refs = new ConcurrentHashMap<String, AtomicInteger>();
+  private DirectoryReferenceFileGC gc;
+
+  public DirectoryReferenceCounter(Directory directory, DirectoryReferenceFileGC gc) {
+    this.directory = directory;
+    this.gc = gc;
+  }
+
+  public void deleteFile(String name) throws IOException {
+    if (name.equals(IndexFileNames.SEGMENTS_GEN)) {
+      deleteFile(name);
+      return;
+    }
+    AtomicInteger counter = refs.get(name);
+    if (counter != null && counter.get() > 0) {
+      addToFileGC(name);
+    } else {
+      LOG.debug("Delete file [{0}]", name);
+      directory.deleteFile(name);
+    }
+  }
+
+  private void addToFileGC(String name) {
+    if (gc != null) {
+      LOG.debug("Add file [{0}] to be GCed once refs are closed.", name);
+      gc.add(directory, name, refs);
+    }
+  }
+
+  public IndexOutput createOutput(String name) throws IOException {
+    if (name.equals(IndexFileNames.SEGMENTS_GEN)) {
+      return directory.createOutput(name);
+    }
+    LOG.debug("Create file [{0}]", name);
+    AtomicInteger counter = refs.get(name);
+    if (counter != null) {
+      LOG.error("Unknown error while trying to create ref counter for [{0}] reference exists.", name);
+      throw new IOException("Reference exists [" + name + "]");
+    }
+    counter = new AtomicInteger(0);
+    refs.put(name, counter);
+    return directory.createOutput(name);
+  }
+
+  public IndexInput openInput(String name) throws IOException {
+    IndexInput input = directory.openInput(name);
+    if (name.equals(IndexFileNames.SEGMENTS_GEN)) {
+      return input;
+    }
+    return wrap(name, input);
+  }
+
+  public IndexInput openInput(String name, int bufferSize) throws IOException {
+    IndexInput input = directory.openInput(name, bufferSize);
+    if (name.equals(IndexFileNames.SEGMENTS_GEN)) {
+      return input;
+    }
+    return wrap(name, input);
+  }
+
+  private IndexInput wrap(String name, IndexInput input) {
+    AtomicInteger counter = refs.get(name);
+    if (counter == null) {
+      counter = new AtomicInteger();
+      refs.put(name, counter);
+    }
+    return new RefIndexInput(input, counter);
+  }
+
+  @SuppressWarnings("deprecation")
+  public static class RefIndexInput extends IndexInput {
+
+    private IndexInput input;
+    private AtomicInteger ref;
+    private boolean closed = false;
+
+    public RefIndexInput(IndexInput input, AtomicInteger ref) {
+      this.input = input;
+      this.ref = ref;
+      ref.incrementAndGet();
+    }
+
+    @Override
+    protected void finalize() throws Throwable {
+      // Seems like not all the clones are being closed...
+      close();
+    }
+
+    public Object clone() {
+      RefIndexInput ref = (RefIndexInput) super.clone();
+      ref.input = (IndexInput) input.clone();
+      ref.ref.incrementAndGet();
+      return ref;
+    }
+
+    public void skipChars(int length) throws IOException {
+      input.skipChars(length);
+    }
+
+    public void setModifiedUTF8StringsMode() {
+      input.setModifiedUTF8StringsMode();
+    }
+
+    public void close() throws IOException {
+      if (!closed) {
+        input.close();
+        ref.decrementAndGet();
+        closed = true;
+      }
+    }
+
+    public short readShort() throws IOException {
+      return input.readShort();
+    }
+
+    public int readInt() throws IOException {
+      return input.readInt();
+    }
+
+    public void seek(long pos) throws IOException {
+      input.seek(pos);
+    }
+
+    public void copyBytes(IndexOutput out, long numBytes) throws IOException {
+      input.copyBytes(out, numBytes);
+    }
+
+    public int readVInt() throws IOException {
+      return input.readVInt();
+    }
+
+    public String toString() {
+      return input.toString();
+    }
+
+    public long readLong() throws IOException {
+      return input.readLong();
+    }
+
+    public long readVLong() throws IOException {
+      return input.readVLong();
+    }
+
+    public String readString() throws IOException {
+      return input.readString();
+    }
+
+    public long getFilePointer() {
+      return input.getFilePointer();
+    }
+
+    public byte readByte() throws IOException {
+      return input.readByte();
+    }
+
+    public void readBytes(byte[] b, int offset, int len) throws IOException {
+      input.readBytes(b, offset, len);
+    }
+
+    public void readBytes(byte[] b, int offset, int len, boolean useBuffer) throws IOException {
+      input.readBytes(b, offset, len, useBuffer);
+    }
+
+    public long length() {
+      return input.length();
+    }
+
+    public void readChars(char[] buffer, int start, int length) throws IOException {
+      input.readChars(buffer, start, length);
+    }
+
+    public Map<String, String> readStringStringMap() throws IOException {
+      return input.readStringStringMap();
+    }
+
+  }
+
+  @SuppressWarnings("deprecation")
+  public void touchFile(String name) throws IOException {
+    directory.touchFile(name);
+  }
+
+  @SuppressWarnings("deprecation")
+  public void sync(String name) throws IOException {
+    directory.sync(name);
+  }
+
+  public void sync(Collection<String> names) throws IOException {
+    directory.sync(names);
+  }
+
+  public void clearLock(String name) throws IOException {
+    directory.clearLock(name);
+  }
+
+  public void close() throws IOException {
+    directory.close();
+  }
+
+  public void setLockFactory(LockFactory lockFactory) throws IOException {
+    directory.setLockFactory(lockFactory);
+  }
+
+  public String getLockID() {
+    return directory.getLockID();
+  }
+
+  public void copy(Directory to, String src, String dest) throws IOException {
+    directory.copy(to, src, dest);
+  }
+
+  public boolean fileExists(String name) throws IOException {
+    return directory.fileExists(name);
+  }
+
+  @SuppressWarnings("deprecation")
+  public long fileModified(String name) throws IOException {
+    return directory.fileModified(name);
+  }
+
+  public long fileLength(String name) throws IOException {
+    return directory.fileLength(name);
+  }
+
+  public LockFactory getLockFactory() {
+    return directory.getLockFactory();
+  }
+
+  public String[] listAll() throws IOException {
+    return directory.listAll();
+  }
+
+  public Lock makeLock(String name) {
+    return directory.makeLock(name);
+  }
+
+  public String toString() {
+    return directory.toString();
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/writer/DirectoryReferenceFileGC.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/DirectoryReferenceFileGC.java b/src/blur-core/src/main/java/org/apache/blur/manager/writer/DirectoryReferenceFileGC.java
new file mode 100644
index 0000000..0aa64f4
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/DirectoryReferenceFileGC.java
@@ -0,0 +1,98 @@
+package org.apache.blur.manager.writer;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.lucene.store.Directory;
+
+
+public class DirectoryReferenceFileGC extends TimerTask {
+
+  private static final Log LOG = LogFactory.getLog(DirectoryReferenceFileGC.class);
+
+  private Timer _timer;
+  private long _delay = 5000;
+  private LinkedBlockingQueue<Value> _queue;
+
+  public static class Value {
+    public Value(Directory directory, String name, Map<String, AtomicInteger> refs) {
+      this.directory = directory;
+      this.name = name;
+      this.refs = refs;
+    }
+
+    Directory directory;
+    String name;
+    Map<String, AtomicInteger> refs;
+
+    public boolean tryToDelete() throws IOException {
+      AtomicInteger counter = refs.get(name);
+      if (counter.get() <= 0) {
+        refs.remove(name);
+        LOG.debug("Removing file [{0}]", name);
+        directory.deleteFile(name);
+        return true;
+      } else {
+        LOG.debug("File [{0}] had too many refs [{1}]", name, counter.get());
+      }
+      return false;
+    }
+  }
+
+  public void init() {
+    _timer = new Timer("Blur-File-GC", true);
+    _timer.scheduleAtFixedRate(this, _delay, _delay);
+    _queue = new LinkedBlockingQueue<Value>();
+  }
+
+  public void add(Directory directory, String name, Map<String, AtomicInteger> refs) {
+    try {
+      _queue.put(new Value(directory, name, refs));
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public void close() {
+    _timer.purge();
+    _timer.cancel();
+  }
+
+  @Override
+  public void run() {
+    Iterator<Value> iterator = _queue.iterator();
+    while (iterator.hasNext()) {
+      Value value = iterator.next();
+      try {
+        if (value.tryToDelete()) {
+          iterator.remove();
+        }
+      } catch (IOException e) {
+        LOG.error("Unknown error", e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java b/src/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
new file mode 100644
index 0000000..acb5592
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
@@ -0,0 +1,344 @@
+package org.apache.blur.manager.writer;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.blur.analysis.BlurAnalyzer;
+import org.apache.blur.analysis.FieldConverterUtil;
+import org.apache.blur.index.IndexWriter;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.Row;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.RowIndexWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.record.Utils;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.Field.Index;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.NRTManager.TrackingIndexWriter;
+
+
+public class TransactionRecorder {
+
+  enum TYPE {
+    DELETE((byte) 0), ROW((byte) 1);
+    private byte b;
+
+    private TYPE(byte b) {
+      this.b = b;
+    }
+
+    public byte value() {
+      return b;
+    }
+
+    public static TYPE lookup(byte b) {
+      switch (b) {
+      case 0:
+        return DELETE;
+      case 1:
+        return ROW;
+      default:
+        throw new RuntimeException("Type not found [" + b + "]");
+      }
+    }
+  }
+
+  private static final Log LOG = LogFactory.getLog(TransactionRecorder.class);
+  private static final Term ROW_ID = new Term(BlurConstants.ROW_ID);
+  private AtomicBoolean running = new AtomicBoolean(true);
+  private Path walPath;
+  private Configuration configuration;
+  private FileSystem fileSystem;
+  private AtomicReference<FSDataOutputStream> outputStream = new AtomicReference<FSDataOutputStream>();
+  private AtomicLong lastSync = new AtomicLong();
+  private long timeBetweenSyncs = TimeUnit.MILLISECONDS.toNanos(10);
+  private BlurAnalyzer analyzer;
+
+  public void init() throws IOException {
+    fileSystem = walPath.getFileSystem(configuration);
+  }
+
+  public void open() throws IOException {
+    if (fileSystem.exists(walPath)) {
+      throw new IOException("WAL path [" + walPath + "] still exists, replay must have not worked.");
+    } else {
+      outputStream.set(fileSystem.create(walPath));
+    }
+    if (outputStream == null) {
+      throw new RuntimeException();
+    }
+    lastSync.set(System.nanoTime());
+  }
+
+  public void replay(IndexWriter writer) throws IOException {
+    if (fileSystem.exists(walPath)) {
+      FSDataInputStream inputStream = fileSystem.open(walPath);
+      replay(writer, inputStream);
+      inputStream.close();
+      commit(writer);
+    } else {
+      open();
+    }
+  }
+
+  private void replay(IndexWriter writer, DataInputStream inputStream) throws CorruptIndexException, IOException {
+    long updateCount = 0;
+    long deleteCount = 0;
+    byte[] buffer;
+    while ((buffer = readBuffer(inputStream)) != null) {
+      DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(buffer));
+      TYPE lookup = TYPE.lookup(dataInputStream.readByte());
+      switch (lookup) {
+      case ROW:
+        Row row = readRow(dataInputStream);
+        writer.updateDocuments(ROW_ID.createTerm(row.id), getDocs(row, analyzer));
+        updateCount++;
+        continue;
+      case DELETE:
+        String deleteRowId = readString(dataInputStream);
+        writer.deleteDocuments(ROW_ID.createTerm(deleteRowId));
+        deleteCount++;
+        continue;
+      default:
+        LOG.error("Unknown type [{0}]", lookup);
+        throw new IOException("Unknown type [" + lookup + "]");
+      }
+    }
+    LOG.info("Rows reclaimed form the WAL [{0}]", updateCount);
+    LOG.info("Deletes reclaimed form the WAL [{0}]", deleteCount);
+  }
+
+  private byte[] readBuffer(DataInputStream inputStream) {
+    try {
+      int length = inputStream.readInt();
+      byte[] buffer = new byte[length];
+      inputStream.readFully(buffer);
+      return buffer;
+    } catch (IOException e) {
+      if (e instanceof EOFException) {
+        return null;
+      }
+      e.printStackTrace();
+    }
+    return null;
+  }
+
+  private void rollLog() throws IOException {
+    LOG.info("Rolling WAL path [" + walPath + "]");
+    FSDataOutputStream os = outputStream.get();
+    if (os != null) {
+      os.close();
+    }
+    fileSystem.delete(walPath, false);
+    open();
+  }
+
+  public void close() throws IOException {
+    synchronized (running) {
+      running.set(false);
+    }
+    outputStream.get().close();
+  }
+
+  private static void writeRow(DataOutputStream outputStream, Row row) throws IOException {
+    writeString(outputStream, row.id);
+    List<Record> records = row.records;
+    int size = records.size();
+    outputStream.writeInt(size);
+    for (int i = 0; i < size; i++) {
+      Record record = records.get(i);
+      writeRecord(outputStream, record);
+    }
+  }
+
+  private static Row readRow(DataInputStream inputStream) throws IOException {
+    Row row = new Row();
+    row.id = readString(inputStream);
+    int size = inputStream.readInt();
+    for (int i = 0; i < size; i++) {
+      row.addToRecords(readRecord(inputStream));
+    }
+    return row;
+  }
+
+  private static void writeRecord(DataOutputStream outputStream, Record record) throws IOException {
+    writeString(outputStream, record.recordId);
+    writeString(outputStream, record.family);
+    List<Column> columns = record.columns;
+    int size = columns.size();
+    outputStream.writeInt(size);
+    for (int i = 0; i < size; i++) {
+      writeColumn(outputStream, columns.get(i));
+    }
+  }
+
+  private static Record readRecord(DataInputStream inputStream) throws IOException {
+    Record record = new Record();
+    record.recordId = readString(inputStream);
+    record.family = readString(inputStream);
+    int size = inputStream.readInt();
+    for (int i = 0; i < size; i++) {
+      record.addToColumns(readColumn(inputStream));
+    }
+    return record;
+  }
+
+  private static void writeColumn(DataOutputStream outputStream, Column column) throws IOException {
+    writeString(outputStream, column.name);
+    writeString(outputStream, column.value);
+  }
+
+  private static Column readColumn(DataInputStream inputStream) throws IOException {
+    Column column = new Column();
+    column.name = readString(inputStream);
+    column.value = readString(inputStream);
+    return column;
+  }
+
+  private static void writeDelete(DataOutputStream outputStream, String deleteRowId) throws IOException {
+    writeString(outputStream, deleteRowId);
+  }
+
+  private static void writeString(DataOutputStream outputStream, String s) throws IOException {
+    byte[] bs = s.getBytes();
+    Utils.writeVInt(outputStream, bs.length);
+    outputStream.write(bs);
+  }
+
+  private static String readString(DataInputStream inputStream) throws IOException {
+    int length = Utils.readVInt(inputStream);
+    byte[] buffer = new byte[length];
+    inputStream.readFully(buffer);
+    return new String(buffer);
+  }
+
+  private void sync(byte[] bs) throws IOException {
+    if (bs == null || outputStream == null) {
+      throw new RuntimeException("bs [" + bs + "] outputStream [" + outputStream + "]");
+    }
+    FSDataOutputStream os = outputStream.get();
+    os.writeInt(bs.length);
+    os.write(bs);
+    long now = System.nanoTime();
+    if (lastSync.get() + timeBetweenSyncs < now) {
+      os.sync();
+      lastSync.set(now);
+    }
+  }
+
+  public long replaceRow(boolean wal, Row row, TrackingIndexWriter writer) throws IOException {
+    if (wal) {
+      synchronized (running) {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream outputStream = new DataOutputStream(baos);
+        outputStream.writeByte(TYPE.ROW.value());
+        writeRow(outputStream, row);
+        outputStream.close();
+        sync(baos.toByteArray());
+      }
+    }
+    Term term = ROW_ID.createTerm(row.id);
+    List<Document> docs = getDocs(row, analyzer);
+    return writer.updateDocuments(term, docs);
+  }
+
+  public long deleteRow(boolean wal, String rowId, TrackingIndexWriter writer) throws IOException {
+    if (wal) {
+      synchronized (running) {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream outputStream = new DataOutputStream(baos);
+        outputStream.writeByte(TYPE.DELETE.value());
+        writeDelete(outputStream, rowId);
+        outputStream.close();
+        sync(baos.toByteArray());
+      }
+    }
+    return writer.deleteDocuments(ROW_ID.createTerm(rowId));
+  }
+
+  public void setWalPath(Path walPath) {
+    this.walPath = walPath;
+  }
+
+  public void setConfiguration(Configuration configuration) {
+    this.configuration = configuration;
+  }
+
+  public void commit(IndexWriter writer) throws CorruptIndexException, IOException {
+    synchronized (running) {
+      long s = System.nanoTime();
+      writer.commit();
+      long m = System.nanoTime();
+      LOG.info("Commit took [{0}] for [{1}]", (m - s) / 1000000.0, writer);
+      rollLog();
+      long e = System.nanoTime();
+      LOG.info("Log roller took [{0}] for [{1}]", (e - m) / 1000000.0, writer);
+    }
+  }
+
+  public static List<Document> getDocs(Row row, BlurAnalyzer analyzer) {
+    List<Record> records = row.records;
+    int size = records.size();
+    final String rowId = row.id;
+    final StringBuilder builder = new StringBuilder();
+    List<Document> docs = new ArrayList<Document>(size);
+    for (int i = 0; i < size; i++) {
+      Document document = convert(rowId, records.get(i), builder, analyzer);
+      if (i == 0) {
+        document.add(BlurConstants.PRIME_DOC_FIELD);
+      }
+      docs.add(document);
+    }
+    return docs;
+  }
+
+  public static Document convert(String rowId, Record record, StringBuilder builder, BlurAnalyzer analyzer) {
+    Document document = new Document();
+    document.add(new Field(BlurConstants.ROW_ID, rowId, Store.YES, Index.NOT_ANALYZED_NO_NORMS));
+    document.add(new Field(BlurConstants.RECORD_ID, record.recordId, Store.YES, Index.NOT_ANALYZED_NO_NORMS));
+    RowIndexWriter.addColumns(document, analyzer, builder, record.family, record.columns);
+    FieldConverterUtil.convert(document, analyzer);
+    return document;
+  }
+
+  public void setAnalyzer(BlurAnalyzer analyzer) {
+    this.analyzer = analyzer;
+  }
+}


Mime
View raw message