incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [48/51] [partial] Fixed BLUR-126.
Date Thu, 06 Jun 2013 18:58:20 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperPathConstants.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperPathConstants.java b/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperPathConstants.java
new file mode 100644
index 0000000..f2eee4f
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperPathConstants.java
@@ -0,0 +1,110 @@
+package org.apache.blur.manager.clusterstatus;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+public class ZookeeperPathConstants {
+
+  public static String getBasePath() {
+    return "/blur";
+  }
+
+  public static String getClusterPath(String cluster) {
+    return getClustersPath() + "/" + cluster;
+  }
+
+  public static String getClustersPath() {
+    return getBasePath() + "/clusters";
+  }
+
+  public static String getOnlineControllersPath() {
+    return getBasePath() + "/online-controller-nodes";
+  }
+
+  public static String getTableEnabledPath(String cluster, String table) {
+    return getTablePath(cluster, table) + "/enabled";
+  }
+
+  public static String getTableUriPath(String cluster, String table) {
+    return getTablePath(cluster, table) + "/uri";
+  }
+
+  public static String getTableShardCountPath(String cluster, String table) {
+    return getTablePath(cluster, table) + "/shard-count";
+  }
+
+  public static String getOnlineShardsPath(String cluster) {
+    return getClusterPath(cluster) + "/online-nodes";
+  }
+
+  public static String getTablesPath(String cluster) {
+    return getClusterPath(cluster) + "/tables";
+  }
+
+  public static String getTablePath(String cluster, String table) {
+    return getTablesPath(cluster) + "/" + table;
+  }
+
+  public static String getSafemodePath(String cluster) {
+    return getClusterPath(cluster) + "/safemode";
+  }
+
+  public static String getRegisteredShardsPath(String cluster) {
+    return getClusterPath(cluster) + "/registered-nodes";
+  }
+
+  public static String getTableCompressionCodecPath(String cluster, String table) {
+    return getTablePath(cluster, table) + "/compression-codec";
+  }
+
+  public static String getTableCompressionBlockSizePath(String cluster, String table) {
+    return getTablePath(cluster, table) + "/compression-blocksize";
+  }
+
+  public static String getLockPath(String cluster, String table) {
+    return getTablePath(cluster, table) + "/locks";
+  }
+
+  public static String getTableBlockCachingFileTypesPath(String cluster, String table) {
+    return getTablePath(cluster, table) + "/blockcachingfiletypes";
+  }
+
+  public static String getTableBlockCachingPath(String cluster, String table) {
+    return getTablePath(cluster, table) + "/blockcaching";
+  }
+
+  public static String getTableSimilarityPath(String cluster, String table) {
+    return getTablePath(cluster, table) + "/similarity";
+  }
+
+  public static String getTableFieldNamesPath(String cluster, String table) {
+    return getTablePath(cluster, table) + "/fieldnames";
+  }
+
+  public static String getTableFieldNamesPath(String cluster, String table, String fieldName) {
+    return getTableFieldNamesPath(cluster, table) + "/" + fieldName;
+  }
+
+  public static String getTableReadOnlyPath(String cluster, String table) {
+    return getTablePath(cluster, table) + "/readonly";
+  }
+
+  public static String getTableColumnsToPreCache(String cluster, String table) {
+    return getTablePath(cluster, table) + "/precache";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/manager/indexserver/AbstractIndexServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/AbstractIndexServer.java b/blur-core/src/main/java/org/apache/blur/manager/indexserver/AbstractIndexServer.java
new file mode 100644
index 0000000..3644d96
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/AbstractIndexServer.java
@@ -0,0 +1,72 @@
+package org.apache.blur.manager.indexserver;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.blur.manager.IndexServer;
+import org.apache.blur.manager.writer.BlurIndex;
+import org.apache.blur.server.IndexSearcherClosable;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.search.TopDocs;
+
+public abstract class AbstractIndexServer implements IndexServer {
+
+  public long getRecordCount(String table) throws IOException {
+    long recordCount = 0;
+    Map<String, BlurIndex> indexes = getIndexes(table);
+    for (Map.Entry<String, BlurIndex> index : indexes.entrySet()) {
+      IndexSearcherClosable searcher = null;
+      try {
+        searcher = index.getValue().getIndexReader();
+        recordCount += searcher.getIndexReader().numDocs();
+      } finally {
+        if (searcher != null) {
+          searcher.close();
+        }
+      }
+    }
+    return recordCount;
+  }
+
+  public long getRowCount(String table) throws IOException {
+    long rowCount = 0;
+    Map<String, BlurIndex> indexes = getIndexes(table);
+    for (Map.Entry<String, BlurIndex> index : indexes.entrySet()) {
+      IndexSearcherClosable searcher = null;
+      try {
+        searcher = index.getValue().getIndexReader();
+        rowCount += getRowCount(searcher.getIndexReader());
+      } finally {
+        if (searcher != null) {
+          searcher.close();
+        }
+      }
+    }
+    return rowCount;
+  }
+
+  private long getRowCount(IndexReader indexReader) throws IOException {
+    IndexSearcher searcher = new IndexSearcher(indexReader);
+    TopDocs topDocs = searcher.search(new TermQuery(BlurConstants.PRIME_DOC_TERM), 1);
+    return topDocs.totalHits;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/manager/indexserver/BlurIndexWarmup.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/BlurIndexWarmup.java b/blur-core/src/main/java/org/apache/blur/manager/indexserver/BlurIndexWarmup.java
new file mode 100644
index 0000000..8265cc6
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/BlurIndexWarmup.java
@@ -0,0 +1,71 @@
+package org.apache.blur.manager.indexserver;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.blur.manager.indexserver.DistributedIndexServer.ReleaseReader;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.lucene.index.IndexReader;
+
+
+public abstract class BlurIndexWarmup {
+
+  /**
+   * Once the reader has be warmed up, release() must be called on the
+   * ReleaseReader even if an exception occurs.
+   * 
+   * @param table
+   *          the table name.
+   * @param shard
+   *          the shard name.
+   * @param reader
+   *          thread reader inself.
+   * @param isClosed
+   *          to check if the shard has been migrated to another node.
+   * @param releaseReader
+   *          to release the handle on the reader.
+   * @throws IOException
+   * 
+   * @deprecated
+   */
+  public void warmBlurIndex(String table, String shard, IndexReader reader, AtomicBoolean isClosed, ReleaseReader releaseReader) throws IOException {
+
+  }
+
+  /**
+   * Once the reader has be warmed up, release() must be called on the
+   * ReleaseReader even if an exception occurs.
+   * 
+   * @param table
+   *          the table descriptor.
+   * @param shard
+   *          the shard name.
+   * @param reader
+   *          thread reader inself.
+   * @param isClosed
+   *          to check if the shard has been migrated to another node.
+   * @param releaseReader
+   *          to release the handle on the reader.
+   * @throws IOException
+   */
+  public void warmBlurIndex(TableDescriptor table, String shard, IndexReader reader, AtomicBoolean isClosed, ReleaseReader releaseReader) throws IOException {
+    warmBlurIndex(table.name, shard, reader, isClosed, releaseReader);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/manager/indexserver/BlurServerShutDown.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/BlurServerShutDown.java b/blur-core/src/main/java/org/apache/blur/manager/indexserver/BlurServerShutDown.java
new file mode 100644
index 0000000..e41fdef
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/BlurServerShutDown.java
@@ -0,0 +1,72 @@
+package org.apache.blur.manager.indexserver;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+
+
+public class BlurServerShutDown implements Watcher {
+
+  private static final Log LOG = LogFactory.getLog(BlurServerShutDown.class);
+
+  public interface BlurShutdown {
+    void shutdown();
+  }
+
+  private BlurShutdown shutdown;
+  private ZooKeeper zooKeeper;
+
+  public BlurServerShutDown() {
+    Runtime runtime = Runtime.getRuntime();
+    runtime.addShutdownHook(new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          LOG.info("Closing zookeeper.");
+          zooKeeper.close();
+        } catch (InterruptedException e) {
+          LOG.error("Unknown error while closing zookeeper.", e);
+        }
+      }
+    }));
+  }
+
+  public void register(final BlurShutdown shutdown, ZooKeeper zooKeeper) {
+    this.shutdown = shutdown;
+    this.zooKeeper = zooKeeper;
+    zooKeeper.register(new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        KeeperState state = event.getState();
+        if (state == KeeperState.Expired) {
+          LOG.fatal("Zookeeper session has [" + state + "] server process shutting down.");
+          shutdown.shutdown();
+        }
+      }
+    });
+  }
+
+  @Override
+  public void process(WatchedEvent event) {
+    register(shutdown, zooKeeper);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/manager/indexserver/CloseableExecutorService.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/CloseableExecutorService.java b/blur-core/src/main/java/org/apache/blur/manager/indexserver/CloseableExecutorService.java
new file mode 100644
index 0000000..f5900db
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/CloseableExecutorService.java
@@ -0,0 +1,36 @@
+package org.apache.blur.manager.indexserver;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+
+public class CloseableExecutorService implements Closeable {
+
+  private ExecutorService executor;
+
+  public CloseableExecutorService(ExecutorService executor) {
+    this.executor = executor;
+  }
+
+  @Override
+  public void close() throws IOException {
+    executor.shutdownNow();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/manager/indexserver/DefaultBlurIndexWarmup.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DefaultBlurIndexWarmup.java b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DefaultBlurIndexWarmup.java
new file mode 100644
index 0000000..5f18dc0
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DefaultBlurIndexWarmup.java
@@ -0,0 +1,52 @@
+package org.apache.blur.manager.indexserver;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.manager.indexserver.DistributedIndexServer.ReleaseReader;
+import org.apache.blur.manager.writer.FieldBasedWarmer;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.lucene.index.AtomicReader;
+import org.apache.lucene.index.AtomicReaderContext;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexReaderContext;
+
+public class DefaultBlurIndexWarmup extends BlurIndexWarmup {
+
+  private static final Log LOG = LogFactory.getLog(DefaultBlurIndexWarmup.class);
+
+  @Override
+  public void warmBlurIndex(final TableDescriptor table, final String shard, IndexReader reader,
+      AtomicBoolean isClosed, ReleaseReader releaseReader) throws IOException {
+    LOG.info("Running warmup for reader [{0}]", reader);
+    try {
+      FieldBasedWarmer warmer = new FieldBasedWarmer(table);
+      for (IndexReaderContext context : reader.getContext().leaves()) {
+        AtomicReaderContext atomicReaderContext = (AtomicReaderContext) context;
+        AtomicReader atomicReader = atomicReaderContext.reader();
+        warmer.warm(atomicReader);
+      }
+    } finally {
+      releaseReader.release();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
new file mode 100644
index 0000000..c86eeae
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
@@ -0,0 +1,742 @@
+package org.apache.blur.manager.indexserver;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.apache.blur.metrics.MetricsConstants.BLUR;
+import static org.apache.blur.metrics.MetricsConstants.INDEX_COUNT;
+import static org.apache.blur.metrics.MetricsConstants.INDEX_MEMORY_USAGE;
+import static org.apache.blur.metrics.MetricsConstants.ORG_APACHE_BLUR;
+import static org.apache.blur.metrics.MetricsConstants.SEGMENT_COUNT;
+import static org.apache.blur.metrics.MetricsConstants.TABLE_COUNT;
+import static org.apache.blur.utils.BlurConstants.SHARD_PREFIX;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.blur.analysis.BlurAnalyzer;
+import org.apache.blur.concurrent.Executors;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.lucene.search.FairSimilarity;
+import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC;
+import org.apache.blur.lucene.store.refcounter.IndexInputCloser;
+import org.apache.blur.manager.BlurFilterCache;
+import org.apache.blur.manager.clusterstatus.ClusterStatus;
+import org.apache.blur.manager.clusterstatus.ZookeeperPathConstants;
+import org.apache.blur.manager.writer.BlurIndex;
+import org.apache.blur.manager.writer.BlurIndexCloser;
+import org.apache.blur.manager.writer.BlurIndexReader;
+import org.apache.blur.manager.writer.BlurIndexRefresher;
+import org.apache.blur.manager.writer.BlurNRTIndex;
+import org.apache.blur.manager.writer.SharedMergeScheduler;
+import org.apache.blur.metrics.AtomicLongGauge;
+import org.apache.blur.server.IndexSearcherClosable;
+import org.apache.blur.server.ShardContext;
+import org.apache.blur.server.TableContext;
+import org.apache.blur.store.blockcache.BlockDirectory;
+import org.apache.blur.store.blockcache.Cache;
+import org.apache.blur.store.hdfs.BlurLockFactory;
+import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.blur.thrift.generated.ShardState;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.blur.zookeeper.WatchChildren;
+import org.apache.blur.zookeeper.WatchChildren.OnChange;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.search.similarities.Similarity;
+import org.apache.lucene.store.Directory;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.MetricName;
+
+public class DistributedIndexServer extends AbstractIndexServer {
+
+  private static final Log LOG = LogFactory.getLog(DistributedIndexServer.class);
+  private static final long _delay = TimeUnit.SECONDS.toMillis(5);
+
+  private Map<String, BlurAnalyzer> _tableAnalyzers = new ConcurrentHashMap<String, BlurAnalyzer>();
+  private Map<String, TableDescriptor> _tableDescriptors = new ConcurrentHashMap<String, TableDescriptor>();
+  private Map<String, Similarity> _tableSimilarity = new ConcurrentHashMap<String, Similarity>();
+  private Map<String, DistributedLayoutManager> _layoutManagers = new ConcurrentHashMap<String, DistributedLayoutManager>();
+  private Map<String, Set<String>> _layoutCache = new ConcurrentHashMap<String, Set<String>>();
+  private ConcurrentHashMap<String, Map<String, BlurIndex>> _indexes = new ConcurrentHashMap<String, Map<String, BlurIndex>>();
+  private final ShardStateManager _shardStateManager = new ShardStateManager();
+
+  // set externally
+  private ClusterStatus _clusterStatus;
+  private Configuration _configuration;
+  private String _nodeName;
+  private int _shardOpenerThreadCount;
+  private Cache _cache;
+  private ZooKeeper _zookeeper;
+  private String _cluster;
+
+  // set internally
+  private Timer _timerCacheFlush;
+  private ExecutorService _openerService;
+  private Timer _timerTableWarmer;
+  private BlurFilterCache _filterCache;
+  private AtomicBoolean _running = new AtomicBoolean();
+  private long _safeModeDelay;
+  private BlurIndexWarmup _warmup = new DefaultBlurIndexWarmup();
+  private DirectoryReferenceFileGC _gc;
+  private WatchChildren _watchOnlineShards;
+
+  private SharedMergeScheduler _mergeScheduler;
+  private IndexInputCloser _closer = null;
+  private ExecutorService _searchExecutor = null;
+
+  private AtomicLong _tableCount = new AtomicLong();
+  private AtomicLong _indexCount = new AtomicLong();
+  private AtomicLong _segmentCount = new AtomicLong();
+  private AtomicLong _indexMemoryUsage = new AtomicLong();
+  private BlurIndexRefresher _refresher;
+  private BlurIndexCloser _indexCloser;
+
+  public static interface ReleaseReader {
+    void release() throws IOException;
+  }
+
+  public void init() throws KeeperException, InterruptedException, IOException {
+    MetricName tableCount = new MetricName(ORG_APACHE_BLUR, BLUR, TABLE_COUNT, _cluster);
+    MetricName indexCount = new MetricName(ORG_APACHE_BLUR, BLUR, INDEX_COUNT, _cluster);
+    MetricName segmentCount = new MetricName(ORG_APACHE_BLUR, BLUR, SEGMENT_COUNT, _cluster);
+    MetricName indexMemoryUsage = new MetricName(ORG_APACHE_BLUR, BLUR, INDEX_MEMORY_USAGE, _cluster);
+    
+    Metrics.newGauge(tableCount, new AtomicLongGauge(_tableCount));
+    Metrics.newGauge(indexCount, new AtomicLongGauge(_indexCount));
+    Metrics.newGauge(segmentCount, new AtomicLongGauge(_segmentCount));
+    Metrics.newGauge(indexMemoryUsage, new AtomicLongGauge(_indexMemoryUsage));
+
+    BlurUtil.setupZookeeper(_zookeeper, _cluster);
+    _openerService = Executors.newThreadPool("shard-opener", _shardOpenerThreadCount);
+    _gc = new DirectoryReferenceFileGC();
+    _gc.init();
+
+    // @TODO allow for configuration of these
+    _mergeScheduler = new SharedMergeScheduler();
+    _searchExecutor = Executors.newThreadPool("internal-search", 16);
+    _closer = new IndexInputCloser();
+    _closer.init();
+    _refresher = new BlurIndexRefresher();
+    _refresher.init();
+    _indexCloser = new BlurIndexCloser();
+    _indexCloser.init();
+    setupFlushCacheTimer();
+    
+    registerMyselfAsMemberOfCluster();
+    String onlineShardsPath = ZookeeperPathConstants.getOnlineShardsPath(_cluster);
+    String safemodePath = ZookeeperPathConstants.getSafemodePath(_cluster);
+    SafeMode safeMode = new SafeMode(_zookeeper, safemodePath, onlineShardsPath, TimeUnit.MILLISECONDS, _safeModeDelay, TimeUnit.SECONDS, 60);
+    safeMode.registerNode(getNodeName(), BlurUtil.getVersion().getBytes());
+
+    _running.set(true);
+    setupTableWarmer();
+    watchForShardServerChanges();
+  }
+
+  private void watchForShardServerChanges() {
+    ZookeeperPathConstants.getOnlineShardsPath(_cluster);
+    _watchOnlineShards = new WatchChildren(_zookeeper, ZookeeperPathConstants.getOnlineShardsPath(_cluster))
+        .watch(new OnChange() {
+          private List<String> _prevOnlineShards = new ArrayList<String>();
+
+          @Override
+          public void action(List<String> onlineShards) {
+            List<String> oldOnlineShards = _prevOnlineShards;
+            _prevOnlineShards = onlineShards;
+            _layoutManagers.clear();
+            _layoutCache.clear();
+            LOG.info("Online shard servers changed, clearing layout managers and cache.");
+            if (oldOnlineShards == null) {
+              oldOnlineShards = new ArrayList<String>();
+            }
+            for (String oldOnlineShard : oldOnlineShards) {
+              if (!onlineShards.contains(oldOnlineShard)) {
+                LOG.info("Node went offline [{0}]", oldOnlineShard);
+              }
+            }
+            for (String onlineShard : onlineShards) {
+              if (!oldOnlineShards.contains(onlineShard)) {
+                LOG.info("Node came online [{0}]", onlineShard);
+              }
+            }
+          }
+        });
+  }
+
+  private void setupTableWarmer() {
+    _timerTableWarmer = new Timer("Table-Warmer", true);
+    _timerTableWarmer.schedule(new TimerTask() {
+      @Override
+      public void run() {
+        try {
+          warmup();
+        } catch (Throwable t) {
+          if (_running.get()) {
+            LOG.error("Unknown error", t);
+          } else {
+            LOG.debug("Unknown error", t);
+          }
+        }
+      }
+
+      private void warmup() {
+        if (_running.get()) {
+          List<String> tableList = _clusterStatus.getTableList(false, _cluster);
+          _tableCount.set(tableList.size());
+          long indexCount = 0;
+          AtomicLong segmentCount = new AtomicLong();
+          AtomicLong indexMemoryUsage = new AtomicLong();
+          for (String table : tableList) {
+            try {
+              Map<String, BlurIndex> indexes = getIndexes(table);
+              int count = indexes.size();
+              indexCount += count;
+              updateMetrics(indexes, segmentCount, indexMemoryUsage);
+              LOG.debug("Table [{0}] has [{1}] number of shards online in this node.", table, count);
+            } catch (IOException e) {
+              LOG.error("Unknown error trying to warm table [{0}]", e, table);
+            }
+          }
+          _indexCount.set(indexCount);
+          _segmentCount.set(segmentCount.get());
+          _indexMemoryUsage.set(indexMemoryUsage.get());
+        }
+      }
+
+      private void updateMetrics(Map<String, BlurIndex> indexes, AtomicLong segmentCount, AtomicLong indexMemoryUsage)
+          throws IOException {
+        // @TODO not sure how to do this yet
+        // for (BlurIndex index : indexes.values()) {
+        // IndexReader reader = index.getIndexReader();
+        // try {
+        // IndexReader[] readers = reader.getSequentialSubReaders();
+        // if (readers != null) {
+        // segmentCount.addAndGet(readers.length);
+        // }
+        // indexMemoryUsage.addAndGet(BlurUtil.getMemoryUsage(reader));
+        // } finally {
+        // reader.decRef();
+        // }
+        // }
+      }
+    }, _delay, _delay);
+  }
+
+  private void registerMyselfAsMemberOfCluster() {
+    String nodeName = getNodeName();
+    String registeredShardsPath = ZookeeperPathConstants.getRegisteredShardsPath(_cluster) + "/" + nodeName;
+    try {
+      if (_zookeeper.exists(registeredShardsPath, false) == null) {
+        _zookeeper.create(registeredShardsPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+      }
+    } catch (KeeperException e) {
+      throw new RuntimeException(e);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Map<String, ShardState> getShardState(String table) {
+    return _shardStateManager.getShardState(table);
+  }
+
+  private void setupFlushCacheTimer() {
+    _timerCacheFlush = new Timer("Flush-IndexServer-Caches", true);
+    _timerCacheFlush.schedule(new TimerTask() {
+      @Override
+      public void run() {
+        try {
+          cleanup();
+        } catch (Throwable t) {
+          LOG.error("Unknown error", t);
+        }
+      }
+
+      private void cleanup() {
+        clearMapOfOldTables(_tableAnalyzers);
+        clearMapOfOldTables(_tableDescriptors);
+        clearMapOfOldTables(_layoutManagers);
+        clearMapOfOldTables(_layoutCache);
+        clearMapOfOldTables(_tableSimilarity);
+        Map<String, Map<String, BlurIndex>> oldIndexesThatNeedToBeClosed = clearMapOfOldTables(_indexes);
+        for (String table : oldIndexesThatNeedToBeClosed.keySet()) {
+          Map<String, BlurIndex> indexes = oldIndexesThatNeedToBeClosed.get(table);
+          if (indexes == null) {
+            continue;
+          }
+          for (String shard : indexes.keySet()) {
+            BlurIndex index = indexes.get(shard);
+            if (index == null) {
+              continue;
+            }
+            close(index, table, shard);
+          }
+        }
+        for (String table : _indexes.keySet()) {
+          Map<String, BlurIndex> shardMap = _indexes.get(table);
+          if (shardMap != null) {
+            Set<String> shards = new HashSet<String>(shardMap.keySet());
+            Set<String> shardsToServe = getShardsToServe(table);
+            shards.removeAll(shardsToServe);
+            if (!shards.isEmpty()) {
+              LOG.info("Need to close indexes for table [{0}] indexes [{1}]", table, shards);
+            }
+            for (String shard : shards) {
+              LOG.info("Closing index for table [{0}] shard [{1}]", table, shard);
+              BlurIndex index = shardMap.remove(shard);
+              close(index, table, shard);
+            }
+          }
+        }
+      }
+    }, _delay, _delay);
+  }
+
+  protected void close(BlurIndex index, String table, String shard) {
+    LOG.info("Closing index [{0}] from table [{1}] shard [{2}]", index, table, shard);
+    try {
+      _filterCache.closing(table, shard, index);
+      _shardStateManager.closing(table, shard);
+      index.close();
+      _shardStateManager.closed(table, shard);
+    } catch (Throwable e) {
+      LOG.error("Error while closing index [{0}] from table [{1}] shard [{2}]", e, index, table, shard);
+      _shardStateManager.closingError(table, shard);
+    }
+  }
+
+  protected <T> Map<String, T> clearMapOfOldTables(Map<String, T> map) {
+    List<String> tables = new ArrayList<String>(map.keySet());
+    Map<String, T> removed = new HashMap<String, T>();
+    for (String table : tables) {
+      if (!_clusterStatus.exists(true, _cluster, table)) {
+        removed.put(table, map.remove(table));
+      }
+    }
+    for (String table : tables) {
+      if (!_clusterStatus.isEnabled(true, _cluster, table)) {
+        removed.put(table, map.remove(table));
+      }
+    }
+    return removed;
+  }
+
+  @Override
+  public void close() {
+    if (_running.get()) {
+      
+      _shardStateManager.close();
+      _running.set(false);
+      closeAllIndexes();
+      _refresher.close();
+      _indexCloser.close();
+      _watchOnlineShards.close();
+      _timerCacheFlush.purge();
+      _timerCacheFlush.cancel();
+
+      _timerTableWarmer.purge();
+      _timerTableWarmer.cancel();
+      _closer.close();
+      _gc.close();
+      _openerService.shutdownNow();
+    }
+  }
+
+  private void closeAllIndexes() {
+    for (Entry<String, Map<String, BlurIndex>> tableToShards : _indexes.entrySet()) {
+      for (Entry<String, BlurIndex> shard : tableToShards.getValue().entrySet()) {
+        BlurIndex index = shard.getValue();
+        try {
+          index.close();
+          LOG.info("Closed [{0}] [{1}] [{2}]", tableToShards.getKey(), shard.getKey(), index);
+        } catch (IOException e) {
+          LOG.info("Error during closing of [{0}] [{1}] [{2}]", tableToShards.getKey(), shard.getKey(), index);
+        }
+      }
+    }
+  }
+
+  @Override
+  public BlurAnalyzer getAnalyzer(String table) {
+    checkTable(table);
+    BlurAnalyzer blurAnalyzer = _tableAnalyzers.get(table);
+    if (blurAnalyzer == null) {
+      TableDescriptor descriptor = getTableDescriptor(table);
+      blurAnalyzer = new BlurAnalyzer(descriptor.analyzerDefinition);
+      _tableAnalyzers.put(table, blurAnalyzer);
+    }
+    return blurAnalyzer;
+  }
+
+  @Override
+  public SortedSet<String> getShardListCurrentServerOnly(String table) throws IOException {
+    return new TreeSet<String>(getShardsToServe(table));
+  }
+
+  @Override
+  public Map<String, BlurIndex> getIndexes(String table) throws IOException {
+    checkTable(table);
+
+    Set<String> shardsToServe = getShardsToServe(table);
+    synchronized (_indexes) {
+      if (!_indexes.containsKey(table)) {
+        _indexes.putIfAbsent(table, new ConcurrentHashMap<String, BlurIndex>());
+      }
+    }
+    Map<String, BlurIndex> tableIndexes = _indexes.get(table);
+    Set<String> shardsBeingServed = new HashSet<String>(tableIndexes.keySet());
+    if (shardsBeingServed.containsAll(shardsToServe)) {
+      Map<String, BlurIndex> result = new HashMap<String, BlurIndex>(tableIndexes);
+      shardsBeingServed.removeAll(shardsToServe);
+      for (String shardNotToServe : shardsBeingServed) {
+        result.remove(shardNotToServe);
+      }
+      return result;
+    } else {
+      return openMissingShards(table, shardsToServe, tableIndexes);
+    }
+  }
+
+  private BlurIndex openShard(String table, String shard) throws IOException {
+    LOG.info("Opening shard [{0}] for table [{1}]", shard, table);
+    Path tablePath = new Path(getTableDescriptor(table).tableUri);
+    Path hdfsDirPath = new Path(tablePath, shard);
+
+    BlurLockFactory lockFactory = new BlurLockFactory(_configuration, hdfsDirPath, _nodeName, BlurConstants.getPid());
+
+    Directory directory = new HdfsDirectory(_configuration, hdfsDirPath);
+    directory.setLockFactory(lockFactory);
+
+    TableDescriptor descriptor = _clusterStatus.getTableDescriptor(true, _cluster, table);
+    TableContext tableContext = TableContext.create(descriptor);
+    ShardContext shardContext = ShardContext.create(tableContext, shard);
+
+    Directory dir;
+    boolean blockCacheEnabled = _clusterStatus.isBlockCacheEnabled(_cluster, table);
+    if (blockCacheEnabled) {
+      Set<String> blockCacheFileTypes = _clusterStatus.getBlockCacheFileTypes(_cluster, table);
+      dir = new BlockDirectory(table + "_" + shard, directory, _cache, blockCacheFileTypes);
+    } else {
+      dir = directory;
+    }
+
+    BlurIndex index;
+    if (_clusterStatus.isReadOnly(true, _cluster, table)) {
+      BlurIndexReader reader = new BlurIndexReader(shardContext, directory, _refresher, _indexCloser);
+      index = reader;
+    } else {
+      BlurNRTIndex writer = new BlurNRTIndex(shardContext, _mergeScheduler, _closer, dir, _gc, _searchExecutor);
+      index = writer;
+    }
+    _filterCache.opening(table, shard, index);
+    TableDescriptor tableDescriptor = _clusterStatus.getTableDescriptor(true, _cluster, table);
+    return warmUp(index, tableDescriptor, shard);
+  }
+
+  private BlurIndex warmUp(BlurIndex index, TableDescriptor table, String shard) throws IOException {
+    final IndexSearcherClosable searcher = index.getIndexReader();
+    IndexReader reader = searcher.getIndexReader();
+    _warmup.warmBlurIndex(table, shard, reader, index.isClosed(), new ReleaseReader() {
+      @Override
+      public void release() throws IOException {
+        // this will allow for closing of index
+        searcher.close();
+      }
+    });
+    return index;
+  }
+
+  private synchronized Map<String, BlurIndex> openMissingShards(final String table, Set<String> shardsToServe,
+      final Map<String, BlurIndex> tableIndexes) {
+    Map<String, Future<BlurIndex>> opening = new HashMap<String, Future<BlurIndex>>();
+    for (String s : shardsToServe) {
+      final String shard = s;
+      BlurIndex blurIndex = tableIndexes.get(shard);
+      if (blurIndex == null) {
+        LOG.info("Opening missing shard [{0}] from table [{1}]", shard, table);
+        Future<BlurIndex> submit = _openerService.submit(new Callable<BlurIndex>() {
+          @Override
+          public BlurIndex call() throws Exception {
+            _shardStateManager.opening(table, shard);
+            try {
+              BlurIndex openShard = openShard(table, shard);
+              _shardStateManager.open(table, shard);
+              return openShard;
+            } catch (Exception e) {
+              _shardStateManager.openingError(table, shard);
+              throw e;
+            } catch (Throwable t) {
+              _shardStateManager.openingError(table, shard);
+              throw new RuntimeException(t);
+            }
+          }
+        });
+        opening.put(shard, submit);
+      }
+    }
+
+    for (Entry<String, Future<BlurIndex>> entry : opening.entrySet()) {
+      String shard = entry.getKey();
+      Future<BlurIndex> future = entry.getValue();
+      try {
+        BlurIndex blurIndex = future.get();
+        tableIndexes.put(shard, blurIndex);
+      } catch (Exception e) {
+        e.printStackTrace();
+        LOG.error("Unknown error while opening shard [{0}] for table [{1}].", e.getCause(), shard, table);
+      }
+    }
+
+    Map<String, BlurIndex> result = new HashMap<String, BlurIndex>();
+    for (String shard : shardsToServe) {
+      BlurIndex blurIndex = tableIndexes.get(shard);
+      if (blurIndex == null) {
+        LOG.error("Missing shard [{0}] for table [{1}].", shard, table);
+      } else {
+        result.put(shard, blurIndex);
+      }
+    }
+    return result;
+  }
+
+  private Set<String> getShardsToServe(String table) {
+    TABLE_STATUS tableStatus = getTableStatus(table);
+    if (tableStatus == TABLE_STATUS.DISABLED) {
+      return new HashSet<String>();
+    }
+    DistributedLayoutManager layoutManager = _layoutManagers.get(table);
+    if (layoutManager == null) {
+      return setupLayoutManager(table);
+    } else {
+      return _layoutCache.get(table);
+    }
+  }
+
+  private synchronized Set<String> setupLayoutManager(String table) {
+    DistributedLayoutManager layoutManager = new DistributedLayoutManager();
+
+    String cluster = _clusterStatus.getCluster(false, table);
+    if (cluster == null) {
+      throw new RuntimeException("Table [" + table + "] is not found.");
+    }
+
+    List<String> shardServerList = _clusterStatus.getShardServerList(cluster);
+    List<String> offlineShardServers = new ArrayList<String>(_clusterStatus.getOfflineShardServers(false, cluster));
+    List<String> shardList = getShardList(table);
+
+    layoutManager.setNodes(shardServerList);
+    layoutManager.setNodesOffline(offlineShardServers);
+    layoutManager.setShards(shardList);
+    layoutManager.init();
+
+    Map<String, String> layout = layoutManager.getLayout();
+    String nodeName = getNodeName();
+    Set<String> shardsToServeCache = new TreeSet<String>();
+    for (Entry<String, String> entry : layout.entrySet()) {
+      if (entry.getValue().equals(nodeName)) {
+        shardsToServeCache.add(entry.getKey());
+      }
+    }
+    _layoutCache.put(table, shardsToServeCache);
+    _layoutManagers.put(table, layoutManager);
+    return shardsToServeCache;
+  }
+
+  @Override
+  public String getNodeName() {
+    return _nodeName;
+  }
+
+  @Override
+  public int getShardCount(String table) {
+    checkTable(table);
+    TableDescriptor descriptor = getTableDescriptor(table);
+    return descriptor.shardCount;
+  }
+
+  @Override
+  public List<String> getShardList(String table) {
+    checkTable(table);
+    List<String> result = new ArrayList<String>();
+    try {
+      TableDescriptor descriptor = getTableDescriptor(table);
+      Path tablePath = new Path(descriptor.tableUri);
+      FileSystem fileSystem = FileSystem.get(tablePath.toUri(), _configuration);
+      if (!fileSystem.exists(tablePath)) {
+        LOG.error("Table [{0}] is missing, defined location [{1}]", table, tablePath.toUri());
+        throw new RuntimeException("Table [" + table + "] is missing, defined location [" + tablePath.toUri() + "]");
+      }
+      FileStatus[] listStatus = fileSystem.listStatus(tablePath);
+      for (FileStatus status : listStatus) {
+        if (status.isDir()) {
+          String name = status.getPath().getName();
+          if (name.startsWith(SHARD_PREFIX)) {
+            result.add(name);
+          }
+        }
+      }
+      return result;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Similarity getSimilarity(String table) {
+    checkTable(table);
+    Similarity similarity = _tableSimilarity.get(table);
+    if (similarity == null) {
+      TableDescriptor tableDescriptor = _clusterStatus.getTableDescriptor(true, _cluster, table);
+      String similarityClass = tableDescriptor.similarityClass;
+      if (similarityClass == null) {
+        similarity = new FairSimilarity();
+      } else {
+        similarity = getInstance(similarityClass, Similarity.class);
+      }
+      _tableSimilarity.put(table, similarity);
+    }
+    return similarity;
+  }
+
+  @Override
+  public long getTableSize(String table) throws IOException {
+    checkTable(table);
+    Path tablePath = new Path(getTableUri(table));
+    FileSystem fileSystem = FileSystem.get(tablePath.toUri(), _configuration);
+    ContentSummary contentSummary = fileSystem.getContentSummary(tablePath);
+    return contentSummary.getLength();
+  }
+
+  @Override
+  public TABLE_STATUS getTableStatus(String table) {
+    checkTable(table);
+    boolean enabled = _clusterStatus.isEnabled(true, _cluster, table);
+    if (enabled) {
+      return TABLE_STATUS.ENABLED;
+    }
+    return TABLE_STATUS.DISABLED;
+  }
+
+  private void checkTable(String table) {
+    if (_clusterStatus.exists(true, _cluster, table)) {
+      return;
+    }
+    throw new RuntimeException("Table [" + table + "] does not exist.");
+  }
+
+  @Override
+  public String getTableUri(String table) {
+    checkTable(table);
+    TableDescriptor descriptor = getTableDescriptor(table);
+    return descriptor.tableUri;
+  }
+
+  private TableDescriptor getTableDescriptor(String table) {
+    TableDescriptor tableDescriptor = _tableDescriptors.get(table);
+    if (tableDescriptor == null) {
+      tableDescriptor = _clusterStatus.getTableDescriptor(true, _cluster, table);
+      _tableDescriptors.put(table, tableDescriptor);
+    }
+    return tableDescriptor;
+  }
+
+  @SuppressWarnings("unchecked")
+  private <T> T getInstance(String className, Class<T> c) {
+    try {
+      Class<? extends T> clazz = (Class<? extends T>) Class.forName(className);
+      Object object = clazz.newInstance();
+      if (object instanceof Configurable) {
+        Configurable configurable = (Configurable) object;
+        configurable.setConf(_configuration);
+      }
+      return (T) object;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public void setClusterStatus(ClusterStatus clusterStatus) {
+    _clusterStatus = clusterStatus;
+  }
+
+  public void setConfiguration(Configuration configuration) {
+    _configuration = configuration;
+  }
+
+  public void setNodeName(String nodeName) {
+    _nodeName = nodeName;
+  }
+
+  public void setShardOpenerThreadCount(int shardOpenerThreadCount) {
+    _shardOpenerThreadCount = shardOpenerThreadCount;
+  }
+
+  public void setCache(Cache cache) {
+    _cache = cache;
+  }
+
+  public void setZookeeper(ZooKeeper zookeeper) {
+    _zookeeper = zookeeper;
+  }
+
+  public void setFilterCache(BlurFilterCache filterCache) {
+    _filterCache = filterCache;
+  }
+
+  public void setSafeModeDelay(long safeModeDelay) {
+    _safeModeDelay = safeModeDelay;
+  }
+
+  public void setWarmup(BlurIndexWarmup warmup) {
+    _warmup = warmup;
+  }
+
+  public void setClusterName(String cluster) {
+    _cluster = cluster;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedLayoutManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedLayoutManager.java b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedLayoutManager.java
new file mode 100644
index 0000000..293f957
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedLayoutManager.java
@@ -0,0 +1,245 @@
+package org.apache.blur.manager.indexserver;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.Map.Entry;
+
+public class DistributedLayoutManager {
+
+  private static final SortedSet<String> EMPTY_SORTED_SET = new TreeSet<String>();
+
+  private SortedSet<String> nodes = EMPTY_SORTED_SET;
+  private Set<String> nodesOffline = EMPTY_SORTED_SET;
+  private SortedSet<String> shards = EMPTY_SORTED_SET;
+  private List<String> nodeList = new ArrayList<String>();
+  private Map<String, String> cache = new TreeMap<String, String>();
+
+  public DistributedLayoutManager init() {
+    if (nodesOffline.equals(nodes) || nodes.isEmpty()) {
+      cache = getLockedMap(new TreeMap<String, String>());
+      return this;
+    }
+    Map<String, String> mappings = new TreeMap<String, String>();
+    SortedSet<String> moveBecauseOfDownNodes = new TreeSet<String>();
+    int nodeListSize = nodeList.size();
+    int nodeCount = getStartingPoint();
+    for (String shard : shards) {
+      String node = nodeList.get(nodeCount);
+      mappings.put(shard, node);
+      if (nodesOffline.contains(node)) {
+        moveBecauseOfDownNodes.add(shard);
+      }
+      nodeCount++;
+      if (nodeCount >= nodeListSize) {
+        nodeCount = 0;
+      }
+    }
+    for (String shard : moveBecauseOfDownNodes) {
+      String node = nodeList.get(nodeCount);
+      while (isOffline(node)) {
+        nodeCount++;
+        if (nodeCount >= nodeListSize) {
+          nodeCount = 0;
+        }
+        node = nodeList.get(nodeCount);
+      }
+      mappings.put(shard, node);
+      nodeCount++;
+      if (nodeCount >= nodeListSize) {
+        nodeCount = 0;
+      }
+    }
+    cache = getLockedMap(mappings);
+    return this;
+  }
+
+  private int getStartingPoint() {
+    int size = nodes.size();
+    int hash = 37;
+    for (String node : nodes) {
+      hash += node.hashCode() * 17;
+    }
+    return Math.abs(hash % size);
+  }
+
+  public Map<String, String> getLayout() {
+    return cache;
+  }
+
+  private boolean isOffline(String node) {
+    return nodesOffline.contains(node);
+  }
+
+  public Collection<String> getNodes() {
+    return new TreeSet<String>(nodes);
+  }
+
+  public void setNodes(Collection<String> v) {
+    this.nodes = new TreeSet<String>(v);
+    this.nodeList = new ArrayList<String>(nodes);
+  }
+
+  public Collection<String> getShards() {
+    return new TreeSet<String>(shards);
+  }
+
+  public void setShards(Collection<String> v) {
+    this.shards = new TreeSet<String>(v);
+  }
+
+  public Collection<String> getNodesOffline() {
+    return new TreeSet<String>(nodesOffline);
+  }
+
+  public void setNodesOffline(Collection<String> v) {
+    this.nodesOffline = new HashSet<String>(v);
+  }
+
+  private Map<String, String> getLockedMap(final Map<String, String> map) {
+    final Set<Entry<String, String>> entrySet = wrapEntries(map.entrySet());
+    return new Map<String, String>() {
+
+      @Override
+      public boolean containsKey(Object key) {
+        return map.containsKey(key);
+      }
+
+      @Override
+      public boolean containsValue(Object value) {
+        return map.containsValue(value);
+      }
+
+      @Override
+      public Set<java.util.Map.Entry<String, String>> entrySet() {
+        return entrySet;
+      }
+
+      @Override
+      public String get(Object key) {
+        return map.get(key);
+      }
+
+      @Override
+      public boolean isEmpty() {
+        return map.isEmpty();
+      }
+
+      @Override
+      public Set<String> keySet() {
+        return new TreeSet<String>(map.keySet());
+      }
+
+      @Override
+      public int size() {
+        return map.size();
+      }
+
+      @Override
+      public Collection<String> values() {
+        return new TreeSet<String>(map.values());
+      }
+
+      @Override
+      public void clear() {
+        throw new RuntimeException("read only");
+      }
+
+      @Override
+      public String put(String key, String value) {
+        throw new RuntimeException("read only");
+      }
+
+      @Override
+      public void putAll(Map<? extends String, ? extends String> m) {
+        throw new RuntimeException("read only");
+      }
+
+      @Override
+      public String remove(Object key) {
+        throw new RuntimeException("read only");
+      }
+
+      @Override
+      public String toString() {
+        return map.toString();
+      }
+
+      @Override
+      public boolean equals(Object obj) {
+        return map.equals(obj);
+      }
+
+      @Override
+      public int hashCode() {
+        return map.hashCode();
+      }
+    };
+  }
+
+  private Set<Entry<String, String>> wrapEntries(Set<Entry<String, String>> entrySet) {
+    Set<Entry<String, String>> result = new HashSet<Entry<String, String>>();
+    for (Entry<String, String> entry : entrySet) {
+      result.add(wrapEntry(entry));
+    }
+    return result;
+  }
+
+  private Entry<String, String> wrapEntry(final Entry<String, String> entry) {
+    return new Entry<String, String>() {
+
+      @Override
+      public String setValue(String value) {
+        throw new RuntimeException("read only");
+      }
+
+      @Override
+      public String getValue() {
+        return entry.getValue();
+      }
+
+      @Override
+      public String getKey() {
+        return entry.getKey();
+      }
+
+      @Override
+      public String toString() {
+        return entry.toString();
+      }
+
+      @Override
+      public boolean equals(Object obj) {
+        return entry.equals(obj);
+      }
+
+      @Override
+      public int hashCode() {
+        return entry.hashCode();
+      }
+    };
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java b/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
new file mode 100644
index 0000000..5872c11
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
@@ -0,0 +1,242 @@
+package org.apache.blur.manager.indexserver;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.apache.blur.lucene.LuceneVersionConstant.LUCENE_VERSION;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.blur.analysis.BlurAnalyzer;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.lucene.search.FairSimilarity;
+import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC;
+import org.apache.blur.lucene.store.refcounter.IndexInputCloser;
+import org.apache.blur.manager.writer.BlurIndex;
+import org.apache.blur.manager.writer.BlurNRTIndex;
+import org.apache.blur.manager.writer.SharedMergeScheduler;
+import org.apache.blur.server.ShardContext;
+import org.apache.blur.server.TableContext;
+import org.apache.blur.thrift.generated.ShardState;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.analysis.core.KeywordAnalyzer;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.analysis.util.CharArraySet;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.search.similarities.Similarity;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.MMapDirectory;
+
+import com.google.common.io.Closer;
+
+public class LocalIndexServer extends AbstractIndexServer {
+
+  private final static Log LOG = LogFactory.getLog(LocalIndexServer.class);
+
+  private final Map<String, Map<String, BlurIndex>> _readersMap = new ConcurrentHashMap<String, Map<String, BlurIndex>>();
+  private final SharedMergeScheduler _mergeScheduler;
+  private final IndexInputCloser _indexInputCloser;
+  private final DirectoryReferenceFileGC _gc;
+  private final ExecutorService _searchExecutor;
+  private final TableContext _tableContext;
+  private final Closer _closer;
+
+  public LocalIndexServer(TableDescriptor tableDescriptor) throws IOException {
+    _closer = Closer.create();
+    _tableContext = TableContext.create(tableDescriptor);
+    _mergeScheduler = new SharedMergeScheduler();
+    _indexInputCloser = new IndexInputCloser();
+    _indexInputCloser.init();
+    _gc = new DirectoryReferenceFileGC();
+    _gc.init();
+    _searchExecutor = Executors.newCachedThreadPool();
+    _closer.register(_mergeScheduler);
+    _closer.register(_indexInputCloser);
+    _closer.register(_gc);
+    _closer.register(new CloseableExecutorService(_searchExecutor));
+
+    getIndexes(_tableContext.getTable());
+  }
+
+  @Override
+  public void close() {
+    try {
+      _closer.close();
+    } catch (IOException e) {
+      LOG.error("Unknown error", e);
+    }
+    for (String table : _readersMap.keySet()) {
+      close(_readersMap.get(table));
+    }
+  }
+
+  @Override
+  public BlurAnalyzer getAnalyzer(String table) {
+    return new BlurAnalyzer(new StandardAnalyzer(LUCENE_VERSION, new CharArraySet(LUCENE_VERSION,
+        new HashSet<String>(), false)));
+  }
+
+  @Override
+  public SortedSet<String> getShardListCurrentServerOnly(String table) throws IOException {
+    Map<String, BlurIndex> tableMap = _readersMap.get(table);
+    Set<String> shardsSet;
+    if (tableMap == null) {
+      shardsSet = getIndexes(table).keySet();
+    } else {
+      shardsSet = tableMap.keySet();
+    }
+    return new TreeSet<String>(shardsSet);
+  }
+
+  @Override
+  public Map<String, BlurIndex> getIndexes(String table) throws IOException {
+    Map<String, BlurIndex> tableMap = _readersMap.get(table);
+    if (tableMap == null) {
+      tableMap = openFromDisk();
+      _readersMap.put(table, tableMap);
+    }
+    return tableMap;
+  }
+
+  @Override
+  public Similarity getSimilarity(String table) {
+    return new FairSimilarity();
+  }
+
+  private void close(Map<String, BlurIndex> map) {
+    for (BlurIndex index : map.values()) {
+      try {
+        index.close();
+      } catch (Exception e) {
+        LOG.error("Error while trying to close index.", e);
+      }
+    }
+  }
+
+  private Map<String, BlurIndex> openFromDisk() throws IOException {
+    String table = _tableContext.getDescriptor().getName();
+    Path tablePath = _tableContext.getTablePath();
+    File tableFile = new File(tablePath.toUri());
+    if (tableFile.isDirectory()) {
+      Map<String, BlurIndex> shards = new ConcurrentHashMap<String, BlurIndex>();
+      int shardCount = _tableContext.getDescriptor().getShardCount();
+      for (int i = 0; i < shardCount; i++) {
+        String shardName = BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, i);
+        File file = new File(tableFile, shardName);
+        file.mkdirs();
+        MMapDirectory directory = new MMapDirectory(file);
+        if (!DirectoryReader.indexExists(directory)) {
+          new IndexWriter(directory, new IndexWriterConfig(LUCENE_VERSION, new KeywordAnalyzer())).close();
+        }
+        shards.put(shardName, openIndex(table, shardName, directory));
+      }
+      return shards;
+    }
+    throw new IOException("Table [" + table + "] not found.");
+  }
+
+  private BlurIndex openIndex(String table, String shard, Directory dir) throws CorruptIndexException, IOException {
+    ShardContext shardContext = ShardContext.create(_tableContext, shard);
+    BlurNRTIndex index = new BlurNRTIndex(shardContext, _mergeScheduler, _indexInputCloser, dir, _gc, _searchExecutor);
+    return index;
+  }
+
+  @Override
+  public TABLE_STATUS getTableStatus(String table) {
+    return TABLE_STATUS.ENABLED;
+  }
+
+  @Override
+  public List<String> getShardList(String table) {
+    try {
+      List<String> result = new ArrayList<String>();
+      Path tablePath = _tableContext.getTablePath();
+      File tableFile = new File(new File(tablePath.toUri()), table);
+      if (tableFile.isDirectory()) {
+        for (File f : tableFile.listFiles()) {
+          if (f.isDirectory()) {
+            result.add(f.getName());
+          }
+        }
+      }
+      return result;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public String getNodeName() {
+    return "localhost";
+  }
+
+  @Override
+  public String getTableUri(String table) {
+    return _tableContext.getTablePath().toUri().toString();
+  }
+
+  @Override
+  public int getShardCount(String table) {
+    return _tableContext.getDescriptor().getShardCount();
+  }
+
+  @Override
+  public long getTableSize(String table) throws IOException {
+    try {
+      File file = new File(new URI(getTableUri(table)));
+      return getFolderSize(file);
+    } catch (URISyntaxException e) {
+      throw new IOException("bad URI", e);
+    }
+  }
+
+  private long getFolderSize(File file) {
+    long size = 0;
+    if (file.isDirectory()) {
+      for (File sub : file.listFiles()) {
+        size += getFolderSize(sub);
+      }
+    } else {
+      size += file.length();
+    }
+    return size;
+  }
+
+  @Override
+  public Map<String, ShardState> getShardState(String table) {
+    throw new RuntimeException("Not supported yet.");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/manager/indexserver/SafeMode.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/SafeMode.java b/blur-core/src/main/java/org/apache/blur/manager/indexserver/SafeMode.java
new file mode 100644
index 0000000..155722f
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/SafeMode.java
@@ -0,0 +1,193 @@
+package org.apache.blur.manager.indexserver;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.zookeeper.ZkUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+
+/**
+ * This class controls the startup of the cluster. Basically the first node
+ * online waits until there is no more nodes that have started. The period that
+ * is required to have no activity is the waittime passed in through the
+ * constructor. If a new node comes online while the leader is waiting, the wait
+ * starts over. Once the wait period has been exhausted the cluster is to be
+ * settled and can now come online.
+ * 
+ */
+public class SafeMode {
+
+  private static final Log LOG = LogFactory.getLog(SafeMode.class);
+  private static final String STARTUP = "STARTUP";
+  private static final String SETUP = "SETUP";
+
+  private final ZooKeeper zooKeeper;
+  private final String lockPath;
+  private final long waitTime;
+  private final Object lock = new Object();
+  private final Watcher watcher = new Watcher() {
+    @Override
+    public void process(WatchedEvent event) {
+      synchronized (lock) {
+        lock.notify();
+      }
+    }
+  };
+  private final Map<String, String> lockMap = new HashMap<String, String>();
+  private final String nodePath;
+  private final long duplicateNodeTimeout;
+
+  public SafeMode(ZooKeeper zooKeeper, String lockPath, String nodePath, TimeUnit waitTimeUnit, long waitTime,
+      TimeUnit duplicateNodeTimeoutTimeUnit, long duplicateNodeTimeout) {
+    this.zooKeeper = zooKeeper;
+    this.lockPath = lockPath;
+    this.waitTime = waitTimeUnit.toMillis(waitTime);
+    this.duplicateNodeTimeout = duplicateNodeTimeoutTimeUnit.toNanos(duplicateNodeTimeout);
+    this.nodePath = nodePath;
+    ZkUtils.mkNodesStr(zooKeeper, nodePath);
+    ZkUtils.mkNodesStr(zooKeeper, lockPath);
+  }
+
+  public void registerNode(String node, byte[] data) throws KeeperException, InterruptedException {
+    lock(SETUP);
+    register(node, data);
+    if (isLeader(node)) {
+      // Create barrier for cluster
+      lock(STARTUP);
+
+      // Allow other nodes to register
+      unlock(SETUP);
+      waitForClusterToSettle();
+      unlock(STARTUP);
+    } else {
+      // Allow other nodes to register
+      unlock(SETUP);
+
+      // Block waiting on cluster to settle
+      lock(STARTUP);
+      unlock(STARTUP);
+    }
+  }
+
+  private void waitForClusterToSettle() throws InterruptedException, KeeperException {
+    long startingWaitTime = System.currentTimeMillis();
+    List<String> prev = null;
+    while (true) {
+      synchronized (lock) {
+        List<String> children = new ArrayList<String>(zooKeeper.getChildren(nodePath, watcher));
+        Collections.sort(children);
+        if (children.equals(prev)) {
+          LOG.info("Clustered has settled.");
+          return;
+        } else {
+          prev = children;
+          LOG.info("Waiting for cluster to settle, current size [" + children.size() + "] total time waited so far ["
+              + (System.currentTimeMillis() - startingWaitTime) + " ms] waiting another [" + waitTime + " ms].");
+          lock.wait(waitTime);
+        }
+      }
+    }
+  }
+
+  private boolean isLeader(String node) throws KeeperException, InterruptedException {
+    List<String> children = zooKeeper.getChildren(nodePath, false);
+    if (children.size() == 1) {
+      String n = children.get(0);
+      if (!n.equals(node)) {
+        throw new RuntimeException("We got a problem here!  Only one node register [" + n + "] and I'm not it [" + node
+            + "]");
+      }
+      return true;
+    }
+    return false;
+  }
+
+  private void unlock(String name) throws InterruptedException, KeeperException {
+    if (!lockMap.containsKey(name)) {
+      throw new RuntimeException("Lock [" + name + "] has not be created.");
+    }
+    String lockPath = lockMap.get(name);
+    LOG.debug("Unlocking on path [" + lockPath + "] with name [" + name + "]");
+    zooKeeper.delete(lockPath, -1);
+  }
+
+  private void register(String node, byte[] data) throws KeeperException, InterruptedException {
+    String p = nodePath + "/" + node;
+    long start = System.nanoTime();
+    while (zooKeeper.exists(p, false) != null) {
+      if (start + duplicateNodeTimeout < System.nanoTime()) {
+        throw new RuntimeException("Node [" + node + "] cannot be registered, check to make sure a "
+            + "process has not already been started or that server" + " names have not been duplicated.");
+      }
+      LOG.info("Node [{0}] already registered, waiting for path [{1}] to be released", node, p);
+      String tmpPath = p + "_" + UUID.randomUUID();
+      zooKeeper.create(tmpPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+      Thread.sleep(1000);
+      zooKeeper.delete(tmpPath, -1);
+    }
+    zooKeeper.create(p, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+  }
+
+  private void lock(String name) throws KeeperException, InterruptedException {
+    if (lockMap.containsKey(name)) {
+      throw new RuntimeException("Lock [" + name + "] already created.");
+    }
+    String newPath = zooKeeper.create(lockPath + "/" + name + "_", null, Ids.OPEN_ACL_UNSAFE,
+        CreateMode.EPHEMERAL_SEQUENTIAL);
+    lockMap.put(name, newPath);
+    while (true) {
+      synchronized (lock) {
+        List<String> children = getOnlyThisLocksChildren(name, zooKeeper.getChildren(lockPath, watcher));
+        Collections.sort(children);
+        String firstElement = children.get(0);
+        if ((lockPath + "/" + firstElement).equals(newPath)) {
+          // yay!, we got the lock
+          LOG.debug("Lock on path [" + lockPath + "] with name [" + name + "]");
+          return;
+        } else {
+          LOG.debug("Waiting for lock on path [" + lockPath + "] with name [" + name + "]");
+          lock.wait();
+        }
+      }
+    }
+  }
+
+  private List<String> getOnlyThisLocksChildren(String name, List<String> children) {
+    List<String> result = new ArrayList<String>();
+    for (String c : children) {
+      if (c.startsWith(name + "_")) {
+        result.add(c);
+      }
+    }
+    return result;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/manager/indexserver/ShardStateManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/ShardStateManager.java b/blur-core/src/main/java/org/apache/blur/manager/indexserver/ShardStateManager.java
new file mode 100644
index 0000000..b507736
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/ShardStateManager.java
@@ -0,0 +1,186 @@
+package org.apache.blur.manager.indexserver;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.thrift.generated.ShardState;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * This class holds the current state of any given shard within the shard
+ * server.
+ * 
+ */
+public class ShardStateManager implements Closeable {
+
+  private static final long _5_SECONDS = TimeUnit.SECONDS.toMillis(5);
+  private static final long _60_SECONDS = TimeUnit.SECONDS.toMillis(60);
+  private final Map<Key, Value> stateMap = new ConcurrentHashMap<Key, Value>();
+  private final Timer timer;
+
+  public ShardStateManager() {
+    timer = new Timer("ShardStateManager-cleanup", true);
+    timer.schedule(new TimerTask() {
+      @Override
+      public void run() {
+        Collection<Key> toBeDeleted = null;
+        for (Entry<Key, Value> e : stateMap.entrySet()) {
+          if (shouldBeRemoved(e)) {
+            if (toBeDeleted == null) {
+              toBeDeleted = new HashSet<ShardStateManager.Key>();
+            }
+            toBeDeleted.add(e.getKey());
+          }
+        }
+        if (toBeDeleted != null) {
+          for (Key k : toBeDeleted) {
+            stateMap.remove(k);
+          }
+        }
+      }
+
+      private boolean shouldBeRemoved(Entry<Key, Value> e) {
+        if (e.getValue().timeToBeRemoved < System.currentTimeMillis()) {
+          return true;
+        }
+        return false;
+      }
+    }, _5_SECONDS, _5_SECONDS);
+  }
+
+  public void opening(String table, String shard) {
+    setState(table, shard, ShardState.OPENING);
+  }
+
+  public void open(String table, String shard) {
+    setState(table, shard, ShardState.OPEN);
+  }
+
+  public void openingError(String table, String shard) {
+    setState(table, shard, ShardState.OPENING_ERROR);
+  }
+
+  public void closing(String table, String shard) {
+    setState(table, shard, ShardState.CLOSING);
+  }
+
+  public void closed(String table, String shard) {
+    setState(table, shard, ShardState.CLOSED);
+  }
+
+  public void closingError(String table, String shard) {
+    setState(table, shard, ShardState.CLOSING_ERROR);
+  }
+
+  public Map<String, ShardState> getShardState(String table) {
+    Map<String, ShardState> result = new HashMap<String, ShardState>();
+    List<Entry<Key, Value>> entryList = new ArrayList<Entry<Key, Value>>(stateMap.entrySet());
+    for (Entry<Key, Value> entry : entryList) {
+      Key key = entry.getKey();
+      if (key.table.equals(table)) {
+        result.put(key.shard, entry.getValue().shardState);
+      }
+    }
+    return result;
+  }
+
+  private void setState(String table, String shard, ShardState state) {
+    switch (state) {
+    case CLOSED:
+    case CLOSING_ERROR:
+      stateMap.put(new Key(table, shard), new Value(state, System.currentTimeMillis() + _60_SECONDS));
+      return;
+    default:
+      stateMap.put(new Key(table, shard), new Value(state));
+      return;
+    }
+  }
+
+  private static class Value {
+    final ShardState shardState;
+    final long timeToBeRemoved;
+
+    Value(ShardState shardState) {
+      this(shardState, Long.MAX_VALUE);
+    }
+
+    Value(ShardState shardState, long timeToBeRemoved) {
+      this.shardState = shardState;
+      this.timeToBeRemoved = timeToBeRemoved;
+    }
+  }
+
+  private static class Key {
+    final String table;
+    final String shard;
+
+    Key(String table, String shard) {
+      this.table = table;
+      this.shard = shard;
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((shard == null) ? 0 : shard.hashCode());
+      result = prime * result + ((table == null) ? 0 : table.hashCode());
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj)
+        return true;
+      if (obj == null)
+        return false;
+      if (getClass() != obj.getClass())
+        return false;
+      Key other = (Key) obj;
+      if (shard == null) {
+        if (other.shard != null)
+          return false;
+      } else if (!shard.equals(other.shard))
+        return false;
+      if (table == null) {
+        if (other.table != null)
+          return false;
+      } else if (!table.equals(other.table))
+        return false;
+      return true;
+    }
+  }
+
+  @Override
+  public void close() {
+    timer.cancel();
+    timer.purge();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultComparator.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultComparator.java b/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultComparator.java
new file mode 100644
index 0000000..39f2f92
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultComparator.java
@@ -0,0 +1,35 @@
+package org.apache.blur.manager.results;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.Comparator;
+
+import org.apache.blur.thrift.generated.BlurResult;
+
+
+public class BlurResultComparator implements Comparator<BlurResult> {
+
+  @Override
+  public int compare(BlurResult o1, BlurResult o2) {
+    int compare = Double.compare(o2.score, o1.score);
+    if (compare == 0) {
+      return o2.locationId.compareTo(o1.locationId);
+    }
+    return compare;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterable.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterable.java b/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterable.java
new file mode 100644
index 0000000..517920a
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterable.java
@@ -0,0 +1,32 @@
+package org.apache.blur.manager.results;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.Closeable;
+import java.util.Map;
+
+import org.apache.blur.thrift.generated.BlurResult;
+
+public interface BlurResultIterable extends Iterable<BlurResult>, Closeable {
+
+  void skipTo(long skipTo);
+
+  long getTotalResults();
+
+  Map<String, Long> getShardInfo();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableClient.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableClient.java b/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableClient.java
new file mode 100644
index 0000000..9cf7dd7
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/results/BlurResultIterableClient.java
@@ -0,0 +1,171 @@
+package org.apache.blur.manager.results;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicLongArray;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.thrift.BlurClientManager;
+import org.apache.blur.thrift.Connection;
+import org.apache.blur.thrift.generated.Blur;
+import org.apache.blur.thrift.generated.BlurQuery;
+import org.apache.blur.thrift.generated.BlurResult;
+import org.apache.blur.thrift.generated.BlurResults;
+import org.apache.blur.thrift.generated.Blur.Client;
+
+public class BlurResultIterableClient implements BlurResultIterable {
+
+  private static final Log LOG = LogFactory.getLog(BlurResultIterableClient.class);
+
+  private final Map<String, Long> _shardInfo = new TreeMap<String, Long>();
+  private final Client _client;
+  private final String _table;
+  private final BlurQuery _originalQuery;
+  private final Connection _connection;
+  private final int _remoteFetchCount;
+  private final AtomicLongArray _facetCounts;
+
+  private BlurResults _results;
+  private int _batch = 0;
+  private long _totalResults;
+  private long _skipTo;
+  private boolean _alreadyProcessed;
+
+  public BlurResultIterableClient(Connection connection, Blur.Client client, String table, BlurQuery query,
+      AtomicLongArray facetCounts, int remoteFetchCount) {
+    _connection = connection;
+    _client = client;
+    _table = table;
+    _facetCounts = facetCounts;
+    _originalQuery = query;
+    _remoteFetchCount = remoteFetchCount;
+    performSearch();
+  }
+
+  public Client getClient() {
+    return _client;
+  }
+
+  private void performSearch() {
+    try {
+      long cursor = _remoteFetchCount * _batch;
+      BlurQuery blurQuery = new BlurQuery(_originalQuery.simpleQuery, _originalQuery.expertQuery,
+          _originalQuery.facets, null, false, _originalQuery.useCacheIfPresent, cursor, _remoteFetchCount,
+          _originalQuery.minimumNumberOfResults, _originalQuery.maxQueryTime, _originalQuery.uuid,
+          _originalQuery.userContext, _originalQuery.cacheResult, _originalQuery.startTime,
+          _originalQuery.modifyFileCaches);
+      _results = makeLazy(_client.query(_table, blurQuery));
+      addFacets();
+      _totalResults = _results.totalResults;
+      _shardInfo.putAll(_results.shardInfo);
+      _batch++;
+    } catch (Exception e) {
+      LOG.error("Error during for [{0}]", e, _originalQuery);
+      throw new RuntimeException(e);
+    }
+  }
+
+  private BlurResults makeLazy(BlurResults results) {
+    List<BlurResult> list = results.results;
+    for (int i = 0; i < list.size(); i++) {
+      BlurResult blurResult = list.get(i);
+      if (blurResult != null) {
+        list.set(i, new LazyBlurResult(blurResult, _client));
+      }
+    }
+    return results;
+  }
+
+  private void addFacets() {
+    if (!_alreadyProcessed) {
+      List<Long> counts = _results.facetCounts;
+      if (counts != null) {
+        int size = counts.size();
+        for (int i = 0; i < size; i++) {
+          _facetCounts.addAndGet(i, counts.get(i));
+        }
+      }
+      _alreadyProcessed = true;
+    }
+  }
+
+  @Override
+  public Map<String, Long> getShardInfo() {
+    return _shardInfo;
+  }
+
+  @Override
+  public long getTotalResults() {
+    return _totalResults;
+  }
+
+  @Override
+  public void skipTo(long skipTo) {
+    this._skipTo = skipTo;
+  }
+
+  @Override
+  public Iterator<BlurResult> iterator() {
+    SearchIterator iterator = new SearchIterator();
+    long start = 0;
+    while (iterator.hasNext() && start < _skipTo) {
+      iterator.next();
+      start++;
+    }
+    return iterator;
+  }
+
+  public class SearchIterator implements Iterator<BlurResult> {
+
+    private int position = 0;
+    private int relposition = 0;
+
+    @Override
+    public boolean hasNext() {
+      if (position < _originalQuery.minimumNumberOfResults && position < _totalResults) {
+        return true;
+      }
+      return false;
+    }
+
+    @Override
+    public BlurResult next() {
+      if (relposition >= _results.results.size()) {
+        performSearch();
+        relposition = 0;
+      }
+      position++;
+      return _results.results.get(relposition++);
+    }
+
+    @Override
+    public void remove() {
+
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    BlurClientManager.returnClient(_connection, _client);
+  }
+}


Mime
View raw message