incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [10/51] [partial] Initial repackage to org.apache.
Date Mon, 03 Sep 2012 03:17:11 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/AbstractIndexServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/AbstractIndexServer.java b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/AbstractIndexServer.java
new file mode 100644
index 0000000..03344af
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/AbstractIndexServer.java
@@ -0,0 +1,136 @@
+package org.apache.blur.manager.indexserver;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.blur.manager.IndexServer;
+import org.apache.blur.manager.writer.BlurIndex;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.TermDocs;
+
+
+public abstract class AbstractIndexServer implements IndexServer {
+
+  private Map<String, IndexCounts> _recordsTableCounts = new ConcurrentHashMap<String, IndexCounts>();
+  private Map<String, IndexCounts> _rowTableCounts = new ConcurrentHashMap<String, IndexCounts>();
+
+  private static class IndexCounts {
+    Map<String, IndexCount> counts = new ConcurrentHashMap<String, IndexCount>();
+  }
+
+  private static class IndexCount {
+    long version;
+    long count = -1;
+  }
+
+  public long getRecordCount(String table) throws IOException {
+    IndexCounts indexCounts;
+    synchronized (_recordsTableCounts) {
+      indexCounts = _recordsTableCounts.get(table);
+      if (indexCounts == null) {
+        indexCounts = new IndexCounts();
+        _recordsTableCounts.put(table, indexCounts);
+      }
+    }
+    synchronized (indexCounts) {
+      long recordCount = 0;
+      Map<String, BlurIndex> indexes = getIndexes(table);
+      for (Map.Entry<String, BlurIndex> index : indexes.entrySet()) {
+        IndexReader indexReader = null;
+        try {
+          String shard = index.getKey();
+          IndexCount indexCount = indexCounts.counts.get(shard);
+          if (indexCount == null) {
+            indexCount = new IndexCount();
+            indexCounts.counts.put(shard, indexCount);
+          }
+          indexReader = index.getValue().getIndexReader();
+          if (!isValid(indexCount, indexReader)) {
+            indexCount.count = indexReader.numDocs();
+            indexCount.version = indexReader.getVersion();
+          }
+          recordCount += indexCount.count;
+        } finally {
+          if (indexReader != null) {
+            indexReader.decRef();
+          }
+        }
+      }
+      return recordCount;
+    }
+  }
+
+  private boolean isValid(IndexCount indexCount, IndexReader indexReader) {
+    if (indexCount.version == indexReader.getVersion() && indexCount.count != -1l) {
+      return true;
+    }
+    return false;
+  }
+
+  public long getRowCount(String table) throws IOException {
+    IndexCounts indexCounts;
+    synchronized (_rowTableCounts) {
+      indexCounts = _rowTableCounts.get(table);
+      if (indexCounts == null) {
+        indexCounts = new IndexCounts();
+        _rowTableCounts.put(table, indexCounts);
+      }
+    }
+    synchronized (indexCounts) {
+      long rowCount = 0;
+      Map<String, BlurIndex> indexes = getIndexes(table);
+      for (Map.Entry<String, BlurIndex> index : indexes.entrySet()) {
+        IndexReader indexReader = null;
+        try {
+          String shard = index.getKey();
+          IndexCount indexCount = indexCounts.counts.get(shard);
+          if (indexCount == null) {
+            indexCount = new IndexCount();
+            indexCounts.counts.put(shard, indexCount);
+          }
+          indexReader = index.getValue().getIndexReader();
+          if (!isValid(indexCount, indexReader)) {
+            indexCount.count = getRowCount(indexReader);
+            indexCount.version = indexReader.getVersion();
+          }
+          rowCount += indexCount.count;
+        } finally {
+          if (indexReader != null) {
+            indexReader.decRef();
+          }
+        }
+      }
+      return rowCount;
+    }
+  }
+
+  private long getRowCount(IndexReader indexReader) throws IOException {
+    long rowCount = 0;
+    TermDocs termDocs = indexReader.termDocs(BlurConstants.PRIME_DOC_TERM);
+    while (termDocs.next()) {
+      if (!indexReader.isDeleted(termDocs.doc())) {
+        rowCount++;
+      }
+    }
+    termDocs.close();
+    return rowCount;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/BlurIndexWarmup.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/BlurIndexWarmup.java b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/BlurIndexWarmup.java
new file mode 100644
index 0000000..8265cc6
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/BlurIndexWarmup.java
@@ -0,0 +1,71 @@
+package org.apache.blur.manager.indexserver;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.blur.manager.indexserver.DistributedIndexServer.ReleaseReader;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.lucene.index.IndexReader;
+
+
+public abstract class BlurIndexWarmup {
+
+  /**
+   * Once the reader has be warmed up, release() must be called on the
+   * ReleaseReader even if an exception occurs.
+   * 
+   * @param table
+   *          the table name.
+   * @param shard
+   *          the shard name.
+   * @param reader
+   *          thread reader inself.
+   * @param isClosed
+   *          to check if the shard has been migrated to another node.
+   * @param releaseReader
+   *          to release the handle on the reader.
+   * @throws IOException
+   * 
+   * @deprecated
+   */
+  public void warmBlurIndex(String table, String shard, IndexReader reader, AtomicBoolean isClosed, ReleaseReader releaseReader) throws IOException {
+
+  }
+
+  /**
+   * Once the reader has be warmed up, release() must be called on the
+   * ReleaseReader even if an exception occurs.
+   * 
+   * @param table
+   *          the table descriptor.
+   * @param shard
+   *          the shard name.
+   * @param reader
+   *          thread reader inself.
+   * @param isClosed
+   *          to check if the shard has been migrated to another node.
+   * @param releaseReader
+   *          to release the handle on the reader.
+   * @throws IOException
+   */
+  public void warmBlurIndex(TableDescriptor table, String shard, IndexReader reader, AtomicBoolean isClosed, ReleaseReader releaseReader) throws IOException {
+    warmBlurIndex(table.name, shard, reader, isClosed, releaseReader);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/BlurServerShutDown.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/BlurServerShutDown.java b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/BlurServerShutDown.java
new file mode 100644
index 0000000..e41fdef
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/BlurServerShutDown.java
@@ -0,0 +1,72 @@
+package org.apache.blur.manager.indexserver;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+
+
+public class BlurServerShutDown implements Watcher {
+
+  private static final Log LOG = LogFactory.getLog(BlurServerShutDown.class);
+
+  public interface BlurShutdown {
+    void shutdown();
+  }
+
+  private BlurShutdown shutdown;
+  private ZooKeeper zooKeeper;
+
+  public BlurServerShutDown() {
+    Runtime runtime = Runtime.getRuntime();
+    runtime.addShutdownHook(new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          LOG.info("Closing zookeeper.");
+          zooKeeper.close();
+        } catch (InterruptedException e) {
+          LOG.error("Unknown error while closing zookeeper.", e);
+        }
+      }
+    }));
+  }
+
+  public void register(final BlurShutdown shutdown, ZooKeeper zooKeeper) {
+    this.shutdown = shutdown;
+    this.zooKeeper = zooKeeper;
+    zooKeeper.register(new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        KeeperState state = event.getState();
+        if (state == KeeperState.Expired) {
+          LOG.fatal("Zookeeper session has [" + state + "] server process shutting down.");
+          shutdown.shutdown();
+        }
+      }
+    });
+  }
+
+  @Override
+  public void process(WatchedEvent event) {
+    register(shutdown, zooKeeper);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DefaultBlurIndexWarmup.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DefaultBlurIndexWarmup.java b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DefaultBlurIndexWarmup.java
new file mode 100644
index 0000000..f828229
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DefaultBlurIndexWarmup.java
@@ -0,0 +1,89 @@
+package org.apache.blur.manager.indexserver;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.manager.indexserver.DistributedIndexServer.ReleaseReader;
+import org.apache.blur.thrift.generated.ColumnPreCache;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.index.FieldInfos;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.index.WarmUpByFieldBounds;
+import org.apache.lucene.index.WarmUpByFieldBoundsStatus;
+import org.apache.lucene.util.ReaderUtil;
+
+
+public class DefaultBlurIndexWarmup extends BlurIndexWarmup {
+
+  private static final Log LOG = LogFactory.getLog(DefaultBlurIndexWarmup.class);
+
+  @Override
+  public void warmBlurIndex(final TableDescriptor table, final String shard, IndexReader reader, AtomicBoolean isClosed, ReleaseReader releaseReader) throws IOException {
+    try {
+      ColumnPreCache columnPreCache = table.columnPreCache;
+      List<String> preCacheCols = null;
+      if (columnPreCache != null) {
+        preCacheCols = columnPreCache.preCacheCols;
+      }
+      if (preCacheCols == null) {
+        LOG.info("No pre cache defined, precache all fields.");
+        FieldInfos fieldInfos = ReaderUtil.getMergedFieldInfos(reader);
+        preCacheCols = new ArrayList<String>();
+        for (FieldInfo fieldInfo : fieldInfos) {
+          if (fieldInfo.isIndexed) {
+            preCacheCols.add(fieldInfo.name);
+          }
+        }
+        preCacheCols.remove(BlurConstants.ROW_ID);
+        preCacheCols.remove(BlurConstants.RECORD_ID);
+        preCacheCols.remove(BlurConstants.PRIME_DOC);
+        preCacheCols.remove(BlurConstants.SUPER);
+      }
+
+      WarmUpByFieldBounds warmUpByFieldBounds = new WarmUpByFieldBounds();
+      WarmUpByFieldBoundsStatus status = new WarmUpByFieldBoundsStatus() {
+        @Override
+        public void complete(String name, Term start, Term end, long startPosition, long endPosition, long totalBytesRead, long nanoTime, AtomicBoolean isClosed) {
+          double bytesPerNano = totalBytesRead / (double) nanoTime;
+          double mBytesPerNano = bytesPerNano / 1024 / 1024;
+          double mBytesPerSecond = mBytesPerNano * 1000000000.0;
+          if (totalBytesRead > 0) {
+            LOG.info("Precached field [{0}] in table [{1}] shard [{2}] file [{3}], [{4}] bytes cached at [{5} MB/s]", start.field(), table.name, shard, name, totalBytesRead,
+                mBytesPerSecond);
+          }
+        }
+      };
+      if (preCacheCols != null) {
+        for (String field : preCacheCols) {
+          warmUpByFieldBounds.warmUpByField(isClosed, new Term(field), reader, status);
+        }
+      }
+    } finally {
+      releaseReader.release();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
new file mode 100644
index 0000000..2d60131
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
@@ -0,0 +1,843 @@
+package org.apache.blur.manager.indexserver;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.apache.blur.utils.BlurConstants.SHARD_PREFIX;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.blur.analysis.BlurAnalyzer;
+import org.apache.blur.concurrent.Executors;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.lucene.search.FairSimilarity;
+import org.apache.blur.manager.BlurFilterCache;
+import org.apache.blur.manager.clusterstatus.ClusterStatus;
+import org.apache.blur.manager.clusterstatus.ZookeeperPathConstants;
+import org.apache.blur.manager.writer.BlurIndex;
+import org.apache.blur.manager.writer.BlurIndexCloser;
+import org.apache.blur.manager.writer.BlurIndexReader;
+import org.apache.blur.manager.writer.BlurIndexRefresher;
+import org.apache.blur.manager.writer.BlurNRTIndex;
+import org.apache.blur.manager.writer.DirectoryReferenceFileGC;
+import org.apache.blur.metrics.BlurMetrics;
+import org.apache.blur.store.blockcache.BlockDirectory;
+import org.apache.blur.store.blockcache.Cache;
+import org.apache.blur.store.compressed.CompressedFieldDataDirectory;
+import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.blur.store.lock.BlurLockFactory;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.blur.zookeeper.WatchChildren;
+import org.apache.blur.zookeeper.WatchChildren.OnChange;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.index.FieldInfos;
+import org.apache.lucene.index.IndexDeletionPolicy;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.TermDocs;
+import org.apache.lucene.index.TermPositions;
+import org.apache.lucene.search.Similarity;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.ReaderUtil;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+
+public class DistributedIndexServer extends AbstractIndexServer {
+
+  private static final String LOGS = "logs";
+  private static final Log LOG = LogFactory.getLog(DistributedIndexServer.class);
+  private static final long _delay = TimeUnit.SECONDS.toMillis(5);
+
+  private Map<String, BlurAnalyzer> _tableAnalyzers = new ConcurrentHashMap<String, BlurAnalyzer>();
+  private Map<String, TableDescriptor> _tableDescriptors = new ConcurrentHashMap<String, TableDescriptor>();
+  private Map<String, Similarity> _tableSimilarity = new ConcurrentHashMap<String, Similarity>();
+  private Map<String, DistributedLayoutManager> _layoutManagers = new ConcurrentHashMap<String, DistributedLayoutManager>();
+  private Map<String, Set<String>> _layoutCache = new ConcurrentHashMap<String, Set<String>>();
+  private ConcurrentHashMap<String, Map<String, BlurIndex>> _indexes = new ConcurrentHashMap<String, Map<String, BlurIndex>>();
+
+  // set externally
+  private ClusterStatus _clusterStatus;
+  private Configuration _configuration;
+  private String _nodeName;
+  private int _shardOpenerThreadCount;
+  private BlurIndexRefresher _refresher;
+  private Cache _cache;
+  private BlurMetrics _blurMetrics;
+  private ZooKeeper _zookeeper;
+
+  // set internally
+  private Timer _timerCacheFlush;
+  private ExecutorService _openerService;
+  private BlurIndexCloser _closer;
+  private Timer _timerTableWarmer;
+  private BlurFilterCache _filterCache;
+  private AtomicBoolean _running = new AtomicBoolean();
+  private long _safeModeDelay;
+  private BlurIndexWarmup _warmup = new DefaultBlurIndexWarmup();
+  private IndexDeletionPolicy _indexDeletionPolicy;
+  private String cluster = BlurConstants.BLUR_CLUSTER;
+  private DirectoryReferenceFileGC _gc;
+  private long _timeBetweenCommits = TimeUnit.SECONDS.toMillis(60);
+  private long _timeBetweenRefreshs = TimeUnit.MILLISECONDS.toMillis(500);
+  private WatchChildren _watchOnlineShards;
+
+  public static interface ReleaseReader {
+    void release() throws IOException;
+  }
+
+  public void init() throws KeeperException, InterruptedException, IOException {
+    BlurUtil.setupZookeeper(_zookeeper, cluster);
+    _openerService = Executors.newThreadPool("shard-opener", _shardOpenerThreadCount);
+    _closer = new BlurIndexCloser();
+    _closer.init();
+    _gc = new DirectoryReferenceFileGC();
+    _gc.init();
+    setupFlushCacheTimer();
+    String lockPath = BlurUtil.lockForSafeMode(_zookeeper, getNodeName(), cluster);
+    try {
+      registerMyself();
+      setupSafeMode();
+    } finally {
+      BlurUtil.unlockForSafeMode(_zookeeper, lockPath);
+    }
+    waitInSafeModeIfNeeded();
+    _running.set(true);
+    setupTableWarmer();
+    watchForShardServerChanges();
+  }
+
+  private void watchForShardServerChanges() {
+    ZookeeperPathConstants.getOnlineShardsPath(cluster);
+    _watchOnlineShards = new WatchChildren(_zookeeper, ZookeeperPathConstants.getOnlineShardsPath(cluster)).watch(new OnChange() {
+      private List<String> _prevOnlineShards = new ArrayList<String>();
+
+      @Override
+      public void action(List<String> onlineShards) {
+        List<String> oldOnlineShards = _prevOnlineShards;
+        _prevOnlineShards = onlineShards;
+        _layoutManagers.clear();
+        _layoutCache.clear();
+        LOG.info("Online shard servers changed, clearing layout managers and cache.");
+        if (oldOnlineShards == null) {
+          oldOnlineShards = new ArrayList<String>();
+        }
+        for (String oldOnlineShard : oldOnlineShards) {
+          if (!onlineShards.contains(oldOnlineShard)) {
+            LOG.info("Node went offline [{0}]", oldOnlineShard);
+          }
+        }
+        for (String onlineShard : onlineShards) {
+          if (!oldOnlineShards.contains(onlineShard)) {
+            LOG.info("Node came online [{0}]", onlineShard);
+          }
+        }
+      }
+    });
+  }
+
+  private void waitInSafeModeIfNeeded() throws KeeperException, InterruptedException {
+    String blurSafemodePath = ZookeeperPathConstants.getSafemodePath(cluster);
+    Stat stat = _zookeeper.exists(blurSafemodePath, false);
+    if (stat == null) {
+      throw new RuntimeException("Safemode path missing [" + blurSafemodePath + "]");
+    }
+    byte[] data = _zookeeper.getData(blurSafemodePath, false, stat);
+    if (data == null) {
+      throw new RuntimeException("Safemode data missing [" + blurSafemodePath + "]");
+    }
+    long timestamp = Long.parseLong(new String(data));
+    long waitTime = timestamp - System.currentTimeMillis();
+    if (waitTime > 0) {
+      LOG.info("Waiting in safe mode for [{0}] seconds", waitTime / 1000.0);
+      Thread.sleep(waitTime);
+    }
+  }
+
+  private void setupSafeMode() throws KeeperException, InterruptedException {
+    String shardsPath = ZookeeperPathConstants.getOnlineShardsPath(cluster);
+    List<String> children = _zookeeper.getChildren(shardsPath, false);
+    if (children.size() == 0) {
+      throw new RuntimeException("No shards registered!");
+    }
+    if (children.size() != 1) {
+      return;
+    }
+    LOG.info("First node online, setting up safe mode.");
+    long timestamp = System.currentTimeMillis() + _safeModeDelay;
+    String blurSafemodePath = ZookeeperPathConstants.getSafemodePath(cluster);
+    Stat stat = _zookeeper.exists(blurSafemodePath, false);
+    if (stat == null) {
+      _zookeeper.create(blurSafemodePath, Long.toString(timestamp).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+    } else {
+      _zookeeper.setData(blurSafemodePath, Long.toString(timestamp).getBytes(), -1);
+    }
+    setupAnyMissingPaths();
+  }
+
+  private void setupAnyMissingPaths() throws KeeperException, InterruptedException {
+    String tablesPath = ZookeeperPathConstants.getTablesPath(cluster);
+    List<String> tables = _zookeeper.getChildren(tablesPath, false);
+    for (String table : tables) {
+      BlurUtil.createIfMissing(_zookeeper, ZookeeperPathConstants.getLockPath(cluster, table));
+      BlurUtil.createIfMissing(_zookeeper, ZookeeperPathConstants.getTableFieldNamesPath(cluster, table));
+    }
+  }
+
+  private void setupTableWarmer() {
+    _timerTableWarmer = new Timer("Table-Warmer", true);
+    _timerTableWarmer.schedule(new TimerTask() {
+      @Override
+      public void run() {
+        try {
+          warmup();
+        } catch (Throwable t) {
+          if (_running.get()) {
+            LOG.error("Unknown error", t);
+          } else {
+            LOG.debug("Unknown error", t);
+          }
+        }
+      }
+
+      private void warmup() {
+        if (_running.get()) {
+          List<String> tableList = _clusterStatus.getTableList(false, cluster);
+          _blurMetrics.tableCount.set(tableList.size());
+          long indexCount = 0;
+          AtomicLong segmentCount = new AtomicLong();
+          AtomicLong indexMemoryUsage = new AtomicLong();
+          for (String table : tableList) {
+            try {
+              Map<String, BlurIndex> indexes = getIndexes(table);
+              int count = indexes.size();
+              indexCount += count;
+              updateMetrics(_blurMetrics, indexes, segmentCount, indexMemoryUsage);
+              LOG.debug("Table [{0}] has [{1}] number of shards online in this node.", table, count);
+            } catch (IOException e) {
+              LOG.error("Unknown error trying to warm table [{0}]", e, table);
+            }
+          }
+          _blurMetrics.indexCount.set(indexCount);
+          _blurMetrics.segmentCount.set(segmentCount.get());
+          _blurMetrics.indexMemoryUsage.set(indexMemoryUsage.get());
+        }
+      }
+
+      private void updateMetrics(BlurMetrics blurMetrics, Map<String, BlurIndex> indexes, AtomicLong segmentCount, AtomicLong indexMemoryUsage) throws IOException {
+        for (BlurIndex index : indexes.values()) {
+          IndexReader reader = index.getIndexReader();
+          try {
+            IndexReader[] readers = reader.getSequentialSubReaders();
+            if (readers != null) {
+              segmentCount.addAndGet(readers.length);
+            }
+            indexMemoryUsage.addAndGet(BlurUtil.getMemoryUsage(reader));
+          } finally {
+            reader.decRef();
+          }
+        }
+      }
+    }, _delay, _delay);
+  }
+
+  private void registerMyself() {
+    String nodeName = getNodeName();
+    String registeredShardsPath = ZookeeperPathConstants.getRegisteredShardsPath(cluster) + "/" + nodeName;
+    String onlineShardsPath = ZookeeperPathConstants.getOnlineShardsPath(cluster) + "/" + nodeName;
+    try {
+      if (_zookeeper.exists(registeredShardsPath, false) == null) {
+        _zookeeper.create(registeredShardsPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+      }
+      while (_zookeeper.exists(onlineShardsPath, false) != null) {
+        LOG.info("Node [{0}] already registered, waiting for path [{1}] to be released", nodeName, onlineShardsPath);
+        Thread.sleep(3000);
+      }
+      String version = BlurUtil.getVersion();
+      _zookeeper.create(onlineShardsPath, version.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+    } catch (KeeperException e) {
+      throw new RuntimeException(e);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void setupFlushCacheTimer() {
+    _timerCacheFlush = new Timer("Flush-IndexServer-Caches", true);
+    _timerCacheFlush.schedule(new TimerTask() {
+      @Override
+      public void run() {
+        try {
+          cleanup();
+        } catch (Throwable t) {
+          LOG.error("Unknown error", t);
+        }
+      }
+
+      private void cleanup() {
+        clearMapOfOldTables(_tableAnalyzers);
+        clearMapOfOldTables(_tableDescriptors);
+        clearMapOfOldTables(_layoutManagers);
+        clearMapOfOldTables(_layoutCache);
+        clearMapOfOldTables(_tableSimilarity);
+        Map<String, Map<String, BlurIndex>> oldIndexesThatNeedToBeClosed = clearMapOfOldTables(_indexes);
+        for (String table : oldIndexesThatNeedToBeClosed.keySet()) {
+          Map<String, BlurIndex> indexes = oldIndexesThatNeedToBeClosed.get(table);
+          if (indexes == null) {
+            continue;
+          }
+          for (String shard : indexes.keySet()) {
+            BlurIndex index = indexes.get(shard);
+            if (index == null) {
+              continue;
+            }
+            close(index, table, shard);
+          }
+        }
+        for (String table : _indexes.keySet()) {
+          Map<String, BlurIndex> shardMap = _indexes.get(table);
+          if (shardMap != null) {
+            Set<String> shards = new HashSet<String>(shardMap.keySet());
+            Set<String> shardsToServe = getShardsToServe(table);
+            shards.removeAll(shardsToServe);
+            if (!shards.isEmpty()) {
+              LOG.info("Need to close indexes for table [{0}] indexes [{1}]", table, shards);
+            }
+            for (String shard : shards) {
+              LOG.info("Closing index for table [{0}] shard [{1}]", table, shard);
+              BlurIndex index = shardMap.remove(shard);
+              close(index, table, shard);
+            }
+          }
+        }
+      }
+    }, _delay, _delay);
+  }
+
+  protected void close(BlurIndex index, String table, String shard) {
+    LOG.info("Closing index [{0}] from table [{1}] shard [{2}]", index, table, shard);
+    try {
+      _filterCache.closing(table, shard, index);
+      index.close();
+    } catch (Throwable e) {
+      LOG.error("Error while closing index [{0}] from table [{1}] shard [{2}]", e, index, table, shard);
+    }
+  }
+
+  protected <T> Map<String, T> clearMapOfOldTables(Map<String, T> map) {
+    List<String> tables = new ArrayList<String>(map.keySet());
+    Map<String, T> removed = new HashMap<String, T>();
+    for (String table : tables) {
+      if (!_clusterStatus.exists(true, cluster, table)) {
+        removed.put(table, map.remove(table));
+      }
+    }
+    for (String table : tables) {
+      if (!_clusterStatus.isEnabled(true, cluster, table)) {
+        removed.put(table, map.remove(table));
+      }
+    }
+    return removed;
+  }
+
+  @Override
+  public void close() {
+    if (_running.get()) {
+      _running.set(false);
+      closeAllIndexes();
+      _watchOnlineShards.close();
+      _timerCacheFlush.purge();
+      _timerCacheFlush.cancel();
+
+      _timerTableWarmer.purge();
+      _timerTableWarmer.cancel();
+      _closer.close();
+      _gc.close();
+      _openerService.shutdownNow();
+    }
+  }
+
+  private void closeAllIndexes() {
+    for (Entry<String, Map<String, BlurIndex>> tableToShards : _indexes.entrySet()) {
+      for (Entry<String, BlurIndex> shard : tableToShards.getValue().entrySet()) {
+        BlurIndex index = shard.getValue();
+        try {
+          index.close();
+          LOG.info("Closed [{0}] [{1}] [{2}]", tableToShards.getKey(), shard.getKey(), index);
+        } catch (IOException e) {
+          LOG.info("Error during closing of [{0}] [{1}] [{2}]", tableToShards.getKey(), shard.getKey(), index);
+        }
+      }
+    }
+  }
+
+  @Override
+  public BlurAnalyzer getAnalyzer(String table) {
+    checkTable(table);
+    BlurAnalyzer blurAnalyzer = _tableAnalyzers.get(table);
+    if (blurAnalyzer == null) {
+      TableDescriptor descriptor = getTableDescriptor(table);
+      blurAnalyzer = new BlurAnalyzer(descriptor.analyzerDefinition);
+      _tableAnalyzers.put(table, blurAnalyzer);
+    }
+    return blurAnalyzer;
+  }
+
+  @Override
+  public int getCompressionBlockSize(String table) {
+    checkTable(table);
+    TableDescriptor descriptor = getTableDescriptor(table);
+    return descriptor.compressionBlockSize;
+  }
+
+  @Override
+  public CompressionCodec getCompressionCodec(String table) {
+    checkTable(table);
+    TableDescriptor descriptor = getTableDescriptor(table);
+    return getInstance(descriptor.compressionClass, CompressionCodec.class);
+  }
+
+  @Override
+  public SortedSet<String> getShardListCurrentServerOnly(String table) throws IOException {
+    return new TreeSet<String>(getShardsToServe(table));
+  }
+
+  @Override
+  public Map<String, BlurIndex> getIndexes(String table) throws IOException {
+    checkTable(table);
+
+    Set<String> shardsToServe = getShardsToServe(table);
+    synchronized (_indexes) {
+      if (!_indexes.containsKey(table)) {
+        _indexes.putIfAbsent(table, new ConcurrentHashMap<String, BlurIndex>());
+      }
+    }
+    Map<String, BlurIndex> tableIndexes = _indexes.get(table);
+    Set<String> shardsBeingServed = new HashSet<String>(tableIndexes.keySet());
+    if (shardsBeingServed.containsAll(shardsToServe)) {
+      Map<String, BlurIndex> result = new HashMap<String, BlurIndex>(tableIndexes);
+      shardsBeingServed.removeAll(shardsToServe);
+      for (String shardNotToServe : shardsBeingServed) {
+        result.remove(shardNotToServe);
+      }
+      return result;
+    } else {
+      return openMissingShards(table, shardsToServe, tableIndexes);
+    }
+  }
+
+  private BlurIndex openShard(String table, String shard) throws IOException {
+    LOG.info("Opening shard [{0}] for table [{1}]", shard, table);
+    Path tablePath = new Path(getTableDescriptor(table).tableUri);
+    Path walTablePath = new Path(tablePath, LOGS);
+    Path hdfsDirPath = new Path(tablePath, shard);
+
+    BlurLockFactory lockFactory = new BlurLockFactory(_configuration, hdfsDirPath, _nodeName, BlurConstants.getPid());
+
+    Directory directory = new HdfsDirectory(hdfsDirPath);
+    directory.setLockFactory(lockFactory);
+
+    TableDescriptor descriptor = _clusterStatus.getTableDescriptor(true, cluster, table);
+    String compressionClass = descriptor.compressionClass;
+    int compressionBlockSize = descriptor.compressionBlockSize;
+    if (compressionClass != null) {
+      CompressionCodec compressionCodec;
+      try {
+        compressionCodec = BlurUtil.getInstance(compressionClass, CompressionCodec.class);
+        directory = new CompressedFieldDataDirectory(directory, compressionCodec, compressionBlockSize);
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+    }
+
+    Directory dir;
+    boolean blockCacheEnabled = _clusterStatus.isBlockCacheEnabled(cluster, table);
+    if (blockCacheEnabled) {
+      Set<String> blockCacheFileTypes = _clusterStatus.getBlockCacheFileTypes(cluster, table);
+      dir = new BlockDirectory(table + "_" + shard, directory, _cache, blockCacheFileTypes);
+    } else {
+      dir = directory;
+    }
+
+    BlurIndex index;
+    if (_clusterStatus.isReadOnly(true, cluster, table)) {
+      BlurIndexReader reader = new BlurIndexReader();
+      reader.setCloser(_closer);
+      reader.setAnalyzer(getAnalyzer(table));
+      reader.setDirectory(dir);
+      reader.setRefresher(_refresher);
+      reader.setShard(shard);
+      reader.setTable(table);
+      reader.setIndexDeletionPolicy(_indexDeletionPolicy);
+      reader.setSimilarity(getSimilarity(table));
+      reader.init();
+      index = reader;
+    } else {
+      BlurNRTIndex writer = new BlurNRTIndex();
+      writer.setAnalyzer(getAnalyzer(table));
+      writer.setDirectory(dir);
+      writer.setShard(shard);
+      writer.setTable(table);
+      writer.setSimilarity(getSimilarity(table));
+      writer.setTimeBetweenCommits(_timeBetweenCommits);
+      writer.setTimeBetweenRefreshs(_timeBetweenRefreshs);
+      writer.setWalPath(walTablePath);
+      writer.setConfiguration(_configuration);
+      writer.setIndexDeletionPolicy(_indexDeletionPolicy);
+      writer.setCloser(_closer);
+      writer.setGc(_gc);
+      writer.init();
+      index = writer;
+    }
+    _filterCache.opening(table, shard, index);
+    TableDescriptor tableDescriptor = _clusterStatus.getTableDescriptor(true, cluster, table);
+    return warmUp(index, tableDescriptor, shard);
+  }
+
+  private BlurIndex warmUp(BlurIndex index, TableDescriptor table, String shard) throws IOException {
+    final IndexReader reader = index.getIndexReader();
+    warmUpAllSegments(reader);
+    _warmup.warmBlurIndex(table, shard, reader, index.isClosed(), new ReleaseReader() {
+      @Override
+      public void release() throws IOException {
+        // this will allow for closing of index
+        reader.decRef();
+      }
+    });
+
+    return index;
+  }
+
+  private void warmUpAllSegments(IndexReader reader) throws IOException {
+    IndexReader[] indexReaders = reader.getSequentialSubReaders();
+    if (indexReaders != null) {
+      for (IndexReader r : indexReaders) {
+        warmUpAllSegments(r);
+      }
+    }
+    int maxDoc = reader.maxDoc();
+    int numDocs = reader.numDocs();
+    FieldInfos fieldInfos = ReaderUtil.getMergedFieldInfos(reader);
+    Collection<String> fieldNames = new ArrayList<String>();
+    for (FieldInfo fieldInfo : fieldInfos) {
+      if (fieldInfo.isIndexed) {
+        fieldNames.add(fieldInfo.name);
+      }
+    }
+    int primeDocCount = reader.docFreq(BlurConstants.PRIME_DOC_TERM);
+    TermDocs termDocs = reader.termDocs(BlurConstants.PRIME_DOC_TERM);
+    termDocs.next();
+    termDocs.close();
+
+    TermPositions termPositions = reader.termPositions(BlurConstants.PRIME_DOC_TERM);
+    if (termPositions.next()) {
+      if (termPositions.freq() > 0) {
+        termPositions.nextPosition();
+      }
+    }
+    termPositions.close();
+    LOG.info("Warmup of indexreader [" + reader + "] complete, maxDocs [" + maxDoc + "], numDocs [" + numDocs + "], primeDocumentCount [" + primeDocCount + "], fieldCount ["
+        + fieldNames.size() + "]");
+  }
+
+  private synchronized Map<String, BlurIndex> openMissingShards(final String table, Set<String> shardsToServe, final Map<String, BlurIndex> tableIndexes) {
+    Map<String, Future<BlurIndex>> opening = new HashMap<String, Future<BlurIndex>>();
+    for (String s : shardsToServe) {
+      final String shard = s;
+      BlurIndex blurIndex = tableIndexes.get(shard);
+      if (blurIndex == null) {
+        LOG.info("Opening missing shard [{0}] from table [{1}]", shard, table);
+        Future<BlurIndex> submit = _openerService.submit(new Callable<BlurIndex>() {
+          @Override
+          public BlurIndex call() throws Exception {
+            return openShard(table, shard);
+          }
+        });
+        opening.put(shard, submit);
+      }
+    }
+
+    for (Entry<String, Future<BlurIndex>> entry : opening.entrySet()) {
+      String shard = entry.getKey();
+      Future<BlurIndex> future = entry.getValue();
+      try {
+        BlurIndex blurIndex = future.get();
+        tableIndexes.put(shard, blurIndex);
+      } catch (Exception e) {
+        e.printStackTrace();
+        LOG.error("Unknown error while opening shard [{0}] for table [{1}].", e.getCause(), shard, table);
+      }
+    }
+
+    Map<String, BlurIndex> result = new HashMap<String, BlurIndex>();
+    for (String shard : shardsToServe) {
+      BlurIndex blurIndex = tableIndexes.get(shard);
+      if (blurIndex == null) {
+        LOG.error("Missing shard [{0}] for table [{1}].", shard, table);
+      } else {
+        result.put(shard, blurIndex);
+      }
+    }
+    return result;
+  }
+
+  private Set<String> getShardsToServe(String table) {
+    TABLE_STATUS tableStatus = getTableStatus(table);
+    if (tableStatus == TABLE_STATUS.DISABLED) {
+      return new HashSet<String>();
+    }
+    DistributedLayoutManager layoutManager = _layoutManagers.get(table);
+    if (layoutManager == null) {
+      return setupLayoutManager(table);
+    } else {
+      return _layoutCache.get(table);
+    }
+  }
+
+  private synchronized Set<String> setupLayoutManager(String table) {
+    DistributedLayoutManager layoutManager = new DistributedLayoutManager();
+
+    String cluster = _clusterStatus.getCluster(false, table);
+    if (cluster == null) {
+      throw new RuntimeException("Table [" + table + "] is not found.");
+    }
+
+    List<String> shardServerList = _clusterStatus.getShardServerList(cluster);
+    List<String> offlineShardServers = new ArrayList<String>(_clusterStatus.getOfflineShardServers(false, cluster));
+    List<String> shardList = getShardList(table);
+
+    layoutManager.setNodes(shardServerList);
+    layoutManager.setNodesOffline(offlineShardServers);
+    layoutManager.setShards(shardList);
+    layoutManager.init();
+
+    Map<String, String> layout = layoutManager.getLayout();
+    String nodeName = getNodeName();
+    Set<String> shardsToServeCache = new TreeSet<String>();
+    for (Entry<String, String> entry : layout.entrySet()) {
+      if (entry.getValue().equals(nodeName)) {
+        shardsToServeCache.add(entry.getKey());
+      }
+    }
+    _layoutCache.put(table, shardsToServeCache);
+    _layoutManagers.put(table, layoutManager);
+    return shardsToServeCache;
+  }
+
+  @Override
+  public String getNodeName() {
+    return _nodeName;
+  }
+
+  @Override
+  public int getShardCount(String table) {
+    checkTable(table);
+    TableDescriptor descriptor = getTableDescriptor(table);
+    return descriptor.shardCount;
+  }
+
+  @Override
+  public List<String> getShardList(String table) {
+    checkTable(table);
+    List<String> result = new ArrayList<String>();
+    try {
+      TableDescriptor descriptor = getTableDescriptor(table);
+      Path tablePath = new Path(descriptor.tableUri);
+      FileSystem fileSystem = FileSystem.get(tablePath.toUri(), _configuration);
+      if (!fileSystem.exists(tablePath)) {
+        LOG.error("Table [{0}] is missing, defined location [{1}]", table, tablePath.toUri());
+        throw new RuntimeException("Table [" + table + "] is missing, defined location [" + tablePath.toUri() + "]");
+      }
+      FileStatus[] listStatus = fileSystem.listStatus(tablePath);
+      for (FileStatus status : listStatus) {
+        if (status.isDir()) {
+          String name = status.getPath().getName();
+          if (name.startsWith(SHARD_PREFIX)) {
+            result.add(name);
+          }
+        }
+      }
+      return result;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Similarity getSimilarity(String table) {
+    checkTable(table);
+    Similarity similarity = _tableSimilarity.get(table);
+    if (similarity == null) {
+      TableDescriptor tableDescriptor = _clusterStatus.getTableDescriptor(true, cluster, table);
+      String similarityClass = tableDescriptor.similarityClass;
+      if (similarityClass == null) {
+        similarity = new FairSimilarity();
+      } else {
+        similarity = getInstance(similarityClass, Similarity.class);
+      }
+      _tableSimilarity.put(table, similarity);
+    }
+    return similarity;
+  }
+
+  @Override
+  public long getTableSize(String table) throws IOException {
+    checkTable(table);
+    Path tablePath = new Path(getTableUri(table));
+    FileSystem fileSystem = FileSystem.get(tablePath.toUri(), _configuration);
+    ContentSummary contentSummary = fileSystem.getContentSummary(tablePath);
+    return contentSummary.getLength();
+  }
+
+  @Override
+  public TABLE_STATUS getTableStatus(String table) {
+    checkTable(table);
+    boolean enabled = _clusterStatus.isEnabled(true, cluster, table);
+    if (enabled) {
+      return TABLE_STATUS.ENABLED;
+    }
+    return TABLE_STATUS.DISABLED;
+  }
+
+  private void checkTable(String table) {
+    if (_clusterStatus.exists(true, cluster, table)) {
+      return;
+    }
+    throw new RuntimeException("Table [" + table + "] does not exist.");
+  }
+
+  @Override
+  public String getTableUri(String table) {
+    checkTable(table);
+    TableDescriptor descriptor = getTableDescriptor(table);
+    return descriptor.tableUri;
+  }
+
+  private TableDescriptor getTableDescriptor(String table) {
+    TableDescriptor tableDescriptor = _tableDescriptors.get(table);
+    if (tableDescriptor == null) {
+      tableDescriptor = _clusterStatus.getTableDescriptor(true, cluster, table);
+      _tableDescriptors.put(table, tableDescriptor);
+    }
+    return tableDescriptor;
+  }
+
+  @SuppressWarnings("unchecked")
+  private <T> T getInstance(String className, Class<T> c) {
+    try {
+      Class<? extends T> clazz = (Class<? extends T>) Class.forName(className);
+      Object object = clazz.newInstance();
+      if (object instanceof Configurable) {
+        Configurable configurable = (Configurable) object;
+        configurable.setConf(_configuration);
+      }
+      return (T) object;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public void setClusterStatus(ClusterStatus clusterStatus) {
+    _clusterStatus = clusterStatus;
+  }
+
+  public void setConfiguration(Configuration configuration) {
+    _configuration = configuration;
+  }
+
+  public void setNodeName(String nodeName) {
+    _nodeName = nodeName;
+  }
+
+  public void setShardOpenerThreadCount(int shardOpenerThreadCount) {
+    _shardOpenerThreadCount = shardOpenerThreadCount;
+  }
+
+  public void setRefresher(BlurIndexRefresher refresher) {
+    _refresher = refresher;
+  }
+
+  public void setCache(Cache cache) {
+    _cache = cache;
+  }
+
+  public void setBlurMetrics(BlurMetrics blurMetrics) {
+    _blurMetrics = blurMetrics;
+  }
+
+  public void setCloser(BlurIndexCloser closer) {
+    _closer = closer;
+  }
+
+  public void setZookeeper(ZooKeeper zookeeper) {
+    _zookeeper = zookeeper;
+  }
+
+  public void setFilterCache(BlurFilterCache filterCache) {
+    _filterCache = filterCache;
+  }
+
+  public void setSafeModeDelay(long safeModeDelay) {
+    _safeModeDelay = safeModeDelay;
+  }
+
+  public void setWarmup(BlurIndexWarmup warmup) {
+    _warmup = warmup;
+  }
+
+  public void setIndexDeletionPolicy(IndexDeletionPolicy indexDeletionPolicy) {
+    _indexDeletionPolicy = indexDeletionPolicy;
+  }
+
+  public void setTimeBetweenCommits(long timeBetweenCommits) {
+    _timeBetweenCommits = timeBetweenCommits;
+  }
+
+  public void setTimeBetweenRefreshs(long timeBetweenRefreshs) {
+    _timeBetweenRefreshs = timeBetweenRefreshs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedLayoutManager.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedLayoutManager.java b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedLayoutManager.java
new file mode 100644
index 0000000..293f957
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedLayoutManager.java
@@ -0,0 +1,245 @@
+package org.apache.blur.manager.indexserver;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.Map.Entry;
+
+public class DistributedLayoutManager {
+
+  private static final SortedSet<String> EMPTY_SORTED_SET = new TreeSet<String>();
+
+  private SortedSet<String> nodes = EMPTY_SORTED_SET;
+  private Set<String> nodesOffline = EMPTY_SORTED_SET;
+  private SortedSet<String> shards = EMPTY_SORTED_SET;
+  private List<String> nodeList = new ArrayList<String>();
+  private Map<String, String> cache = new TreeMap<String, String>();
+
+  public DistributedLayoutManager init() {
+    if (nodesOffline.equals(nodes) || nodes.isEmpty()) {
+      cache = getLockedMap(new TreeMap<String, String>());
+      return this;
+    }
+    Map<String, String> mappings = new TreeMap<String, String>();
+    SortedSet<String> moveBecauseOfDownNodes = new TreeSet<String>();
+    int nodeListSize = nodeList.size();
+    int nodeCount = getStartingPoint();
+    for (String shard : shards) {
+      String node = nodeList.get(nodeCount);
+      mappings.put(shard, node);
+      if (nodesOffline.contains(node)) {
+        moveBecauseOfDownNodes.add(shard);
+      }
+      nodeCount++;
+      if (nodeCount >= nodeListSize) {
+        nodeCount = 0;
+      }
+    }
+    for (String shard : moveBecauseOfDownNodes) {
+      String node = nodeList.get(nodeCount);
+      while (isOffline(node)) {
+        nodeCount++;
+        if (nodeCount >= nodeListSize) {
+          nodeCount = 0;
+        }
+        node = nodeList.get(nodeCount);
+      }
+      mappings.put(shard, node);
+      nodeCount++;
+      if (nodeCount >= nodeListSize) {
+        nodeCount = 0;
+      }
+    }
+    cache = getLockedMap(mappings);
+    return this;
+  }
+
+  private int getStartingPoint() {
+    int size = nodes.size();
+    int hash = 37;
+    for (String node : nodes) {
+      hash += node.hashCode() * 17;
+    }
+    return Math.abs(hash % size);
+  }
+
+  public Map<String, String> getLayout() {
+    return cache;
+  }
+
+  private boolean isOffline(String node) {
+    return nodesOffline.contains(node);
+  }
+
+  public Collection<String> getNodes() {
+    return new TreeSet<String>(nodes);
+  }
+
+  public void setNodes(Collection<String> v) {
+    this.nodes = new TreeSet<String>(v);
+    this.nodeList = new ArrayList<String>(nodes);
+  }
+
+  public Collection<String> getShards() {
+    return new TreeSet<String>(shards);
+  }
+
+  public void setShards(Collection<String> v) {
+    this.shards = new TreeSet<String>(v);
+  }
+
+  public Collection<String> getNodesOffline() {
+    return new TreeSet<String>(nodesOffline);
+  }
+
+  public void setNodesOffline(Collection<String> v) {
+    this.nodesOffline = new HashSet<String>(v);
+  }
+
+  private Map<String, String> getLockedMap(final Map<String, String> map) {
+    final Set<Entry<String, String>> entrySet = wrapEntries(map.entrySet());
+    return new Map<String, String>() {
+
+      @Override
+      public boolean containsKey(Object key) {
+        return map.containsKey(key);
+      }
+
+      @Override
+      public boolean containsValue(Object value) {
+        return map.containsValue(value);
+      }
+
+      @Override
+      public Set<java.util.Map.Entry<String, String>> entrySet() {
+        return entrySet;
+      }
+
+      @Override
+      public String get(Object key) {
+        return map.get(key);
+      }
+
+      @Override
+      public boolean isEmpty() {
+        return map.isEmpty();
+      }
+
+      @Override
+      public Set<String> keySet() {
+        return new TreeSet<String>(map.keySet());
+      }
+
+      @Override
+      public int size() {
+        return map.size();
+      }
+
+      @Override
+      public Collection<String> values() {
+        return new TreeSet<String>(map.values());
+      }
+
+      @Override
+      public void clear() {
+        throw new RuntimeException("read only");
+      }
+
+      @Override
+      public String put(String key, String value) {
+        throw new RuntimeException("read only");
+      }
+
+      @Override
+      public void putAll(Map<? extends String, ? extends String> m) {
+        throw new RuntimeException("read only");
+      }
+
+      @Override
+      public String remove(Object key) {
+        throw new RuntimeException("read only");
+      }
+
+      @Override
+      public String toString() {
+        return map.toString();
+      }
+
+      @Override
+      public boolean equals(Object obj) {
+        return map.equals(obj);
+      }
+
+      @Override
+      public int hashCode() {
+        return map.hashCode();
+      }
+    };
+  }
+
+  private Set<Entry<String, String>> wrapEntries(Set<Entry<String, String>> entrySet) {
+    Set<Entry<String, String>> result = new HashSet<Entry<String, String>>();
+    for (Entry<String, String> entry : entrySet) {
+      result.add(wrapEntry(entry));
+    }
+    return result;
+  }
+
+  private Entry<String, String> wrapEntry(final Entry<String, String> entry) {
+    return new Entry<String, String>() {
+
+      @Override
+      public String setValue(String value) {
+        throw new RuntimeException("read only");
+      }
+
+      @Override
+      public String getValue() {
+        return entry.getValue();
+      }
+
+      @Override
+      public String getKey() {
+        return entry.getKey();
+      }
+
+      @Override
+      public String toString() {
+        return entry.toString();
+      }
+
+      @Override
+      public boolean equals(Object obj) {
+        return entry.equals(obj);
+      }
+
+      @Override
+      public int hashCode() {
+        return entry.hashCode();
+      }
+    };
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
new file mode 100644
index 0000000..d5ee79f
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
@@ -0,0 +1,229 @@
+package org.apache.blur.manager.indexserver;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.apache.blur.lucene.LuceneConstant.LUCENE_VERSION;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.blur.analysis.BlurAnalyzer;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.lucene.search.FairSimilarity;
+import org.apache.blur.manager.writer.BlurIndex;
+import org.apache.blur.manager.writer.BlurIndexCloser;
+import org.apache.blur.manager.writer.BlurNRTIndex;
+import org.apache.blur.store.compressed.CompressedFieldDataDirectory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.lucene.analysis.KeywordAnalyzer;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.search.Similarity;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.MMapDirectory;
+
+
+public class LocalIndexServer extends AbstractIndexServer {
+
+  private final static Log LOG = LogFactory.getLog(LocalIndexServer.class);
+
+  private Map<String, Map<String, BlurIndex>> _readersMap = new ConcurrentHashMap<String, Map<String, BlurIndex>>();
+  private File _localDir;
+  private BlurIndexCloser _closer;
+  private int _blockSize = 65536;
+  private CompressionCodec _compression = CompressedFieldDataDirectory.DEFAULT_COMPRESSION;
+  private Path _walPath;
+  private Configuration _configuration = new Configuration();
+
+  public LocalIndexServer(File file, Path walPath) {
+    _localDir = file;
+    _localDir.mkdirs();
+    _closer = new BlurIndexCloser();
+    _closer.init();
+    _walPath = walPath;
+  }
+
+  @Override
+  public BlurAnalyzer getAnalyzer(String table) {
+    return new BlurAnalyzer(new StandardAnalyzer(LUCENE_VERSION, new HashSet<String>()));
+  }
+
+  @Override
+  public SortedSet<String> getShardListCurrentServerOnly(String table) throws IOException {
+    Map<String, BlurIndex> tableMap = _readersMap.get(table);
+    Set<String> shardsSet;
+    if (tableMap == null) {
+      shardsSet = getIndexes(table).keySet();
+    } else {
+      shardsSet = tableMap.keySet();
+    }
+    return new TreeSet<String>(shardsSet);
+  }
+
+  @Override
+  public Map<String, BlurIndex> getIndexes(String table) throws IOException {
+    Map<String, BlurIndex> tableMap = _readersMap.get(table);
+    if (tableMap == null) {
+      tableMap = openFromDisk(table);
+      _readersMap.put(table, tableMap);
+    }
+    return tableMap;
+  }
+
+  @Override
+  public Similarity getSimilarity(String table) {
+    return new FairSimilarity();
+  }
+
+  @Override
+  public void close() {
+    _closer.close();
+    for (String table : _readersMap.keySet()) {
+      close(_readersMap.get(table));
+    }
+  }
+
+  private void close(Map<String, BlurIndex> map) {
+    for (BlurIndex index : map.values()) {
+      try {
+        index.close();
+      } catch (Exception e) {
+        LOG.error("Error while trying to close index.", e);
+      }
+    }
+  }
+
+  private Map<String, BlurIndex> openFromDisk(String table) throws IOException {
+    File tableFile = new File(_localDir, table);
+    if (tableFile.isDirectory()) {
+      Map<String, BlurIndex> shards = new ConcurrentHashMap<String, BlurIndex>();
+      for (File f : tableFile.listFiles()) {
+        if (f.isDirectory()) {
+          MMapDirectory directory = new MMapDirectory(f);
+          if (!IndexReader.indexExists(directory)) {
+            new IndexWriter(directory, new IndexWriterConfig(LUCENE_VERSION, new KeywordAnalyzer())).close();
+          }
+          String shardName = f.getName();
+          shards.put(shardName, openIndex(table, shardName, directory));
+        }
+      }
+      return shards;
+    }
+    throw new IOException("Table [" + table + "] not found.");
+  }
+
+  private BlurIndex openIndex(String table, String shard, Directory dir) throws CorruptIndexException, IOException {
+    BlurNRTIndex index = new BlurNRTIndex();
+    index.setAnalyzer(getAnalyzer(table));
+    index.setDirectory(dir);
+    index.setShard(shard);
+    index.setSimilarity(getSimilarity(table));
+    index.setTable(table);
+    index.setWalPath(new Path(new Path(_walPath, table), shard));
+    index.setConfiguration(_configuration);
+    index.setCloser(_closer);
+    index.setTimeBetweenRefreshs(25);
+    index.init();
+    return index;
+  }
+
+  @Override
+  public TABLE_STATUS getTableStatus(String table) {
+    return TABLE_STATUS.ENABLED;
+  }
+
+  @Override
+  public List<String> getShardList(String table) {
+    try {
+      List<String> result = new ArrayList<String>();
+      File tableFile = new File(_localDir, table);
+      if (tableFile.isDirectory()) {
+        for (File f : tableFile.listFiles()) {
+          if (f.isDirectory()) {
+            result.add(f.getName());
+          }
+        }
+      }
+      return result;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public String getNodeName() {
+    return "localhost";
+  }
+
+  @Override
+  public String getTableUri(String table) {
+    return new File(_localDir, table).toURI().toString();
+  }
+
+  @Override
+  public int getShardCount(String table) {
+    return getShardList(table).size();
+  }
+
+  @Override
+  public int getCompressionBlockSize(String table) {
+    return _blockSize;
+  }
+
+  @Override
+  public CompressionCodec getCompressionCodec(String table) {
+    return _compression;
+  }
+
+  @Override
+  public long getTableSize(String table) throws IOException {
+    try {
+      File file = new File(new URI(getTableUri(table)));
+      return getFolderSize(file);
+    } catch (URISyntaxException e) {
+      throw new IOException("bad URI", e);
+    }
+  }
+
+  private long getFolderSize(File file) {
+    long size = 0;
+    if (file.isDirectory()) {
+      for (File sub : file.listFiles()) {
+        size += getFolderSize(sub);
+      }
+    } else {
+      size += file.length();
+    }
+    return size;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultComparator.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultComparator.java b/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultComparator.java
new file mode 100644
index 0000000..39f2f92
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultComparator.java
@@ -0,0 +1,35 @@
+package org.apache.blur.manager.results;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.Comparator;
+
+import org.apache.blur.thrift.generated.BlurResult;
+
+
+public class BlurResultComparator implements Comparator<BlurResult> {
+
+  @Override
+  public int compare(BlurResult o1, BlurResult o2) {
+    int compare = Double.compare(o2.score, o1.score);
+    if (compare == 0) {
+      return o2.locationId.compareTo(o1.locationId);
+    }
+    return compare;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterable.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterable.java b/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterable.java
new file mode 100644
index 0000000..8514ae2
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterable.java
@@ -0,0 +1,35 @@
+package org.apache.blur.manager.results;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.blur.thrift.generated.BlurResult;
+
+
+public interface BlurResultIterable extends Iterable<BlurResult> {
+
+  void skipTo(long skipTo);
+
+  long getTotalResults();
+
+  Map<String, Long> getShardInfo();
+
+  void close() throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableClient.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableClient.java b/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableClient.java
new file mode 100644
index 0000000..d6f62a0
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableClient.java
@@ -0,0 +1,149 @@
+package org.apache.blur.manager.results;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicLongArray;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.thrift.generated.Blur;
+import org.apache.blur.thrift.generated.BlurQuery;
+import org.apache.blur.thrift.generated.BlurResult;
+import org.apache.blur.thrift.generated.BlurResults;
+import org.apache.blur.thrift.generated.Blur.Client;
+
+
+public class BlurResultIterableClient implements BlurResultIterable {
+
+  private static final Log LOG = LogFactory.getLog(BlurResultIterableClient.class);
+
+  private Map<String, Long> _shardInfo = new TreeMap<String, Long>();
+  private Client _client;
+  private String _table;
+  private BlurResults _results;
+  private int _remoteFetchCount;
+  private int _batch = 0;
+  private long _totalResults;
+  private long _skipTo;
+  private AtomicLongArray _facetCounts;
+  private boolean _alreadyProcessed;
+  private BlurQuery _originalQuery;
+
+  public BlurResultIterableClient(Blur.Client client, String table, BlurQuery query, AtomicLongArray facetCounts, int remoteFetchCount) {
+    _client = client;
+    _table = table;
+    _facetCounts = facetCounts;
+    _originalQuery = query;
+    _remoteFetchCount = remoteFetchCount;
+    performSearch();
+  }
+
+  private void performSearch() {
+    try {
+      long cursor = _remoteFetchCount * _batch;
+      BlurQuery blurQuery = new BlurQuery(_originalQuery.simpleQuery, _originalQuery.expertQuery, _originalQuery.facets, null, false, _originalQuery.useCacheIfPresent, cursor,
+          _remoteFetchCount, _originalQuery.minimumNumberOfResults, _originalQuery.maxQueryTime, _originalQuery.uuid, _originalQuery.userContext, _originalQuery.cacheResult,
+          _originalQuery.startTime, _originalQuery.modifyFileCaches);
+      _results = _client.query(_table, blurQuery);
+      addFacets();
+      _totalResults = _results.totalResults;
+      _shardInfo.putAll(_results.shardInfo);
+      _batch++;
+    } catch (Exception e) {
+      LOG.error("Error during for [{0}]", e, _originalQuery);
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void addFacets() {
+    if (!_alreadyProcessed) {
+      List<Long> counts = _results.facetCounts;
+      if (counts != null) {
+        int size = counts.size();
+        for (int i = 0; i < size; i++) {
+          _facetCounts.addAndGet(i, counts.get(i));
+        }
+      }
+      _alreadyProcessed = true;
+    }
+  }
+
+  @Override
+  public Map<String, Long> getShardInfo() {
+    return _shardInfo;
+  }
+
+  @Override
+  public long getTotalResults() {
+    return _totalResults;
+  }
+
+  @Override
+  public void skipTo(long skipTo) {
+    this._skipTo = skipTo;
+  }
+
+  @Override
+  public Iterator<BlurResult> iterator() {
+    SearchIterator iterator = new SearchIterator();
+    long start = 0;
+    while (iterator.hasNext() && start < _skipTo) {
+      iterator.next();
+      start++;
+    }
+    return iterator;
+  }
+
+  public class SearchIterator implements Iterator<BlurResult> {
+
+    private int position = 0;
+    private int relposition = 0;
+
+    @Override
+    public boolean hasNext() {
+      if (position < _originalQuery.minimumNumberOfResults && position < _totalResults) {
+        return true;
+      }
+      return false;
+    }
+
+    @Override
+    public BlurResult next() {
+      if (relposition >= _results.results.size()) {
+        performSearch();
+        relposition = 0;
+      }
+      position++;
+      return _results.results.get(relposition++);
+    }
+
+    @Override
+    public void remove() {
+
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    // nothing
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableMultiple.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableMultiple.java b/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableMultiple.java
new file mode 100644
index 0000000..625c663
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableMultiple.java
@@ -0,0 +1,114 @@
+package org.apache.blur.manager.results;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.blur.thrift.generated.BlurResult;
+import org.apache.blur.utils.BlurConstants;
+
+
+public class BlurResultIterableMultiple implements BlurResultIterable {
+
+  private long totalResults;
+  private Map<String, Long> shardInfo = new TreeMap<String, Long>();
+  private long skipTo;
+  private List<BlurResultIterable> results = new ArrayList<BlurResultIterable>();
+
+  public void addBlurResultIterable(BlurResultIterable iterable) {
+    totalResults += iterable.getTotalResults();
+    shardInfo.putAll(iterable.getShardInfo());
+    results.add(iterable);
+  }
+
+  @Override
+  public Map<String, Long> getShardInfo() {
+    return shardInfo;
+  }
+
+  @Override
+  public long getTotalResults() {
+    return totalResults;
+  }
+
+  @Override
+  public void skipTo(long skipTo) {
+    this.skipTo = skipTo;
+  }
+
+  @Override
+  public Iterator<BlurResult> iterator() {
+    MultipleHitsIterator iterator = new MultipleHitsIterator(results);
+    long start = 0;
+    while (iterator.hasNext() && start < skipTo) {
+      iterator.next();
+      start++;
+    }
+    return iterator;
+  }
+
+  public static class MultipleHitsIterator implements Iterator<BlurResult> {
+
+    private List<PeekableIterator<BlurResult>> iterators = new ArrayList<PeekableIterator<BlurResult>>();
+    private int length;
+
+    public MultipleHitsIterator(List<BlurResultIterable> hits) {
+      for (BlurResultIterable hitsIterable : hits) {
+        iterators.add(new PeekableIterator<BlurResult>(hitsIterable.iterator()));
+      }
+      length = iterators.size();
+    }
+
+    @Override
+    public boolean hasNext() {
+      for (int i = 0; i < length; i++) {
+        if (iterators.get(i).hasNext()) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    @Override
+    public BlurResult next() {
+      Collections.sort(iterators, BlurConstants.HITS_PEEKABLE_ITERATOR_COMPARATOR);
+      return fetchResult(iterators.get(0).next());
+    }
+
+    public BlurResult fetchResult(BlurResult next) {
+      return next;
+    }
+
+    @Override
+    public void remove() {
+
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    for (BlurResultIterable it : results) {
+      it.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableSearcher.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableSearcher.java b/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableSearcher.java
new file mode 100644
index 0000000..0a0786d
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableSearcher.java
@@ -0,0 +1,135 @@
+package org.apache.blur.manager.results;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.lucene.search.IterablePaging;
+import org.apache.blur.lucene.search.IterablePaging.ProgressRef;
+import org.apache.blur.lucene.search.IterablePaging.TotalHitsRef;
+import org.apache.blur.manager.IndexManager;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.BlurResult;
+import org.apache.blur.thrift.generated.FetchResult;
+import org.apache.blur.thrift.generated.Selector;
+import org.apache.blur.utils.Converter;
+import org.apache.blur.utils.IteratorConverter;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.ScoreDoc;
+
+
+public class BlurResultIterableSearcher implements BlurResultIterable {
+
+  private static final Log LOG = LogFactory.getLog(BlurResultIterableSearcher.class);
+
+  private Map<String, Long> _shardInfo = new TreeMap<String, Long>();
+  private String _shard;
+  private long _skipTo;
+  private String _table;
+  private int _fetchCount = 1000;
+
+  private IteratorConverter<ScoreDoc, BlurResult> _iterator;
+  private Selector _selector;
+  private Query _query;
+  private IndexSearcher _searcher;
+  private TotalHitsRef _totalHitsRef = new TotalHitsRef();
+  private ProgressRef _progressRef = new ProgressRef();
+  private AtomicBoolean _running;
+  private IndexReader _reader;
+
+  public BlurResultIterableSearcher(AtomicBoolean running, Query query, String table, String shard, IndexSearcher searcher, Selector selector, IndexReader reader)
+      throws IOException {
+    _running = running;
+    _table = table;
+    _query = query;
+    _shard = shard;
+    _searcher = searcher;
+    _selector = selector;
+    _reader = reader;
+    performSearch();
+  }
+
+  private void performSearch() throws IOException {
+    IterablePaging iterablePaging = new IterablePaging(_running, _searcher, _query, _fetchCount, _totalHitsRef, _progressRef);
+    _iterator = new IteratorConverter<ScoreDoc, BlurResult>(iterablePaging.iterator(), new Converter<ScoreDoc, BlurResult>() {
+      @Override
+      public BlurResult convert(ScoreDoc scoreDoc) throws Exception {
+        String resolveId = resolveId(scoreDoc.doc);
+        return new BlurResult(resolveId, scoreDoc.score, getFetchResult(resolveId));
+      }
+    });
+    _shardInfo.put(_shard, (long) _totalHitsRef.totalHits());
+  }
+
+  private FetchResult getFetchResult(String resolveId) throws IOException, BlurException {
+    if (_selector == null) {
+      return null;
+    }
+    FetchResult fetchResult = new FetchResult();
+    _selector.setLocationId(resolveId);
+    IndexManager.validSelector(_selector);
+    IndexManager.fetchRow(_searcher.getIndexReader(), _table, _selector, fetchResult);
+    return fetchResult;
+  }
+
+  @Override
+  public Map<String, Long> getShardInfo() {
+    return _shardInfo;
+  }
+
+  @Override
+  public long getTotalResults() {
+    return _totalHitsRef.totalHits();
+  }
+
+  @Override
+  public void skipTo(long skipTo) {
+    _skipTo = skipTo;
+  }
+
+  @Override
+  public Iterator<BlurResult> iterator() {
+    long start = 0;
+    while (_iterator.hasNext() && start < _skipTo) {
+      _iterator.next();
+      start++;
+    }
+    return _iterator;
+  }
+
+  private String resolveId(int docId) {
+    return _shard + "/" + docId;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (_reader != null) {
+      int refCount = _reader.getRefCount();
+      _reader.decRef();
+      LOG.debug("Decrementing reader old ref [{0}] new ref count [{1}]", refCount, _reader.getRefCount());
+      _reader = null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableSimple.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableSimple.java b/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableSimple.java
new file mode 100644
index 0000000..3158576
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableSimple.java
@@ -0,0 +1,74 @@
+package org.apache.blur.manager.results;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.blur.thrift.generated.BlurResult;
+import org.apache.blur.utils.BlurConstants;
+
+
+public class BlurResultIterableSimple implements BlurResultIterable {
+
+  private List<BlurResult> results;
+  private Map<String, Long> shardInfo;
+  private long skipTo;
+
+  public BlurResultIterableSimple(String shard, List<BlurResult> hits) {
+    Collections.sort(hits, BlurConstants.HITS_COMPARATOR);
+    this.results = hits;
+    this.shardInfo = new TreeMap<String, Long>();
+    this.shardInfo.put(shard, (long) hits.size());
+  }
+
+  @Override
+  public Map<String, Long> getShardInfo() {
+    return shardInfo;
+  }
+
+  @Override
+  public long getTotalResults() {
+    return results.size();
+  }
+
+  @Override
+  public void skipTo(long skipTo) {
+    this.skipTo = skipTo;
+  }
+
+  @Override
+  public Iterator<BlurResult> iterator() {
+    long start = 0;
+    Iterator<BlurResult> iterator = results.iterator();
+    while (iterator.hasNext() && start < skipTo) {
+      iterator.next();
+      start++;
+    }
+    return iterator;
+  }
+
+  @Override
+  public void close() throws IOException {
+    // do nothing
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultPeekableIteratorComparator.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultPeekableIteratorComparator.java b/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultPeekableIteratorComparator.java
new file mode 100644
index 0000000..7eddb0a
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultPeekableIteratorComparator.java
@@ -0,0 +1,44 @@
+package org.apache.blur.manager.results;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.Comparator;
+
+import org.apache.blur.thrift.generated.BlurResult;
+
+
+public class BlurResultPeekableIteratorComparator implements Comparator<PeekableIterator<BlurResult>> {
+
+  @Override
+  public int compare(PeekableIterator<BlurResult> o1, PeekableIterator<BlurResult> o2) {
+    BlurResult result1 = o1.peek();
+    BlurResult result2 = o2.peek();
+    if (result1 == null && result2 == null) {
+      return 0;
+    } else if (result1 == null) {
+      return 1;
+    } else if (result2 == null) {
+      return -1;
+    }
+    int compare = Double.compare(result2.score, result1.score);
+    if (compare == 0) {
+      return result2.locationId.compareTo(result1.locationId);
+    }
+    return compare;
+  }
+
+}


Mime
View raw message