incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [2/3] git commit: Refactoring the DistributedIndexServer class to prepare for reworking the layout strategies.
Date Tue, 08 Oct 2013 15:03:29 GMT
Refactoring the DistributedIndexServer class to prepare for reworking the layout strategies.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/4727825d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/4727825d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/4727825d

Branch: refs/heads/apache-blur-0.2
Commit: 4727825d8807ac16dd0e597aaf4a5040c78ff1c4
Parents: 71b0e85
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Tue Oct 8 10:12:29 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Tue Oct 8 10:12:29 2013 -0400

----------------------------------------------------------------------
 .../org/apache/blur/manager/IndexManager.java   |  16 +-
 .../org/apache/blur/manager/IndexServer.java    |   5 +-
 .../AbstractDistributedIndexServer.java         | 131 +++++
 .../indexserver/DistributedIndexServer.java     | 480 +++++++------------
 .../indexserver/DistributedLayoutFactory.java   |  26 +
 .../blur/thrift/ThriftBlurShardServer.java      |  29 +-
 6 files changed, 365 insertions(+), 322 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4727825d/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
index 728fdc8..aa4dfe6 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
@@ -185,7 +185,11 @@ public class IndexManager {
       _statusManager.close();
       _executor.shutdownNow();
       _mutateExecutor.shutdownNow();
-      _indexServer.close();
+      try {
+        _indexServer.close();
+      } catch (IOException e) {
+        LOG.error("Unknown error while trying to close the index server", e);
+      }
     }
   }
 
@@ -402,7 +406,7 @@ public class IndexManager {
           preFilter, getScoreType(simpleQuery.scoreType), context);
       Query facetedQuery = getFacetedQuery(blurQuery, userQuery, facetedCounts, fieldManager,
context, postFilter,
           preFilter);
-      call = new SimpleQueryParallelCall(running, table, status, _indexServer, facetedQuery,
blurQuery.selector,
+      call = new SimpleQueryParallelCall(running, table, status, facetedQuery, blurQuery.selector,
           _queriesInternalMeter, shardServerContext, runSlow, _fetchCount, _maxHeapPerRowFetch,
context.getSimilarity());
       MergerBlurResultIterable merger = new MergerBlurResultIterable(blurQuery);
       return ForkJoin.execute(_executor, blurIndexes.entrySet(), call, new Cancel() {
@@ -1056,7 +1060,6 @@ public class IndexManager {
 
     private final String _table;
     private final QueryStatus _status;
-    private final IndexServer _indexServer;
     private final Query _query;
     private final Selector _selector;
     private final AtomicBoolean _running;
@@ -1067,13 +1070,12 @@ public class IndexManager {
     private final int _maxHeapPerRowFetch;
     private final Similarity _similarity;
 
-    public SimpleQueryParallelCall(AtomicBoolean running, String table, QueryStatus status,
IndexServer indexServer,
-        Query query, Selector selector, Meter queriesInternalMeter, ShardServerContext shardServerContext,
-        boolean runSlow, int fetchCount, int maxHeapPerRowFetch, Similarity similarity) {
+    public SimpleQueryParallelCall(AtomicBoolean running, String table, QueryStatus status,
Query query,
+        Selector selector, Meter queriesInternalMeter, ShardServerContext shardServerContext,
boolean runSlow,
+        int fetchCount, int maxHeapPerRowFetch, Similarity similarity) {
       _running = running;
       _table = table;
       _status = status;
-      _indexServer = indexServer;
       _query = query;
       _selector = selector;
       _queriesInternalMeter = queriesInternalMeter;

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4727825d/blur-core/src/main/java/org/apache/blur/manager/IndexServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/IndexServer.java b/blur-core/src/main/java/org/apache/blur/manager/IndexServer.java
index 4d04a89..ab5aad7 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/IndexServer.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/IndexServer.java
@@ -16,6 +16,7 @@ package org.apache.blur.manager;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -28,7 +29,7 @@ import org.apache.blur.thrift.generated.ShardState;
  * The {@link IndexServer} interface provides the internal API to interact with
  * the indexes being served in the shard server instance.
  */
-public interface IndexServer {
+public interface IndexServer extends Closeable {
 
   // Server state
 
@@ -106,7 +107,7 @@ public interface IndexServer {
   /**
    * Closes the index server.
    */
-  void close();
+  void close() throws IOException;
 
   /**
    * Get the shard state. Provides access to the as is state of the shards in

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4727825d/blur-core/src/main/java/org/apache/blur/manager/indexserver/AbstractDistributedIndexServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/AbstractDistributedIndexServer.java
b/blur-core/src/main/java/org/apache/blur/manager/indexserver/AbstractDistributedIndexServer.java
new file mode 100644
index 0000000..aba7ca2
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/AbstractDistributedIndexServer.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.manager.indexserver;
+
+import static org.apache.blur.metrics.MetricsConstants.BLUR;
+import static org.apache.blur.metrics.MetricsConstants.INDEX_COUNT;
+import static org.apache.blur.metrics.MetricsConstants.INDEX_MEMORY_USAGE;
+import static org.apache.blur.metrics.MetricsConstants.ORG_APACHE_BLUR;
+import static org.apache.blur.metrics.MetricsConstants.SEGMENT_COUNT;
+import static org.apache.blur.metrics.MetricsConstants.TABLE_COUNT;
+import static org.apache.blur.utils.BlurConstants.SHARD_PREFIX;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.manager.clusterstatus.ClusterStatus;
+import org.apache.blur.metrics.AtomicLongGauge;
+import org.apache.blur.server.TableContext;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.MetricName;
+
+public abstract class AbstractDistributedIndexServer extends AbstractIndexServer {
+
+  private static final Log LOG = LogFactory.getLog(AbstractDistributedIndexServer.class);
+
+  protected final ClusterStatus _clusterStatus;
+  protected final Configuration _configuration;
+  protected final String _nodeName;
+  protected final String _cluster;
+
+  protected final AtomicLong _tableCount = new AtomicLong();
+  protected final AtomicLong _indexCount = new AtomicLong();
+  protected final AtomicLong _segmentCount = new AtomicLong();
+  protected final AtomicLong _indexMemoryUsage = new AtomicLong();
+
+  public AbstractDistributedIndexServer(ClusterStatus clusterStatus, Configuration configuration,
String nodeName,
+      String cluster) {
+    _clusterStatus = clusterStatus;
+    _configuration = configuration;
+    _nodeName = nodeName;
+    _cluster = cluster;
+    MetricName tableCount = new MetricName(ORG_APACHE_BLUR, BLUR, TABLE_COUNT, _cluster);
+    MetricName indexCount = new MetricName(ORG_APACHE_BLUR, BLUR, INDEX_COUNT, _cluster);
+    MetricName segmentCount = new MetricName(ORG_APACHE_BLUR, BLUR, SEGMENT_COUNT, _cluster);
+    MetricName indexMemoryUsage = new MetricName(ORG_APACHE_BLUR, BLUR, INDEX_MEMORY_USAGE,
_cluster);
+
+    Metrics.newGauge(tableCount, new AtomicLongGauge(_tableCount));
+    Metrics.newGauge(indexCount, new AtomicLongGauge(_indexCount));
+    Metrics.newGauge(segmentCount, new AtomicLongGauge(_segmentCount));
+    Metrics.newGauge(indexMemoryUsage, new AtomicLongGauge(_indexMemoryUsage));
+  }
+
+  @Override
+  public final String getNodeName() {
+    return _nodeName;
+  }
+
+  @Override
+  public final long getTableSize(String table) throws IOException {
+    checkTable(table);
+    String tableUri = getTableContext(table).getTablePath().toUri().toString();
+    Path tablePath = new Path(tableUri);
+    FileSystem fileSystem = FileSystem.get(tablePath.toUri(), _configuration);
+    ContentSummary contentSummary = fileSystem.getContentSummary(tablePath);
+    return contentSummary.getLength();
+  }
+
+  @Override
+  public final List<String> getShardList(String table) {
+    checkTable(table);
+    List<String> result = new ArrayList<String>();
+    try {
+      TableContext tableContext = getTableContext(table);
+      TableDescriptor descriptor = tableContext.getDescriptor();
+      Path tablePath = new Path(descriptor.tableUri);
+      FileSystem fileSystem = FileSystem.get(tablePath.toUri(), _configuration);
+      if (!fileSystem.exists(tablePath)) {
+        LOG.error("Table [{0}] is missing, defined location [{1}]", table, tablePath.toUri());
+        throw new RuntimeException("Table [" + table + "] is missing, defined location ["
+ tablePath.toUri() + "]");
+      }
+      FileStatus[] listStatus = fileSystem.listStatus(tablePath);
+      for (FileStatus status : listStatus) {
+        if (status.isDir()) {
+          String name = status.getPath().getName();
+          if (name.startsWith(SHARD_PREFIX)) {
+            result.add(name);
+          }
+        }
+      }
+      return result;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  protected final TableContext getTableContext(final String table) {
+    return TableContext.create(_clusterStatus.getTableDescriptor(true, _clusterStatus.getCluster(true,
table), table));
+  }
+
+  protected final void checkTable(String table) {
+    if (_clusterStatus.exists(true, _cluster, table)) {
+      return;
+    }
+    throw new RuntimeException("Table [" + table + "] does not exist.");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4727825d/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
index 7c75234..49908e4 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
@@ -16,14 +16,6 @@ package org.apache.blur.manager.indexserver;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import static org.apache.blur.metrics.MetricsConstants.BLUR;
-import static org.apache.blur.metrics.MetricsConstants.INDEX_COUNT;
-import static org.apache.blur.metrics.MetricsConstants.INDEX_MEMORY_USAGE;
-import static org.apache.blur.metrics.MetricsConstants.ORG_APACHE_BLUR;
-import static org.apache.blur.metrics.MetricsConstants.SEGMENT_COUNT;
-import static org.apache.blur.metrics.MetricsConstants.TABLE_COUNT;
-import static org.apache.blur.utils.BlurConstants.SHARD_PREFIX;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -38,6 +30,7 @@ import java.util.TimerTask;
 import java.util.TreeSet;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -58,7 +51,6 @@ import org.apache.blur.manager.writer.BlurIndexReader;
 import org.apache.blur.manager.writer.BlurIndexRefresher;
 import org.apache.blur.manager.writer.BlurNRTIndex;
 import org.apache.blur.manager.writer.SharedMergeScheduler;
-import org.apache.blur.metrics.AtomicLongGauge;
 import org.apache.blur.server.IndexSearcherClosable;
 import org.apache.blur.server.ShardContext;
 import org.apache.blur.server.TableContext;
@@ -71,82 +63,74 @@ import org.apache.blur.utils.BlurUtil;
 import org.apache.blur.zookeeper.WatchChildren;
 import org.apache.blur.zookeeper.WatchChildren.OnChange;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.search.similarities.Similarity;
 import org.apache.lucene.store.Directory;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
 
-import com.yammer.metrics.Metrics;
-import com.yammer.metrics.core.MetricName;
-
-public class DistributedIndexServer extends AbstractIndexServer {
+public class DistributedIndexServer extends AbstractDistributedIndexServer {
 
   private static final Log LOG = LogFactory.getLog(DistributedIndexServer.class);
   private static final long _delay = TimeUnit.SECONDS.toMillis(10);
   private static final AtomicLong _pauseWarmup = new AtomicLong();
 
-  private Map<String, TableDescriptor> _tableDescriptors = new ConcurrentHashMap<String,
TableDescriptor>();
-  private Map<String, Similarity> _tableSimilarity = new ConcurrentHashMap<String,
Similarity>();
-  private Map<String, DistributedLayout> _layoutManagers = new ConcurrentHashMap<String,
DistributedLayout>();
-  private Map<String, Set<String>> _layoutCache = new ConcurrentHashMap<String,
Set<String>>();
-  private ConcurrentHashMap<String, Map<String, BlurIndex>> _indexes = new ConcurrentHashMap<String,
Map<String, BlurIndex>>();
-  private final ShardStateManager _shardStateManager = new ShardStateManager();
-
-  // set externally
-  private ClusterStatus _clusterStatus;
-  private Configuration _configuration;
-  private String _nodeName;
-  private int _shardOpenerThreadCount;
-  private BlockCacheDirectoryFactory _blockCacheDirectoryFactory;
-  private ZooKeeper _zookeeper;
-  private String _cluster;
+  static class LayoutEntry {
 
-  // set internally
-  private Timer _timerCacheFlush;
-  private ExecutorService _openerService;
-  private Timer _timerTableWarmer;
-  private BlurFilterCache _filterCache;
-  private AtomicBoolean _running = new AtomicBoolean();
-  private long _safeModeDelay;
-  private BlurIndexWarmup _warmup = new DefaultBlurIndexWarmup(1000000);
-  private DirectoryReferenceFileGC _gc;
-  private WatchChildren _watchOnlineShards;
-
-  private SharedMergeScheduler _mergeScheduler;
-  private IndexInputCloser _closer = null;
-  private ExecutorService _searchExecutor = null;
-
-  private AtomicLong _tableCount = new AtomicLong();
-  private AtomicLong _indexCount = new AtomicLong();
-  private AtomicLong _segmentCount = new AtomicLong();
-  private AtomicLong _indexMemoryUsage = new AtomicLong();
-  private BlurIndexRefresher _refresher;
-  private BlurIndexCloser _indexCloser;
-  private int _internalSearchThreads;
-  private ExecutorService _warmupExecutor;
-  private int _warmupThreads;
+    LayoutEntry(DistributedLayout distributedLayout, Set<String> shards) {
+      _distributedLayout = distributedLayout;
+      _shards = shards;
+    }
 
-  public static interface ReleaseReader {
-    void release() throws IOException;
+    final DistributedLayout _distributedLayout;
+    final Set<String> _shards;
   }
 
-  public void init() throws KeeperException, InterruptedException, IOException {
-    MetricName tableCount = new MetricName(ORG_APACHE_BLUR, BLUR, TABLE_COUNT, _cluster);
-    MetricName indexCount = new MetricName(ORG_APACHE_BLUR, BLUR, INDEX_COUNT, _cluster);
-    MetricName segmentCount = new MetricName(ORG_APACHE_BLUR, BLUR, SEGMENT_COUNT, _cluster);
-    MetricName indexMemoryUsage = new MetricName(ORG_APACHE_BLUR, BLUR, INDEX_MEMORY_USAGE,
_cluster);
+  // set externally
+  private final int _shardOpenerThreadCount;
+  private final BlockCacheDirectoryFactory _blockCacheDirectoryFactory;
+  private final ZooKeeper _zookeeper;
+  private final int _internalSearchThreads;
+  private final int _warmupThreads;
+  private final long _safeModeDelay;
+  private final BlurIndexWarmup _warmup;
+  private final BlurFilterCache _filterCache;
+  private final DistributedLayoutFactory _distributedLayoutFactory;
 
-    Metrics.newGauge(tableCount, new AtomicLongGauge(_tableCount));
-    Metrics.newGauge(indexCount, new AtomicLongGauge(_indexCount));
-    Metrics.newGauge(segmentCount, new AtomicLongGauge(_segmentCount));
-    Metrics.newGauge(indexMemoryUsage, new AtomicLongGauge(_indexMemoryUsage));
+  // set internally
+  private final AtomicBoolean _running = new AtomicBoolean();
+  private final Timer _timerTableWarmer;
+  private final Timer _timerCacheFlush;
+  private final ExecutorService _openerService;
+  private final DirectoryReferenceFileGC _gc;
+  private final WatchChildren _watchOnlineShards;
+  private final SharedMergeScheduler _mergeScheduler;
+  private final IndexInputCloser _closer;
+  private final ExecutorService _searchExecutor;
+  private final BlurIndexRefresher _refresher;
+  private final BlurIndexCloser _indexCloser;
+  private final ExecutorService _warmupExecutor;
+  private final ConcurrentMap<String, LayoutEntry> _layout = new ConcurrentHashMap<String,
LayoutEntry>();
+  private final ConcurrentMap<String, Map<String, BlurIndex>> _indexes = new
ConcurrentHashMap<String, Map<String, BlurIndex>>();
+  private final ShardStateManager _shardStateManager = new ShardStateManager();
+
+  public DistributedIndexServer(Configuration configuration, ZooKeeper zookeeper, ClusterStatus
clusterStatus,
+      BlurIndexWarmup warmup, BlurFilterCache filterCache, BlockCacheDirectoryFactory blockCacheDirectoryFactory,
+      DistributedLayoutFactory distributedLayoutFactory, String cluster, String nodeName,
long safeModeDelay,
+      int shardOpenerThreadCount, int internalSearchThreads, int warmupThreads) throws KeeperException,
+      InterruptedException {
+    super(clusterStatus, configuration, nodeName, cluster);
+    _shardOpenerThreadCount = shardOpenerThreadCount;
+    _zookeeper = zookeeper;
+    _filterCache = filterCache;
+    _safeModeDelay = safeModeDelay;
+    _warmup = warmup == null ? new DefaultBlurIndexWarmup(1000000) : warmup;
+    _internalSearchThreads = internalSearchThreads;
+    _warmupThreads = warmupThreads;
+    _blockCacheDirectoryFactory = blockCacheDirectoryFactory;
+    _distributedLayoutFactory = distributedLayoutFactory == null ? getDefaultLayoutFactory()
: distributedLayoutFactory;
 
     BlurUtil.setupZookeeper(_zookeeper, _cluster);
     _openerService = Executors.newThreadPool("shard-opener", _shardOpenerThreadCount);
@@ -163,7 +147,7 @@ public class DistributedIndexServer extends AbstractIndexServer {
     _refresher.init();
     _indexCloser = new BlurIndexCloser();
     _indexCloser.init();
-    setupFlushCacheTimer();
+    _timerCacheFlush = setupFlushCacheTimer();
 
     registerMyselfAsMemberOfCluster();
     String onlineShardsPath = ZookeeperPathConstants.getOnlineShardsPath(_cluster);
@@ -173,43 +157,127 @@ public class DistributedIndexServer extends AbstractIndexServer {
     safeMode.registerNode(getNodeName(), BlurUtil.getVersion().getBytes());
 
     _running.set(true);
-    setupTableWarmer();
-    watchForShardServerChanges();
+    _timerTableWarmer = setupTableWarmer();
+    _watchOnlineShards = watchForShardServerChanges();
   }
 
-  private void watchForShardServerChanges() {
-    ZookeeperPathConstants.getOnlineShardsPath(_cluster);
-    _watchOnlineShards = new WatchChildren(_zookeeper, ZookeeperPathConstants.getOnlineShardsPath(_cluster))
-        .watch(new OnChange() {
-          private List<String> _prevOnlineShards = new ArrayList<String>();
+  private DistributedLayoutFactory getDefaultLayoutFactory() {
+    return new DistributedLayoutFactory() {
 
-          @Override
-          public void action(List<String> onlineShards) {
-            List<String> oldOnlineShards = _prevOnlineShards;
-            _prevOnlineShards = onlineShards;
-            _layoutManagers.clear();
-            _layoutCache.clear();
-            LOG.info("Online shard servers changed, clearing layout managers and cache.");
-            if (oldOnlineShards == null) {
-              oldOnlineShards = new ArrayList<String>();
-            }
-            for (String oldOnlineShard : oldOnlineShards) {
-              if (!onlineShards.contains(oldOnlineShard)) {
-                LOG.info("Node went offline [{0}]", oldOnlineShard);
-              }
-            }
-            for (String onlineShard : onlineShards) {
-              if (!oldOnlineShards.contains(onlineShard)) {
-                LOG.info("Node came online [{0}]", onlineShard);
-              }
-            }
+      @Override
+      public DistributedLayout createDistributedLayout(String table, List<String> shardList,
+          List<String> shardServerList, List<String> offlineShardServers) {
+        DistributedLayoutManager layoutManager = new DistributedLayoutManager();
+        layoutManager.setNodes(shardServerList);
+        layoutManager.setNodesOffline(offlineShardServers);
+        layoutManager.setShards(shardList);
+        layoutManager.init();
+        return layoutManager;
+      }
+    };
+  }
+
+  public static interface ReleaseReader {
+    void release() throws IOException;
+  }
+
+  public static AtomicLong getPauseWarmup() {
+    return _pauseWarmup;
+  }
+
+  @Override
+  public Map<String, ShardState> getShardState(String table) {
+    return _shardStateManager.getShardState(table);
+  }
+
+  @Override
+  public void close() {
+    if (_running.get()) {
+      _shardStateManager.close();
+      _running.set(false);
+      closeAllIndexes();
+      _refresher.close();
+      _indexCloser.close();
+      _watchOnlineShards.close();
+      _timerCacheFlush.purge();
+      _timerCacheFlush.cancel();
+
+      _timerTableWarmer.purge();
+      _timerTableWarmer.cancel();
+      _closer.close();
+      _gc.close();
+      _openerService.shutdownNow();
+      _searchExecutor.shutdownNow();
+      _warmupExecutor.shutdownNow();
+    }
+  }
+
+  @Override
+  public SortedSet<String> getShardListCurrentServerOnly(String table) throws IOException
{
+    return new TreeSet<String>(getShardsToServe(table));
+  }
+
+  @Override
+  public Map<String, BlurIndex> getIndexes(String table) throws IOException {
+    checkTable(table);
+
+    Set<String> shardsToServe = getShardsToServe(table);
+    synchronized (_indexes) {
+      if (!_indexes.containsKey(table)) {
+        _indexes.putIfAbsent(table, new ConcurrentHashMap<String, BlurIndex>());
+      }
+    }
+    Map<String, BlurIndex> tableIndexes = _indexes.get(table);
+    Set<String> shardsBeingServed = new HashSet<String>(tableIndexes.keySet());
+    if (shardsBeingServed.containsAll(shardsToServe)) {
+      Map<String, BlurIndex> result = new HashMap<String, BlurIndex>(tableIndexes);
+      shardsBeingServed.removeAll(shardsToServe);
+      for (String shardNotToServe : shardsBeingServed) {
+        result.remove(shardNotToServe);
+      }
+      return result;
+    } else {
+      return openMissingShards(table, shardsToServe, tableIndexes);
+    }
+  }
+
+  private boolean isEnabled(String table) {
+    checkTable(table);
+    return _clusterStatus.isEnabled(true, _cluster, table);
+  }
+
+  private WatchChildren watchForShardServerChanges() {
+    WatchChildren watchOnlineShards = new WatchChildren(_zookeeper,
+        ZookeeperPathConstants.getOnlineShardsPath(_cluster)).watch(new OnChange() {
+      private List<String> _prevOnlineShards = new ArrayList<String>();
+
+      @Override
+      public void action(List<String> onlineShards) {
+        List<String> oldOnlineShards = _prevOnlineShards;
+        _prevOnlineShards = onlineShards;
+        _layout.clear();
+        LOG.info("Online shard servers changed, clearing layout managers and cache.");
+        if (oldOnlineShards == null) {
+          oldOnlineShards = new ArrayList<String>();
+        }
+        for (String oldOnlineShard : oldOnlineShards) {
+          if (!onlineShards.contains(oldOnlineShard)) {
+            LOG.info("Node went offline [{0}]", oldOnlineShard);
           }
-        });
+        }
+        for (String onlineShard : onlineShards) {
+          if (!oldOnlineShards.contains(onlineShard)) {
+            LOG.info("Node came online [{0}]", onlineShard);
+          }
+        }
+      }
+    });
+    return watchOnlineShards;
   }
 
-  private void setupTableWarmer() {
-    _timerTableWarmer = new Timer("Table-Warmer", true);
-    _timerTableWarmer.schedule(new TimerTask() {
+  private Timer setupTableWarmer() {
+    Timer timerTableWarmer = new Timer("Table-Warmer", true);
+    timerTableWarmer.schedule(new TimerTask() {
       @Override
       public void run() {
         try {
@@ -255,6 +323,7 @@ public class DistributedIndexServer extends AbstractIndexServer {
         }
       }
     }, _delay, _delay);
+    return timerTableWarmer;
   }
 
   private void registerMyselfAsMemberOfCluster() {
@@ -271,14 +340,9 @@ public class DistributedIndexServer extends AbstractIndexServer {
     }
   }
 
-  @Override
-  public Map<String, ShardState> getShardState(String table) {
-    return _shardStateManager.getShardState(table);
-  }
-
-  private void setupFlushCacheTimer() {
-    _timerCacheFlush = new Timer("Flush-IndexServer-Caches", true);
-    _timerCacheFlush.schedule(new TimerTask() {
+  private Timer setupFlushCacheTimer() {
+    Timer timerCacheFlush = new Timer("Flush-IndexServer-Caches", true);
+    timerCacheFlush.schedule(new TimerTask() {
       @Override
       public void run() {
         try {
@@ -289,10 +353,7 @@ public class DistributedIndexServer extends AbstractIndexServer {
       }
 
       private void cleanup() {
-        clearMapOfOldTables(_tableDescriptors);
-        clearMapOfOldTables(_layoutManagers);
-        clearMapOfOldTables(_layoutCache);
-        clearMapOfOldTables(_tableSimilarity);
+        clearMapOfOldTables(_layout);
         boolean closed = false;
         Map<String, Map<String, BlurIndex>> oldIndexesThatNeedToBeClosed = clearMapOfOldTables(_indexes);
         for (String table : oldIndexesThatNeedToBeClosed.keySet()) {
@@ -331,6 +392,7 @@ public class DistributedIndexServer extends AbstractIndexServer {
         }
       }
     }, _delay, _delay);
+    return timerCacheFlush;
   }
 
   protected void close(BlurIndex index, String table, String shard) {
@@ -362,28 +424,6 @@ public class DistributedIndexServer extends AbstractIndexServer {
     return removed;
   }
 
-  @Override
-  public void close() {
-    if (_running.get()) {
-      _shardStateManager.close();
-      _running.set(false);
-      closeAllIndexes();
-      _refresher.close();
-      _indexCloser.close();
-      _watchOnlineShards.close();
-      _timerCacheFlush.purge();
-      _timerCacheFlush.cancel();
-
-      _timerTableWarmer.purge();
-      _timerTableWarmer.cancel();
-      _closer.close();
-      _gc.close();
-      _openerService.shutdownNow();
-      _searchExecutor.shutdownNow();
-      _warmupExecutor.shutdownNow();
-    }
-  }
-
   private void closeAllIndexes() {
     for (Entry<String, Map<String, BlurIndex>> tableToShards : _indexes.entrySet())
{
       for (Entry<String, BlurIndex> shard : tableToShards.getValue().entrySet()) {
@@ -398,38 +438,10 @@ public class DistributedIndexServer extends AbstractIndexServer {
     }
   }
 
-  @Override
-  public SortedSet<String> getShardListCurrentServerOnly(String table) throws IOException
{
-    return new TreeSet<String>(getShardsToServe(table));
-  }
-
-  @Override
-  public Map<String, BlurIndex> getIndexes(String table) throws IOException {
-    checkTable(table);
-
-    Set<String> shardsToServe = getShardsToServe(table);
-    synchronized (_indexes) {
-      if (!_indexes.containsKey(table)) {
-        _indexes.putIfAbsent(table, new ConcurrentHashMap<String, BlurIndex>());
-      }
-    }
-    Map<String, BlurIndex> tableIndexes = _indexes.get(table);
-    Set<String> shardsBeingServed = new HashSet<String>(tableIndexes.keySet());
-    if (shardsBeingServed.containsAll(shardsToServe)) {
-      Map<String, BlurIndex> result = new HashMap<String, BlurIndex>(tableIndexes);
-      shardsBeingServed.removeAll(shardsToServe);
-      for (String shardNotToServe : shardsBeingServed) {
-        result.remove(shardNotToServe);
-      }
-      return result;
-    } else {
-      return openMissingShards(table, shardsToServe, tableIndexes);
-    }
-  }
-
   private BlurIndex openShard(String table, String shard) throws IOException {
     LOG.info("Opening shard [{0}] for table [{1}]", shard, table);
-    Path tablePath = new Path(getTableDescriptor(table).tableUri);
+    TableContext tableContext = getTableContext(table);
+    Path tablePath = tableContext.getTablePath();
     Path hdfsDirPath = new Path(tablePath, shard);
 
     BlurLockFactory lockFactory = new BlurLockFactory(_configuration, hdfsDirPath, _nodeName,
BlurUtil.getPid());
@@ -437,8 +449,6 @@ public class DistributedIndexServer extends AbstractIndexServer {
     Directory directory = new HdfsDirectory(_configuration, hdfsDirPath);
     directory.setLockFactory(lockFactory);
 
-    TableDescriptor descriptor = _clusterStatus.getTableDescriptor(true, _cluster, table);
-    TableContext tableContext = TableContext.create(descriptor);
     ShardContext shardContext = ShardContext.create(tableContext, shard);
 
     Directory dir;
@@ -483,7 +493,6 @@ public class DistributedIndexServer extends AbstractIndexServer {
         }
       }
     });
-
   }
 
   private synchronized Map<String, BlurIndex> openMissingShards(final String table,
Set<String> shardsToServe,
@@ -546,31 +555,26 @@ public class DistributedIndexServer extends AbstractIndexServer {
     if (!isEnabled(table)) {
       return new HashSet<String>();
     }
-    DistributedLayout layoutManager = _layoutManagers.get(table);
-    if (layoutManager == null) {
+    LayoutEntry layoutEntry = _layout.get(table);
+    if (layoutEntry == null) {
       return setupLayoutManager(table);
     } else {
-      return _layoutCache.get(table);
+      return layoutEntry._shards;
     }
   }
 
   private synchronized Set<String> setupLayoutManager(String table) {
-    DistributedLayoutManager layoutManager = new DistributedLayoutManager();
-
     String cluster = _clusterStatus.getCluster(false, table);
     if (cluster == null) {
       throw new RuntimeException("Table [" + table + "] is not found.");
     }
-
     List<String> shardServerList = _clusterStatus.getShardServerList(cluster);
     List<String> offlineShardServers = new ArrayList<String>(_clusterStatus.getOfflineShardServers(false,
cluster));
     List<String> shardList = getShardList(table);
 
-    layoutManager.setNodes(shardServerList);
-    layoutManager.setNodesOffline(offlineShardServers);
-    layoutManager.setShards(shardList);
-    layoutManager.init();
-
+    DistributedLayout layoutManager = _distributedLayoutFactory.createDistributedLayout(table,
shardList,
+        shardServerList, offlineShardServers);
+    
     Map<String, String> layout = layoutManager.getLayout();
     String nodeName = getNodeName();
     Set<String> shardsToServeCache = new TreeSet<String>();
@@ -579,127 +583,7 @@ public class DistributedIndexServer extends AbstractIndexServer {
         shardsToServeCache.add(entry.getKey());
       }
     }
-    _layoutCache.put(table, shardsToServeCache);
-    _layoutManagers.put(table, layoutManager);
+    _layout.put(table, new LayoutEntry(layoutManager, shardsToServeCache));
     return shardsToServeCache;
   }
-
-  @Override
-  public String getNodeName() {
-    return _nodeName;
-  }
-
-  @Override
-  public List<String> getShardList(String table) {
-    checkTable(table);
-    List<String> result = new ArrayList<String>();
-    try {
-      TableDescriptor descriptor = getTableDescriptor(table);
-      Path tablePath = new Path(descriptor.tableUri);
-      FileSystem fileSystem = FileSystem.get(tablePath.toUri(), _configuration);
-      if (!fileSystem.exists(tablePath)) {
-        LOG.error("Table [{0}] is missing, defined location [{1}]", table, tablePath.toUri());
-        throw new RuntimeException("Table [" + table + "] is missing, defined location ["
+ tablePath.toUri() + "]");
-      }
-      FileStatus[] listStatus = fileSystem.listStatus(tablePath);
-      for (FileStatus status : listStatus) {
-        if (status.isDir()) {
-          String name = status.getPath().getName();
-          if (name.startsWith(SHARD_PREFIX)) {
-            result.add(name);
-          }
-        }
-      }
-      return result;
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  private TableContext getTableContext(final String table) {
-    return TableContext.create(_clusterStatus.getTableDescriptor(true, _clusterStatus.getCluster(true,
table), table));
-  }
-
-  @Override
-  public long getTableSize(String table) throws IOException {
-    checkTable(table);
-    String tableUri = getTableContext(table).getTablePath().toUri().toString();
-    Path tablePath = new Path(tableUri);
-    FileSystem fileSystem = FileSystem.get(tablePath.toUri(), _configuration);
-    ContentSummary contentSummary = fileSystem.getContentSummary(tablePath);
-    return contentSummary.getLength();
-  }
-
-  public boolean isEnabled(String table) {
-    checkTable(table);
-    return _clusterStatus.isEnabled(true, _cluster, table);
-  }
-
-  private void checkTable(String table) {
-    if (_clusterStatus.exists(true, _cluster, table)) {
-      return;
-    }
-    throw new RuntimeException("Table [" + table + "] does not exist.");
-  }
-
-  private TableDescriptor getTableDescriptor(String table) {
-    TableDescriptor tableDescriptor = _tableDescriptors.get(table);
-    if (tableDescriptor == null) {
-      tableDescriptor = _clusterStatus.getTableDescriptor(true, _cluster, table);
-      _tableDescriptors.put(table, tableDescriptor);
-    }
-    return tableDescriptor;
-  }
-
-  public void setClusterStatus(ClusterStatus clusterStatus) {
-    _clusterStatus = clusterStatus;
-  }
-
-  public void setConfiguration(Configuration configuration) {
-    _configuration = configuration;
-  }
-
-  public void setNodeName(String nodeName) {
-    _nodeName = nodeName;
-  }
-
-  public void setShardOpenerThreadCount(int shardOpenerThreadCount) {
-    _shardOpenerThreadCount = shardOpenerThreadCount;
-  }
-
-  public void setZookeeper(ZooKeeper zookeeper) {
-    _zookeeper = zookeeper;
-  }
-
-  public void setFilterCache(BlurFilterCache filterCache) {
-    _filterCache = filterCache;
-  }
-
-  public void setSafeModeDelay(long safeModeDelay) {
-    _safeModeDelay = safeModeDelay;
-  }
-
-  public void setWarmup(BlurIndexWarmup warmup) {
-    _warmup = warmup;
-  }
-
-  public void setClusterName(String cluster) {
-    _cluster = cluster;
-  }
-
-  public void setInternalSearchThreads(int internalSearchThreads) {
-    _internalSearchThreads = internalSearchThreads;
-  }
-
-  public void setWarmupThreads(int warmupThreads) {
-    _warmupThreads = warmupThreads;
-  }
-
-  public static AtomicLong getPauseWarmup() {
-    return _pauseWarmup;
-  }
-  
-  public void setBlockCacheDirectoryFactory(BlockCacheDirectoryFactory blockCacheDirectoryFactory)
{
-    _blockCacheDirectoryFactory = blockCacheDirectoryFactory;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4727825d/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedLayoutFactory.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedLayoutFactory.java
b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedLayoutFactory.java
new file mode 100644
index 0000000..cb211e2
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedLayoutFactory.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.manager.indexserver;
+
+import java.util.List;
+
+public interface DistributedLayoutFactory {
+
+  DistributedLayout createDistributedLayout(String table, List<String> shardList, List<String>
shardServerList,
+      List<String> offlineShardServers);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4727825d/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
index 9febe8b..4f6bc43 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
@@ -62,12 +62,14 @@ import org.apache.blur.manager.BlurFilterCache;
 import org.apache.blur.manager.BlurQueryChecker;
 import org.apache.blur.manager.DefaultBlurFilterCache;
 import org.apache.blur.manager.IndexManager;
+import org.apache.blur.manager.clusterstatus.ClusterStatus;
 import org.apache.blur.manager.clusterstatus.ZookeeperClusterStatus;
 import org.apache.blur.manager.indexserver.BlurIndexWarmup;
 import org.apache.blur.manager.indexserver.BlurServerShutDown;
 import org.apache.blur.manager.indexserver.BlurServerShutDown.BlurShutdown;
 import org.apache.blur.manager.indexserver.DefaultBlurIndexWarmup;
 import org.apache.blur.manager.indexserver.DistributedIndexServer;
+import org.apache.blur.manager.indexserver.DistributedLayoutFactory;
 import org.apache.blur.manager.writer.BlurIndexRefresher;
 import org.apache.blur.metrics.JSONReporter;
 import org.apache.blur.metrics.ReporterSetup;
@@ -220,21 +222,18 @@ public class ThriftBlurShardServer extends ThriftServer {
 
     BlurFilterCache filterCache = getFilterCache(configuration);
     BlurIndexWarmup indexWarmup = getIndexWarmup(configuration);
-
-    final DistributedIndexServer indexServer = new DistributedIndexServer();
-    indexServer.setBlockCacheDirectoryFactory(blockCacheDirectoryFactory);
-    indexServer.setClusterStatus(clusterStatus);
-    indexServer.setClusterName(configuration.get(BLUR_CLUSTER_NAME, BLUR_CLUSTER));
-    indexServer.setConfiguration(config);
-    indexServer.setNodeName(nodeName);
-    indexServer.setShardOpenerThreadCount(configuration.getInt(BLUR_SHARD_OPENER_THREAD_COUNT,
16));
-    indexServer.setWarmupThreads(configuration.getInt(BLUR_SHARD_WARMUP_THREAD_COUNT, 16));
-    indexServer.setInternalSearchThreads(configuration.getInt(BLUR_SHARD_INTERNAL_SEARCH_THREAD_COUNT,
16));
-    indexServer.setZookeeper(zooKeeper);
-    indexServer.setFilterCache(filterCache);
-    indexServer.setSafeModeDelay(configuration.getLong(BLUR_SHARD_SAFEMODEDELAY, 60000));
-    indexServer.setWarmup(indexWarmup);
-    indexServer.init();
+    //@todo add in read from config
+    DistributedLayoutFactory distributedLayoutFactory = null;
+
+    String cluster = configuration.get(BLUR_CLUSTER_NAME, BLUR_CLUSTER);
+    long safeModeDelay = configuration.getLong(BLUR_SHARD_SAFEMODEDELAY, 60000);
+    int shardOpenerThreadCount = configuration.getInt(BLUR_SHARD_OPENER_THREAD_COUNT, 16);
+    int internalSearchThreads = configuration.getInt(BLUR_SHARD_WARMUP_THREAD_COUNT, 16);
+    int warmupThreads = configuration.getInt(BLUR_SHARD_WARMUP_THREAD_COUNT, 16);
+    
+    final DistributedIndexServer indexServer = new DistributedIndexServer(config, zooKeeper,
clusterStatus,
+        indexWarmup, filterCache, blockCacheDirectoryFactory, distributedLayoutFactory, cluster,
nodeName, safeModeDelay, shardOpenerThreadCount,
+        internalSearchThreads, warmupThreads);
 
     final IndexManager indexManager = new IndexManager();
     indexManager.setIndexServer(indexServer);


Mime
View raw message