incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [3/51] [partial] Initial repackage to org.apache.
Date Mon, 03 Sep 2012 03:17:10 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/com/nearinfinity/blur/manager/clusterstatus/ZookeeperClusterStatus.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/com/nearinfinity/blur/manager/clusterstatus/ZookeeperClusterStatus.java b/src/blur-core/src/main/java/com/nearinfinity/blur/manager/clusterstatus/ZookeeperClusterStatus.java
deleted file mode 100644
index ea9ead0..0000000
--- a/src/blur-core/src/main/java/com/nearinfinity/blur/manager/clusterstatus/ZookeeperClusterStatus.java
+++ /dev/null
@@ -1,823 +0,0 @@
-package com.nearinfinity.blur.manager.clusterstatus;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DeflateCodec;
-import org.apache.lucene.search.Similarity;
-import org.apache.thrift.TBase;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TJSONProtocol;
-import org.apache.thrift.transport.TMemoryInputTransport;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-
-import com.nearinfinity.blur.analysis.BlurAnalyzer;
-import com.nearinfinity.blur.log.Log;
-import com.nearinfinity.blur.log.LogFactory;
-import com.nearinfinity.blur.lucene.search.FairSimilarity;
-import com.nearinfinity.blur.thrift.generated.AnalyzerDefinition;
-import com.nearinfinity.blur.thrift.generated.ColumnPreCache;
-import com.nearinfinity.blur.thrift.generated.TableDescriptor;
-import com.nearinfinity.blur.utils.BlurUtil;
-import com.nearinfinity.blur.zookeeper.WatchChildren;
-import com.nearinfinity.blur.zookeeper.WatchChildren.OnChange;
-import com.nearinfinity.blur.zookeeper.WatchNodeData;
-import com.nearinfinity.blur.zookeeper.WatchNodeExistance;
-import com.nearinfinity.blur.zookeeper.ZkUtils;
-
-public class ZookeeperClusterStatus extends ClusterStatus {
-
-  private static final Log LOG = LogFactory.getLog(ZookeeperClusterStatus.class);
-
-  private ZooKeeper _zk;
-  private AtomicBoolean _running = new AtomicBoolean();
-  private ConcurrentMap<String, Long> _safeModeMap = new ConcurrentHashMap<String, Long>();
-  private ConcurrentMap<String, List<String>> _onlineShardsNodes = new ConcurrentHashMap<String, List<String>>();
-  private ConcurrentMap<String, Set<String>> _tablesPerCluster = new ConcurrentHashMap<String, Set<String>>();
-  private AtomicReference<Set<String>> _clusters = new AtomicReference<Set<String>>(new HashSet<String>());
-  private ConcurrentMap<String, Boolean> _enabled = new ConcurrentHashMap<String, Boolean>();
-  private ConcurrentMap<String, Boolean> _readOnly = new ConcurrentHashMap<String, Boolean>();
-
-  private WatchChildren _clusterWatcher;
-  private ConcurrentMap<String, WatchChildren> _onlineShardsNodesWatchers = new ConcurrentHashMap<String, WatchChildren>();
-  private ConcurrentMap<String, WatchChildren> _tableWatchers = new ConcurrentHashMap<String, WatchChildren>();
-  private ConcurrentMap<String, WatchNodeExistance> _safeModeWatchers = new ConcurrentHashMap<String, WatchNodeExistance>();
-  private ConcurrentMap<String, WatchNodeData> _safeModeDataWatchers = new ConcurrentHashMap<String, WatchNodeData>();
-  private ConcurrentMap<String, WatchNodeExistance> _enabledWatchNodeExistance = new ConcurrentHashMap<String, WatchNodeExistance>();
-  private ConcurrentMap<String, WatchNodeExistance> _readOnlyWatchNodeExistance = new ConcurrentHashMap<String, WatchNodeExistance>();
-
-  public ZookeeperClusterStatus(ZooKeeper zooKeeper) {
-    _zk = zooKeeper;
-    _running.set(true);
-    watchForClusters();
-    try {
-      Thread.sleep(1000);
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  class Clusters extends OnChange {
-    @Override
-    public void action(List<String> clusters) {
-      _clusters.set(new HashSet<String>(clusters));
-      for (String cluster : clusters) {
-        if (!_tableWatchers.containsKey(cluster)) {
-          String tablesPath = ZookeeperPathConstants.getTablesPath(cluster);
-          ZkUtils.waitUntilExists(_zk, tablesPath);
-          WatchChildren clusterWatcher = new WatchChildren(_zk, tablesPath).watch(new Tables(cluster));
-          _tableWatchers.put(cluster, clusterWatcher);
-          String safemodePath = ZookeeperPathConstants.getSafemodePath(cluster);
-          ZkUtils.waitUntilExists(_zk, safemodePath);
-          WatchNodeExistance watchNodeExistance = new WatchNodeExistance(_zk, safemodePath).watch(new SafeExistance(cluster));
-          _safeModeWatchers.put(cluster, watchNodeExistance);
-        }
-      }
-
-      List<String> clustersToCloseAndRemove = new ArrayList<String>(clusters);
-      clustersToCloseAndRemove.removeAll(_tableWatchers.keySet());
-      for (String cluster : clustersToCloseAndRemove) {
-        WatchChildren watcher = _tableWatchers.remove(cluster);
-        if (watcher == null) {
-          LOG.error("Error watcher is null [" + cluster + "] ");
-        } else {
-          watcher.close();
-        }
-      }
-    }
-  }
-
-  class SafeExistance extends WatchNodeExistance.OnChange {
-
-    private String cluster;
-
-    public SafeExistance(String cluster) {
-      this.cluster = cluster;
-    }
-
-    @Override
-    public void action(Stat stat) {
-      if (stat != null) {
-        WatchNodeData watchNodeData = new WatchNodeData(_zk, ZookeeperPathConstants.getSafemodePath(cluster));
-        watchNodeData.watch(new WatchNodeData.OnChange() {
-          @Override
-          public void action(byte[] data) {
-            if (data == null) {
-              LOG.debug("Safe mode value for cluster [" + cluster + "] is not set.");
-              _safeModeMap.put(cluster, Long.MIN_VALUE);
-            } else {
-              String value = new String(data);
-              LOG.debug("Safe mode value for cluster [" + cluster + "] is [" + value + "].");
-              _safeModeMap.put(cluster, Long.parseLong(value));
-            }
-          }
-        });
-        WatchNodeData nodeData = _safeModeDataWatchers.put(cluster, watchNodeData);
-        if (nodeData != null) {
-          nodeData.close();
-        }
-      }
-    }
-  }
-
-  class Tables extends OnChange {
-    private String cluster;
-
-    public Tables(String cluster) {
-      this.cluster = cluster;
-    }
-
-    @Override
-    public void action(List<String> tables) {
-      Set<String> newSet = new HashSet<String>(tables);
-      Set<String> oldSet = _tablesPerCluster.put(cluster, newSet);
-      Set<String> newTables = getNewTables(newSet, oldSet);
-      for (String table : newTables) {
-        final String clusterTableKey = getClusterTableKey(cluster, table);
-
-        WatchNodeExistance readOnlyWatcher = new WatchNodeExistance(_zk, ZookeeperPathConstants.getTableReadOnlyPath(cluster, table));
-        readOnlyWatcher.watch(new WatchNodeExistance.OnChange() {
-          @Override
-          public void action(Stat stat) {
-            if (stat == null) {
-              _readOnly.put(clusterTableKey, Boolean.FALSE);
-            } else {
-              _readOnly.put(clusterTableKey, Boolean.TRUE);
-            }
-          }
-        });
-        if (_readOnlyWatchNodeExistance.putIfAbsent(clusterTableKey, readOnlyWatcher) != null) {
-          readOnlyWatcher.close();
-        }
-
-        WatchNodeExistance enabledWatcher = new WatchNodeExistance(_zk, ZookeeperPathConstants.getTableEnabledPath(cluster, table));
-        enabledWatcher.watch(new WatchNodeExistance.OnChange() {
-          @Override
-          public void action(Stat stat) {
-            if (stat == null) {
-              _enabled.put(clusterTableKey, Boolean.FALSE);
-            } else {
-              _enabled.put(clusterTableKey, Boolean.TRUE);
-            }
-          }
-        });
-        if (_enabledWatchNodeExistance.putIfAbsent(clusterTableKey, enabledWatcher) != null) {
-          enabledWatcher.close();
-        }
-      }
-    }
-
-    private Set<String> getNewTables(Set<String> newSet, Set<String> oldSet) {
-      Set<String> newTables = new HashSet<String>(newSet);
-      if (oldSet != null) {
-        newTables.removeAll(oldSet);
-      }
-      return newTables;
-    }
-  }
-
-  private void watchForClusters() {
-    _clusterWatcher = new WatchChildren(_zk, ZookeeperPathConstants.getClustersPath()).watch(new Clusters());
-  }
-
-  public ZookeeperClusterStatus(String connectionStr) throws IOException {
-    this(new ZooKeeper(connectionStr, 30000, new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-
-      }
-    }));
-  }
-
-  private String getClusterTableKey(String cluster, String table) {
-    return cluster + "." + table;
-  }
-
-  @Override
-  public List<String> getClusterList(boolean useCache) {
-    if (useCache) {
-      return new ArrayList<String>(_clusters.get());
-    }
-    long s = System.nanoTime();
-    try {
-      checkIfOpen();
-      return _zk.getChildren(ZookeeperPathConstants.getClustersPath(), false);
-    } catch (KeeperException e) {
-      throw new RuntimeException(e);
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    } finally {
-      long e = System.nanoTime();
-      LOG.debug("trace getClusterList [" + (e - s) / 1000000.0 + " ms]");
-    }
-  }
-
-  private void checkIfOpen() {
-    if (_running.get()) {
-      return;
-    }
-    throw new RuntimeException("not open");
-  }
-
-  @Override
-  public List<String> getControllerServerList() {
-    long s = System.nanoTime();
-    try {
-      checkIfOpen();
-      return _zk.getChildren(ZookeeperPathConstants.getOnlineControllersPath(), false);
-    } catch (KeeperException e) {
-      throw new RuntimeException(e);
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    } finally {
-      long e = System.nanoTime();
-      LOG.debug("trace getControllerServerList [" + (e - s) / 1000000.0 + " ms]");
-    }
-  }
-
-  @Override
-  public List<String> getOnlineShardServers(boolean useCache, String cluster) {
-    if (useCache) {
-      List<String> shards = _onlineShardsNodes.get(cluster);
-      if (shards != null) {
-        return shards;
-      } else {
-        watchForOnlineShardNodes(cluster);
-      }
-    }
-
-    long s = System.nanoTime();
-    try {
-      checkIfOpen();
-      return _zk.getChildren(ZookeeperPathConstants.getClustersPath() + "/" + cluster + "/online/shard-nodes", false);
-    } catch (KeeperException e) {
-      throw new RuntimeException(e);
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    } finally {
-      long e = System.nanoTime();
-      LOG.debug("trace getOnlineShardServers took [" + (e - s) / 1000000.0 + " ms]");
-    }
-  }
-
-  private void watchForOnlineShardNodes(final String cluster) {
-    WatchChildren watch = new WatchChildren(_zk, ZookeeperPathConstants.getOnlineShardsPath(cluster)).watch(new OnChange() {
-      @Override
-      public void action(List<String> children) {
-        _onlineShardsNodes.put(cluster, children);
-      }
-    });
-    if (_onlineShardsNodesWatchers.putIfAbsent(cluster, watch) != null) {
-      // There was already a watch created. Close the extra watcher.
-      watch.close();
-    }
-  }
-
-  @Override
-  public List<String> getShardServerList(String cluster) {
-    long s = System.nanoTime();
-    try {
-      checkIfOpen();
-      return _zk.getChildren(ZookeeperPathConstants.getClustersPath() + "/" + cluster + "/shard-nodes", false);
-    } catch (KeeperException e) {
-      throw new RuntimeException(e);
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    } finally {
-      long e = System.nanoTime();
-      LOG.debug("trace getShardServerList took [" + (e - s) / 1000000.0 + " ms]");
-    }
-  }
-
-  @Override
-  public boolean exists(boolean useCache, String cluster, String table) {
-    if (useCache) {
-      Set<String> tables = _tablesPerCluster.get(cluster);
-      if (tables != null) {
-        if (tables.contains(table)) {
-          return true;
-        }
-      }
-    }
-    long s = System.nanoTime();
-    try {
-      checkIfOpen();
-      if (_zk.exists(ZookeeperPathConstants.getTablePath(cluster, table), false) == null) {
-        return false;
-      }
-      return true;
-    } catch (KeeperException e) {
-      throw new RuntimeException(e);
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    } finally {
-      long e = System.nanoTime();
-      LOG.debug("trace exists took [" + (e - s) / 1000000.0 + " ms]");
-    }
-  }
-
-  @Override
-  public boolean isEnabled(boolean useCache, String cluster, String table) {
-    if (useCache) {
-      Boolean e = _enabled.get(getClusterTableKey(cluster, table));
-      if (e != null) {
-        return e;
-      }
-    }
-    long s = System.nanoTime();
-    String tablePathIsEnabled = ZookeeperPathConstants.getTableEnabledPath(cluster, table);
-    try {
-      checkIfOpen();
-      if (_zk.exists(tablePathIsEnabled, false) == null) {
-        return false;
-      }
-    } catch (KeeperException e) {
-      throw new RuntimeException(e);
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    } finally {
-      long e = System.nanoTime();
-      LOG.debug("trace isEnabled took [" + (e - s) / 1000000.0 + " ms]");
-    }
-    return true;
-  }
-
-  private Map<String, TableDescriptor> _tableDescriptorCache = new ConcurrentHashMap<String, TableDescriptor>();
-
-  @Override
-  public TableDescriptor getTableDescriptor(boolean useCache, String cluster, String table) {
-    if (useCache) {
-      TableDescriptor tableDescriptor = _tableDescriptorCache.get(table);
-      updateReadOnlyAndEnabled(useCache, tableDescriptor, cluster, table);
-      if (tableDescriptor != null) {
-        return tableDescriptor;
-      }
-    }
-    long s = System.nanoTime();
-    TableDescriptor tableDescriptor = new TableDescriptor();
-    try {
-      checkIfOpen();
-      tableDescriptor.shardCount = Integer.parseInt(new String(getData(ZookeeperPathConstants.getTableShardCountPath(cluster, table))));
-      tableDescriptor.tableUri = new String(getData(ZookeeperPathConstants.getTableUriPath(cluster, table)));
-      tableDescriptor.compressionClass = new String(getData(ZookeeperPathConstants.getTableCompressionCodecPath(cluster, table)));
-      tableDescriptor.compressionBlockSize = Integer.parseInt(new String(getData(ZookeeperPathConstants.getTableCompressionBlockSizePath(cluster, table))));
-      tableDescriptor.analyzerDefinition = fromBytes(getData(ZookeeperPathConstants.getTablePath(cluster, table)), AnalyzerDefinition.class);
-      tableDescriptor.blockCaching = isBlockCacheEnabled(cluster, table);
-      tableDescriptor.blockCachingFileTypes = getBlockCacheFileTypes(cluster, table);
-      tableDescriptor.name = table;
-      tableDescriptor.columnPreCache = fromBytes(getData(ZookeeperPathConstants.getTableColumnsToPreCache(cluster, table)), ColumnPreCache.class);
-      byte[] data = getData(ZookeeperPathConstants.getTableSimilarityPath(cluster, table));
-      if (data != null) {
-        tableDescriptor.similarityClass = new String(data);
-      }
-      updateReadOnlyAndEnabled(useCache, tableDescriptor, cluster, table);
-    } catch (KeeperException e) {
-      throw new RuntimeException(e);
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    } finally {
-      long e = System.nanoTime();
-      LOG.debug("trace getTableDescriptor took [" + (e - s) / 1000000.0 + " ms]");
-    }
-    tableDescriptor.cluster = cluster;
-    _tableDescriptorCache.put(table, tableDescriptor);
-    return tableDescriptor;
-  }
-
-  private void updateReadOnlyAndEnabled(boolean useCache, TableDescriptor tableDescriptor, String cluster, String table) {
-    if (tableDescriptor != null) {
-      tableDescriptor.setReadOnly(isReadOnly(useCache, cluster, table));
-      tableDescriptor.setIsEnabled(isEnabled(useCache, cluster, table));
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  private <T extends TBase<?, ?>> T fromBytes(byte[] data, Class<T> clazz) {
-    try {
-      if (data == null) {
-        return null;
-      }
-      TBase<?, ?> base = clazz.newInstance();
-      TMemoryInputTransport trans = new TMemoryInputTransport(data);
-      TJSONProtocol protocol = new TJSONProtocol(trans);
-      base.read(protocol);
-      trans.close();
-      return (T) base;
-    } catch (InstantiationException e) {
-      throw new RuntimeException(e);
-    } catch (IllegalAccessException e) {
-      throw new RuntimeException(e);
-    } catch (TException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  private byte[] getData(String path) throws KeeperException, InterruptedException {
-    Stat stat = _zk.exists(path, false);
-    if (stat == null) {
-      return null;
-    }
-    return _zk.getData(path, false, stat);
-  }
-
-  @Override
-  public List<String> getTableList(boolean useCache, String cluster) {
-    if (useCache) {
-      Set<String> tables = _tablesPerCluster.get(cluster);
-      if (tables != null) {
-        return new ArrayList<String>(tables);
-      }
-    }
-    long s = System.nanoTime();
-    try {
-      checkIfOpen();
-      return _zk.getChildren(ZookeeperPathConstants.getTablesPath(cluster), false);
-    } catch (KeeperException e) {
-      throw new RuntimeException(e);
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    } finally {
-      long e = System.nanoTime();
-      LOG.debug("trace getTableList took [" + (e - s) / 1000000.0 + " ms]");
-    }
-  }
-
-  public void close() {
-    if (_running.get()) {
-      _running.set(false);
-      close(_clusterWatcher);
-      close(_onlineShardsNodesWatchers);
-      close(_tableWatchers);
-      close(_safeModeWatchers);
-      close(_safeModeDataWatchers);
-      close(_enabledWatchNodeExistance);
-      close(_readOnlyWatchNodeExistance);
-    }
-  }
-
-  private void close(ConcurrentMap<String, ? extends Closeable> closableMap) {
-    Collection<? extends Closeable> values = closableMap.values();
-    for (Closeable closeable : values) {
-      close(closeable);
-    }
-  }
-
-  private void close(Closeable closeable) {
-    try {
-      closeable.close();
-    } catch (IOException e) {
-      LOG.error("Unknown error while trying to close [{0}]", closeable);
-    }
-  }
-
-  @Override
-  public String getCluster(boolean useCache, String table) {
-    if (useCache) {
-      for (Entry<String, Set<String>> entry : _tablesPerCluster.entrySet()) {
-        if (entry.getValue().contains(table)) {
-          return entry.getKey();
-        }
-      }
-    }
-    List<String> clusterList = getClusterList(useCache);
-    for (String cluster : clusterList) {
-      long s = System.nanoTime();
-      try {
-        checkIfOpen();
-        Stat stat = _zk.exists(ZookeeperPathConstants.getTablePath(cluster, table), false);
-        if (stat != null) {
-          // _tableToClusterCache.put(table, cluster);
-          return cluster;
-        }
-      } catch (KeeperException e) {
-        throw new RuntimeException(e);
-      } catch (InterruptedException e) {
-        throw new RuntimeException(e);
-      } finally {
-        long e = System.nanoTime();
-        LOG.debug("trace getCluster took [" + (e - s) / 1000000.0 + " ms]");
-      }
-    }
-    return null;
-  }
-
-  @Override
-  public boolean isInSafeMode(boolean useCache, String cluster) {
-    if (useCache) {
-      Long safeModeTimestamp = _safeModeMap.get(cluster);
-      if (safeModeTimestamp != null && safeModeTimestamp != Long.MIN_VALUE) {
-        return safeModeTimestamp < System.currentTimeMillis() ? false : true;
-      }
-    }
-    long s = System.nanoTime();
-    try {
-      checkIfOpen();
-      String blurSafemodePath = ZookeeperPathConstants.getSafemodePath(cluster);
-      Stat stat = _zk.exists(blurSafemodePath, false);
-      if (stat == null) {
-        return false;
-      }
-      byte[] data = _zk.getData(blurSafemodePath, false, stat);
-      if (data == null) {
-        return false;
-      }
-      long timestamp = Long.parseLong(new String(data));
-      long waitTime = timestamp - System.currentTimeMillis();
-      if (waitTime > 0) {
-        return true;
-      }
-      return false;
-    } catch (KeeperException e) {
-      throw new RuntimeException(e);
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    } finally {
-      long e = System.nanoTime();
-      LOG.debug("trace isInSafeMode took [" + (e - s) / 1000000.0 + " ms]");
-    }
-  }
-
-  @Override
-  public int getShardCount(boolean useCache, String cluster, String table) {
-    if (useCache) {
-      TableDescriptor tableDescriptor = getTableDescriptor(true, cluster, table);
-      return tableDescriptor.shardCount;
-    }
-    long s = System.nanoTime();
-    try {
-      return Integer.parseInt(new String(getData(ZookeeperPathConstants.getTableShardCountPath(cluster, table))));
-    } catch (NumberFormatException e) {
-      throw new RuntimeException(e);
-    } catch (KeeperException e) {
-      throw new RuntimeException(e);
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    } finally {
-      long e = System.nanoTime();
-      LOG.debug("trace getShardCount took [" + (e - s) / 1000000.0 + " ms]");
-    }
-  }
-
-  @Override
-  public Set<String> getBlockCacheFileTypes(String cluster, String table) {
-    long s = System.nanoTime();
-    try {
-      byte[] data = getData(ZookeeperPathConstants.getTableBlockCachingFileTypesPath(cluster, table));
-      if (data == null) {
-        return null;
-      }
-      String str = new String(data);
-      if (str.isEmpty()) {
-        return null;
-      }
-      Set<String> types = new HashSet<String>(Arrays.asList(str.split(",")));
-      if (types.isEmpty()) {
-        return null;
-      }
-      return types;
-    } catch (KeeperException e) {
-      throw new RuntimeException(e);
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    } finally {
-      long e = System.nanoTime();
-      LOG.debug("trace getBlockCacheFileTypes took [" + (e - s) / 1000000.0 + " ms]");
-    }
-  }
-
-  @Override
-  public boolean isBlockCacheEnabled(String cluster, String table) {
-    long s = System.nanoTime();
-    try {
-      checkIfOpen();
-      if (_zk.exists(ZookeeperPathConstants.getTableBlockCachingFileTypesPath(cluster, table), false) == null) {
-        return false;
-      }
-    } catch (KeeperException e) {
-      throw new RuntimeException(e);
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    } finally {
-      long e = System.nanoTime();
-      LOG.debug("trace isBlockCacheEnabled took [" + (e - s) / 1000000.0 + " ms]");
-    }
-    return true;
-  }
-
-  @Override
-  public boolean isReadOnly(boolean useCache, String cluster, String table) {
-    if (useCache) {
-      Boolean ro = _readOnly.get(getClusterTableKey(cluster, table));
-      if (ro != null) {
-        return ro;
-      }
-    }
-    long s = System.nanoTime();
-    String path = ZookeeperPathConstants.getTableReadOnlyPath(cluster, table);
-    try {
-      checkIfOpen();
-      if (_zk.exists(path, false) == null) {
-        return false;
-      }
-      return true;
-    } catch (KeeperException e) {
-      throw new RuntimeException(e);
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    } finally {
-      long e = System.nanoTime();
-      LOG.debug("trace isReadOnly took [" + (e - s) / 1000000.0 + " ms]");
-    }
-  }
-
-  @Override
-  public void createTable(TableDescriptor tableDescriptor) {
-    long s = System.nanoTime();
-    try {
-      checkIfOpen();
-      if (tableDescriptor.getCompressionClass() == null) {
-        tableDescriptor.setCompressionClass(DeflateCodec.class.getName());
-      }
-      if (tableDescriptor.getSimilarityClass() == null) {
-        tableDescriptor.setSimilarityClass(FairSimilarity.class.getName());
-      }
-      if (tableDescriptor.getAnalyzerDefinition() == null) {
-        tableDescriptor.setAnalyzerDefinition(new AnalyzerDefinition());
-      }
-      String table = BlurUtil.nullCheck(tableDescriptor.name, "tableDescriptor.name cannot be null.");
-      String cluster = BlurUtil.nullCheck(tableDescriptor.cluster, "tableDescriptor.cluster cannot be null.");
-      BlurAnalyzer analyzer = new BlurAnalyzer(BlurUtil.nullCheck(tableDescriptor.analyzerDefinition, "tableDescriptor.analyzerDefinition cannot be null."));
-      String uri = BlurUtil.nullCheck(tableDescriptor.tableUri, "tableDescriptor.tableUri cannot be null.");
-      int shardCount = BlurUtil.zeroCheck(tableDescriptor.shardCount, "tableDescriptor.shardCount cannot be less than 1");
-      CompressionCodec compressionCodec = BlurUtil.getInstance(tableDescriptor.compressionClass, CompressionCodec.class);
-      // @TODO check block size
-      int compressionBlockSize = tableDescriptor.compressionBlockSize;
-      Similarity similarity = BlurUtil.getInstance(tableDescriptor.similarityClass, Similarity.class);
-      boolean blockCaching = tableDescriptor.blockCaching;
-      Set<String> blockCachingFileTypes = tableDescriptor.blockCachingFileTypes;
-      String blurTablePath = ZookeeperPathConstants.getTablePath(cluster, table);
-      ColumnPreCache columnPreCache = tableDescriptor.columnPreCache;
-
-      if (_zk.exists(blurTablePath, false) != null) {
-        throw new IOException("Table [" + table + "] already exists.");
-      }
-      BlurUtil.setupFileSystem(uri, shardCount);
-      BlurUtil.createPath(_zk, blurTablePath, analyzer.toJSON().getBytes());
-      BlurUtil.createPath(_zk, ZookeeperPathConstants.getTableColumnsToPreCache(cluster, table), BlurUtil.read(columnPreCache));
-      BlurUtil.createPath(_zk, ZookeeperPathConstants.getTableUriPath(cluster, table), uri.getBytes());
-      BlurUtil.createPath(_zk, ZookeeperPathConstants.getTableShardCountPath(cluster, table), Integer.toString(shardCount).getBytes());
-      BlurUtil.createPath(_zk, ZookeeperPathConstants.getTableCompressionCodecPath(cluster, table), compressionCodec.getClass().getName().getBytes());
-      BlurUtil.createPath(_zk, ZookeeperPathConstants.getTableCompressionBlockSizePath(cluster, table), Integer.toString(compressionBlockSize).getBytes());
-      BlurUtil.createPath(_zk, ZookeeperPathConstants.getTableSimilarityPath(cluster, table), similarity.getClass().getName().getBytes());
-      BlurUtil.createPath(_zk, ZookeeperPathConstants.getLockPath(cluster, table), null);
-      BlurUtil.createPath(_zk, ZookeeperPathConstants.getTableFieldNamesPath(cluster, table), null);
-      if (tableDescriptor.readOnly) {
-        BlurUtil.createPath(_zk, ZookeeperPathConstants.getTableReadOnlyPath(cluster, table), null);
-      }
-      if (blockCaching) {
-        BlurUtil.createPath(_zk, ZookeeperPathConstants.getTableBlockCachingPath(cluster, table), null);
-      }
-      BlurUtil.createPath(_zk, ZookeeperPathConstants.getTableBlockCachingFileTypesPath(cluster, table), toBytes(blockCachingFileTypes));
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    } catch (KeeperException e) {
-      throw new RuntimeException(e);
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    } finally {
-      long e = System.nanoTime();
-      LOG.debug("trace createTable took [" + (e - s) / 1000000.0 + " ms]");
-    }
-  }
-
-  @Override
-  public void disableTable(String cluster, String table) {
-    long s = System.nanoTime();
-    try {
-      checkIfOpen();
-      if (_zk.exists(ZookeeperPathConstants.getTablePath(cluster, table), false) == null) {
-        throw new IOException("Table [" + table + "] does not exist.");
-      }
-      String blurTableEnabledPath = ZookeeperPathConstants.getTableEnabledPath(cluster, table);
-      if (_zk.exists(blurTableEnabledPath, false) == null) {
-        throw new IOException("Table [" + table + "] already disabled.");
-      }
-      _zk.delete(blurTableEnabledPath, -1);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    } catch (KeeperException e) {
-      throw new RuntimeException(e);
-    } finally {
-      long e = System.nanoTime();
-      LOG.debug("trace disableTable took [" + (e - s) / 1000000.0 + " ms]");
-    }
-  }
-
-  @Override
-  public void enableTable(String cluster, String table) {
-    long s = System.nanoTime();
-    try {
-      checkIfOpen();
-      if (_zk.exists(ZookeeperPathConstants.getTablePath(cluster, table), false) == null) {
-        throw new IOException("Table [" + table + "] does not exist.");
-      }
-      String blurTableEnabledPath = ZookeeperPathConstants.getTableEnabledPath(cluster, table);
-      if (_zk.exists(blurTableEnabledPath, false) != null) {
-        throw new IOException("Table [" + table + "] already enabled.");
-      }
-      _zk.create(blurTableEnabledPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    } catch (KeeperException e) {
-      throw new RuntimeException(e);
-    } finally {
-      long e = System.nanoTime();
-      LOG.debug("trace enableTable took [" + (e - s) / 1000000.0 + " ms]");
-    }
-  }
-
-  @Override
-  public void removeTable(String cluster, String table, boolean deleteIndexFiles) {
-    long s = System.nanoTime();
-    try {
-      checkIfOpen();
-      String blurTablePath = ZookeeperPathConstants.getTablePath(cluster, table);
-      if (_zk.exists(blurTablePath, false) == null) {
-        throw new IOException("Table [" + table + "] does not exist.");
-      }
-      if (_zk.exists(ZookeeperPathConstants.getTableEnabledPath(cluster, table), false) != null) {
-        throw new IOException("Table [" + table + "] must be disabled before it can be removed.");
-      }
-      byte[] data = getData(ZookeeperPathConstants.getTableUriPath(cluster, table));
-      String uri = new String(data);
-      BlurUtil.removeAll(_zk, blurTablePath);
-      if (deleteIndexFiles) {
-        BlurUtil.removeIndexFiles(uri);
-      }
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    } catch (KeeperException e) {
-      throw new RuntimeException(e);
-    } finally {
-      long e = System.nanoTime();
-      LOG.debug("trace removeTable took [" + (e - s) / 1000000.0 + " ms]");
-    }
-  }
-
-  private static byte[] toBytes(Set<String> blockCachingFileTypes) {
-    if (blockCachingFileTypes == null || blockCachingFileTypes.isEmpty()) {
-      return null;
-    }
-    StringBuilder builder = new StringBuilder();
-    for (String type : blockCachingFileTypes) {
-      builder.append(type).append(',');
-    }
-    return builder.substring(0, builder.length() - 1).getBytes();
-  }
-
-  @Override
-  public boolean isOpen() {
-    return _running.get();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/com/nearinfinity/blur/manager/clusterstatus/ZookeeperPathConstants.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/com/nearinfinity/blur/manager/clusterstatus/ZookeeperPathConstants.java b/src/blur-core/src/main/java/com/nearinfinity/blur/manager/clusterstatus/ZookeeperPathConstants.java
deleted file mode 100644
index e9671b3..0000000
--- a/src/blur-core/src/main/java/com/nearinfinity/blur/manager/clusterstatus/ZookeeperPathConstants.java
+++ /dev/null
@@ -1,114 +0,0 @@
-package com.nearinfinity.blur.manager.clusterstatus;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-public class ZookeeperPathConstants {
-
-  public static String getBasePath() {
-    return "/blur";
-  }
-
-  public static String getClusterPath(String cluster) {
-    return getClustersPath() + "/" + cluster;
-  }
-
-  public static String getClustersPath() {
-    return getBasePath() + "/clusters";
-  }
-
-  public static String getOnlineControllersPath() {
-    return getBasePath() + "/online-controller-nodes";
-  }
-
-  public static String getTableEnabledPath(String cluster, String table) {
-    return getTablePath(cluster, table) + "/enabled";
-  }
-
-  public static String getTableUriPath(String cluster, String table) {
-    return getTablePath(cluster, table) + "/uri";
-  }
-
-  public static String getTableShardCountPath(String cluster, String table) {
-    return getTablePath(cluster, table) + "/shard-count";
-  }
-
-  public static String getOnlinePath(String cluster) {
-    return getClusterPath(cluster) + "/online";
-  }
-
-  public static String getOnlineShardsPath(String cluster) {
-    return getOnlinePath(cluster) + "/shard-nodes";
-  }
-
-  public static String getTablesPath(String cluster) {
-    return getClusterPath(cluster) + "/tables";
-  }
-
-  public static String getTablePath(String cluster, String table) {
-    return getTablesPath(cluster) + "/" + table;
-  }
-
-  public static String getSafemodePath(String cluster) {
-    return getClusterPath(cluster) + "/safemode";
-  }
-
-  public static String getRegisteredShardsPath(String cluster) {
-    return getClusterPath(cluster) + "/shard-nodes";
-  }
-
-  public static String getTableCompressionCodecPath(String cluster, String table) {
-    return getTablePath(cluster, table) + "/compression-codec";
-  }
-
-  public static String getTableCompressionBlockSizePath(String cluster, String table) {
-    return getTablePath(cluster, table) + "/compression-blocksize";
-  }
-
-  public static String getLockPath(String cluster, String table) {
-    return getTablePath(cluster, table) + "/locks";
-  }
-
-  public static String getTableBlockCachingFileTypesPath(String cluster, String table) {
-    return getTablePath(cluster, table) + "/blockcachingfiletypes";
-  }
-
-  public static String getTableBlockCachingPath(String cluster, String table) {
-    return getTablePath(cluster, table) + "/blockcaching";
-  }
-
-  public static String getTableSimilarityPath(String cluster, String table) {
-    return getTablePath(cluster, table) + "/similarity";
-  }
-
-  public static String getTableFieldNamesPath(String cluster, String table) {
-    return getTablePath(cluster, table) + "/fieldnames";
-  }
-
-  public static String getTableFieldNamesPath(String cluster, String table, String fieldName) {
-    return getTableFieldNamesPath(cluster, table) + "/" + fieldName;
-  }
-
-  public static String getTableReadOnlyPath(String cluster, String table) {
-    return getTablePath(cluster, table) + "/readonly";
-  }
-
-  public static String getTableColumnsToPreCache(String cluster, String table) {
-    return getTablePath(cluster, table) + "/precache";
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/com/nearinfinity/blur/manager/indexserver/AbstractIndexServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/com/nearinfinity/blur/manager/indexserver/AbstractIndexServer.java b/src/blur-core/src/main/java/com/nearinfinity/blur/manager/indexserver/AbstractIndexServer.java
deleted file mode 100644
index f411528..0000000
--- a/src/blur-core/src/main/java/com/nearinfinity/blur/manager/indexserver/AbstractIndexServer.java
+++ /dev/null
@@ -1,136 +0,0 @@
-package com.nearinfinity.blur.manager.indexserver;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.TermDocs;
-
-import com.nearinfinity.blur.manager.IndexServer;
-import com.nearinfinity.blur.manager.writer.BlurIndex;
-import com.nearinfinity.blur.utils.BlurConstants;
-
-public abstract class AbstractIndexServer implements IndexServer {
-
-  private Map<String, IndexCounts> _recordsTableCounts = new ConcurrentHashMap<String, IndexCounts>();
-  private Map<String, IndexCounts> _rowTableCounts = new ConcurrentHashMap<String, IndexCounts>();
-
-  private static class IndexCounts {
-    Map<String, IndexCount> counts = new ConcurrentHashMap<String, IndexCount>();
-  }
-
-  private static class IndexCount {
-    long version;
-    long count = -1;
-  }
-
-  public long getRecordCount(String table) throws IOException {
-    IndexCounts indexCounts;
-    synchronized (_recordsTableCounts) {
-      indexCounts = _recordsTableCounts.get(table);
-      if (indexCounts == null) {
-        indexCounts = new IndexCounts();
-        _recordsTableCounts.put(table, indexCounts);
-      }
-    }
-    synchronized (indexCounts) {
-      long recordCount = 0;
-      Map<String, BlurIndex> indexes = getIndexes(table);
-      for (Map.Entry<String, BlurIndex> index : indexes.entrySet()) {
-        IndexReader indexReader = null;
-        try {
-          String shard = index.getKey();
-          IndexCount indexCount = indexCounts.counts.get(shard);
-          if (indexCount == null) {
-            indexCount = new IndexCount();
-            indexCounts.counts.put(shard, indexCount);
-          }
-          indexReader = index.getValue().getIndexReader();
-          if (!isValid(indexCount, indexReader)) {
-            indexCount.count = indexReader.numDocs();
-            indexCount.version = indexReader.getVersion();
-          }
-          recordCount += indexCount.count;
-        } finally {
-          if (indexReader != null) {
-            indexReader.decRef();
-          }
-        }
-      }
-      return recordCount;
-    }
-  }
-
-  private boolean isValid(IndexCount indexCount, IndexReader indexReader) {
-    if (indexCount.version == indexReader.getVersion() && indexCount.count != -1l) {
-      return true;
-    }
-    return false;
-  }
-
-  public long getRowCount(String table) throws IOException {
-    IndexCounts indexCounts;
-    synchronized (_rowTableCounts) {
-      indexCounts = _rowTableCounts.get(table);
-      if (indexCounts == null) {
-        indexCounts = new IndexCounts();
-        _rowTableCounts.put(table, indexCounts);
-      }
-    }
-    synchronized (indexCounts) {
-      long rowCount = 0;
-      Map<String, BlurIndex> indexes = getIndexes(table);
-      for (Map.Entry<String, BlurIndex> index : indexes.entrySet()) {
-        IndexReader indexReader = null;
-        try {
-          String shard = index.getKey();
-          IndexCount indexCount = indexCounts.counts.get(shard);
-          if (indexCount == null) {
-            indexCount = new IndexCount();
-            indexCounts.counts.put(shard, indexCount);
-          }
-          indexReader = index.getValue().getIndexReader();
-          if (!isValid(indexCount, indexReader)) {
-            indexCount.count = getRowCount(indexReader);
-            indexCount.version = indexReader.getVersion();
-          }
-          rowCount += indexCount.count;
-        } finally {
-          if (indexReader != null) {
-            indexReader.decRef();
-          }
-        }
-      }
-      return rowCount;
-    }
-  }
-
-  private long getRowCount(IndexReader indexReader) throws IOException {
-    long rowCount = 0;
-    TermDocs termDocs = indexReader.termDocs(BlurConstants.PRIME_DOC_TERM);
-    while (termDocs.next()) {
-      if (!indexReader.isDeleted(termDocs.doc())) {
-        rowCount++;
-      }
-    }
-    termDocs.close();
-    return rowCount;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/com/nearinfinity/blur/manager/indexserver/BlurIndexWarmup.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/com/nearinfinity/blur/manager/indexserver/BlurIndexWarmup.java b/src/blur-core/src/main/java/com/nearinfinity/blur/manager/indexserver/BlurIndexWarmup.java
deleted file mode 100644
index 1aed7cb..0000000
--- a/src/blur-core/src/main/java/com/nearinfinity/blur/manager/indexserver/BlurIndexWarmup.java
+++ /dev/null
@@ -1,71 +0,0 @@
-package com.nearinfinity.blur.manager.indexserver;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.lucene.index.IndexReader;
-
-import com.nearinfinity.blur.manager.indexserver.DistributedIndexServer.ReleaseReader;
-import com.nearinfinity.blur.thrift.generated.TableDescriptor;
-
-public abstract class BlurIndexWarmup {
-
-  /**
-   * Once the reader has be warmed up, release() must be called on the
-   * ReleaseReader even if an exception occurs.
-   * 
-   * @param table
-   *          the table name.
-   * @param shard
-   *          the shard name.
-   * @param reader
-   *          thread reader inself.
-   * @param isClosed
-   *          to check if the shard has been migrated to another node.
-   * @param releaseReader
-   *          to release the handle on the reader.
-   * @throws IOException
-   * 
-   * @deprecated
-   */
-  public void warmBlurIndex(String table, String shard, IndexReader reader, AtomicBoolean isClosed, ReleaseReader releaseReader) throws IOException {
-
-  }
-
-  /**
-   * Once the reader has be warmed up, release() must be called on the
-   * ReleaseReader even if an exception occurs.
-   * 
-   * @param table
-   *          the table descriptor.
-   * @param shard
-   *          the shard name.
-   * @param reader
-   *          thread reader inself.
-   * @param isClosed
-   *          to check if the shard has been migrated to another node.
-   * @param releaseReader
-   *          to release the handle on the reader.
-   * @throws IOException
-   */
-  public void warmBlurIndex(TableDescriptor table, String shard, IndexReader reader, AtomicBoolean isClosed, ReleaseReader releaseReader) throws IOException {
-    warmBlurIndex(table.name, shard, reader, isClosed, releaseReader);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/com/nearinfinity/blur/manager/indexserver/BlurServerShutDown.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/com/nearinfinity/blur/manager/indexserver/BlurServerShutDown.java b/src/blur-core/src/main/java/com/nearinfinity/blur/manager/indexserver/BlurServerShutDown.java
deleted file mode 100644
index 9d12527..0000000
--- a/src/blur-core/src/main/java/com/nearinfinity/blur/manager/indexserver/BlurServerShutDown.java
+++ /dev/null
@@ -1,72 +0,0 @@
-package com.nearinfinity.blur.manager.indexserver;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-
-import com.nearinfinity.blur.log.Log;
-import com.nearinfinity.blur.log.LogFactory;
-
-public class BlurServerShutDown implements Watcher {
-
-  private static final Log LOG = LogFactory.getLog(BlurServerShutDown.class);
-
-  public interface BlurShutdown {
-    void shutdown();
-  }
-
-  private BlurShutdown shutdown;
-  private ZooKeeper zooKeeper;
-
-  public BlurServerShutDown() {
-    Runtime runtime = Runtime.getRuntime();
-    runtime.addShutdownHook(new Thread(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          LOG.info("Closing zookeeper.");
-          zooKeeper.close();
-        } catch (InterruptedException e) {
-          LOG.error("Unknown error while closing zookeeper.", e);
-        }
-      }
-    }));
-  }
-
-  public void register(final BlurShutdown shutdown, ZooKeeper zooKeeper) {
-    this.shutdown = shutdown;
-    this.zooKeeper = zooKeeper;
-    zooKeeper.register(new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-        KeeperState state = event.getState();
-        if (state == KeeperState.Expired) {
-          LOG.fatal("Zookeeper session has [" + state + "] server process shutting down.");
-          shutdown.shutdown();
-        }
-      }
-    });
-  }
-
-  @Override
-  public void process(WatchedEvent event) {
-    register(shutdown, zooKeeper);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/com/nearinfinity/blur/manager/indexserver/DefaultBlurIndexWarmup.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/com/nearinfinity/blur/manager/indexserver/DefaultBlurIndexWarmup.java b/src/blur-core/src/main/java/com/nearinfinity/blur/manager/indexserver/DefaultBlurIndexWarmup.java
deleted file mode 100644
index 2642184..0000000
--- a/src/blur-core/src/main/java/com/nearinfinity/blur/manager/indexserver/DefaultBlurIndexWarmup.java
+++ /dev/null
@@ -1,89 +0,0 @@
-package com.nearinfinity.blur.manager.indexserver;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.lucene.index.FieldInfo;
-import org.apache.lucene.index.FieldInfos;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.index.WarmUpByFieldBounds;
-import org.apache.lucene.index.WarmUpByFieldBoundsStatus;
-import org.apache.lucene.util.ReaderUtil;
-
-import com.nearinfinity.blur.log.Log;
-import com.nearinfinity.blur.log.LogFactory;
-import com.nearinfinity.blur.manager.indexserver.DistributedIndexServer.ReleaseReader;
-import com.nearinfinity.blur.thrift.generated.ColumnPreCache;
-import com.nearinfinity.blur.thrift.generated.TableDescriptor;
-import com.nearinfinity.blur.utils.BlurConstants;
-
-public class DefaultBlurIndexWarmup extends BlurIndexWarmup {
-
-  private static final Log LOG = LogFactory.getLog(DefaultBlurIndexWarmup.class);
-
-  @Override
-  public void warmBlurIndex(final TableDescriptor table, final String shard, IndexReader reader, AtomicBoolean isClosed, ReleaseReader releaseReader) throws IOException {
-    try {
-      ColumnPreCache columnPreCache = table.columnPreCache;
-      List<String> preCacheCols = null;
-      if (columnPreCache != null) {
-        preCacheCols = columnPreCache.preCacheCols;
-      }
-      if (preCacheCols == null) {
-        LOG.info("No pre cache defined, precache all fields.");
-        FieldInfos fieldInfos = ReaderUtil.getMergedFieldInfos(reader);
-        preCacheCols = new ArrayList<String>();
-        for (FieldInfo fieldInfo : fieldInfos) {
-          if (fieldInfo.isIndexed) {
-            preCacheCols.add(fieldInfo.name);
-          }
-        }
-        preCacheCols.remove(BlurConstants.ROW_ID);
-        preCacheCols.remove(BlurConstants.RECORD_ID);
-        preCacheCols.remove(BlurConstants.PRIME_DOC);
-        preCacheCols.remove(BlurConstants.SUPER);
-      }
-
-      WarmUpByFieldBounds warmUpByFieldBounds = new WarmUpByFieldBounds();
-      WarmUpByFieldBoundsStatus status = new WarmUpByFieldBoundsStatus() {
-        @Override
-        public void complete(String name, Term start, Term end, long startPosition, long endPosition, long totalBytesRead, long nanoTime, AtomicBoolean isClosed) {
-          double bytesPerNano = totalBytesRead / (double) nanoTime;
-          double mBytesPerNano = bytesPerNano / 1024 / 1024;
-          double mBytesPerSecond = mBytesPerNano * 1000000000.0;
-          if (totalBytesRead > 0) {
-            LOG.info("Precached field [{0}] in table [{1}] shard [{2}] file [{3}], [{4}] bytes cached at [{5} MB/s]", start.field(), table.name, shard, name, totalBytesRead,
-                mBytesPerSecond);
-          }
-        }
-      };
-      if (preCacheCols != null) {
-        for (String field : preCacheCols) {
-          warmUpByFieldBounds.warmUpByField(isClosed, new Term(field), reader, status);
-        }
-      }
-    } finally {
-      releaseReader.release();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-core/src/main/java/com/nearinfinity/blur/manager/indexserver/DistributedIndexServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/com/nearinfinity/blur/manager/indexserver/DistributedIndexServer.java b/src/blur-core/src/main/java/com/nearinfinity/blur/manager/indexserver/DistributedIndexServer.java
deleted file mode 100644
index 72fbe66..0000000
--- a/src/blur-core/src/main/java/com/nearinfinity/blur/manager/indexserver/DistributedIndexServer.java
+++ /dev/null
@@ -1,843 +0,0 @@
-package com.nearinfinity.blur.manager.indexserver;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import static com.nearinfinity.blur.utils.BlurConstants.SHARD_PREFIX;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.TreeSet;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.lucene.index.FieldInfo;
-import org.apache.lucene.index.FieldInfos;
-import org.apache.lucene.index.IndexDeletionPolicy;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.TermDocs;
-import org.apache.lucene.index.TermPositions;
-import org.apache.lucene.search.Similarity;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.util.ReaderUtil;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-
-import com.nearinfinity.blur.analysis.BlurAnalyzer;
-import com.nearinfinity.blur.concurrent.Executors;
-import com.nearinfinity.blur.log.Log;
-import com.nearinfinity.blur.log.LogFactory;
-import com.nearinfinity.blur.lucene.search.FairSimilarity;
-import com.nearinfinity.blur.manager.BlurFilterCache;
-import com.nearinfinity.blur.manager.clusterstatus.ClusterStatus;
-import com.nearinfinity.blur.manager.clusterstatus.ZookeeperPathConstants;
-import com.nearinfinity.blur.manager.writer.BlurIndex;
-import com.nearinfinity.blur.manager.writer.BlurIndexCloser;
-import com.nearinfinity.blur.manager.writer.BlurIndexReader;
-import com.nearinfinity.blur.manager.writer.BlurIndexRefresher;
-import com.nearinfinity.blur.manager.writer.BlurNRTIndex;
-import com.nearinfinity.blur.manager.writer.DirectoryReferenceFileGC;
-import com.nearinfinity.blur.metrics.BlurMetrics;
-import com.nearinfinity.blur.store.blockcache.BlockDirectory;
-import com.nearinfinity.blur.store.blockcache.Cache;
-import com.nearinfinity.blur.store.compressed.CompressedFieldDataDirectory;
-import com.nearinfinity.blur.store.hdfs.HdfsDirectory;
-import com.nearinfinity.blur.store.lock.BlurLockFactory;
-import com.nearinfinity.blur.thrift.generated.TableDescriptor;
-import com.nearinfinity.blur.utils.BlurConstants;
-import com.nearinfinity.blur.utils.BlurUtil;
-import com.nearinfinity.blur.zookeeper.WatchChildren;
-import com.nearinfinity.blur.zookeeper.WatchChildren.OnChange;
-
-public class DistributedIndexServer extends AbstractIndexServer {
-
-  private static final String LOGS = "logs";
-  private static final Log LOG = LogFactory.getLog(DistributedIndexServer.class);
-  private static final long _delay = TimeUnit.SECONDS.toMillis(5);
-
-  private Map<String, BlurAnalyzer> _tableAnalyzers = new ConcurrentHashMap<String, BlurAnalyzer>();
-  private Map<String, TableDescriptor> _tableDescriptors = new ConcurrentHashMap<String, TableDescriptor>();
-  private Map<String, Similarity> _tableSimilarity = new ConcurrentHashMap<String, Similarity>();
-  private Map<String, DistributedLayoutManager> _layoutManagers = new ConcurrentHashMap<String, DistributedLayoutManager>();
-  private Map<String, Set<String>> _layoutCache = new ConcurrentHashMap<String, Set<String>>();
-  private ConcurrentHashMap<String, Map<String, BlurIndex>> _indexes = new ConcurrentHashMap<String, Map<String, BlurIndex>>();
-
-  // set externally
-  private ClusterStatus _clusterStatus;
-  private Configuration _configuration;
-  private String _nodeName;
-  private int _shardOpenerThreadCount;
-  private BlurIndexRefresher _refresher;
-  private Cache _cache;
-  private BlurMetrics _blurMetrics;
-  private ZooKeeper _zookeeper;
-
-  // set internally
-  private Timer _timerCacheFlush;
-  private ExecutorService _openerService;
-  private BlurIndexCloser _closer;
-  private Timer _timerTableWarmer;
-  private BlurFilterCache _filterCache;
-  private AtomicBoolean _running = new AtomicBoolean();
-  private long _safeModeDelay;
-  private BlurIndexWarmup _warmup = new DefaultBlurIndexWarmup();
-  private IndexDeletionPolicy _indexDeletionPolicy;
-  private String cluster = BlurConstants.BLUR_CLUSTER;
-  private DirectoryReferenceFileGC _gc;
-  private long _timeBetweenCommits = TimeUnit.SECONDS.toMillis(60);
-  private long _timeBetweenRefreshs = TimeUnit.MILLISECONDS.toMillis(500);
-  private WatchChildren _watchOnlineShards;
-
-  public static interface ReleaseReader {
-    void release() throws IOException;
-  }
-
-  public void init() throws KeeperException, InterruptedException, IOException {
-    BlurUtil.setupZookeeper(_zookeeper, cluster);
-    _openerService = Executors.newThreadPool("shard-opener", _shardOpenerThreadCount);
-    _closer = new BlurIndexCloser();
-    _closer.init();
-    _gc = new DirectoryReferenceFileGC();
-    _gc.init();
-    setupFlushCacheTimer();
-    String lockPath = BlurUtil.lockForSafeMode(_zookeeper, getNodeName(), cluster);
-    try {
-      registerMyself();
-      setupSafeMode();
-    } finally {
-      BlurUtil.unlockForSafeMode(_zookeeper, lockPath);
-    }
-    waitInSafeModeIfNeeded();
-    _running.set(true);
-    setupTableWarmer();
-    watchForShardServerChanges();
-  }
-
-  private void watchForShardServerChanges() {
-    ZookeeperPathConstants.getOnlineShardsPath(cluster);
-    _watchOnlineShards = new WatchChildren(_zookeeper, ZookeeperPathConstants.getOnlineShardsPath(cluster)).watch(new OnChange() {
-      private List<String> _prevOnlineShards = new ArrayList<String>();
-
-      @Override
-      public void action(List<String> onlineShards) {
-        List<String> oldOnlineShards = _prevOnlineShards;
-        _prevOnlineShards = onlineShards;
-        _layoutManagers.clear();
-        _layoutCache.clear();
-        LOG.info("Online shard servers changed, clearing layout managers and cache.");
-        if (oldOnlineShards == null) {
-          oldOnlineShards = new ArrayList<String>();
-        }
-        for (String oldOnlineShard : oldOnlineShards) {
-          if (!onlineShards.contains(oldOnlineShard)) {
-            LOG.info("Node went offline [{0}]", oldOnlineShard);
-          }
-        }
-        for (String onlineShard : onlineShards) {
-          if (!oldOnlineShards.contains(onlineShard)) {
-            LOG.info("Node came online [{0}]", onlineShard);
-          }
-        }
-      }
-    });
-  }
-
-  private void waitInSafeModeIfNeeded() throws KeeperException, InterruptedException {
-    String blurSafemodePath = ZookeeperPathConstants.getSafemodePath(cluster);
-    Stat stat = _zookeeper.exists(blurSafemodePath, false);
-    if (stat == null) {
-      throw new RuntimeException("Safemode path missing [" + blurSafemodePath + "]");
-    }
-    byte[] data = _zookeeper.getData(blurSafemodePath, false, stat);
-    if (data == null) {
-      throw new RuntimeException("Safemode data missing [" + blurSafemodePath + "]");
-    }
-    long timestamp = Long.parseLong(new String(data));
-    long waitTime = timestamp - System.currentTimeMillis();
-    if (waitTime > 0) {
-      LOG.info("Waiting in safe mode for [{0}] seconds", waitTime / 1000.0);
-      Thread.sleep(waitTime);
-    }
-  }
-
-  private void setupSafeMode() throws KeeperException, InterruptedException {
-    String shardsPath = ZookeeperPathConstants.getOnlineShardsPath(cluster);
-    List<String> children = _zookeeper.getChildren(shardsPath, false);
-    if (children.size() == 0) {
-      throw new RuntimeException("No shards registered!");
-    }
-    if (children.size() != 1) {
-      return;
-    }
-    LOG.info("First node online, setting up safe mode.");
-    long timestamp = System.currentTimeMillis() + _safeModeDelay;
-    String blurSafemodePath = ZookeeperPathConstants.getSafemodePath(cluster);
-    Stat stat = _zookeeper.exists(blurSafemodePath, false);
-    if (stat == null) {
-      _zookeeper.create(blurSafemodePath, Long.toString(timestamp).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-    } else {
-      _zookeeper.setData(blurSafemodePath, Long.toString(timestamp).getBytes(), -1);
-    }
-    setupAnyMissingPaths();
-  }
-
-  private void setupAnyMissingPaths() throws KeeperException, InterruptedException {
-    String tablesPath = ZookeeperPathConstants.getTablesPath(cluster);
-    List<String> tables = _zookeeper.getChildren(tablesPath, false);
-    for (String table : tables) {
-      BlurUtil.createIfMissing(_zookeeper, ZookeeperPathConstants.getLockPath(cluster, table));
-      BlurUtil.createIfMissing(_zookeeper, ZookeeperPathConstants.getTableFieldNamesPath(cluster, table));
-    }
-  }
-
-  private void setupTableWarmer() {
-    _timerTableWarmer = new Timer("Table-Warmer", true);
-    _timerTableWarmer.schedule(new TimerTask() {
-      @Override
-      public void run() {
-        try {
-          warmup();
-        } catch (Throwable t) {
-          if (_running.get()) {
-            LOG.error("Unknown error", t);
-          } else {
-            LOG.debug("Unknown error", t);
-          }
-        }
-      }
-
-      private void warmup() {
-        if (_running.get()) {
-          List<String> tableList = _clusterStatus.getTableList(false, cluster);
-          _blurMetrics.tableCount.set(tableList.size());
-          long indexCount = 0;
-          AtomicLong segmentCount = new AtomicLong();
-          AtomicLong indexMemoryUsage = new AtomicLong();
-          for (String table : tableList) {
-            try {
-              Map<String, BlurIndex> indexes = getIndexes(table);
-              int count = indexes.size();
-              indexCount += count;
-              updateMetrics(_blurMetrics, indexes, segmentCount, indexMemoryUsage);
-              LOG.debug("Table [{0}] has [{1}] number of shards online in this node.", table, count);
-            } catch (IOException e) {
-              LOG.error("Unknown error trying to warm table [{0}]", e, table);
-            }
-          }
-          _blurMetrics.indexCount.set(indexCount);
-          _blurMetrics.segmentCount.set(segmentCount.get());
-          _blurMetrics.indexMemoryUsage.set(indexMemoryUsage.get());
-        }
-      }
-
-      private void updateMetrics(BlurMetrics blurMetrics, Map<String, BlurIndex> indexes, AtomicLong segmentCount, AtomicLong indexMemoryUsage) throws IOException {
-        for (BlurIndex index : indexes.values()) {
-          IndexReader reader = index.getIndexReader();
-          try {
-            IndexReader[] readers = reader.getSequentialSubReaders();
-            if (readers != null) {
-              segmentCount.addAndGet(readers.length);
-            }
-            indexMemoryUsage.addAndGet(BlurUtil.getMemoryUsage(reader));
-          } finally {
-            reader.decRef();
-          }
-        }
-      }
-    }, _delay, _delay);
-  }
-
-  private void registerMyself() {
-    String nodeName = getNodeName();
-    String registeredShardsPath = ZookeeperPathConstants.getRegisteredShardsPath(cluster) + "/" + nodeName;
-    String onlineShardsPath = ZookeeperPathConstants.getOnlineShardsPath(cluster) + "/" + nodeName;
-    try {
-      if (_zookeeper.exists(registeredShardsPath, false) == null) {
-        _zookeeper.create(registeredShardsPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-      }
-      while (_zookeeper.exists(onlineShardsPath, false) != null) {
-        LOG.info("Node [{0}] already registered, waiting for path [{1}] to be released", nodeName, onlineShardsPath);
-        Thread.sleep(3000);
-      }
-      String version = BlurUtil.getVersion();
-      _zookeeper.create(onlineShardsPath, version.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
-    } catch (KeeperException e) {
-      throw new RuntimeException(e);
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  private void setupFlushCacheTimer() {
-    _timerCacheFlush = new Timer("Flush-IndexServer-Caches", true);
-    _timerCacheFlush.schedule(new TimerTask() {
-      @Override
-      public void run() {
-        try {
-          cleanup();
-        } catch (Throwable t) {
-          LOG.error("Unknown error", t);
-        }
-      }
-
-      private void cleanup() {
-        clearMapOfOldTables(_tableAnalyzers);
-        clearMapOfOldTables(_tableDescriptors);
-        clearMapOfOldTables(_layoutManagers);
-        clearMapOfOldTables(_layoutCache);
-        clearMapOfOldTables(_tableSimilarity);
-        Map<String, Map<String, BlurIndex>> oldIndexesThatNeedToBeClosed = clearMapOfOldTables(_indexes);
-        for (String table : oldIndexesThatNeedToBeClosed.keySet()) {
-          Map<String, BlurIndex> indexes = oldIndexesThatNeedToBeClosed.get(table);
-          if (indexes == null) {
-            continue;
-          }
-          for (String shard : indexes.keySet()) {
-            BlurIndex index = indexes.get(shard);
-            if (index == null) {
-              continue;
-            }
-            close(index, table, shard);
-          }
-        }
-        for (String table : _indexes.keySet()) {
-          Map<String, BlurIndex> shardMap = _indexes.get(table);
-          if (shardMap != null) {
-            Set<String> shards = new HashSet<String>(shardMap.keySet());
-            Set<String> shardsToServe = getShardsToServe(table);
-            shards.removeAll(shardsToServe);
-            if (!shards.isEmpty()) {
-              LOG.info("Need to close indexes for table [{0}] indexes [{1}]", table, shards);
-            }
-            for (String shard : shards) {
-              LOG.info("Closing index for table [{0}] shard [{1}]", table, shard);
-              BlurIndex index = shardMap.remove(shard);
-              close(index, table, shard);
-            }
-          }
-        }
-      }
-    }, _delay, _delay);
-  }
-
-  protected void close(BlurIndex index, String table, String shard) {
-    LOG.info("Closing index [{0}] from table [{1}] shard [{2}]", index, table, shard);
-    try {
-      _filterCache.closing(table, shard, index);
-      index.close();
-    } catch (Throwable e) {
-      LOG.error("Error while closing index [{0}] from table [{1}] shard [{2}]", e, index, table, shard);
-    }
-  }
-
-  protected <T> Map<String, T> clearMapOfOldTables(Map<String, T> map) {
-    List<String> tables = new ArrayList<String>(map.keySet());
-    Map<String, T> removed = new HashMap<String, T>();
-    for (String table : tables) {
-      if (!_clusterStatus.exists(true, cluster, table)) {
-        removed.put(table, map.remove(table));
-      }
-    }
-    for (String table : tables) {
-      if (!_clusterStatus.isEnabled(true, cluster, table)) {
-        removed.put(table, map.remove(table));
-      }
-    }
-    return removed;
-  }
-
-  @Override
-  public void close() {
-    if (_running.get()) {
-      _running.set(false);
-      closeAllIndexes();
-      _watchOnlineShards.close();
-      _timerCacheFlush.purge();
-      _timerCacheFlush.cancel();
-
-      _timerTableWarmer.purge();
-      _timerTableWarmer.cancel();
-      _closer.close();
-      _gc.close();
-      _openerService.shutdownNow();
-    }
-  }
-
-  private void closeAllIndexes() {
-    for (Entry<String, Map<String, BlurIndex>> tableToShards : _indexes.entrySet()) {
-      for (Entry<String, BlurIndex> shard : tableToShards.getValue().entrySet()) {
-        BlurIndex index = shard.getValue();
-        try {
-          index.close();
-          LOG.info("Closed [{0}] [{1}] [{2}]", tableToShards.getKey(), shard.getKey(), index);
-        } catch (IOException e) {
-          LOG.info("Error during closing of [{0}] [{1}] [{2}]", tableToShards.getKey(), shard.getKey(), index);
-        }
-      }
-    }
-  }
-
-  @Override
-  public BlurAnalyzer getAnalyzer(String table) {
-    checkTable(table);
-    BlurAnalyzer blurAnalyzer = _tableAnalyzers.get(table);
-    if (blurAnalyzer == null) {
-      TableDescriptor descriptor = getTableDescriptor(table);
-      blurAnalyzer = new BlurAnalyzer(descriptor.analyzerDefinition);
-      _tableAnalyzers.put(table, blurAnalyzer);
-    }
-    return blurAnalyzer;
-  }
-
-  @Override
-  public int getCompressionBlockSize(String table) {
-    checkTable(table);
-    TableDescriptor descriptor = getTableDescriptor(table);
-    return descriptor.compressionBlockSize;
-  }
-
-  @Override
-  public CompressionCodec getCompressionCodec(String table) {
-    checkTable(table);
-    TableDescriptor descriptor = getTableDescriptor(table);
-    return getInstance(descriptor.compressionClass, CompressionCodec.class);
-  }
-
-  @Override
-  public SortedSet<String> getShardListCurrentServerOnly(String table) throws IOException {
-    return new TreeSet<String>(getShardsToServe(table));
-  }
-
-  @Override
-  public Map<String, BlurIndex> getIndexes(String table) throws IOException {
-    checkTable(table);
-
-    Set<String> shardsToServe = getShardsToServe(table);
-    synchronized (_indexes) {
-      if (!_indexes.containsKey(table)) {
-        _indexes.putIfAbsent(table, new ConcurrentHashMap<String, BlurIndex>());
-      }
-    }
-    Map<String, BlurIndex> tableIndexes = _indexes.get(table);
-    Set<String> shardsBeingServed = new HashSet<String>(tableIndexes.keySet());
-    if (shardsBeingServed.containsAll(shardsToServe)) {
-      Map<String, BlurIndex> result = new HashMap<String, BlurIndex>(tableIndexes);
-      shardsBeingServed.removeAll(shardsToServe);
-      for (String shardNotToServe : shardsBeingServed) {
-        result.remove(shardNotToServe);
-      }
-      return result;
-    } else {
-      return openMissingShards(table, shardsToServe, tableIndexes);
-    }
-  }
-
-  private BlurIndex openShard(String table, String shard) throws IOException {
-    LOG.info("Opening shard [{0}] for table [{1}]", shard, table);
-    Path tablePath = new Path(getTableDescriptor(table).tableUri);
-    Path walTablePath = new Path(tablePath, LOGS);
-    Path hdfsDirPath = new Path(tablePath, shard);
-
-    BlurLockFactory lockFactory = new BlurLockFactory(_configuration, hdfsDirPath, _nodeName, BlurConstants.getPid());
-
-    Directory directory = new HdfsDirectory(hdfsDirPath);
-    directory.setLockFactory(lockFactory);
-
-    TableDescriptor descriptor = _clusterStatus.getTableDescriptor(true, cluster, table);
-    String compressionClass = descriptor.compressionClass;
-    int compressionBlockSize = descriptor.compressionBlockSize;
-    if (compressionClass != null) {
-      CompressionCodec compressionCodec;
-      try {
-        compressionCodec = BlurUtil.getInstance(compressionClass, CompressionCodec.class);
-        directory = new CompressedFieldDataDirectory(directory, compressionCodec, compressionBlockSize);
-      } catch (Exception e) {
-        throw new IOException(e);
-      }
-    }
-
-    Directory dir;
-    boolean blockCacheEnabled = _clusterStatus.isBlockCacheEnabled(cluster, table);
-    if (blockCacheEnabled) {
-      Set<String> blockCacheFileTypes = _clusterStatus.getBlockCacheFileTypes(cluster, table);
-      dir = new BlockDirectory(table + "_" + shard, directory, _cache, blockCacheFileTypes);
-    } else {
-      dir = directory;
-    }
-
-    BlurIndex index;
-    if (_clusterStatus.isReadOnly(true, cluster, table)) {
-      BlurIndexReader reader = new BlurIndexReader();
-      reader.setCloser(_closer);
-      reader.setAnalyzer(getAnalyzer(table));
-      reader.setDirectory(dir);
-      reader.setRefresher(_refresher);
-      reader.setShard(shard);
-      reader.setTable(table);
-      reader.setIndexDeletionPolicy(_indexDeletionPolicy);
-      reader.setSimilarity(getSimilarity(table));
-      reader.init();
-      index = reader;
-    } else {
-      BlurNRTIndex writer = new BlurNRTIndex();
-      writer.setAnalyzer(getAnalyzer(table));
-      writer.setDirectory(dir);
-      writer.setShard(shard);
-      writer.setTable(table);
-      writer.setSimilarity(getSimilarity(table));
-      writer.setTimeBetweenCommits(_timeBetweenCommits);
-      writer.setTimeBetweenRefreshs(_timeBetweenRefreshs);
-      writer.setWalPath(walTablePath);
-      writer.setConfiguration(_configuration);
-      writer.setIndexDeletionPolicy(_indexDeletionPolicy);
-      writer.setCloser(_closer);
-      writer.setGc(_gc);
-      writer.init();
-      index = writer;
-    }
-    _filterCache.opening(table, shard, index);
-    TableDescriptor tableDescriptor = _clusterStatus.getTableDescriptor(true, cluster, table);
-    return warmUp(index, tableDescriptor, shard);
-  }
-
-  private BlurIndex warmUp(BlurIndex index, TableDescriptor table, String shard) throws IOException {
-    final IndexReader reader = index.getIndexReader();
-    warmUpAllSegments(reader);
-    _warmup.warmBlurIndex(table, shard, reader, index.isClosed(), new ReleaseReader() {
-      @Override
-      public void release() throws IOException {
-        // this will allow for closing of index
-        reader.decRef();
-      }
-    });
-
-    return index;
-  }
-
-  private void warmUpAllSegments(IndexReader reader) throws IOException {
-    IndexReader[] indexReaders = reader.getSequentialSubReaders();
-    if (indexReaders != null) {
-      for (IndexReader r : indexReaders) {
-        warmUpAllSegments(r);
-      }
-    }
-    int maxDoc = reader.maxDoc();
-    int numDocs = reader.numDocs();
-    FieldInfos fieldInfos = ReaderUtil.getMergedFieldInfos(reader);
-    Collection<String> fieldNames = new ArrayList<String>();
-    for (FieldInfo fieldInfo : fieldInfos) {
-      if (fieldInfo.isIndexed) {
-        fieldNames.add(fieldInfo.name);
-      }
-    }
-    int primeDocCount = reader.docFreq(BlurConstants.PRIME_DOC_TERM);
-    TermDocs termDocs = reader.termDocs(BlurConstants.PRIME_DOC_TERM);
-    termDocs.next();
-    termDocs.close();
-
-    TermPositions termPositions = reader.termPositions(BlurConstants.PRIME_DOC_TERM);
-    if (termPositions.next()) {
-      if (termPositions.freq() > 0) {
-        termPositions.nextPosition();
-      }
-    }
-    termPositions.close();
-    LOG.info("Warmup of indexreader [" + reader + "] complete, maxDocs [" + maxDoc + "], numDocs [" + numDocs + "], primeDocumentCount [" + primeDocCount + "], fieldCount ["
-        + fieldNames.size() + "]");
-  }
-
-  private synchronized Map<String, BlurIndex> openMissingShards(final String table, Set<String> shardsToServe, final Map<String, BlurIndex> tableIndexes) {
-    Map<String, Future<BlurIndex>> opening = new HashMap<String, Future<BlurIndex>>();
-    for (String s : shardsToServe) {
-      final String shard = s;
-      BlurIndex blurIndex = tableIndexes.get(shard);
-      if (blurIndex == null) {
-        LOG.info("Opening missing shard [{0}] from table [{1}]", shard, table);
-        Future<BlurIndex> submit = _openerService.submit(new Callable<BlurIndex>() {
-          @Override
-          public BlurIndex call() throws Exception {
-            return openShard(table, shard);
-          }
-        });
-        opening.put(shard, submit);
-      }
-    }
-
-    for (Entry<String, Future<BlurIndex>> entry : opening.entrySet()) {
-      String shard = entry.getKey();
-      Future<BlurIndex> future = entry.getValue();
-      try {
-        BlurIndex blurIndex = future.get();
-        tableIndexes.put(shard, blurIndex);
-      } catch (Exception e) {
-        e.printStackTrace();
-        LOG.error("Unknown error while opening shard [{0}] for table [{1}].", e.getCause(), shard, table);
-      }
-    }
-
-    Map<String, BlurIndex> result = new HashMap<String, BlurIndex>();
-    for (String shard : shardsToServe) {
-      BlurIndex blurIndex = tableIndexes.get(shard);
-      if (blurIndex == null) {
-        LOG.error("Missing shard [{0}] for table [{1}].", shard, table);
-      } else {
-        result.put(shard, blurIndex);
-      }
-    }
-    return result;
-  }
-
-  private Set<String> getShardsToServe(String table) {
-    TABLE_STATUS tableStatus = getTableStatus(table);
-    if (tableStatus == TABLE_STATUS.DISABLED) {
-      return new HashSet<String>();
-    }
-    DistributedLayoutManager layoutManager = _layoutManagers.get(table);
-    if (layoutManager == null) {
-      return setupLayoutManager(table);
-    } else {
-      return _layoutCache.get(table);
-    }
-  }
-
-  private synchronized Set<String> setupLayoutManager(String table) {
-    DistributedLayoutManager layoutManager = new DistributedLayoutManager();
-
-    String cluster = _clusterStatus.getCluster(false, table);
-    if (cluster == null) {
-      throw new RuntimeException("Table [" + table + "] is not found.");
-    }
-
-    List<String> shardServerList = _clusterStatus.getShardServerList(cluster);
-    List<String> offlineShardServers = new ArrayList<String>(_clusterStatus.getOfflineShardServers(false, cluster));
-    List<String> shardList = getShardList(table);
-
-    layoutManager.setNodes(shardServerList);
-    layoutManager.setNodesOffline(offlineShardServers);
-    layoutManager.setShards(shardList);
-    layoutManager.init();
-
-    Map<String, String> layout = layoutManager.getLayout();
-    String nodeName = getNodeName();
-    Set<String> shardsToServeCache = new TreeSet<String>();
-    for (Entry<String, String> entry : layout.entrySet()) {
-      if (entry.getValue().equals(nodeName)) {
-        shardsToServeCache.add(entry.getKey());
-      }
-    }
-    _layoutCache.put(table, shardsToServeCache);
-    _layoutManagers.put(table, layoutManager);
-    return shardsToServeCache;
-  }
-
-  @Override
-  public String getNodeName() {
-    return _nodeName;
-  }
-
-  @Override
-  public int getShardCount(String table) {
-    checkTable(table);
-    TableDescriptor descriptor = getTableDescriptor(table);
-    return descriptor.shardCount;
-  }
-
-  @Override
-  public List<String> getShardList(String table) {
-    checkTable(table);
-    List<String> result = new ArrayList<String>();
-    try {
-      TableDescriptor descriptor = getTableDescriptor(table);
-      Path tablePath = new Path(descriptor.tableUri);
-      FileSystem fileSystem = FileSystem.get(tablePath.toUri(), _configuration);
-      if (!fileSystem.exists(tablePath)) {
-        LOG.error("Table [{0}] is missing, defined location [{1}]", table, tablePath.toUri());
-        throw new RuntimeException("Table [" + table + "] is missing, defined location [" + tablePath.toUri() + "]");
-      }
-      FileStatus[] listStatus = fileSystem.listStatus(tablePath);
-      for (FileStatus status : listStatus) {
-        if (status.isDir()) {
-          String name = status.getPath().getName();
-          if (name.startsWith(SHARD_PREFIX)) {
-            result.add(name);
-          }
-        }
-      }
-      return result;
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public Similarity getSimilarity(String table) {
-    checkTable(table);
-    Similarity similarity = _tableSimilarity.get(table);
-    if (similarity == null) {
-      TableDescriptor tableDescriptor = _clusterStatus.getTableDescriptor(true, cluster, table);
-      String similarityClass = tableDescriptor.similarityClass;
-      if (similarityClass == null) {
-        similarity = new FairSimilarity();
-      } else {
-        similarity = getInstance(similarityClass, Similarity.class);
-      }
-      _tableSimilarity.put(table, similarity);
-    }
-    return similarity;
-  }
-
-  @Override
-  public long getTableSize(String table) throws IOException {
-    checkTable(table);
-    Path tablePath = new Path(getTableUri(table));
-    FileSystem fileSystem = FileSystem.get(tablePath.toUri(), _configuration);
-    ContentSummary contentSummary = fileSystem.getContentSummary(tablePath);
-    return contentSummary.getLength();
-  }
-
-  @Override
-  public TABLE_STATUS getTableStatus(String table) {
-    checkTable(table);
-    boolean enabled = _clusterStatus.isEnabled(true, cluster, table);
-    if (enabled) {
-      return TABLE_STATUS.ENABLED;
-    }
-    return TABLE_STATUS.DISABLED;
-  }
-
-  private void checkTable(String table) {
-    if (_clusterStatus.exists(true, cluster, table)) {
-      return;
-    }
-    throw new RuntimeException("Table [" + table + "] does not exist.");
-  }
-
-  @Override
-  public String getTableUri(String table) {
-    checkTable(table);
-    TableDescriptor descriptor = getTableDescriptor(table);
-    return descriptor.tableUri;
-  }
-
-  private TableDescriptor getTableDescriptor(String table) {
-    TableDescriptor tableDescriptor = _tableDescriptors.get(table);
-    if (tableDescriptor == null) {
-      tableDescriptor = _clusterStatus.getTableDescriptor(true, cluster, table);
-      _tableDescriptors.put(table, tableDescriptor);
-    }
-    return tableDescriptor;
-  }
-
-  @SuppressWarnings("unchecked")
-  private <T> T getInstance(String className, Class<T> c) {
-    try {
-      Class<? extends T> clazz = (Class<? extends T>) Class.forName(className);
-      Object object = clazz.newInstance();
-      if (object instanceof Configurable) {
-        Configurable configurable = (Configurable) object;
-        configurable.setConf(_configuration);
-      }
-      return (T) object;
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  public void setClusterStatus(ClusterStatus clusterStatus) {
-    _clusterStatus = clusterStatus;
-  }
-
-  public void setConfiguration(Configuration configuration) {
-    _configuration = configuration;
-  }
-
-  public void setNodeName(String nodeName) {
-    _nodeName = nodeName;
-  }
-
-  public void setShardOpenerThreadCount(int shardOpenerThreadCount) {
-    _shardOpenerThreadCount = shardOpenerThreadCount;
-  }
-
-  public void setRefresher(BlurIndexRefresher refresher) {
-    _refresher = refresher;
-  }
-
-  public void setCache(Cache cache) {
-    _cache = cache;
-  }
-
-  public void setBlurMetrics(BlurMetrics blurMetrics) {
-    _blurMetrics = blurMetrics;
-  }
-
-  public void setCloser(BlurIndexCloser closer) {
-    _closer = closer;
-  }
-
-  public void setZookeeper(ZooKeeper zookeeper) {
-    _zookeeper = zookeeper;
-  }
-
-  public void setFilterCache(BlurFilterCache filterCache) {
-    _filterCache = filterCache;
-  }
-
-  public void setSafeModeDelay(long safeModeDelay) {
-    _safeModeDelay = safeModeDelay;
-  }
-
-  public void setWarmup(BlurIndexWarmup warmup) {
-    _warmup = warmup;
-  }
-
-  public void setIndexDeletionPolicy(IndexDeletionPolicy indexDeletionPolicy) {
-    _indexDeletionPolicy = indexDeletionPolicy;
-  }
-
-  public void setTimeBetweenCommits(long timeBetweenCommits) {
-    _timeBetweenCommits = timeBetweenCommits;
-  }
-
-  public void setTimeBetweenRefreshs(long timeBetweenRefreshs) {
-    _timeBetweenRefreshs = timeBetweenRefreshs;
-  }
-}


Mime
View raw message